Daten Import via Java - Neo4j Real-Beispiel Tutorial

Nun möchte ich euch gerne einmal zeigen wie mein derzeitiger Java-Code aussieht, welchen ich zum Importieren großer Datenmengen in Neo4j verwende. Dies stellt einen Real-Beispiel Import Code dar. Genutzt werden könnte er für eine bestimmte kostenpflichtige Datenbank aus dem Internet, welche ich hier nicht weiter erwähne.

Vorwissen

Der Java-Code ist stark an die Fallbeispiel-Datenbank angelehnt. Jedoch weicht er auch in einigen Punkten ab. Ich denke aber, dass es auch ganz interessant sein kann von dem strikten Beispiel abzuweichen. Dazu gehören:

  • Lucene Indexe statt den Normalen
  • erhöhte Anzahl an Eigenschaften pro Klasse
  • Einlesen mehrerer Dateien pro Klasse/Relation

Importer.java

Die Importer.java ist die einzige Datei, welche ihr zum Importieren braucht. Sie vereint die:

  • Erzeugung von Mappings (Representation der Node-IDs)
  • Erstellung der Label- und Relations-Eigenschaften
  • den eigentlichen Daten-Import

Ein herzliches Dankeschön geht dabei an Michael Hunger. Ohne ihn wäre folgender Code wohl nicht vollendet worden.

import au.com.bytecode.opencsv.CSVReader;
import gnu.trove.map.TIntIntMap;
import gnu.trove.map.hash.TIntIntHashMap;
import org.neo4j.graphdb.Label;
import org.neo4j.graphdb.RelationshipType;
import org.neo4j.helpers.collection.MapUtil;
import org.neo4j.index.impl.lucene.LuceneIndexImplementation;
import org.neo4j.index.lucene.unsafe.batchinsert.LuceneBatchInserterIndexProvider;
import org.neo4j.kernel.impl.util.FileUtils;
import org.neo4j.unsafe.batchinsert.BatchInserter;
import org.neo4j.unsafe.batchinsert.BatchInserterIndex;
import org.neo4j.unsafe.batchinsert.BatchInserters;
import scala.App;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import static java.util.Arrays.asList;
import static org.neo4j.helpers.collection.MapUtil.map;
import static org.neo4j.helpers.collection.MapUtil.toMap;
@SuppressWarnings("SpellCheckingInspection")
public class Importer {
    public static final Map<String, Object> NO_PROPS = (Map<String, Object>) Collections.EMPTY_MAP;
    private final String database;
    private final File csvDir;
    private int count;
    private long time;
    public Importer(String database, String csvDir) {
        this.database = database;
        this.csvDir = new File(csvDir);
    }
    public static void main(String[] args) throws IOException {
        String database = args[0];
        String csvDir = args[1];
        Importer importer = new Importer(database, csvDir);
        importer.doImport();
    }
    enum Labels implements Label {Appln, Title, Person}
    enum Types implements RelationshipType {HAS_TITLE, WROTE}
    interface Property {
        Object value(String[] row);
        String property();
        int pos();
    }
    enum Appln implements Property {
        appln_id(0, "ID") {
            public Integer value(String[] row) {
                String value = super.value(row).toString();
                if (value.trim().isEmpty()) return -1;
                return Integer.valueOf(value);
            }
        },
        appln_auth(1, "auth"),
        appln_nr(2, "nr"),
        appln_kind(3, "kind"),
        appln_filing_date(4, "filingDate"),
        appln_nr_epodoc(5, "nrEpodoc"),
        ipr_type(6, "iprType"),
        appln_title_lg(7, "titleLg"),
        appln_abstract_lg(8, "abstrLg"),
        internat_appln_id(9, "internatApplnID"),
        title(-1, "title") {
            public Object value(String[] row) {
                return "APPLN TITLE NOT SET";
            }
        };
        private final int pos;
        final String property;
        Appln(int pos, String property) {
            this.pos = pos;
            this.property = property;
        }
        public Object value(String[] row) {
            if (pos == -1) return 0;
            return row[pos];
        }
        @Override
        public String property() {
            return property;
        }
        @Override
        public int pos() {
            return pos;
        }
    }
    private static Map<String, Object> toMap(Property[] properties, String[] row) {
        Map<String, Object> map = new HashMap<String, Object>(properties.length);
        for (Property property : properties) {
            map.put(property.property(), property.value(row));
        }
        return map;
    }
    private void doImport() throws IOException {
        FileUtils.deleteRecursively(new File(database));
        System.out.println("Importing data from " + csvDir + " into " + database);
        time = System.currentTimeMillis();
        long start = time;
        TIntIntMap appln = new TIntIntHashMap();
        TIntIntMap persons = new TIntIntHashMap();
        BatchInserter batch = BatchInserters.inserter(database, config());
        LuceneBatchInserterIndexProvider index = new LuceneBatchInserterIndexProvider(batch);
        try {
            importApplns(appln, batch);
            importTitles(appln, batch, index);
            importPeople(persons, batch, index);
            importPeopleAppln(appln, persons, batch);
            batch.createDeferredSchemaIndex(Labels.Appln).on("ID").create();
            System.out.println("Appln.ID index finished");
            batch.createDeferredSchemaIndex(Labels.Person).on("ID").create();
            System.out.println("Person.ID index finished");
        } finally {
            index.shutdown();
            batch.shutdown();
            System.out.println("Total " + count + " Rows " + appln.size() + " Applns " + persons.size() + " People took " + (System.currentTimeMillis() - start) / 1000 + " seconds.");
        }
    }
    private void importApplns(TIntIntMap appln, BatchInserter batch) throws IOException {
        System.out.println("Applns start");
        trace(true);
        for (int part : asList(1, 2, 3, 4)) {
            int lines = 0;
            File file = new File(csvDir, "tls201_part0" + part + ".txt");
            if (!file.exists()) {
                System.out.println("File " + file + " does not exist, skipping");
                continue;
            }
            CSVReader csv = new CSVReader(new FileReader(file));
            csv.readNext(); // header
            String[] line;
            while ((line = csv.readNext()) != null) {
                if (line.length == 0) break;
                lines++;
                // if (lines > 1000) break;
                int applnId = (Integer) Appln.appln_id.value(line);
                if (!appln.containsKey(applnId)) {
                    Map<String, Object> data = toMap(Appln.values(), line);
                    appln.put(applnId, (int) batch.createNode(data, Labels.Appln));
                }
                trace(false);
            }
            csv.close();
            System.out.println("Applns end part " + part);
            trace(true);
        }
        System.out.println("Applns end");
    }
    private void importTitles(TIntIntMap appln, BatchInserter batch, LuceneBatchInserterIndexProvider index) throws IOException {
        System.out.println("Title start");
        trace(true);
        BatchInserterIndex titleIndex = index.nodeIndex("titles", LuceneIndexImplementation.FULLTEXT_CONFIG);
        for (int part : asList(1, 2, 3)) {
            int lines = 0;
            File file = new File(csvDir, "tls202_part0" + part + ".txt");
            if (!file.exists()) {
                System.out.println("File " + file + " does not exist, skipping");
                continue;
            }
            CSVReader csv = new CSVReader(new FileReader(file));
            csv.readNext(); // header
            String[] line;
            while ((line = csv.readNext()) != null) {
                if (line.length == 0) break;
                lines++;
                // if (lines > 1000) break;
                if (line[0].trim().isEmpty()) continue;
                int applnId = Integer.parseInt(line[0]);
                if (!appln.containsKey(applnId)) continue;
                long applnNode = appln.get(applnId);
                String title = line[1];
                titleIndex.add(applnNode, map("title", title));
                batch.setNodeProperty(applnNode, "title", title);
                trace(false);
            }
            csv.close();
            System.out.println("Titles end part " + part);
            trace(true);
        }
        System.out.println("Title end");
    }
    private void importTitles_Old(Map<Integer, Long> appln, BatchInserter batch, LuceneBatchInserterIndexProvider index) throws IOException {
        System.out.println("Title start");
        trace(true);
        BatchInserterIndex titleIndex = index.nodeIndex("titles", LuceneIndexImplementation.FULLTEXT_CONFIG);
        for (int part : asList(1, 2, 3)) {
            int lines = 0;
            File file = new File(csvDir, "tls202_part0" + part + ".txt");
            if (!file.exists()) {
                System.out.println("File " + file + " does not exist, skipping");
                continue;
            }
            CSVReader csv = new CSVReader(new FileReader(file));
            csv.readNext(); // header
            String[] line;
            while ((line = csv.readNext()) != null) {
                if (line.length == 0) break;
                lines++;
                // if (lines > 1000) break;
                String title = line[1];
                Map<String, Object> props = map("title", title);
                long titleNode = batch.createNode(props, Labels.Title);
                titleIndex.add(titleNode, props);
                if (line[0].trim().isEmpty()) continue;
                int applnId = Integer.parseInt(line[0]);
                if (!appln.containsKey(applnId)) continue;
                long applnNode = appln.get(applnId);
                batch.createRelationship(applnNode, titleNode, Types.HAS_TITLE, NO_PROPS);
                trace(false);
            }
            csv.close();
            System.out.println("Titles end part " + part);
            trace(true);
        }
        System.out.println("Title end");
    }
    enum Person implements Property {
        person_id(0, "ID") {
            public Integer value(String[] row) {
                String value = super.value(row).toString();
                if (value.trim().isEmpty()) return -1;
                return Integer.valueOf(value);
            }
        },
        person_name(3, "name"),
        person_ctry_code(1, "countryCode"),
        doc_std_name_id(2, "docStdNameID"),
        person_address(4, "address");
        final String property;
        final int pos;
        Person(int pos, String property) {
            this.pos = pos;
            this.property = property;
        }
        public Object value(String[] row) {
            if (pos == -1) return 0;
            return row[pos];
        }
        @Override
        public String property() {
            return property;
        }
        @Override
        public int pos() {
            return pos;
        }
    }
    private void importPeople(TIntIntMap people, BatchInserter batch, LuceneBatchInserterIndexProvider index) throws IOException {
        System.out.println("People start");
        trace(true);
        BatchInserterIndex peopleIndex = index.nodeIndex("people", LuceneIndexImplementation.FULLTEXT_CONFIG);
        for (int part : asList(1, 2)) {
            int lines = 0;
            File file = new File(csvDir, "tls206_part0" + part + ".txt");
            if (!file.exists()) {
                System.out.println("File " + file + " does not exist, skipping");
                continue;
            }
            CSVReader csv = new CSVReader(new FileReader(file));
            csv.readNext(); // header
            String[] line;
            while ((line = csv.readNext()) != null) {
                if (line.length == 0) break;
                lines++;
                // if (lines > 1000) break;
                int personId = (Integer) Person.person_id.value(line);
                if (personId != -1 && !people.containsKey(personId)) {
                    Map<String, Object> props = toMap(Person.values(), line);
                    long personNode = batch.createNode(props, Labels.Person);
                    people.put(personId, (int) personNode);
                    peopleIndex.add(personNode, props);
                }
                trace(false);
            }
            csv.close();
            System.out.println("People end part " + part);
        }
        System.out.println("People end ");
        trace(true);
    }
    private void importPeopleAppln(TIntIntMap appln, TIntIntMap people, BatchInserter batch) throws IOException {
        System.out.println("PeopleAppln start");
        trace(true);
        for (int part : asList(1, 2)) {
            int lines = 0;
            File file = new File(csvDir, "tls207_part0" + part + ".txt");
            if (!file.exists()) {
                System.out.println("File " + file + " does not exist, skipping");
                continue;
            }
            CSVReader csv = new CSVReader(new FileReader(file));
            csv.readNext(); // header
            String[] line;
            while ((line = csv.readNext()) != null) {
                lines++;
                // if (lines > 1000) break;
                int personId = Integer.parseInt(line[0]);
                if (!people.containsKey(personId)) continue;
                long personNode = people.get(personId);
                int applnId = Integer.parseInt(line[1]);
                if (!appln.containsKey(applnId)) continue;
                long applnNode = appln.get(applnId);
                batch.createRelationship(personNode,applnNode, Types.WROTE, NO_PROPS);
                trace(false);
            }
            csv.close();
            System.out.println("PeopleAppln end part " + part);
            trace(true);
        }
        System.out.println("PeopleAppln end");
    }
    private void trace(boolean output) {
        if (output || ++count % 1000000 == 0) {
            long now = System.currentTimeMillis();
            System.out.println(count + " rows " + (now - time) + " ms");
            time = now;
        }
    }
    private static Map<String, String> config() {
        return MapUtil.stringMap(
                "use_memory_mapped_buffers", "true",
                "neostore.nodestore.db.mapped_memory", "250M",
                "neostore.relationshipstore.db.mapped_memory", "1G",
                "neostore.propertystore.db.mapped_memory", "500M",
                "neostore.propertystore.db.strings.mapped_memory", "500M",
                "neostore.propertystore.db.arrays.mapped_memory", "0M",
                "cache_type", "none",
                "keep_logical_logs","false",
                "dump_config", "true"
        );
    }
}

Dieser Code liest 300 Mio Zeilen in etwa 5 Stunden ein. Dies stellt einen sehr schnellen Import dar.

Fazit

Hier also sehr kompakter Java Import Code für Neo4j gezeigt. Wenn man will passt alles in eine Datei. Natürlich ist es fraglich ob dies Sinn macht, da dadurch die Datei sehr unübersichtlich wird. Dennoch wenn man sich einmal damit auseinandersetzt, findet man die Methoden und Hilfsfunktionen sehr schnell. Auch Anpassungen der Konfiguration lassen sich in der Datei definieren.

Kommentar schreiben

Kommentare: 2
  • #1

    Karsten S. (Montag, 01 Dezember 2014 17:36)

    Find ich echt bemerkenswert was du hier alles unter deinen Lesern verteilst. Mach auf jede Fälle weiter so, ich find das echt klasse und lese deinen Blog sehr gerne.

    Ganz ganz lieben Gruß der Karsten

  • #2

    kwoxer (Dienstag, 02 Dezember 2014 12:14)

    Und genau das mach ich. Danke ;)