TripleStoreRDF4J.java

/**
 * Copyright (c) 2017-2018, RTE (http://www.rte-france.com)
 * This Source Code Form is subject to the terms of the Mozilla Public
 * License, v. 2.0. If a copy of the MPL was not distributed with this
 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
 * SPDX-License-Identifier: MPL-2.0
 */

package com.powsybl.triplestore.impl.rdf4j;

import com.powsybl.commons.datasource.DataSource;
import com.powsybl.triplestore.api.*;
import org.eclipse.rdf4j.common.transaction.IsolationLevels;
import org.eclipse.rdf4j.model.*;
import org.eclipse.rdf4j.model.util.URIUtil;
import org.eclipse.rdf4j.model.vocabulary.RDF;
import org.eclipse.rdf4j.query.*;
import org.eclipse.rdf4j.query.algebra.evaluation.optimizer.ParentReferenceChecker;
import org.eclipse.rdf4j.query.explanation.Explanation;
import org.eclipse.rdf4j.repository.Repository;
import org.eclipse.rdf4j.repository.RepositoryConnection;
import org.eclipse.rdf4j.repository.RepositoryException;
import org.eclipse.rdf4j.repository.RepositoryResult;
import org.eclipse.rdf4j.repository.sail.SailRepository;
import org.eclipse.rdf4j.rio.RDFFormat;
import org.eclipse.rdf4j.rio.RDFWriter;
import org.eclipse.rdf4j.rio.Rio;
import org.eclipse.rdf4j.rio.helpers.BasicParserSettings;
import org.eclipse.rdf4j.rio.helpers.BasicWriterSettings;
import org.eclipse.rdf4j.rio.helpers.XMLParserSettings;
import org.eclipse.rdf4j.sail.memory.MemoryStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.PrintStream;
import java.util.*;
import java.util.stream.Collectors;

/**
 * @author Luma Zamarre��o {@literal <zamarrenolm at aia.es>}
 */
public class TripleStoreRDF4J extends AbstractPowsyblTripleStore {

    static final String NAME = "rdf4j";

    public TripleStoreRDF4J() {
        this(new TripleStoreOptions());
    }

    public TripleStoreRDF4J(TripleStoreOptions options) {
        super(options);

        // This boolean is used to deactivate the ParentReferenceChecker optimizers added in testing environment.
        // If you see serialization issues during the tests, do not hesitate to comment this line, at a cost of
        // computation performances IN TEST ENVIRONMENT ONLY, it does not affect production environment
        ParentReferenceChecker.skip = true;

        repo = new SailRepository(new MemoryStore());
        repo.init();
    }

    @Override
    public String getImplementationName() {
        return NAME;
    }

    public Repository getRepository() {
        return repo;
    }

    public void setWriteBySubject(boolean writeBySubject) {
        this.writeBySubject = writeBySubject;
    }

    @Override
    public void read(InputStream is, String baseName, String contextName) {
        try (RepositoryConnection conn = repo.getConnection()) {
            conn.setIsolationLevel(IsolationLevels.NONE);

            // Report invalid identifiers but do not fail
            // (sometimes RDF identifiers contain spaces or begin with #)
            // This is the default behavior for other triple store engines (e.g. Jena)
            conn.getParserConfig().addNonFatalError(XMLParserSettings.FAIL_ON_INVALID_NCNAME);
            conn.getParserConfig().addNonFatalError(BasicParserSettings.VERIFY_URI_SYNTAX);
            conn.getParserConfig().addNonFatalError(XMLParserSettings.FAIL_ON_DUPLICATE_RDF_ID);

            Resource context = context(conn, contextName);
            // We add data with a context (graph) to keep the source of information
            // When we write we want to keep data split by graph
            conn.add(is, baseName, guessFormatFromName(contextName), context);
            addNamespaceForBase(conn, baseName);
        } catch (IOException x) {
            throw new TripleStoreException(String.format("Reading %s %s", baseName, contextName), x);
        }
    }

    private static RDFFormat guessFormatFromName(String name) {
        if (name.endsWith(".ttl")) {
            return RDFFormat.TURTLE;
        } else if (name.endsWith(".xml")) {
            return RDFFormat.RDFXML;
        }
        return RDFFormat.RDFXML;
    }

    @Override
    public void write(DataSource ds) {
        try (RepositoryConnection conn = repo.getConnection()) {
            RepositoryResult<Resource> contexts = conn.getContextIDs();
            while (contexts.hasNext()) {
                Resource context = contexts.next();
                write(ds, conn, context);
            }
        }
    }

    @Override
    public void write(DataSource ds, String contextName) {
        try (RepositoryConnection conn = repo.getConnection()) {
            RepositoryResult<Resource> contexts = conn.getContextIDs();
            while (contexts.hasNext()) {
                Resource context = contexts.next();
                if (context.stringValue().equals(contextName)) {
                    write(ds, conn, context);
                }
            }
        }
    }

    private void write(DataSource ds, RepositoryConnection conn, Resource context) {
        LOGGER.info("Writing context {}", context);

        RepositoryResult<Statement> statements;
        statements = conn.getStatements(null, null, null, context);
        Model model = QueryResults.asModel(statements);
        copyNamespacesToModel(conn, model);

        String outname = context.toString();
        write(model, outputStream(ds, outname));
    }

    @Override
    public void print(PrintStream out) {
        out.println("TripleStore based on RDF4J. Graph names and sizes");
        try (RepositoryConnection conn = repo.getConnection()) {
            RepositoryResult<Resource> ctxs = conn.getContextIDs();
            while (ctxs.hasNext()) {
                Resource ctx = ctxs.next();
                int size = statementsCount(conn, ctx);
                out.println("    " + ctx + " : " + size);
            }
        }
    }

    @Override
    public Set<String> contextNames() {
        try (RepositoryConnection conn = repo.getConnection()) {
            return conn.getContextIDs().stream().map(Resource::stringValue).collect(Collectors.toSet());
        }
    }

    @Override
    public void clear(String contextName) {
        try (RepositoryConnection conn = repo.getConnection()) {
            Resource context = context(conn, contextName);
            conn.clear(context);
        }
    }

    @Override
    public PropertyBags query(String query) {
        String query1 = adjustedQuery(query);
        PropertyBags results = new PropertyBags();
        try (RepositoryConnection conn = repo.getConnection()) {
            // Default language is SPARQL
            TupleQuery q = conn.prepareTupleQuery(query1);

            // Print the optimization plan for the query
            // Explaining queries take some time, so we change the execution timeout
            if (EXPLAIN_QUERIES && LOGGER.isDebugEnabled()) {
                Explanation explanation = q.explain(Explanation.Level.Timed);
                LOGGER.debug("Query explanation:\n{}\n{}", query, explanation);
            }

            // Duplicated triplets are returned in queries
            // when an object is defined in a file and referenced in another (rdf:ID and
            // rdf:about)
            // and data has been added to repository with contexts
            // and we query without using explicit GRAPH clauses
            // This means that we have to filter distinct results
            try (TupleQueryResult r = QueryResults.distinctResults(q.evaluate())) {
                List<String> names = r.getBindingNames();
                while (r.hasNext()) {
                    BindingSet s = r.next();
                    PropertyBag result = new PropertyBag(names, getOptions().isRemoveInitialUnderscoreForIdentifiers(), getOptions().unescapeIdentifiers());

                    names.forEach(name -> {
                        if (s.hasBinding(name)) {
                            String value = s.getBinding(name).getValue().stringValue();
                            result.put(name, value);
                        }
                    });
                    if (result.size() > 0) {
                        results.add(result);
                    }
                }
            }
        }
        return results;
    }

    @Override
    public void add(TripleStore source) {
        Objects.requireNonNull(source);
        Repository sourceRepo;
        if (source instanceof TripleStoreRDF4J tripleStoreRDF4J) {
            sourceRepo = tripleStoreRDF4J.repo;
            try (RepositoryConnection sourceConn = sourceRepo.getConnection()) {
                try (RepositoryConnection targetConn = repo.getConnection()) {
                    copyNamespacesToRepository(sourceConn, targetConn);
                    // copy statements
                    RepositoryResult<Resource> contexts = sourceConn.getContextIDs();
                    for (Resource sourceContext : contexts) {
                        Resource targetContext = context(targetConn, sourceContext.stringValue());

                        RepositoryResult<Statement> statements;
                        statements = sourceConn.getStatements(null, null, null, sourceContext);
                        // add statements to the new repository
                        for (Statement statement : statements) {
                            targetConn.add(statement, targetContext);
                        }
                    }
                }
            }
        } else {
            throw new TripleStoreException(String.format("Add to %s from source %s is not supported",
                getImplementationName(), source.getImplementationName()));
        }
    }

    private static void copyNamespacesToRepository(RepositoryConnection sourceConn, RepositoryConnection targetConn) {
        RepositoryResult<Namespace> ns = sourceConn.getNamespaces();
        for (Namespace namespace : ns) {
            targetConn.setNamespace(namespace.getPrefix(), namespace.getName());
        }
    }

    @Override
    public void update(String query) {
        try (RepositoryConnection conn = repo.getConnection()) {
            conn.prepareUpdate(QueryLanguage.SPARQL, adjustedQuery(query)).execute();
        } catch (MalformedQueryException | UpdateExecutionException | RepositoryException e) {
            throw new TripleStoreException(String.format("Query [%s]", query), e);
        }
    }

    @Override
    public void add(String contextName, String objNs, String objType, PropertyBags objects) {
        try (RepositoryConnection conn = repo.getConnection()) {
            conn.setIsolationLevel(IsolationLevels.NONE);
            objects.forEach(object -> createStatements(conn, objNs, objType, object, context(conn, contextName)));
        }
    }

    @Override
    public String add(String contextName, String objNs, String objType, PropertyBag object) {
        try (RepositoryConnection conn = repo.getConnection()) {
            conn.setIsolationLevel(IsolationLevels.NONE);
            return createStatements(conn, objNs, objType, object, context(conn, contextName));
        }
    }

    private static String createStatements(RepositoryConnection cnx, String objNs, String objType,
        PropertyBag statement,
        Resource context) {
        IRI resource;
        if (objType.equals(rdfDescriptionClass())) {
            resource = cnx.getValueFactory().createIRI("urn:uuid:" + UUID.randomUUID().toString());
        } else {
            // Identifiers stored in the triplestore are RDF:ids
            resource = cnx.getValueFactory().createIRI(cnx.getNamespace("data"),
                AbstractPowsyblTripleStore.createRdfId());
        }
        IRI parentPredicate = RDF.TYPE;
        IRI parentObject = cnx.getValueFactory().createIRI(objNs + objType);
        Statement parentSt = cnx.getValueFactory().createStatement(resource, parentPredicate, parentObject);
        cnx.add(parentSt, context);
        // add rest of statements for subject
        createStatements(cnx, objNs, objType, statement, context, resource);
        return resource.getLocalName();
    }

    private static void createStatements(RepositoryConnection cnx, String objNs, String objType,
        PropertyBag statement, Resource context, IRI resource) {
        List<String> names = statement.propertyNames();
        names.forEach(name -> {
            String property = statement.isClassProperty(name) ? name : objType + "." + name;
            String value = statement.get(name);
            IRI predicate = cnx.getValueFactory().createIRI(objNs + property);
            if (statement.isResource(name)) {
                IRI object;
                if (statement.isMultivaluedProperty(name)) {
                    addMultivaluedProperty(cnx, value, resource, predicate, context);
                } else {
                    if (URIUtil.isValidURIReference(value)) { // the value already contains the namespace
                        object = cnx.getValueFactory().createIRI(value);
                    } else { // the value is an id, add the base namespace
                        String namespace = cnx.getNamespace(statement.namespacePrefix(name));
                        object = cnx.getValueFactory().createIRI(namespace, value);
                    }
                    Statement st = cnx.getValueFactory().createStatement(resource, predicate, object);
                    cnx.add(st, context);
                }
            } else {
                Literal object = cnx.getValueFactory().createLiteral(value);
                Statement st = cnx.getValueFactory().createStatement(resource, predicate, object);
                cnx.add(st, context);
            }
        });
    }

    private static void addMultivaluedProperty(RepositoryConnection cnx, String value, IRI resource, IRI predicate, Resource context) {
        String[] objs = value.split(",");
        for (String o : objs) {
            if (!o.startsWith("urn:uuid:")) {
                o = cnx.getNamespace("data") + o;
            }
            IRI object = cnx.getValueFactory().createIRI(o);
            Statement st = cnx.getValueFactory().createStatement(resource, predicate, object);
            cnx.add(st, context);
        }
    }

    private void write(Model model, OutputStream out) {
        try (PrintStream pout = new PrintStream(out)) {
            RDFWriter writer = new PowsyblWriter(pout);
            writer.getWriterConfig().set(BasicWriterSettings.PRETTY_PRINT, true);
            if (writeBySubject) {
                writeBySubject(model, writer);
            } else {
                Rio.write(model, writer);
            }
        }
    }

    private void writeBySubject(Model model, RDFWriter writer) {
        writer.startRDF();
        if (model instanceof NamespaceAware) {
            for (Namespace nextNamespace : model.getNamespaces()) {
                writer.handleNamespace(nextNamespace.getPrefix(), nextNamespace.getName());
            }
        }
        for (final Resource subject : model.subjects()) {
            // First write the statements RDF.TYPE for this subject
            // A resource may be described as an instance of more than one class
            boolean rdfTypeFound = false;
            for (final Statement st0 : model.filter(subject, RDF.TYPE, null)) {
                writer.handleStatement(st0);
                rdfTypeFound = true;
            }
            if (!rdfTypeFound) {
                String message = "subject is missing an rdfType " + subject;
                LOGGER.error(message);
                for (final Statement st : model.filter(subject, null, null)) {
                    LOGGER.error("    {} {} {}", st.getSubject(), st.getPredicate(), st.getObject());
                }
                throw new TripleStoreException(message);
            }
            // Then all the other statements
            writeSubjectStatements(model, writer, subject);
        }
        writer.endRDF();
    }

    private void writeSubjectStatements(Model model, RDFWriter writer, Resource subject) {
        for (final Statement st : model.filter(subject, null, null)) {
            if (st.getPredicate().equals(RDF.TYPE)) {
                continue;
            }
            writer.handleStatement(st);
        }
    }

    private static int statementsCount(RepositoryConnection conn, Resource ctx) {
        RepositoryResult<Statement> statements = conn.getStatements(null, null, null, ctx);
        int counter = 0;
        while (statements.hasNext()) {
            counter++;
            statements.next();
        }
        return counter;
    }

    private static void copyNamespacesToModel(RepositoryConnection conn, Model m) {
        RepositoryResult<Namespace> ns = conn.getNamespaces();
        while (ns.hasNext()) {
            m.setNamespace(ns.next());
        }
    }

    private static void addNamespaceForBase(RepositoryConnection cnx, String base) {
        cnx.setNamespace("data", base + "/#");
    }

    private static Resource context(RepositoryConnection conn, String contextName) {
        // Remove the namespaceForContexts from contextName if it already starts with it
        String name1 = contextName.replace(namespaceForContexts(), "");
        return conn.getValueFactory().createIRI(namespaceForContexts(), name1);
    }

    @Override
    public void addNamespace(String prefix, String namespace) {
        try (RepositoryConnection conn = repo.getConnection()) {
            conn.setNamespace(prefix, namespace);
        }
    }

    @Override
    public List<PrefixNamespace> getNamespaces() {
        List<PrefixNamespace> namespaces = new ArrayList<>();
        try (RepositoryConnection conn = repo.getConnection()) {
            RepositoryResult<Namespace> ns = conn.getNamespaces();
            while (ns.hasNext()) {
                Namespace namespace = ns.next();
                namespaces.add(new PrefixNamespace(namespace.getPrefix(), namespace.getName()));
            }
        }
        return namespaces;
    }

    private final Repository repo;
    private boolean writeBySubject = true;

    private static final boolean EXPLAIN_QUERIES = false;

    private static final Logger LOGGER = LoggerFactory.getLogger(TripleStoreRDF4J.class);
}