RepositoryFederatedService.java
/*******************************************************************************
* Copyright (c) 2015 Eclipse RDF4J contributors, Aduna, and others.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Distribution License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: BSD-3-Clause
*******************************************************************************/
package org.eclipse.rdf4j.repository.sparql.federation;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import org.eclipse.rdf4j.common.iteration.CloseableIteration;
import org.eclipse.rdf4j.common.iteration.EmptyIteration;
import org.eclipse.rdf4j.common.iteration.SilentIteration;
import org.eclipse.rdf4j.query.Binding;
import org.eclipse.rdf4j.query.BindingSet;
import org.eclipse.rdf4j.query.BooleanQuery;
import org.eclipse.rdf4j.query.MalformedQueryException;
import org.eclipse.rdf4j.query.QueryEvaluationException;
import org.eclipse.rdf4j.query.QueryLanguage;
import org.eclipse.rdf4j.query.TupleQuery;
import org.eclipse.rdf4j.query.TupleQueryResult;
import org.eclipse.rdf4j.query.algebra.Service;
import org.eclipse.rdf4j.query.algebra.evaluation.federation.FederatedService;
import org.eclipse.rdf4j.query.impl.EmptyBindingSet;
import org.eclipse.rdf4j.repository.Repository;
import org.eclipse.rdf4j.repository.RepositoryConnection;
import org.eclipse.rdf4j.repository.RepositoryException;
import org.eclipse.rdf4j.repository.sparql.query.InsertBindingSetCursor;
import org.eclipse.rdf4j.repository.sparql.query.QueryStringUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Federated Service wrapping the {@link Repository} to communicate with a SPARQL endpoint.
*
* @author Andreas Schwarte
*/
public class RepositoryFederatedService implements FederatedService {
private static final String ROW_IDX_VAR = "__rowIdx";
final static Logger logger = LoggerFactory.getLogger(RepositoryFederatedService.class);
/**
* A convenience iteration for SERVICE expression which evaluates intermediate results in batches and manages all
* results. Uses {@link JoinExecutorBase} facilities to guarantee correct access to the final results
*
* @author as
*/
private class BatchingServiceIteration extends JoinExecutorBase<BindingSet> {
private final int blockSize;
private final Service service;
/**
* @param inputBindings
* @throws QueryEvaluationException
*/
public BatchingServiceIteration(CloseableIteration<BindingSet> inputBindings,
int blockSize, Service service) throws QueryEvaluationException {
super(inputBindings, null, EmptyBindingSet.getInstance());
this.blockSize = blockSize;
this.service = service;
run();
}
@Override
protected void handleBindings() throws Exception {
while (!isClosed() && leftIter.hasNext()) {
ArrayList<BindingSet> blockBindings = new ArrayList<>(blockSize);
for (int i = 0; i < blockSize; i++) {
if (!leftIter.hasNext()) {
break;
}
blockBindings.add(leftIter.next());
}
CloseableIteration<BindingSet> materializedIter = new CollectionIteration<>(
blockBindings);
addResult(evaluateInternal(service, materializedIter, service.getBaseURI()));
}
}
}
/**
* Helper iteration to evaluate a block of {@link BindingSet}s using the simple
* {@link RepositoryFederatedService#select(Service, Set, BindingSet, String)} routine.
*
* @author Andreas Schwarte
*/
private class FallbackServiceIteration extends JoinExecutorBase<BindingSet> {
private final Service service;
private final List<BindingSet> allBindings;
private final String baseUri;
public FallbackServiceIteration(Service service,
List<BindingSet> allBindings, String baseUri) {
super(null, null, null);
this.service = service;
this.allBindings = allBindings;
this.baseUri = baseUri;
run();
}
@Override
protected void handleBindings() throws Exception {
Set<String> projectionVars = new HashSet<>(service.getServiceVars());
for (BindingSet b : allBindings) {
addResult(select(service, projectionVars, b, baseUri));
}
}
}
/**
* Wrapper iteration which closes a {@link RepositoryConnection} upon {@link #close()}
*
* @author Andreas Schwarte
*/
private static class CloseConnectionIteration implements CloseableIteration<BindingSet> {
private final CloseableIteration<BindingSet> delegate;
private final RepositoryConnection connection;
private CloseConnectionIteration(CloseableIteration<BindingSet> delegate,
RepositoryConnection connection) {
super();
this.delegate = delegate;
this.connection = connection;
}
@Override
public boolean hasNext() throws QueryEvaluationException {
return delegate.hasNext();
}
@Override
public BindingSet next() throws QueryEvaluationException {
return delegate.next();
}
@Override
public void remove() throws QueryEvaluationException {
delegate.remove();
}
@Override
public void close() throws QueryEvaluationException {
try {
delegate.close();
} finally {
closeQuietly(connection);
}
}
}
private final Repository rep;
/**
* The number of bindings sent in a single subquery in {@link #evaluate(Service, CloseableIteration, String)} If
* blockSize is set to 0, the entire input stream is used as block input the block size effectively determines the
* number of remote requests
*/
protected int boundJoinBlockSize = 15;
/**
* Whether to use a fresh repository connection for individual queries
*/
private boolean useFreshConnection = true;
// flag indicating whether the repository shall be closed in #shutdown()
protected boolean shutDown;
private RepositoryConnection managedConn = null;
/**
* @param repo the repository to be used
*/
public RepositoryFederatedService(Repository repo) {
this(repo, true);
}
/**
* @param repo the repository to be used
* @param shutDown a flag indicating whether the repository shall be closed in {@link #shutdown()}
*/
public RepositoryFederatedService(Repository repo, boolean shutDown) {
super();
this.rep = repo;
this.shutDown = shutDown;
}
/**
* Evaluate the provided sparqlQueryString at the initialized {@link Repository} of this {@link FederatedService}.
* Insert bindings into SELECT query and evaluate
*/
@Override
public CloseableIteration<BindingSet> select(Service service, Set<String> projectionVars,
BindingSet bindings, String baseUri) throws QueryEvaluationException {
RepositoryConnection conn = null;
try {
String sparqlQueryString = service.getSelectQueryString(projectionVars);
conn = useFreshConnection ? freshConnection() : getConnection();
TupleQuery query = conn.prepareTupleQuery(QueryLanguage.SPARQL, sparqlQueryString, baseUri);
Iterator<Binding> bIter = bindings.iterator();
while (bIter.hasNext()) {
Binding b = bIter.next();
if (service.getServiceVars().contains(b.getName())) {
query.setBinding(b.getName(), b.getValue());
}
}
TupleQueryResult res = query.evaluate();
// insert original bindings again
CloseableIteration<BindingSet> result = new InsertBindingSetCursor(res, bindings);
if (useFreshConnection) {
result = new CloseConnectionIteration(result, conn);
}
if (service.isSilent()) {
return new SilentIteration<>(result);
} else {
return result;
}
} catch (MalformedQueryException e) {
if (useFreshConnection) {
closeQuietly(conn);
}
throw new QueryEvaluationException(e);
} catch (RepositoryException e) {
if (useFreshConnection) {
closeQuietly(conn);
}
throw new QueryEvaluationException(
"Repository for endpoint " + rep.toString() + " could not be initialized.", e);
} catch (RuntimeException e) {
if (useFreshConnection) {
closeQuietly(conn);
}
throw e;
}
}
/**
* Evaluate the provided sparqlQueryString at the initialized {@link Repository} of this {@link FederatedService}.
* Insert bindings, send ask query and return final result
*/
@Override
public boolean ask(Service service, BindingSet bindings, String baseUri) throws QueryEvaluationException {
RepositoryConnection conn = null;
try {
String sparqlQueryString = service.getAskQueryString();
conn = useFreshConnection ? freshConnection() : getConnection();
BooleanQuery query = conn.prepareBooleanQuery(QueryLanguage.SPARQL, sparqlQueryString, baseUri);
Iterator<Binding> bIter = bindings.iterator();
while (bIter.hasNext()) {
Binding b = bIter.next();
if (service.getServiceVars().contains(b.getName())) {
query.setBinding(b.getName(), b.getValue());
}
}
return query.evaluate();
} catch (MalformedQueryException e) {
throw new QueryEvaluationException(e);
} catch (RepositoryException e) {
throw new QueryEvaluationException(
"Repository for endpoint " + rep.toString() + " could not be initialized.", e);
} finally {
if (useFreshConnection) {
closeQuietly(conn);
}
}
}
@Override
public CloseableIteration<BindingSet> evaluate(Service service,
CloseableIteration<BindingSet> bindings, String baseUri)
throws QueryEvaluationException {
if (boundJoinBlockSize > 0) {
return new BatchingServiceIteration(bindings, boundJoinBlockSize, service);
} else {
// if blocksize is 0 (i.e. disabled) the entire iteration is used as
// block
return evaluateInternal(service, bindings, service.getBaseURI());
}
}
/**
* Evaluate the SPARQL query that can be constructed from the SERVICE node at the initialized {@link Repository} of
* this {@link FederatedService}. Use specified bindings as constraints to the query. Try to evaluate using VALUES
* clause, if this yields an exception fall back to the naive implementation. This method deals with SILENT
* SERVICEs.
*/
protected CloseableIteration<BindingSet> evaluateInternal(Service service,
CloseableIteration<BindingSet> bindings, String baseUri)
throws QueryEvaluationException {
// materialize all bindings (to allow for fallback in case of errors)
// note that this may be blocking depending on the underlying iterator
List<BindingSet> allBindings = new LinkedList<>();
while (bindings.hasNext()) {
allBindings.add(bindings.next());
}
if (allBindings.isEmpty()) {
return new EmptyIteration<>();
}
// projection vars
Set<String> projectionVars = new HashSet<>(service.getServiceVars());
projectionVars.removeAll(allBindings.get(0).getBindingNames());
// below we need to take care for SILENT services
RepositoryConnection conn = null;
CloseableIteration<BindingSet> result = null;
try {
// fallback to simple evaluation (just a single binding)
if (allBindings.size() == 1) {
result = select(service, projectionVars, allBindings.get(0), baseUri);
result = service.isSilent() ? new SilentIteration(result) : result;
return result;
}
// To be able to insert the input bindings again later on, we need some
// means to identify the row of each binding. hence, we use an
// additional
// projection variable, which is also passed in the VALUES clause
// with the value of the actual row. The value corresponds to the index
// of the binding in the index list
projectionVars.add(ROW_IDX_VAR);
String queryString = service.getSelectQueryString(projectionVars);
List<String> relevantBindingNames = getRelevantBindingNames(allBindings, service.getServiceVars());
if (!relevantBindingNames.isEmpty()) {
// insert VALUES clause into the query
queryString = insertValuesClause(queryString, buildVALUESClause(allBindings, relevantBindingNames));
}
conn = useFreshConnection ? freshConnection() : getConnection();
TupleQuery query = conn.prepareTupleQuery(QueryLanguage.SPARQL, queryString, baseUri);
TupleQueryResult res;
query.setMaxExecutionTime(60); // TODO how to retrieve max query value
// from actual setting?
res = query.evaluate();
if (relevantBindingNames.isEmpty()) {
result = new SPARQLCrossProductIteration(res, allBindings); // cross
// product
} else {
result = new ServiceJoinConversionIteration(res, allBindings); // common
// join
}
if (useFreshConnection) {
result = new CloseConnectionIteration(result, conn);
}
result = service.isSilent() ? new SilentIteration(result) : result;
return result;
} catch (RepositoryException e) {
if (useFreshConnection) {
closeQuietly(conn);
}
if (result != null) {
result.close();
}
if (service.isSilent()) {
return new CollectionIteration<>(allBindings);
}
throw new QueryEvaluationException(
"Repository for endpoint " + rep.toString() + " could not be initialized.", e);
} catch (MalformedQueryException e) {
if (useFreshConnection) {
closeQuietly(conn);
}
// this exception must not be silenced, bug in our code
// => try a fallback to the simple evaluation
logger.debug("Encounted malformed query exception: " + e.getMessage()
+ ". Falling back to simple SERVICE evaluation.");
return evaluateInternalFallback(service, allBindings, baseUri);
} catch (QueryEvaluationException e) {
if (useFreshConnection) {
closeQuietly(conn);
}
if (result != null) {
result.close();
}
if (service.isSilent()) {
return new CollectionIteration<>(allBindings);
}
throw e;
} catch (RuntimeException e) {
if (useFreshConnection) {
closeQuietly(conn);
}
if (result != null) {
result.close();
}
// suppress special exceptions (e.g. UndeclaredThrowable with wrapped
// QueryEval) if silent
if (service.isSilent()) {
return new CollectionIteration<>(allBindings);
}
throw e;
}
}
/**
* Evaluate the service expression for the given lists of bindings using {@link FallbackServiceIteration}, i.e.
* basically as a simple join without VALUES clause.
*
* @param service the SERVICE
* @param allBindings all bindings to be processed
* @param baseUri the base URI
* @return resulting iteration
*/
private CloseableIteration<BindingSet> evaluateInternalFallback(Service service,
List<BindingSet> allBindings, String baseUri) {
CloseableIteration<BindingSet> res = new FallbackServiceIteration(service,
allBindings, baseUri);
if (service.isSilent()) {
res = new SilentIteration(res);
}
return res;
}
/**
* Insert the constructed VALUES clause in the beginning of the WHERE block. Also adds the {@link #ROW_IDX_VAR}
* projection if it is not already present.
*
* @param queryString the SELECT query string from the SERVICE node
* @param valuesClause the constructed VALUES clause
* @return the final String
*/
protected String insertValuesClause(String queryString, String valuesClause) {
StringBuilder sb = new StringBuilder(queryString);
if (sb.indexOf(ROW_IDX_VAR) == -1) {
// Note: we also explicitly check on "SELECT *", however, this
// check is heuristics based. If the generated query is invalid
// after this, the fallback evaluation will jump in
// This currently does not cover things like "SELECT *"
if (sb.indexOf("SELECT * ") == -1) {
sb.insert(sb.indexOf("SELECT") + 6, " ?" + ROW_IDX_VAR);
}
}
sb.insert(sb.indexOf("{") + 1, " " + valuesClause);
return sb.toString();
}
@Override
public void initialize() throws QueryEvaluationException {
try {
rep.init();
} catch (RepositoryException e) {
throw new QueryEvaluationException(e);
}
}
@Override
public boolean isInitialized() {
return rep.isInitialized();
}
public int getBoundJoinBlockSize() {
return boundJoinBlockSize;
}
/**
* @param boundJoinBlockSize the bound join block size, 0 to evaluate all in a single request
*/
public void setBoundJoinBlockSize(int boundJoinBlockSize) {
this.boundJoinBlockSize = boundJoinBlockSize;
}
/**
* @param flag whether to use a fresh {@link RepositoryConnection} for each individual query
*/
public void setUseFreshConnection(boolean flag) {
this.useFreshConnection = flag;
}
@Override
public void shutdown() throws QueryEvaluationException {
boolean foundException = false;
try {
if (managedConn != null) {
managedConn.close();
}
} catch (RepositoryException e) {
foundException = true;
throw new QueryEvaluationException(e);
} finally {
try {
// shutdown only if desired, e.g. do not
// invoke shutDown for managed repositories
if (shutDown) {
rep.shutDown();
}
} catch (RepositoryException e) {
// Try not to clobber the initial exception that may be more useful
if (!foundException) {
throw new QueryEvaluationException(e);
}
}
}
}
/**
* Return a fresh {@link RepositoryConnection} from the configured repository.
*
* @return connection
* @throws RepositoryException
*/
private RepositoryConnection freshConnection() throws RepositoryException {
return rep.getConnection();
}
/**
* Retrieve a (re-usable) connection. If it is not yet created, open a fresh connection. Note that this connection
* is closed automatically when shutting this service.
*
* @return connection
* @throws RepositoryException
*/
protected synchronized RepositoryConnection getConnection() throws RepositoryException {
if (managedConn == null) {
managedConn = freshConnection();
}
return managedConn;
}
/**
* Compute the relevant binding names using the variables occurring in the service expression and the input
* bindings. The idea is find all variables which need to be projected in the subquery, i.e. those that will not be
* bound by an input binding.
* <p>
* If the resulting list is empty, the cross product needs to be formed.
*
* @param bindings
* @param serviceVars
* @return the list of relevant bindings (if empty: the cross product needs to be formed)
*/
private List<String> getRelevantBindingNames(List<BindingSet> bindings, Set<String> serviceVars) {
// get the bindings variables
// TODO CHECK: does the first bindingset give all relevant names
List<String> relevantBindingNames = new ArrayList<>(5);
for (String bName : bindings.get(0).getBindingNames()) {
if (serviceVars.contains(bName)) {
relevantBindingNames.add(bName);
}
}
return relevantBindingNames;
}
/**
* Computes the VALUES clause for the set of relevant input bindings. The VALUES clause is attached to a subquery
* for block-nested-loop evaluation. Implementation note: we use a special binding to mark the rowIndex of the input
* binding.
*
* @param bindings
* @param relevantBindingNames
* @return a string with the VALUES clause for the given set of relevant input bindings
* @throws QueryEvaluationException
*/
private String buildVALUESClause(List<BindingSet> bindings, List<String> relevantBindingNames)
throws QueryEvaluationException {
StringBuilder sb = new StringBuilder();
sb.append(" VALUES (?__rowIdx"); // __rowIdx: see comment in evaluate()
for (String bName : relevantBindingNames) {
sb.append(" ?").append(bName);
}
sb.append(") { ");
int rowIdx = 0;
for (BindingSet b : bindings) {
sb.append(" (");
sb.append("\"").append(rowIdx++).append("\" "); // identification of
// the row for post
// processing
for (String bName : relevantBindingNames) {
QueryStringUtil.appendValueAsString(sb, b.getValue(bName)).append(" ");
}
sb.append(")");
}
sb.append(" }");
return sb.toString();
}
private static void closeQuietly(RepositoryConnection conn) {
if (conn == null) {
return;
}
try {
conn.close();
} catch (Throwable t) {
logger.warn("Failed to close connection:" + t.getMessage());
logger.debug("Details: ", t);
}
}
}