SailSourceConnection.java

/*******************************************************************************
 * Copyright (c) 2015 Eclipse RDF4J contributors, Aduna, and others.
 *
 * All rights reserved. This program and the accompanying materials
 * are made available under the terms of the Eclipse Distribution License v1.0
 * which accompanies this distribution, and is available at
 * http://www.eclipse.org/org/documents/edl-v10.php.
 *
 * SPDX-License-Identifier: BSD-3-Clause
 *******************************************************************************/
package org.eclipse.rdf4j.sail.base;

import java.util.Comparator;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Stream;

import org.eclipse.rdf4j.common.iteration.CloseableIteration;
import org.eclipse.rdf4j.common.order.StatementOrder;
import org.eclipse.rdf4j.common.transaction.IsolationLevel;
import org.eclipse.rdf4j.common.transaction.IsolationLevels;
import org.eclipse.rdf4j.common.transaction.QueryEvaluationMode;
import org.eclipse.rdf4j.common.transaction.TransactionSetting;
import org.eclipse.rdf4j.model.IRI;
import org.eclipse.rdf4j.model.Namespace;
import org.eclipse.rdf4j.model.Resource;
import org.eclipse.rdf4j.model.Statement;
import org.eclipse.rdf4j.model.Value;
import org.eclipse.rdf4j.model.ValueFactory;
import org.eclipse.rdf4j.query.BindingSet;
import org.eclipse.rdf4j.query.Dataset;
import org.eclipse.rdf4j.query.QueryEvaluationException;
import org.eclipse.rdf4j.query.algebra.QueryRoot;
import org.eclipse.rdf4j.query.algebra.TupleExpr;
import org.eclipse.rdf4j.query.algebra.evaluation.EvaluationStrategy;
import org.eclipse.rdf4j.query.algebra.evaluation.EvaluationStrategyFactory;
import org.eclipse.rdf4j.query.algebra.evaluation.QueryEvaluationStep;
import org.eclipse.rdf4j.query.algebra.evaluation.TripleSource;
import org.eclipse.rdf4j.query.algebra.evaluation.federation.FederatedServiceResolver;
import org.eclipse.rdf4j.query.algebra.evaluation.federation.FederatedServiceResolverClient;
import org.eclipse.rdf4j.query.algebra.evaluation.impl.DefaultEvaluationStrategy;
import org.eclipse.rdf4j.query.algebra.evaluation.impl.DefaultEvaluationStrategyFactory;
import org.eclipse.rdf4j.query.algebra.helpers.QueryModelTreeToGenericPlanNode;
import org.eclipse.rdf4j.query.explanation.Explanation;
import org.eclipse.rdf4j.query.explanation.ExplanationImpl;
import org.eclipse.rdf4j.query.impl.EmptyBindingSet;
import org.eclipse.rdf4j.sail.SailConnection;
import org.eclipse.rdf4j.sail.SailException;
import org.eclipse.rdf4j.sail.UnknownSailTransactionStateException;
import org.eclipse.rdf4j.sail.UpdateContext;
import org.eclipse.rdf4j.sail.helpers.AbstractNotifyingSailConnection;
import org.eclipse.rdf4j.sail.helpers.AbstractSail;
import org.eclipse.rdf4j.sail.inferencer.InferencerConnection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * A {@link SailConnection} implementation that is based on an {@link SailStore} .
 *
 * @author James Leigh
 */
public abstract class SailSourceConnection extends AbstractNotifyingSailConnection
		implements InferencerConnection, FederatedServiceResolverClient {

	private static final Logger logger = LoggerFactory.getLogger(SailSourceConnection.class);

	/*-----------*
	 * Variables *
	 *-----------*/

	/**
	 * The state of store for outstanding operations.
	 */
	private final Map<UpdateContext, SailDataset> datasets = new HashMap<>();

	/**
	 * Outstanding changes that are underway, but not yet realized, by an active operation.
	 */
	private final Map<UpdateContext, SailSink> explicitSinks = new HashMap<>();

	/**
	 * Set of explicit statements that must not be inferred.
	 */
	private volatile SailDataset explicitOnlyDataset;

	/**
	 * Set of inferred statements that have already been inferred earlier.
	 */
	private volatile SailDataset inferredOnlyDataset;

	/**
	 * Outstanding inferred statements that are not yet flushed by a read operation.
	 */
	private volatile SailSink inferredOnlySink;

	/**
	 * {@link ValueFactory} used by this connection.
	 */
	private final ValueFactory vf;

	/**
	 * The backing {@link SailStore} used to manage the state.
	 */
	private final SailStore store;

	/**
	 * The default {@link IsolationLevel} when not otherwise specified.
	 */
	private final IsolationLevel defaultIsolationLevel;

	/**
	 * An {@link SailSource} of only explicit statements when in an isolated transaction.
	 */
	private volatile SailSource explicitOnlyBranch;

	/**
	 * An {@link SailSource} of only inferred statements when in an isolated transaction.
	 */
	private volatile SailSource inferredOnlyBranch;

	/**
	 * An {@link SailSource} of all statements when in an isolated transaction.
	 */
	private volatile SailSource includeInferredBranch;

	/**
	 * {@link EvaluationStrategyFactory} to use.
	 */
	private final EvaluationStrategyFactory evalStratFactory;

	/**
	 * Connection specific resolver.
	 */
	private volatile FederatedServiceResolver federatedServiceResolver;

	// The context that represents the unnamed graph
	static final Resource[] NULL_CTX = new Resource[] { null };

	// Track the result sizes generated when evaluating a query, used by explain(...)
	private boolean trackResultSize;

	// By default all tuple expressions are cloned before being optimized and executed. We don't want to do this for
	// .explain(...) since we need to retrieve the optimized or executed plan.
	private boolean cloneTupleExpression = true;

	// Track the time used when evaluating a query, used by explain(...)
	private boolean trackTime;

	// current query evaluation mode
	private QueryEvaluationMode queryEvaluationMode;

	/**
	 * Creates a new {@link SailConnection}, using the given {@link SailStore} to manage the state.
	 *
	 * @param sail
	 * @param store
	 * @param resolver the FederatedServiceResolver to use with the {@link DefaultEvaluationStrategy default
	 *                 EvaluationStrategy}.
	 */
	protected SailSourceConnection(AbstractSail sail, SailStore store, FederatedServiceResolver resolver) {
		this(sail, store, new DefaultEvaluationStrategyFactory(resolver));

	}

	/**
	 * Creates a new {@link SailConnection}, using the given {@link SailStore} to manage the state.
	 *
	 * @param sail
	 * @param store
	 * @param evalStratFactory the {@link EvaluationStrategyFactory} to use.
	 */
	protected SailSourceConnection(AbstractSail sail, SailStore store, EvaluationStrategyFactory evalStratFactory) {
		super(sail);
		this.vf = sail.getValueFactory();
		this.store = store;
		this.defaultIsolationLevel = sail.getDefaultIsolationLevel();
		this.evalStratFactory = evalStratFactory;

		this.federatedServiceResolver = (evalStratFactory instanceof FederatedServiceResolverClient)
				? ((FederatedServiceResolverClient) evalStratFactory).getFederatedServiceResolver()
				: null;
		this.queryEvaluationMode = getSailBase().getDefaultQueryEvaluationMode();
		this.evalStratFactory.setCollectionFactory(sail.getCollectionFactory());
	}

	/**
	 * Returns the {@link FederatedServiceResolver} being used.
	 *
	 * @return null if a custom {@link EvaluationStrategyFactory} is being used.
	 */
	@Override
	public FederatedServiceResolver getFederatedServiceResolver() {
		return federatedServiceResolver;
	}

	/**
	 * Sets the {@link FederatedServiceResolver} to use. If a custom {@link EvaluationStrategyFactory} is being used
	 * then this only has an effect if it implements {@link FederatedServiceResolverClient}.
	 */
	@Override
	public void setFederatedServiceResolver(FederatedServiceResolver resolver) {
		this.federatedServiceResolver = resolver;
	}

	protected EvaluationStrategy getEvaluationStrategy(Dataset dataset, TripleSource tripleSource) {
		EvaluationStrategy evalStrat = evalStratFactory.createEvaluationStrategy(dataset, tripleSource,
				store.getEvaluationStatistics());
		if (federatedServiceResolver != null && evalStrat instanceof FederatedServiceResolverClient) {
			((FederatedServiceResolverClient) evalStrat).setFederatedServiceResolver(federatedServiceResolver);
		}
		evalStrat.setQueryEvaluationMode(queryEvaluationMode);
		return evalStrat;
	}

	@Override
	protected CloseableIteration<? extends BindingSet> evaluateInternal(TupleExpr tupleExpr,
			Dataset dataset, BindingSet bindings, boolean includeInferred) throws SailException {
		logger.trace("Incoming query model:\n{}", tupleExpr);

		if (cloneTupleExpression) {
			// Clone the tuple expression to allow for more aggressive optimizations
			tupleExpr = tupleExpr.clone();
		}

		if (!(tupleExpr instanceof QueryRoot)) {
			// Add a dummy root node to the tuple expressions to allow the
			// optimizers to modify the actual root node
			tupleExpr = new QueryRoot(tupleExpr);
		}

		SailSource branch = null;
		SailDataset rdfDataset = null;
		CloseableIteration<BindingSet> iteration = null;
		boolean allGood = false;
		try {
			branch = branch(IncludeInferred.fromBoolean(includeInferred));
			rdfDataset = branch.dataset(getIsolationLevel());

			TripleSource tripleSource = new SailDatasetTripleSource(vf, rdfDataset);
			EvaluationStrategy strategy = getEvaluationStrategy(dataset, tripleSource);
			if (trackResultSize) {
				strategy.setTrackResultSize(trackResultSize);
			}

			if (trackTime) {
				strategy.setTrackTime(trackTime);
			}

			tupleExpr = strategy.optimize(tupleExpr, store.getEvaluationStatistics(), bindings);

			logger.trace("Optimized query model:\n{}", tupleExpr);
			QueryEvaluationStep qes = strategy.precompile(tupleExpr);
			iteration = qes.evaluate(EmptyBindingSet.getInstance());
			iteration = interlock(iteration, rdfDataset, branch);
			allGood = true;
			return iteration;
		} catch (QueryEvaluationException e) {
			throw new SailException(e);
		} finally {
			if (!allGood) {

				try {
					if (iteration != null) {
						iteration.close();
					}
				} finally {
					try {
						if (rdfDataset != null) {
							rdfDataset.close();
						}
					} finally {
						if (branch != null) {
							branch.close();
						}
					}
				}
			}
		}
	}

	@Override
	public Explanation explain(Explanation.Level level, TupleExpr tupleExpr, Dataset dataset,
			BindingSet bindings, boolean includeInferred, int timeoutSeconds) {
		boolean queryTimedOut = false;

		try {

			switch (level) {
			case Timed:
				this.trackTime = true;
				this.trackResultSize = true;
				this.cloneTupleExpression = false;

				queryTimedOut = runQueryForExplain(tupleExpr, dataset, bindings, includeInferred, timeoutSeconds);
				break;

			case Executed:
				this.trackResultSize = true;
				this.cloneTupleExpression = false;

				queryTimedOut = runQueryForExplain(tupleExpr, dataset, bindings, includeInferred, timeoutSeconds);
				break;

			case Optimized:
				this.cloneTupleExpression = false;

				evaluate(tupleExpr, dataset, bindings, includeInferred).close();

				break;

			case Unoptimized:
				break;

			default:
				throw new UnsupportedOperationException("Unsupported query explanation level: " + level);

			}

		} finally {
			this.cloneTupleExpression = true;
			this.trackResultSize = false;
			this.trackTime = false;
		}

		QueryModelTreeToGenericPlanNode converter = new QueryModelTreeToGenericPlanNode(tupleExpr);
		tupleExpr.visit(converter);

		return new ExplanationImpl(converter.getGenericPlanNode(), queryTimedOut);

	}

	private boolean runQueryForExplain(TupleExpr tupleExpr, Dataset dataset, BindingSet bindings,
			boolean includeInferred, int timeoutSeconds) {

		AtomicBoolean timedOut = new AtomicBoolean(false);

		Thread currentThread = Thread.currentThread();

		// selfInterruptOnTimeoutThread will interrupt the current thread after a set timeout to stop the query
		// execution
		Thread selfInterruptOnTimeoutThread = new Thread(() -> {
			try {
				TimeUnit.SECONDS.sleep(timeoutSeconds);
				currentThread.interrupt();
				timedOut.set(true);
			} catch (InterruptedException ignored) {

			}
		});

		try {
			selfInterruptOnTimeoutThread.start();

			try (CloseableIteration<? extends BindingSet> evaluate = evaluate(tupleExpr,
					dataset, bindings, includeInferred)) {
				while (evaluate.hasNext()) {
					if (Thread.interrupted()) {
						break;
					}
					evaluate.next();
				}
			} catch (Exception e) {
				if (e instanceof InterruptedException) {
					Thread.currentThread().interrupt();
				}
				if (!timedOut.get()) {
					throw e;
				}
			}

			return timedOut.get();

		} finally {
			selfInterruptOnTimeoutThread.interrupt();
			try {
				// make sure selfInterruptOnTimeoutThread finishes
				selfInterruptOnTimeoutThread.join();
			} catch (InterruptedException ignored) {
			}

			// clear interrupted flag;
			Thread.interrupted();
		}

	}

	@Override
	protected void closeInternal() throws SailException {
		// no-op
	}

	@Override
	protected CloseableIteration<? extends Resource> getContextIDsInternal() throws SailException {
		SailSource branch = branch(IncludeInferred.explicitOnly);
		SailDataset snapshot = branch.dataset(getIsolationLevel());
		return SailClosingIteration.makeClosable(snapshot.getContextIDs(), snapshot, branch);
	}

	@Override
	protected CloseableIteration<? extends Statement> getStatementsInternal(Resource subj, IRI pred,
			Value obj, boolean includeInferred, Resource... contexts) throws SailException {
		SailSource branch = branch(IncludeInferred.fromBoolean(includeInferred));
		SailDataset snapshot = branch.dataset(getIsolationLevel());
		return SailClosingIteration.makeClosable(snapshot.getStatements(subj, pred, obj, contexts), snapshot, branch);
	}

	@Override
	protected CloseableIteration<? extends Statement> getStatementsInternal(StatementOrder order, Resource subj,
			IRI pred,
			Value obj, boolean includeInferred, Resource... contexts) throws SailException {
		SailSource branch = branch(IncludeInferred.fromBoolean(includeInferred));
		SailDataset snapshot = branch.dataset(getIsolationLevel());
		return SailClosingIteration.makeClosable(snapshot.getStatements(order, subj, pred, obj, contexts), snapshot,
				branch);
	}

	@Override
	public Comparator<Value> getComparator() {
		try (SailSource branch = branch(IncludeInferred.fromBoolean(false))) {
			try (SailDataset snapshot = branch.dataset(getIsolationLevel())) {
				return snapshot.getComparator();
			}
		}
	}

	@Override
	protected long sizeInternal(Resource... contexts) throws SailException {
		try (Stream<? extends Statement> stream = getStatementsInternal(null, null, null, false, contexts).stream()) {
			return stream.count();
		}
	}

	@Override
	protected CloseableIteration<? extends Namespace> getNamespacesInternal() throws SailException {
		SailSource branch = branch(IncludeInferred.explicitOnly);
		SailDataset snapshot = branch.dataset(getIsolationLevel());
		return SailClosingIteration.makeClosable(snapshot.getNamespaces(), snapshot, branch);
	}

	@Override
	protected String getNamespaceInternal(String prefix) throws SailException {
		SailSource branch = null;
		SailDataset snapshot = null;
		try {
			branch = branch(IncludeInferred.explicitOnly);
			snapshot = branch.dataset(getIsolationLevel());
			return snapshot.getNamespace(prefix);
		} finally {
			try {
				if (snapshot != null) {
					snapshot.close();
				}
			} finally {
				if (branch != null) {
					branch.close();
				}
			}
		}
	}

	@Override
	public void setTransactionSettings(TransactionSetting... settings) {
		this.queryEvaluationMode = getSailBase().getDefaultQueryEvaluationMode();
		for (TransactionSetting setting : settings) {
			if (setting instanceof QueryEvaluationMode) {
				this.queryEvaluationMode = ((QueryEvaluationMode) setting);
			}
		}
		super.setTransactionSettings(settings);
	}

	@Override
	protected void startTransactionInternal() throws SailException {
		assert explicitOnlyBranch == null;
		assert inferredOnlyBranch == null;
		assert includeInferredBranch == null;
		IsolationLevel level = getTransactionIsolation();
		if (!IsolationLevels.NONE.isCompatibleWith(level)) {
			// only create transaction branches if transaction is isolated
			explicitOnlyBranch = store.getExplicitSailSource().fork();
			inferredOnlyBranch = store.getInferredSailSource().fork();
			includeInferredBranch = new UnionSailSource(inferredOnlyBranch, explicitOnlyBranch);
		}
	}

	@Override
	protected void prepareInternal() throws SailException {
		SailSource toCheckIncludeInferredBranch = includeInferredBranch;
		if (toCheckIncludeInferredBranch != null) {
			toCheckIncludeInferredBranch.prepare();
		}
	}

	@Override
	protected void commitInternal() throws SailException {
		SailSource toCloseInferredBranch = includeInferredBranch;
		explicitOnlyBranch = null;
		inferredOnlyBranch = null;
		includeInferredBranch = null;

		queryEvaluationMode = getSailBase().getDefaultQueryEvaluationMode();
		try {
			if (toCloseInferredBranch != null) {
				toCloseInferredBranch.flush();
			}
		} finally {
			if (toCloseInferredBranch != null) {
				toCloseInferredBranch.close();
			}
		}
	}

	@Override
	protected void rollbackInternal() throws SailException {
		synchronized (datasets) {
			SailDataset toCloseDataset = null;
			SailSink toCloseExplicitSink = null;
			SailDataset toCloseExplicitOnlyDataset = explicitOnlyDataset;
			explicitOnlyDataset = null;
			SailDataset toCloseInferredDataset = inferredOnlyDataset;
			inferredOnlyDataset = null;
			SailSink toCloseInferredSink = inferredOnlySink;
			inferredOnlySink = null;
			SailSource toCloseIncludeInferredBranch = includeInferredBranch;
			includeInferredBranch = null;
			explicitOnlyBranch = null;
			inferredOnlyBranch = null;

			queryEvaluationMode = getSailBase().getDefaultQueryEvaluationMode();

			try {
				if (datasets.containsKey(null)) {
					toCloseDataset = datasets.remove(null);
				}
			} finally {
				try {
					if (toCloseDataset != null) {
						toCloseDataset.close();
					}
				} finally {
					try {
						if (explicitSinks.containsKey(null)) {
							toCloseExplicitSink = explicitSinks.remove(null);
						}
					} finally {
						try {
							if (toCloseExplicitSink != null) {
								toCloseExplicitSink.close();
							}
						} finally {
							try {
								if (toCloseExplicitOnlyDataset != null) {
									toCloseExplicitOnlyDataset.close();
								}
							} finally {
								try {
									if (toCloseInferredDataset != null) {
										toCloseInferredDataset.close();
									}
								} finally {
									try {
										if (toCloseInferredSink != null) {
											toCloseInferredSink.close();
										}
									} finally {
										if (toCloseIncludeInferredBranch != null) {
											toCloseIncludeInferredBranch.close();
										}
									}
								}
							}
						}
					}
				}
			}
		}

	}

	@Override
	public void startUpdate(UpdateContext op) throws SailException {
		if (op != null) {
			IsolationLevel level = getIsolationLevel();
			flush();
			synchronized (datasets) {
				assert !datasets.containsKey(op);
				SailSource source;
				if (op.isIncludeInferred() && inferredOnlyBranch == null) {
					// IsolationLevels.NONE
					SailSource explicit = store.getExplicitSailSource();
					SailSource inferred = store.getInferredSailSource();
					source = new UnionSailSource(explicit, inferred);
				} else if (op.isIncludeInferred()) {
					source = new UnionSailSource(explicitOnlyBranch, inferredOnlyBranch);
				} else {
					source = branch(IncludeInferred.explicitOnly);
				}
				datasets.put(op, source.dataset(level));
				explicitSinks.put(op, source.sink(level));
			}
		}
	}

	@Override
	public void addStatement(UpdateContext op, Resource subj, IRI pred, Value obj, Resource... contexts)
			throws SailException {
		verifyIsOpen();
		verifyIsActive();
		synchronized (datasets) {
			if (op == null && !datasets.containsKey(null)) {
				SailSource source = branch(IncludeInferred.explicitOnly);
				datasets.put(null, source.dataset(getIsolationLevel()));
				explicitSinks.put(null, source.sink(getIsolationLevel()));
			}
			assert explicitSinks.containsKey(op);
			add(subj, pred, obj, datasets.get(op), explicitSinks.get(op), contexts);
		}
		addStatementInternal(subj, pred, obj, contexts);
	}

	@Override
	public void removeStatement(UpdateContext op, Resource subj, IRI pred, Value obj, Resource... contexts)
			throws SailException {
		verifyIsOpen();
		verifyIsActive();
		synchronized (datasets) {
			if (op == null && !datasets.containsKey(null)) {
				SailSource source = branch(IncludeInferred.explicitOnly);
				datasets.put(null, source.dataset(getIsolationLevel()));
				explicitSinks.put(null, source.sink(getIsolationLevel()));
			}
			assert explicitSinks.containsKey(op);
			remove(subj, pred, obj, datasets.get(op), explicitSinks.get(op), contexts);
		}
		removeStatementsInternal(subj, pred, obj, contexts);
	}

	@Override
	protected void endUpdateInternal(UpdateContext op) throws SailException {
		synchronized (datasets) {
			if (inferredOnlySink == null && explicitOnlyDataset == null && inferredOnlyDataset == null
					&& datasets.isEmpty() && explicitSinks.isEmpty()) {
				return;
			}

			SailSink toCloseInferredSink = inferredOnlySink;
			inferredOnlySink = null;
			SailDataset toCloseExplicitOnlyDataset = explicitOnlyDataset;
			explicitOnlyDataset = null;
			SailDataset toCloseInferredDataset = inferredOnlyDataset;
			inferredOnlyDataset = null;

			try {
				if (toCloseInferredSink != null) {
					toCloseInferredSink.flush();
				}
			} finally {
				try {
					if (toCloseInferredSink != null) {
						toCloseInferredSink.close();
					}
				} finally {
					try {
						if (toCloseExplicitOnlyDataset != null) {
							toCloseExplicitOnlyDataset.close();
						}
					} finally {
						try {
							if (toCloseInferredDataset != null) {
								toCloseInferredDataset.close();
							}
						} finally {
							SailSink explicit = null;
							try {
								explicit = explicitSinks.remove(op);
								if (explicit != null) {
									explicit.flush();
								}
							} finally {
								try {
									if (explicit != null) {
										explicit.close();
									}
								} finally {
									SailDataset toCloseDataset = null;
									try {
										toCloseDataset = datasets.remove(op);
									} finally {
										if (toCloseDataset != null) {
											toCloseDataset.close();
										}
									}
								}
							}
						}
					}
				}
			}
		}
	}

	@Override
	public boolean addInferredStatement(Resource subj, IRI pred, Value obj, Resource... contexts) throws SailException {
		verifyIsOpen();
		verifyIsActive();
		IsolationLevel level = getIsolationLevel();
		synchronized (datasets) {
			if (inferredOnlySink == null) {
				SailSource branch = branch(IncludeInferred.inferredOnly);
				inferredOnlyDataset = branch.dataset(level);
				inferredOnlySink = branch.sink(level);
				explicitOnlyDataset = branch(IncludeInferred.explicitOnly).dataset(level);
			}
			boolean modified = false;
			if (contexts.length == 0 || contexts.length == 1 && contexts[0] == null) {
				if (!hasStatement(explicitOnlyDataset, subj, pred, obj, NULL_CTX)) {
					// only add inferred statements that aren't already explicit
					boolean notHasStatement = !hasStatement(inferredOnlyDataset, subj, pred, obj, NULL_CTX);
					inferredOnlySink.approve(subj, pred, obj, null);
					if (notHasStatement) {
						// only report inferred statements that don't already
						// exist
						addStatementInternal(subj, pred, obj, contexts);
						notifyStatementAdded(vf.createStatement(subj, pred, obj));
						setStatementsAdded();
						modified = true;
					}
				}
			} else {
				for (Resource ctx : contexts) {
					Resource[] contextsToCheck;
					if (contexts.length == 1) {
						contextsToCheck = contexts;
					} else {
						contextsToCheck = new Resource[] { ctx };
					}

					if (!hasStatement(explicitOnlyDataset, subj, pred, obj, contextsToCheck)) {
						// only add inferred statements that aren't already
						// explicit
						boolean notHasStatement = !hasStatement(inferredOnlyDataset, subj, pred, obj, contextsToCheck);
						inferredOnlySink.approve(subj, pred, obj, ctx);
						if (notHasStatement) {
							// only report inferred statements that don't
							// already exist
							addStatementInternal(subj, pred, obj, ctx);
							notifyStatementAdded(vf.createStatement(subj, pred, obj, ctx));
							setStatementsAdded();
							modified = true;
						}
					}
				}
			}
			return modified;
		}
	}

	private void add(Resource subj, IRI pred, Value obj, SailDataset dataset, SailSink sink, Resource... contexts)
			throws SailException {
		if (contexts.length == 0 || (contexts.length == 1 && contexts[0] == null)) {
			if (hasConnectionListeners()) {
				if (!hasStatement(dataset, subj, pred, obj, NULL_CTX)) {
					notifyStatementAdded(vf.createStatement(subj, pred, obj));
				} else if (sink instanceof Changeset && ((Changeset) sink).hasDeprecated(subj, pred, obj, NULL_CTX)) {
					notifyStatementAdded(vf.createStatement(subj, pred, obj));
				}

				// always approve the statement, even if it already exists
				sink.approve(subj, pred, obj, null);

			} else {
				sink.approve(subj, pred, obj, null);
			}
		} else {
			for (Resource ctx : contexts) {
				if (ctx != null && ctx.isTriple()) {
					throw new SailException("context argument can not be of type Triple: " + ctx.stringValue());
				}

				Resource[] contextsToCheck;
				if (contexts.length == 1) {
					contextsToCheck = contexts;
				} else {
					contextsToCheck = new Resource[] { ctx };
				}

				if (hasConnectionListeners()) {
					if (!hasStatement(dataset, subj, pred, obj, contextsToCheck)) {
						notifyStatementAdded(vf.createStatement(subj, pred, obj, ctx));
					} else if (sink instanceof Changeset
							&& ((Changeset) sink).hasDeprecated(subj, pred, obj, contextsToCheck)) {
						notifyStatementAdded(vf.createStatement(subj, pred, obj));
					}
					sink.approve(subj, pred, obj, ctx);
				} else {
					sink.approve(subj, pred, obj, ctx);
				}
			}
		}
	}

	@Override
	public boolean removeInferredStatement(Resource subj, IRI pred, Value obj, Resource... contexts)
			throws SailException {
		verifyIsOpen();
		verifyIsActive();
		synchronized (datasets) {
			IsolationLevel level = getIsolationLevel();
			if (inferredOnlySink == null) {
				SailSource branch = branch(IncludeInferred.inferredOnly);
				inferredOnlyDataset = branch.dataset(level);
				inferredOnlySink = branch.sink(level);
				explicitOnlyDataset = branch(IncludeInferred.explicitOnly).dataset(level);
			}
			removeStatementsInternal(subj, pred, obj, contexts);
			boolean removed = remove(subj, pred, obj, inferredOnlyDataset, inferredOnlySink, contexts);
			if (removed) {
				setStatementsRemoved();
			}
			return removed;
		}
	}

	private boolean remove(Resource subj, IRI pred, Value obj, SailDataset dataset, SailSink sink, Resource... contexts)
			throws SailException {

		// Use deprecateByQuery if we don't need to notify anyone of which statements have been deleted.
		if (!hasConnectionListeners() && sink.supportsDeprecateByQuery()) {
			return sink.deprecateByQuery(subj, pred, obj, contexts);
		}

		boolean statementsRemoved = false;

		try (CloseableIteration<? extends Statement> iter = dataset.getStatements(subj, pred, obj,
				contexts)) {
			while (iter.hasNext()) {
				Statement st = iter.next();
				sink.deprecate(st);
				statementsRemoved = true;
				notifyStatementRemoved(st);
			}
		}
		return statementsRemoved;
	}

	@Override
	protected void clearInternal(Resource... contexts) throws SailException {
		verifyIsOpen();
		verifyIsActive();
		synchronized (datasets) {
			if (!datasets.containsKey(null)) {
				SailSource source = branch(IncludeInferred.explicitOnly);
				datasets.put(null, source.dataset(getIsolationLevel()));
				explicitSinks.put(null, source.sink(getIsolationLevel()));
			}
			assert explicitSinks.containsKey(null);
			if (this.hasConnectionListeners()) {
				remove(null, null, null, datasets.get(null), explicitSinks.get(null), contexts);
			}
			explicitSinks.get(null).clear(contexts);
		}
	}

	@Override
	public void clearInferred(Resource... contexts) throws SailException {
		verifyIsOpen();
		verifyIsActive();
		synchronized (datasets) {
			if (inferredOnlySink == null) {
				IsolationLevel level = getIsolationLevel();
				SailSource branch = branch(IncludeInferred.inferredOnly);
				inferredOnlyDataset = branch.dataset(level);
				inferredOnlySink = branch.sink(level);
				explicitOnlyDataset = branch(IncludeInferred.explicitOnly).dataset(level);
			}
			if (this.hasConnectionListeners()) {
				remove(null, null, null, inferredOnlyDataset, inferredOnlySink, contexts);
			}
			inferredOnlySink.clear(contexts);
			setStatementsRemoved();
		}
	}

	@Override
	public void flushUpdates() throws SailException {
		flush();
	}

	@Override
	protected void setNamespaceInternal(String prefix, String name) throws SailException {
		SailSource branch = null;
		SailSink sink = null;
		try {
			branch = branch(IncludeInferred.explicitOnly);
			sink = branch.sink(getTransactionIsolation());
			sink.setNamespace(prefix, name);
			sink.flush();
		} finally {
			try {
				if (sink != null) {
					sink.close();
				}
			} finally {
				if (branch != null) {
					branch.close();
				}
			}
		}
	}

	@Override
	protected void removeNamespaceInternal(String prefix) throws SailException {
		SailSource branch = null;
		SailSink sink = null;
		try {
			branch = branch(IncludeInferred.explicitOnly);
			sink = branch.sink(getTransactionIsolation());
			sink.removeNamespace(prefix);
			sink.flush();
		} finally {
			try {
				if (sink != null) {
					sink.close();
				}
			} finally {
				if (branch != null) {
					branch.close();
				}
			}
		}
	}

	@Override
	protected void clearNamespacesInternal() throws SailException {
		SailSource branch = null;
		SailSink sink = null;
		try {
			branch = branch(IncludeInferred.explicitOnly);
			sink = branch.sink(getTransactionIsolation());
			sink.clearNamespaces();
			sink.flush();
		} finally {
			try {
				if (sink != null) {
					sink.close();
				}
			} finally {
				if (branch != null) {
					branch.close();
				}
			}
		}
	}

	/*-------------------------------------*
	 * Inner class MemEvaluationStatistics *
	 *-------------------------------------*/

	private IsolationLevel getIsolationLevel() throws UnknownSailTransactionStateException {
		if (isActive()) {
			return getTransactionIsolation();
		} else {
			return defaultIsolationLevel;
		}
	}

	enum IncludeInferred {
		all,
		explicitOnly,
		inferredOnly;

		public static IncludeInferred fromBoolean(boolean includeInferred) {
			return includeInferred ? all : explicitOnly;
		}
	}

	/**
	 * @return read operation {@link SailSource}
	 * @throws SailException
	 */
	private SailSource branch(IncludeInferred includeinferred) throws SailException {
		boolean active = isActive();
		IsolationLevel level = getIsolationLevel();
		boolean isolated = !IsolationLevels.NONE.isCompatibleWith(level);
		if (includeinferred == IncludeInferred.all && active && isolated) {
			// use the transaction branch
			return new DelegatingSailSource(includeInferredBranch, false);
		} else if (includeinferred == IncludeInferred.inferredOnly && active && isolated) {
			// use the transaction branch
			return new DelegatingSailSource(inferredOnlyBranch, false);
		} else if (active && isolated) {
			// use the transaction branch
			return new DelegatingSailSource(explicitOnlyBranch, false);
		} else if (includeinferred == IncludeInferred.all && active) {
			// don't actually branch source
			return new UnionSailSource(store.getInferredSailSource(), store.getExplicitSailSource());
		} else if (includeinferred == IncludeInferred.inferredOnly && active) {
			// don't actually branch source
			return store.getInferredSailSource();
		} else if (active) {
			// don't actually branch source
			return store.getExplicitSailSource();
		} else if (includeinferred == IncludeInferred.all) {
			// create a new branch for read operation
			return new UnionSailSource(store.getInferredSailSource().fork(), store.getExplicitSailSource().fork());
		} else if (includeinferred == IncludeInferred.inferredOnly) {
			// create a new branch for read operation
			return store.getInferredSailSource().fork();
		} else {
			// create a new branch for read operation
			return store.getExplicitSailSource().fork();
		}
	}

	private <T> CloseableIteration<T> interlock(
			CloseableIteration<T> iter, SailClosable... closes) {
		return new SailClosingIteration<>(iter, closes) {

			@Override
			protected void handleSailException(SailException e) throws QueryEvaluationException {
				throw new QueryEvaluationException(e);
			}
		};
	}

	private boolean hasStatement(SailDataset dataset, Resource subj, IRI pred, Value obj, Resource... contexts)
			throws SailException {
		try (CloseableIteration<? extends Statement> iter = dataset.getStatements(subj, pred, obj,
				contexts)) {
			return iter.hasNext();
		}
	}

}