FedXConnection.java
/*******************************************************************************
* Copyright (c) 2019 Eclipse RDF4J contributors.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Distribution License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: BSD-3-Clause
*******************************************************************************/
package org.eclipse.rdf4j.federated;
import java.util.ArrayList;
import java.util.List;
import org.eclipse.rdf4j.collection.factory.api.CollectionFactory;
import org.eclipse.rdf4j.common.iteration.CloseableIteration;
import org.eclipse.rdf4j.common.iteration.DistinctIteration;
import org.eclipse.rdf4j.common.iteration.EmptyIteration;
import org.eclipse.rdf4j.common.iteration.ExceptionConvertingIteration;
import org.eclipse.rdf4j.common.iteration.Iterations;
import org.eclipse.rdf4j.common.transaction.TransactionSetting;
import org.eclipse.rdf4j.federated.algebra.PassThroughTupleExpr;
import org.eclipse.rdf4j.federated.endpoint.Endpoint;
import org.eclipse.rdf4j.federated.evaluation.FederationEvalStrategy;
import org.eclipse.rdf4j.federated.evaluation.FederationEvaluationStatistics;
import org.eclipse.rdf4j.federated.evaluation.concurrent.ParallelExecutor;
import org.eclipse.rdf4j.federated.evaluation.concurrent.ParallelTask;
import org.eclipse.rdf4j.federated.evaluation.iterator.StopRemainingExecutionsOnCloseIteration;
import org.eclipse.rdf4j.federated.evaluation.union.SynchronousWorkerUnion;
import org.eclipse.rdf4j.federated.evaluation.union.WorkerUnionBase;
import org.eclipse.rdf4j.federated.repository.FedXRepositoryConnection;
import org.eclipse.rdf4j.federated.structures.QueryInfo;
import org.eclipse.rdf4j.federated.structures.QueryType;
import org.eclipse.rdf4j.federated.util.FedXUtil;
import org.eclipse.rdf4j.federated.write.WriteStrategy;
import org.eclipse.rdf4j.model.IRI;
import org.eclipse.rdf4j.model.Namespace;
import org.eclipse.rdf4j.model.Resource;
import org.eclipse.rdf4j.model.Statement;
import org.eclipse.rdf4j.model.Value;
import org.eclipse.rdf4j.model.ValueFactory;
import org.eclipse.rdf4j.model.util.Literals;
import org.eclipse.rdf4j.query.BindingSet;
import org.eclipse.rdf4j.query.Dataset;
import org.eclipse.rdf4j.query.Operation;
import org.eclipse.rdf4j.query.QueryEvaluationException;
import org.eclipse.rdf4j.query.algebra.TupleExpr;
import org.eclipse.rdf4j.query.explanation.Explanation;
import org.eclipse.rdf4j.query.impl.EmptyBindingSet;
import org.eclipse.rdf4j.query.impl.MapBindingSet;
import org.eclipse.rdf4j.query.impl.SimpleDataset;
import org.eclipse.rdf4j.repository.RepositoryConnection;
import org.eclipse.rdf4j.repository.RepositoryException;
import org.eclipse.rdf4j.repository.sparql.federation.CollectionIteration;
import org.eclipse.rdf4j.sail.SailConnection;
import org.eclipse.rdf4j.sail.SailException;
import org.eclipse.rdf4j.sail.helpers.AbstractSail;
import org.eclipse.rdf4j.sail.helpers.AbstractSailConnection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* An implementation of RepositoryConnection that uses {@link FederationEvalStrategy} to evaluate provided queries.
* Prior to evaluation various optimizations are performed, see
* {@link org.eclipse.rdf4j.federated.optimizer.FedXOptimizer} for further details.
* <p>
* <p>
* Since 4.0 FedX supports write operations using the supplied {@link WriteStrategy}, e.g. by writing to a designated
* federation member. Note: the {@link WriteStrategy} is initialized lazily upon first access to a write operation, see
* {@link #getWriteStrategyInternal()}.
* <p>
* Implementation notes: - not all methods are implemented as of now
*
* @author Andreas Schwarte
* @see FederationEvalStrategy
* @see WriteStrategy
*/
public class FedXConnection extends AbstractSailConnection {
private static final Logger log = LoggerFactory.getLogger(FedXConnection.class);
protected final FedX federation;
protected final FederationContext federationContext;
/**
* If set, contains the write strategy. Always access via {@link #getWriteStrategyInternal()}
*/
private WriteStrategy writeStrategy;
public FedXConnection(FedX federation, FederationContext federationContext) throws SailException {
super(federation);
this.federation = federation;
this.federationContext = federationContext;
}
@Override
public void setTransactionSettings(TransactionSetting... settings) {
super.setTransactionSettings(settings);
this.getWriteStrategyInternal().setTransactionSettings(settings);
}
@Override
protected CloseableIteration<? extends BindingSet> evaluateInternal(TupleExpr query,
Dataset dataset, BindingSet bindings, boolean includeInferred) throws SailException {
final TupleExpr originalQuery = query;
FederationEvalStrategy strategy = federationContext.createStrategy(dataset);
long start = 0;
String queryString = getOriginalQueryString(bindings);
if (queryString == null) {
log.warn("Query string is null. Please check your FedX setup.");
}
QueryInfo queryInfo = new QueryInfo(queryString, getOriginalBaseURI(bindings), getOriginalQueryType(bindings),
getOriginalMaxExecutionTime(bindings), includeInferred, federationContext, strategy, dataset);
// check if we have pass-through result handler information for single source queries
if (query instanceof PassThroughTupleExpr) {
PassThroughTupleExpr node = ((PassThroughTupleExpr) query);
queryInfo.setResultHandler(node.getResultHandler());
query = node.getExpr();
}
if (log.isDebugEnabled()) {
log.debug("Optimization start (Query: " + queryInfo.getQueryID() + ")");
start = System.currentTimeMillis();
}
try {
federationContext.getMonitoringService().monitorQuery(queryInfo);
FederationEvaluationStatistics stats = new FederationEvaluationStatistics(queryInfo, dataset);
query = strategy.optimize(query, stats, bindings);
} catch (Exception e) {
log.warn(
"Exception occured during optimization (Query: " + queryInfo.getQueryID() + "): " + e.getMessage());
log.debug("Details: ", e);
throw new SailException(e);
}
if (log.isDebugEnabled()) {
log.debug(("Optimization duration: " + ((System.currentTimeMillis() - start))) + " (Query: "
+ queryInfo.getQueryID() + ")");
}
// log the optimized query plan, if Config#isLogQueryPlan(), otherwise void operation
federationContext.getMonitoringService().logQueryPlan(query);
if (federationContext.getConfig().isDebugQueryPlan()) {
System.out.println("Optimized query execution plan: \n" + query);
}
if (log.isDebugEnabled()) {
log.debug("Optimized query execution plan (Query: " + queryInfo.getQueryID() + ");" + query);
}
try {
// make sure to apply any external bindings
BindingSet queryBindings = EmptyBindingSet.getInstance();
if (!FedXRepositoryConnection.FEDX_BINDINGS.containsAll(bindings.getBindingNames())) {
MapBindingSet actualQueryBindings = new MapBindingSet();
bindings.forEach(binding -> {
if (!FedXRepositoryConnection.FEDX_BINDINGS.contains(binding.getName())) {
actualQueryBindings.addBinding(binding);
}
});
queryBindings = actualQueryBindings;
}
CloseableIteration<? extends BindingSet> res = null;
try {
res = strategy.evaluate(query, queryBindings);
// mark the query as PassedThrough, such that outer result handlers are aware of this
// Note: for SingleSourceQuery (i.e. where we use pass through) res is explicitly
// EmptyIteration. Thus we can use it as indicator
if (originalQuery instanceof PassThroughTupleExpr && res instanceof EmptyIteration) {
((PassThroughTupleExpr) originalQuery).setPassedThrough(true);
}
return new StopRemainingExecutionsOnCloseIteration(res, queryInfo);
} catch (Throwable t) {
if (res != null) {
res.close();
}
throw t;
}
} catch (QueryEvaluationException e) {
throw new SailException(e);
}
}
@Override
protected void clearInternal(Resource... contexts) throws SailException {
try {
getWriteStrategyInternal().clear(contexts);
} catch (RepositoryException e) {
throw new SailException(e);
}
}
@Override
protected void clearNamespacesInternal() throws SailException {
try {
getWriteStrategyInternal().clearNamespaces();
} catch (RepositoryException e) {
throw new SailException(e);
}
}
@Override
protected void closeInternal() throws SailException {
/*
* think about it: the federation connection should remain open until the federation is shutdown. we use a
* singleton connection!!
*/
// the write strategy needs to be closed
try {
if (this.writeStrategy != null) {
this.writeStrategy.close();
}
} catch (RepositoryException e) {
throw new SailException(e);
}
}
@Override
protected void commitInternal() throws SailException {
try {
getWriteStrategyInternal().commit();
} catch (RepositoryException e) {
throw new SailException(e);
}
}
@Override
protected CloseableIteration<? extends Resource> getContextIDsInternal() throws SailException {
FederationEvalStrategy strategy = federationContext.createStrategy(new SimpleDataset());
WorkerUnionBase<Resource> union = new SynchronousWorkerUnion<>(new QueryInfo("getContextIDsInternal", null,
QueryType.UNKNOWN, 0, federationContext.getConfig().getIncludeInferredDefault(), federationContext,
strategy, new SimpleDataset()));
for (Endpoint e : federation.getMembers()) {
union.addTask(new ParallelTask<>() {
@Override
public CloseableIteration<Resource> performTask() {
try (RepositoryConnection conn = e.getConnection()) {
// we need to materialize the contexts as they are only accessible
// while the connection is open
return new CollectionIteration<>(Iterations.asList(conn.getContextIDs()));
}
}
@Override
public ParallelExecutor<Resource> getControl() {
return union;
}
@Override
public void cancel() {
}
});
}
// execute the union in a separate thread
federationContext.getManager().getExecutor().execute(union);
CollectionFactory cf = federation.getCollectionFactory().get();
ExceptionConvertingIteration<Resource, SailException> conv = new ExceptionConvertingIteration<>(union) {
@Override
protected SailException convert(RuntimeException e) {
return new SailException(e);
}
};
return new DistinctIteration<Resource>(conv, cf::createSet) {
@Override
protected void handleClose() {
try {
cf.close();
} finally {
super.handleClose();
}
}
};
}
@Override
protected String getNamespaceInternal(String prefix) throws SailException {
// do not support this feature, but also do not throw an exception
// as this method is expected for the RDF4J workbench to work
return null;
}
@Override
protected CloseableIteration<? extends Namespace> getNamespacesInternal() throws SailException {
// do not support this feature, but also do not throw an exception
// as this method is expected for the RDF4J workbench to work
return new EmptyIteration<>();
}
@Override
protected CloseableIteration<? extends Statement> getStatementsInternal(Resource subj, IRI pred,
Value obj, boolean includeInferred, Resource... contexts) throws SailException {
try {
Dataset dataset = new SimpleDataset();
FederationEvalStrategy strategy = federationContext.createStrategy(dataset);
QueryInfo queryInfo = new QueryInfo(subj, pred, obj, 0, includeInferred, federationContext, strategy,
dataset);
federationContext.getMonitoringService().monitorQuery(queryInfo);
CloseableIteration<Statement> res = null;
try {
res = strategy.getStatements(queryInfo, subj, pred, obj, contexts);
return new ExceptionConvertingIteration<>(res) {
@Override
protected SailException convert(RuntimeException e) {
return new SailException(e);
}
};
} catch (Throwable t) {
if (res != null) {
res.close();
}
throw t;
}
} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
throw new SailException(e);
}
}
@Override
protected boolean hasStatementInternal(Resource subj, IRI pred, Value obj, boolean includeInferred,
Resource[] contexts) {
try {
Dataset dataset = new SimpleDataset();
FederationEvalStrategy strategy = federationContext.createStrategy(dataset);
QueryInfo queryInfo = new QueryInfo(subj, pred, obj, 0, includeInferred, federationContext, strategy,
dataset);
federationContext.getMonitoringService().monitorQuery(queryInfo);
return strategy.hasStatements(queryInfo, subj, pred, obj, contexts);
} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
throw new SailException(e);
}
}
@Override
protected void addStatementInternal(Resource subj, IRI pred, Value obj, Resource... contexts) throws SailException {
try {
getWriteStrategyInternal().addStatement(subj, pred, obj, contexts);
} catch (RepositoryException e) {
throw new SailException(e);
}
}
@Override
protected void removeNamespaceInternal(String prefix) throws SailException {
// do not support this feature, but also do not throw an exception
}
@Override
protected void removeStatementsInternal(Resource subj, IRI pred, Value obj, Resource... contexts)
throws SailException {
try {
getWriteStrategyInternal().removeStatement(subj, pred, obj, contexts);
} catch (RepositoryException e) {
throw new SailException(e);
}
}
@Override
protected void rollbackInternal() throws SailException {
try {
getWriteStrategyInternal().rollback();
} catch (RepositoryException e) {
throw new SailException(e);
}
}
@Override
protected void setNamespaceInternal(String prefix, String name) throws SailException {
// do not support this feature, but also do not throw an exception
}
@Override
protected long sizeInternal(Resource... contexts) throws SailException {
if (contexts != null && contexts.length > 0) {
throw new UnsupportedOperationException("Context handling for size() not supported");
}
long size = 0;
List<String> errorEndpoints = new ArrayList<>();
for (Endpoint e : federation.getMembers()) {
try {
size += e.size();
} catch (RepositoryException e1) {
errorEndpoints.add(e.getId());
}
}
if (!errorEndpoints.isEmpty()) {
throw new SailException("Could not determine size for members " + errorEndpoints
+ "(Supported for NativeStore and RemoteRepository only). Computed size: " + size);
}
return size;
}
@Override
protected void startTransactionInternal() throws SailException {
try {
getWriteStrategyInternal().begin();
} catch (RepositoryException e) {
throw new SailException(e);
}
}
/**
* Return the initialized {@link #writeStrategy}. If this has not been done yet, {@link WriteStrategy#initialize()}
* is returned. This method guarantees lazy initialization upon the first write operation on this
* {@link FedXConnection} instance.
*
* @return the {@link WriteStrategy}
*/
protected synchronized WriteStrategy getWriteStrategyInternal() throws SailException {
if (writeStrategy == null) {
writeStrategy = federation.getWriteStrategy();
}
return writeStrategy;
}
private static String getOriginalQueryString(BindingSet b) {
if (b == null) {
return null;
}
Value q = b.getValue(FedXRepositoryConnection.BINDING_ORIGINAL_QUERY);
if (q != null) {
return q.stringValue();
}
return null;
}
private static String getOriginalBaseURI(BindingSet b) {
if (b == null) {
return null;
}
return Literals.getLabel(b.getValue(FedXRepositoryConnection.BINDING_ORIGINAL_BASE_URI), null);
}
private static QueryType getOriginalQueryType(BindingSet b) {
if (b == null) {
return null;
}
Value q = b.getValue(FedXRepositoryConnection.BINDING_ORIGINAL_QUERY_TYPE);
if (q != null) {
return QueryType.valueOf(q.stringValue());
}
return null;
}
/**
* Return the original explicit {@link Operation#getMaxExecutionTime()} in seconds, 0 if
* {@link FedXConfig#getEnforceMaxQueryTime()} should be applied.
*
* @param b
* @return
*/
private static int getOriginalMaxExecutionTime(BindingSet b) {
if (b == null) {
return 0;
}
Value q = b.getValue(FedXRepositoryConnection.BINDING_ORIGINAL_MAX_EXECUTION_TIME);
if (q != null) {
return Integer.parseInt(q.stringValue());
}
return 0;
}
/**
* A default implementation for {@link AbstractSail}. This implementation has no further use, however it is needed
* for the constructor call.
*
* @author as
*/
protected static class SailBaseDefaultImpl extends AbstractSail {
@Override
protected SailConnection getConnectionInternal() throws SailException {
return null;
}
@Override
protected void shutDownInternal() throws SailException {
}
@Override
public ValueFactory getValueFactory() {
return FedXUtil.valueFactory();
}
@Override
public boolean isWritable() throws SailException {
return false;
}
@Override
protected void connectionClosed(SailConnection connection) {
// we do not need this in FedX
}
}
@Override
public Explanation explain(Explanation.Level level, TupleExpr tupleExpr, Dataset dataset, BindingSet bindings,
boolean includeInferred, int timeoutSeconds) {
throw new UnsupportedOperationException();
}
}