Daten Import via Java - OrientDB Real-Beispiel Tutorial

Nun möchte ich euch gerne einmal zeigen wie mein derzeitiger Java-Code aussieht, welchen ich zum Importieren großer Datenmengen in OrientDB verwende. Dies stellt einen Real-Beispiel Import Code dar. Genutzt werden könnte er für eine bestimmte kostenpflichtige Datenbank aus dem Internet, welche ich hier nicht weiter erwähne.

Vorwissen

Der Java-Code ist stark an die Fallbeispiel-Datenbank angelehnt. Jedoch weicht er auch in einigen Punkten ab. Ich denke aber, dass es auch ganz interessant sein kann von dem strikten Beispiel abzuweichen. Dazu gehören:

  • Lucene Indexe statt den Normalen
  • erhöhte Anzahl an Eigenschaften pro Klasse
  • Einlesen mehrerer Dateien pro Klasse/Relation

OrientDBMain.java

Die OrientDBMain ist die Stelleinheit. Hier legst du fest

  • welche Argumente der .jar-Datei übergeben werden
  • Anzahl an Dateien pro Klasse/Relation
  • Reihenfolge der importierten Daten
  • Anweisung zum Erzeugen von Klassen und Relationen
  • Dokumentierungen (z.B. vergangene Zeit)

public class OrientDBMain {
    private static OrientDB orientDB;
    private static String datafiles;
    public static void main(String[] args) {
        System.out.println("Creating the database files...");
        orientDB = new OrientDB("plocal:" + args[1]);
        long completeTime = System.currentTimeMillis();
        datafiles = args[0];
        boolean TESTBETRIEB = false;

        orientDB.createObjectAppln();
        if (TESTBETRIEB){
            importFiles("Appln", 1);
        }else{
            importFiles("Appln", 4);
        }
        System.out.println("All APPLNs imported successfully.");
       
        if (TESTBETRIEB){
            importFiles("Title", 1);
        }else{
            importFiles("Title", 3);
        }
        System.out.println("All TITLEs imported successfully.");

        orientDB.createObjectAbstract();
        if (TESTBETRIEB){
            importFiles("Abstract", 1);
        }else{
            importFiles("Abstract", 24);
        }
        System.out.println("All ABSTRACTs imported successfully.");

        orientDB.createObjectPerson();
        if (TESTBETRIEB){
            importFiles("Person", 1);
        }else{
            importFiles("Person", 2);
        }
        System.out.println("All PERSONs imported successfully.");

        orientDB.createObjectWROTE();
        if (TESTBETRIEB){
            importFiles("WROTE", 1);
        }else{
            importFiles("WROTE", 2);
        }
        System.out.println("All WROTE imported successfully.");

        System.out.println("-------------------------------------");
        System.out.println(((System.currentTimeMillis() - completeTime)) + " ms");
        System.out.println(((System.currentTimeMillis() - completeTime) / 1000) + " sec");
        System.out.println(((System.currentTimeMillis() - completeTime) / 1000 / 60) + " min");
    }

    private static void importFiles(String className, int fileCount) {
        int i = 1;
        int categoryNumber = 0;
        if (className == "Appln") {
            categoryNumber = 201;
        } else if (className == "Title") {
            categoryNumber = 202;
        } else if (className == "Abstract") {
            categoryNumber = 203;
        } else if (className == "Person") {
            categoryNumber = 206;
        } else if (className == "WROTE") {
            categoryNumber = 207;
        }
        while (i <= fileCount) {
            String realNumber = "" + i;
            if (i < 10){
                realNumber = "0" + i;
            }
            long overallTime = System.currentTimeMillis();
            orientDB.setCsvPath(datafiles + "/tls" + categoryNumber + "_part" + realNumber + ".txt");
            orientDB.writeData(className);
            System.out.println(className + "#" + realNumber + " imported in " + (System.currentTimeMillis() - overallTime) + "ms");
            i++;
        }
    }
}

OrientDB.java

Die OrientDB hingegen enthält die gesamte Logik. Dazu zählen

  • die Import-Methode, welche je nach derzeitigen Auftrag die Abarbeitung des Imports verändert
  • Hilfsmethoden
    • Klassen/Relationen erzeugen und löschen
    • Indexe erzeugen und löschen
    • Eigenschaften mappen, damit sie in einem Rutsch importiert werden können

import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.HashMap;
import java.util.Map;
import com.orientechnologies.common.util.OCallable;
import com.orientechnologies.orient.core.command.script.OCommandScript;
import com.orientechnologies.orient.core.config.OGlobalConfiguration;
import com.orientechnologies.orient.core.db.document.ODatabaseDocumentTx;
import com.orientechnologies.orient.core.intent.OIntentMassiveInsert;
import com.orientechnologies.orient.core.metadata.schema.OSchema;
import com.orientechnologies.orient.core.metadata.schema.OType;
import com.orientechnologies.orient.core.metadata.schema.OClass.INDEX_TYPE;
import com.orientechnologies.orient.core.sql.OCommandSQL;
import com.tinkerpop.blueprints.Vertex;
import com.tinkerpop.blueprints.impls.orient.OrientBaseGraph;
import com.tinkerpop.blueprints.impls.orient.OrientElement;
import com.tinkerpop.blueprints.impls.orient.OrientGraph;
import com.tinkerpop.blueprints.impls.orient.OrientGraphFactory;
import com.tinkerpop.blueprints.impls.orient.OrientVertex;
public class OrientDB {
    private OrientGraphFactory factoryGraph;
    private ODatabaseDocumentTx db;
    private String csvPath;
    private String splitRegex = ",(?=([^\"]*\"[^\"]*\")*[^\"]*$)";
    public OrientDB(String dbPath) {
        OGlobalConfiguration.STORAGE_KEEP_OPEN.setValue(true);
        OGlobalConfiguration.ENVIRONMENT_CONCURRENT.setValue(false);
        db = new ODatabaseDocumentTx(dbPath);
        if (!db.exists()) {
            db.create();
        }
        factoryGraph = new OrientGraphFactory(dbPath, "admin", "admin").setupPool(1, 10);
    }
    public void setCsvPath(String csvPath) {
        this.csvPath = csvPath;
    }
    void createObjectAppln() {
        try {
            createClass("Appln");
            createProperty("Appln", "ID", OType.INTEGER);
            createProperty("Appln", "title", OType.STRING);
            createProperty("Appln", "auth", OType.STRING);
            createProperty("Appln", "nr", OType.STRING);
            createProperty("Appln", "kind", OType.STRING);
            createProperty("Appln", "filingDate", OType.DATE);
            createProperty("Appln", "nrEpodoc", OType.STRING);
            createProperty("Appln", "iprType", OType.STRING);
            createProperty("Appln", "titleLg", OType.STRING);
            createProperty("Appln", "abstrLg", OType.STRING);
            createProperty("Appln", "internatApplnID", OType.INTEGER);
            createIndex("Appln", INDEX_TYPE.UNIQUE, "Appln.ID", "ID");
            db.command(new OCommandScript("sql", "CREATE INDEX Appln.title on Appln (title) FULLTEXT ENGINE LUCENE")).execute();
            System.out.println("Class APPLN created successfully.");
        } catch (Exception e) {
            e.printStackTrace();
            System.out.println("Class APPLN already exists (not created).");
        }
    }
    void createObjectAbstract() {
        try {
            createClass("Abstract");
            createProperty("Abstract", "ID", OType.INTEGER);
            createProperty("Abstract", "abstract", OType.STRING);
            createIndex("Abstract", INDEX_TYPE.UNIQUE, "Abstract.ID", "ID");
            db.command(new OCommandScript("sql", "CREATE INDEX Abstract.abstract on Abstract (abstract) FULLTEXT ENGINE LUCENE")).execute();
            System.out.println("Class Abstract created successfully.");
        } catch (Exception e) {
            System.out.println("Class ABSTRACT already exists (not created).");
        }
    }
    void createObjectPerson() {
        try {
            createClass("Person");
            createProperty("Person", "ID", OType.INTEGER);
            createProperty("Person", "countryCode", OType.STRING);
            createProperty("Person", "docStdNameID", OType.INTEGER);
            createProperty("Person", "name", OType.STRING);
            createProperty("Person", "address", OType.STRING);
            createIndex("Person", INDEX_TYPE.UNIQUE, "Person.ID", "ID");
            db.command(new OCommandScript("sql", "CREATE INDEX Person.name on Person (name) FULLTEXT ENGINE LUCENE")).execute();
            System.out.println("Class PERSON created successfully.");
        } catch (Exception e) {
            System.out.println("Class PERSON already exists (not created).");
        }
    }
    void createObjectWROTE() {
        try {
            createEdge("WROTE");
            createProperty("WROTE", "appltSeqNr", OType.INTEGER);
            createProperty("WROTE", "intrSeqNr", OType.INTEGER);
            System.out.println("Edge WROTE created successfully.");
        } catch (Exception e) {
            System.out.println("Class WROTE already exists (not created).");
        }
    }
    void writeData(String currentTask) {
        OrientGraph txGraph = factoryGraph.getTx();
        try {
            txGraph.getRawGraph().getTransaction().setUsingLog(false);
            txGraph.getRawGraph().declareIntent(new OIntentMassiveInsert());
            long innerTime = System.currentTimeMillis();
            BufferedReader readerLines = new BufferedReader(new InputStreamReader(new FileInputStream(this.csvPath), "UTF-8"));
            @SuppressWarnings("unused")
            String firstLine = readerLines.readLine();
            String nextLine;
            long counter = 0;
            while ((nextLine = readerLines.readLine()) != null && counter < 1000) {
                counter++;
                if (currentTask == "WROTE") {
                    if (counter % 1000 == 0) {
                        txGraph.commit();
                    }
                } else if (currentTask == "Title") {
                    if (counter % 10000 == 0) {
                        txGraph.commit();
                    }
                } else {
                    if (counter % 8000 == 0) {
                        txGraph.commit();
                    }
                }
                if (counter % 10000 == 0) {
                    System.out.println(counter + " in " + (System.currentTimeMillis() - innerTime));
                    innerTime = System.currentTimeMillis();
                }
                String[] splitted = nextLine.split(splitRegex);
                for (int i = 0; i < splitted.length; i++) {
                    splitted[i] = splitted[i].replaceAll("\"", "");
                }
                OrientElement vertex;
                if (currentTask != "WROTE" && currentTask != "Title") {
                    vertex = txGraph.addVertex("class:" + currentTask);
                    Map<String, Object> props = new HashMap<String, Object>();
                    try {
                        if (currentTask == "Appln") {
                            props = definePropsAppln(splitted, props);
                        } else if (currentTask == "Person") {
                            props = definePropsPerson(splitted, props);
                        } else if (currentTask == "Abstract") {
                            props = definePropsAbstract(splitted, props);

                        }
                    } catch (Exception e) {
                        System.out.println(nextLine + " IS not UTF8 Standard!");
                        txGraph.rollback();
                        continue;
                    }
                    vertex.setProperties(props);
                    vertex.save();
                    if (currentTask == "Abstract") {
                        for (Vertex appln : txGraph.getVertices("Appln.ID", splitted[0])) {
                            appln.addEdge("HAS_ABSTRACT", (OrientVertex) vertex);
                        }
                    }
                } else {
                    try {
                        if (currentTask == "Title") {
                            for (Vertex title : txGraph.getVertices("Appln.ID", splitted[0])) {
                                title.setProperty("title", splitted[1]);
                            }
                        } else if (currentTask == "WROTE") {
                            for (Vertex person : txGraph.getVertices("Person.ID", splitted[0])) {
                                for (Vertex appln : txGraph.getVertices("Appln.ID", splitted[1])) {
                                    ((OrientVertex) person).addEdge("WROTE", (OrientVertex) appln, new Object[] { "appltSeqNr", splitted[2], "intrSeqNr", splitted[3] });
                                }
                            }
                        }
                    } catch (Exception e) {
                        System.out.println(nextLine + " HAS A WROTE problem!");
                        System.out.println(e);
                        txGraph.rollback();
                        continue;
                    }
                }
            }
            readerLines.close();
            txGraph.commit();
            txGraph.getRawGraph().declareIntent(null);
        } catch (Exception e) {
            System.out.println(e);
            txGraph.rollback();
        } finally {
            txGraph.shutdown();
        }
    }
    Map<String, Object> definePropsAppln(String[] splitted, Map<String, Object> props) throws IOException {
        String[] columns = { "ID", "auth", "nr", "kind", "filingDate", "nrEpodoc", "iprType", "titleLg", "abstrLg", "internatApplnID" };
        props.put(columns[0], splitted[0]);
        props.put(columns[1], splitted[1]);
        props.put(columns[2], splitted[2]);
        props.put(columns[3], splitted[3]);
        props.put(columns[4], splitted[4]);
        props.put(columns[5], splitted[5]);
        props.put(columns[6], splitted[6]);
        props.put(columns[7], splitted[7]);
        props.put(columns[8], splitted[8]);
        props.put(columns[9], splitted[9]);
        return props;
    }
    Map<String, Object> definePropsPerson(String[] splitted, Map<String, Object> props) throws IOException {
        String[] columns = { "ID", "countryCode", "docStdNameID", "name", "address" };
        props.put(columns[0], splitted[0]);
        props.put(columns[1], splitted[1]);
        props.put(columns[2], splitted[2]);
        props.put(columns[3], splitted[3]);
        props.put(columns[4], splitted[4]);
        return props;
    }
    Map<String, Object> definePropsAbstract(String[] splitted, Map<String, Object> props) throws IOException {
        String[] columns = { "ID", "abstract" };
        props.put(columns[0], splitted[0]);
        props.put(columns[1], splitted[1]);
        return props;
    }
    void createClass(final String className) {
        final OrientGraph txGraph = factoryGraph.getTx();
        txGraph.executeOutsideTx(new OCallable<Object, OrientBaseGraph>() {
            public Object call(OrientBaseGraph iArgument) {
                txGraph.createVertexType(className).setClusterSelection("default");
                return null;
            }
        });
    }
    void createEdge(final String className) {
        final OrientGraph txGraph = factoryGraph.getTx();
        txGraph.executeOutsideTx(new OCallable<Object, OrientBaseGraph>() {
            public Object call(OrientBaseGraph iArgument) {
                txGraph.createEdgeType(className).setClusterSelection("default");
                return null;
            }
        });
    }
    void createProperty(String className, String propertyName, OType oType) {
        OrientGraph txGraph = factoryGraph.getTx();
        OSchema schema = db.getMetadata().getSchema();
        if (!schema.existsClass(className)) {
            txGraph.createVertexType(className);
        }
        txGraph.commit();
        txGraph.shutdown();
        db.getMetadata().getSchema().getClass(className).createProperty(propertyName, oType);
    }
    void createIndex(String className, INDEX_TYPE indexType, String propertyName, String indexName) {
        db.getMetadata().getSchema().getClass(className).createIndex(propertyName, indexType, indexName);
    }
    void dropClass(String className) {
        db.getMetadata().getSchema().dropClass(className);
    }
    void dropProperty(String className, String propertyName) {
        db.getMetadata().getSchema().getClass(className).dropProperty(propertyName);
    }
    void deleteClass(String className) {
        db.command(new OCommandSQL("DELETE FROM " + className)).execute();
        db.getMetadata().getSchema().dropClass(className);
    }
    void deleteProperty(String className, String propertyName) {
        db.getMetadata().getSchema().getClass(className).dropProperty(propertyName);
        db.command(new OCommandSQL("UPDATE " + className + " REMOVE " + propertyName)).execute();
    }
}

Fazit

Hier findest du den aktuell schnellsten Import-Code in Java für OrientDB. Natürlich sind noch einigen Optimierungen wie Multithreading und Mapping offen. Dennoch ist dieser Code in der Lage in ca. 30 Studen 500mio Zeilen von Echtdaten zu lesen und zu importieren.

Kommentar schreiben

Kommentare: 2
  • #1

    James (Dienstag, 18 November 2014 10:40)

    Great Code example man. Really helpful because you nearly showed anything important for a real life example. Really appreciate it.

    Keep it on.

  • #2

    kwoxer (Dienstag, 02 Dezember 2014 13:26)

    Thanks James, sorry for just writing in German. But the code is compeltely in English. So yeah cool that you like it.

    Cheers =)