Transaction.java
/*******************************************************************************
* Copyright (c) 2016 Eclipse RDF4J contributors.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Distribution License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: BSD-3-Clause
*******************************************************************************/
package org.eclipse.rdf4j.http.server.repository.transaction;
import java.io.IOException;
import java.io.InputStream;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.eclipse.rdf4j.common.transaction.IsolationLevel;
import org.eclipse.rdf4j.common.transaction.TransactionSetting;
import org.eclipse.rdf4j.model.IRI;
import org.eclipse.rdf4j.model.Resource;
import org.eclipse.rdf4j.model.Statement;
import org.eclipse.rdf4j.model.Value;
import org.eclipse.rdf4j.model.vocabulary.RDF4J;
import org.eclipse.rdf4j.model.vocabulary.SESAME;
import org.eclipse.rdf4j.query.BooleanQuery;
import org.eclipse.rdf4j.query.Dataset;
import org.eclipse.rdf4j.query.GraphQuery;
import org.eclipse.rdf4j.query.GraphQueryResult;
import org.eclipse.rdf4j.query.Query;
import org.eclipse.rdf4j.query.QueryLanguage;
import org.eclipse.rdf4j.query.TupleQuery;
import org.eclipse.rdf4j.query.TupleQueryResult;
import org.eclipse.rdf4j.query.Update;
import org.eclipse.rdf4j.repository.Repository;
import org.eclipse.rdf4j.repository.RepositoryConnection;
import org.eclipse.rdf4j.repository.RepositoryException;
import org.eclipse.rdf4j.repository.util.RDFInserter;
import org.eclipse.rdf4j.rio.ParserConfig;
import org.eclipse.rdf4j.rio.RDFFormat;
import org.eclipse.rdf4j.rio.RDFHandlerException;
import org.eclipse.rdf4j.rio.RDFParser;
import org.eclipse.rdf4j.rio.RDFWriter;
import org.eclipse.rdf4j.rio.Rio;
import org.eclipse.rdf4j.rio.helpers.AbstractRDFHandler;
import org.eclipse.rdf4j.rio.helpers.BasicParserSettings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* A transaction encapsulates a single {@link Thread} and a {@link RepositoryConnection}, to enable executing all
* operations that are part of the transaction from a single, dedicated thread. This is necessary because
* {@link RepositoryConnection} is not guaranteed thread-safe and we may run into concurrency issues if we attempt to
* share it between the various HTTP Request worker threads.
*
* @author Jeen Broekstra
*/
class Transaction implements AutoCloseable {
private static final Logger logger = LoggerFactory.getLogger(Transaction.class);
/**
* Set to true when entering the {@link #close()} method for the first time, to ensure that only a single thread
* executes the close operations.
*/
private final AtomicBoolean isClosed = new AtomicBoolean(false);
/**
* Set to true when the {@link #close()} method is about to complete for the first invocation.
*/
private final AtomicBoolean closeCompleted = new AtomicBoolean(false);
private final UUID id;
private final Repository rep;
private final RepositoryConnection txnConnection;
/**
* The {@link ExecutorService} that performs all of the operations related to this Transaction.
*/
private final ExecutorService executor = Executors
.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("rdf4j-transaction-%d").build());
/**
* Counter of the active operations submitted to the executor
*/
private final AtomicInteger activeOperations = new AtomicInteger();
/**
* Create a new Transaction for the given {@link Repository}.
*
* @param repository the {@link Repository} on which to open a transaction.
* @throws InterruptedException if the transaction thread is interrupted while opening a connection.
* @throws ExecutionException if an error occurs while opening the connection.
*/
Transaction(Repository repository) throws InterruptedException, ExecutionException {
this.id = UUID.randomUUID();
this.rep = repository;
this.txnConnection = getTransactionConnection();
}
/**
* The identifier of this transaction object.
*
* @return a {@link UUID} that identifies this Transaction.
*/
UUID getID() {
return id;
}
/**
* Start the transaction.
*
* @param settings the {@link TransactionSetting}s to use for this transaction (including {@link IsolationLevel}).
* Optional vararg argument.
* @throws InterruptedException if the transaction thread is interrupted
* @throws ExecutionException if an error occurs while starting the transaction.
*/
void begin(TransactionSetting... settings) throws InterruptedException, ExecutionException {
Future<Boolean> result = submit(() -> {
txnConnection.begin(settings);
return true;
});
getFromFuture(result);
}
/**
* Rolls back all updates in the transaction.
*
* @throws ExecutionException
* @throws InterruptedException
*/
void rollback() throws InterruptedException, ExecutionException {
Future<Boolean> result = submit(() -> {
txnConnection.rollback();
return true;
});
getFromFuture(result);
}
/**
* @throws ExecutionException
* @throws InterruptedException
*/
void prepare() throws InterruptedException, ExecutionException {
Future<Boolean> result = submit(() -> {
txnConnection.prepare();
return true;
});
getFromFuture(result);
}
/**
* @throws ExecutionException
* @throws InterruptedException
*/
void commit() throws InterruptedException, ExecutionException {
Future<Boolean> result = submit(() -> {
txnConnection.commit();
return true;
});
getFromFuture(result);
}
/**
* Prepares a query for evaluation on this transaction.
*
* @param queryLanguage The {@link QueryLanguage query language} in which the query is formulated.
* @param query The query string.
* @param baseURI The base URI to resolve any relative URIs that are in the query against, can be
* <var>null</var> if the query does not contain any relative URIs.
* @return A query ready to be evaluated on this repository.
* @throws InterruptedException if the transaction thread is interrupted
* @throws ExecutionException if an error occurs while executing the operation.
*/
Query prepareQuery(QueryLanguage queryLanguage, String query, String baseURI)
throws InterruptedException, ExecutionException {
Future<Query> result = submit(() -> txnConnection.prepareQuery(queryLanguage, query, baseURI));
return getFromFuture(result);
}
/**
* Evaluate a TupleQuery in this transaction and return the result.
*
* @param tQuery a {@link TupleQuery} prepared on this transaction.
* @return a {@link TupleQueryResult}
* @throws InterruptedException if the transaction thread is interrupted
* @throws ExecutionException if an error occurs while executing the operation.
*/
TupleQueryResult evaluate(TupleQuery tQuery) throws InterruptedException, ExecutionException {
Future<TupleQueryResult> result = submit(tQuery::evaluate);
return getFromFuture(result);
}
/**
* Evaluate a {@link GraphQuery} in this transaction and return the result.
*
* @param gQuery a {@link GraphQuery} prepared on this transaction.
* @return a {@link GraphQueryResult}
* @throws InterruptedException if the transaction thread is interrupted
* @throws ExecutionException if an error occurs while executing the operation.
*/
GraphQueryResult evaluate(GraphQuery gQuery) throws InterruptedException, ExecutionException {
Future<GraphQueryResult> result = submit(gQuery::evaluate);
return getFromFuture(result);
}
/**
* Evaluate a {@link BooleanQuery} in this transaction and return the result.
*
* @param bQuery a {@link BooleanQuery} prepared on this transaction.
* @return the query result as a boolean
* @throws InterruptedException if the transaction thread is interrupted
* @throws ExecutionException if an error occurs while executing the operation.
*/
boolean evaluate(BooleanQuery bQuery) throws InterruptedException, ExecutionException {
Future<Boolean> result = submit(() -> bQuery.evaluate());
return getFromFuture(result);
}
/**
* @param subj
* @param pred
* @param obj
* @param useInferencing
* @param rdfWriter
* @param contexts
* @throws ExecutionException
* @throws InterruptedException
*/
void exportStatements(Resource subj, IRI pred, Value obj, boolean useInferencing, RDFWriter rdfWriter,
Resource... contexts) throws InterruptedException, ExecutionException {
Future<Boolean> result = submit(() -> {
txnConnection.exportStatements(subj, pred, obj, useInferencing, rdfWriter, contexts);
return true;
});
getFromFuture(result);
}
/**
* Returns the number of (explicit) statements that are in the specified contexts in this transaction.
*
* @param contexts The context(s) to get the data from. Note that this parameter is a vararg and as such is
* optional. If no contexts are supplied the method operates on the entire repository.
* @return The number of explicit statements from the specified contexts in this transaction.
*/
long getSize(Resource[] contexts) throws InterruptedException, ExecutionException {
Future<Long> result = submit(() -> txnConnection.size(contexts));
return getFromFuture(result);
}
/**
* Adds RDF data from an {@link InputStream} to the transaction.
*
* @param inputStream
* @param baseURI
* @param format
* @param contexts
* @throws ExecutionException
* @throws InterruptedException
*/
void add(InputStream inputStream, String baseURI, RDFFormat format, boolean preserveBNodes, Resource... contexts)
throws InterruptedException, ExecutionException {
Future<Boolean> result = submit(() -> {
logger.debug("executing add operation");
try {
if (preserveBNodes) {
// create a reconfigured parser + inserter instead of
// relying on standard
// repositoryconn add method.
RDFParser parser = Rio.createParser(format);
parser.getParserConfig().set(BasicParserSettings.PRESERVE_BNODE_IDS, true);
RDFInserter inserter = new RDFInserter(txnConnection);
inserter.setPreserveBNodeIDs(true);
if (contexts.length > 0) {
inserter.enforceContext(contexts);
}
parser.setRDFHandler(inserter);
parser.parse(inputStream, baseURI);
} else {
txnConnection.add(inputStream, baseURI, format, contexts);
}
return true;
} catch (IOException e) {
throw new RuntimeException(e);
}
});
getFromFuture(result);
}
/**
* @param contentType
* @param inputStream
* @param baseURI
* @throws ExecutionException
* @throws InterruptedException
*/
void delete(RDFFormat contentType, InputStream inputStream, String baseURI)
throws InterruptedException, ExecutionException {
Future<Boolean> result = submit(() -> {
logger.debug("executing delete operation");
RDFParser parser = Rio.createParser(contentType, txnConnection.getValueFactory());
parser.setRDFHandler(new WildcardRDFRemover(txnConnection));
parser.getParserConfig().set(BasicParserSettings.PRESERVE_BNODE_IDS, true);
try {
parser.parse(inputStream, baseURI);
return true;
} catch (IOException e) {
logger.error("error during txn delete operation", e);
throw new RuntimeException(e);
}
});
getFromFuture(result);
}
/**
* @param queryLn
* @param sparqlUpdateString
* @param baseURI
* @param includeInferred
* @param dataset
* @param bindings
* @throws ExecutionException
* @throws InterruptedException
*/
void executeUpdate(QueryLanguage queryLn, String sparqlUpdateString, String baseURI, boolean includeInferred,
Dataset dataset, Map<String, Value> bindings) throws InterruptedException, ExecutionException {
Future<Boolean> result = submit(() -> {
Update update = txnConnection.prepareUpdate(queryLn, sparqlUpdateString, baseURI);
update.setIncludeInferred(includeInferred);
if (dataset != null) {
update.setDataset(dataset);
}
for (String bindingName : bindings.keySet()) {
update.setBinding(bindingName, bindings.get(bindingName));
}
update.execute();
return true;
});
getFromFuture(result);
}
/**
* Checks if the user has any scheduled tasks for this transaction that have not yet completed.
*
* @return True if there are currently no active tasks being executed for this transaction and false otherwise.
*/
boolean hasActiveOperations() {
return activeOperations.get() > 0;
}
/**
* Checks if close has been called for this transaction.
*
* @return True if the close method has been called for this transaction.
*/
boolean isClosed() {
return isClosed.get();
}
/**
* Checks if close has been completed for this transaction.
*
* @return True if the close operations have been completed.
*/
boolean isComplete() {
return closeCompleted.get();
}
/**
* Close this transaction.
*
* @throws InterruptedException
* @throws ExecutionException
*/
@Override
public void close() throws InterruptedException, ExecutionException {
if (isClosed.compareAndSet(false, true)) {
try {
txnConnection.close();
} finally {
try {
if (!executor.isTerminated()) {
executor.shutdownNow();
}
} finally {
closeCompleted.set(true);
}
}
}
}
/**
* Obtains a {@link RepositoryConnection} through the {@link ExecutorService}.
*
* @return A new {@link RepositoryConnection} to use for this Transaction.
* @throws InterruptedException If the execution of the task was interrupted.
* @throws ExecutionException If the execution of the task failed for any reason.
*/
private RepositoryConnection getTransactionConnection() throws InterruptedException, ExecutionException {
// create a new RepositoryConnection with correct parser settings
Future<RepositoryConnection> result = submit(() -> {
RepositoryConnection conn = rep.getConnection();
ParserConfig config = conn.getParserConfig();
config.addNonFatalError(BasicParserSettings.VERIFY_DATATYPE_VALUES);
config.addNonFatalError(BasicParserSettings.VERIFY_LANGUAGE_TAGS);
return conn;
});
return getFromFuture(result);
}
/**
* Atomically submit the task to the executor and add to our local list used to track whether there are outstanding
* operations for the executor.
*
* @param callable The task to submit
* @return A {@link Future} that can be used to track whether the operation has succeeded and get the result.
*/
private <T> Future<T> submit(final Callable<T> callable) {
final Future<T> result = executor.submit(callable);
// increment the counter of the active operations
// note that it need to be decremented once the Future completes
activeOperations.incrementAndGet();
return result;
}
/**
* Atomically submit the task to the executor and add to our local list used to track whether there are outstanding
* operations for the executor. In addition, this atomically shuts down the ExecutorService to prevent future
* submissions from succeeding.
*
* @param callable The task to submit
* @return A {@link Future} that can be used to track whether the operation has succeeded and get the result.
*/
private <T> Future<T> submitAndShutdown(final Callable<T> callable) {
final Future<T> result = executor.submit(callable);
// increment the counter of the active operations
// note that it need to be decremented once the Future completes
activeOperations.incrementAndGet();
executor.shutdown();
return result;
}
private <T> T getFromFuture(Future<T> result) throws InterruptedException, ExecutionException {
try {
return result.get();
} finally {
activeOperations.decrementAndGet();
}
}
private static class WildcardRDFRemover extends AbstractRDFHandler {
private static final Resource[] ALL_CONTEXT = {};
private static final Resource[] DEFAULT_CONTEXT = { null };
private final RepositoryConnection conn;
public WildcardRDFRemover(RepositoryConnection conn) {
super();
this.conn = conn;
}
@Override
public void handleStatement(Statement st) throws RDFHandlerException {
Resource subject = SESAME.WILDCARD.equals(st.getSubject()) ? null : st.getSubject();
IRI predicate = SESAME.WILDCARD.equals(st.getPredicate()) ? null : st.getPredicate();
Value object = SESAME.WILDCARD.equals(st.getObject()) ? null : st.getObject();
Resource[] context;
if (st.getContext() == null) {
context = ALL_CONTEXT;
} else if (RDF4J.NIL.equals(st.getContext())) {
context = DEFAULT_CONTEXT;
} else {
context = new Resource[] { st.getContext() };
}
try {
if (subject == null && predicate == null && object == null) {
// use the RepositoryConnection.clear operation if we're removing all statements
conn.clear(context);
} else {
conn.remove(subject, predicate, object, context);
}
} catch (RepositoryException e) {
throw new RDFHandlerException(e);
}
}
}
}