SailSourceBranch.java

/*******************************************************************************
 * Copyright (c) 2015 Eclipse RDF4J contributors, Aduna, and others.
 *
 * All rights reserved. This program and the accompanying materials
 * are made available under the terms of the Eclipse Distribution License v1.0
 * which accompanies this distribution, and is available at
 * http://www.eclipse.org/org/documents/edl-v10.php.
 *
 * SPDX-License-Identifier: BSD-3-Clause
 *******************************************************************************/
package org.eclipse.rdf4j.sail.base;

import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

import org.eclipse.rdf4j.common.transaction.IsolationLevel;
import org.eclipse.rdf4j.common.transaction.IsolationLevels;
import org.eclipse.rdf4j.model.IRI;
import org.eclipse.rdf4j.model.Model;
import org.eclipse.rdf4j.model.ModelFactory;
import org.eclipse.rdf4j.model.Resource;
import org.eclipse.rdf4j.model.Value;
import org.eclipse.rdf4j.model.impl.DynamicModelFactory;
import org.eclipse.rdf4j.sail.SailException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * An {@link SailSource} that keeps a delta of its state from a backing {@link SailSource}.
 *
 * @author James Leigh
 */
class SailSourceBranch implements SailSource {

	private static final Logger logger = LoggerFactory.getLogger(SailSourceBranch.class);

	/**
	 * Used to prevent changes to this object's field from multiple threads.
	 */
	private final ReentrantLock semaphore = new ReentrantLock();

	/**
	 * The difference between this {@link SailSource} and the backing {@link SailSource}.
	 */
	private final ArrayDeque<Changeset> changes = new ArrayDeque<>();

	/**
	 * {@link SailSink} that have been created, but not yet {@link SailSink#flush()}ed to this {@link SailSource}.
	 */
	private final Set<Changeset> pending = Collections
			.synchronizedSet(Collections.newSetFromMap(new IdentityHashMap<>()));

	/**
	 * Set of open {@link SailDataset} for this {@link SailSource}.
	 */
	private final Collection<SailDataset> observers = new ArrayList<>();

	/**
	 * The underly {@link SailSource} this {@link SailSource} is derived from.
	 */
	private final SailSource backingSource;

	/**
	 * The {@link Model} instances that should be used to store {@link SailSink#approve(Resource, IRI, Value, Resource)}
	 * and {@link SailSink#deprecate(Resource, IRI, Value, Resource)} statements.
	 */
	private final ModelFactory modelFactory;

	/**
	 * If this {@link SailSource} should be flushed to the backing {@link SailSource} when it is not in use.
	 */
	private final boolean autoFlush;

	/**
	 * Non-null when in {@link IsolationLevels#SNAPSHOT} (or higher) mode.
	 */
	private SailDataset snapshot;

	/**
	 * Non-null when in {@link IsolationLevels#SERIALIZABLE} (or higher) mode.
	 */
	private SailSink serializable;

	/**
	 * Non-null after {@link #prepare()}, but before {@link #flush()}.
	 */
	private SailSink prepared;

	/**
	 * Creates a new in-memory {@link SailSource} derived from the given {@link SailSource}.
	 *
	 * @param backingSource
	 */
	public SailSourceBranch(SailSource backingSource) {
		this(backingSource, new DynamicModelFactory(), false);
	}

	/**
	 * Creates a new {@link SailSource} derived from the given {@link SailSource}.
	 *
	 * @param backingSource
	 * @param modelFactory
	 */
	public SailSourceBranch(SailSource backingSource, ModelFactory modelFactory) {
		this(backingSource, modelFactory, false);
	}

	/**
	 * Creates a new {@link SailSource} derived from the given {@link SailSource} and if <code>autoFlush</code> is true,
	 * will automatically call {@link #flush()} when not in use.
	 *
	 * @param backingSource
	 * @param modelFactory
	 * @param autoFlush
	 */
	public SailSourceBranch(SailSource backingSource, ModelFactory modelFactory, boolean autoFlush) {
		this.backingSource = backingSource;
		this.modelFactory = modelFactory;
		this.autoFlush = autoFlush;
	}

	@Override
	public void close() throws SailException {
		semaphore.lock();
		try {
			try {
				try {
					SailDataset toCloseSnapshot = snapshot;
					snapshot = null;
					if (toCloseSnapshot != null) {
						toCloseSnapshot.close();
					}
				} finally {
					SailSink toCloseSerializable = serializable;
					serializable = null;
					if (toCloseSerializable != null) {
						toCloseSerializable.close();
					}
				}
			} finally {
				SailSink toClosePrepared = prepared;
				prepared = null;
				if (toClosePrepared != null) {
					toClosePrepared.close();
				}

			}
		} finally {
			semaphore.unlock();
		}
	}

	@Override
	public SailSink sink(IsolationLevel level) throws SailException {
		Changeset changeset = new Changeset() {

			private boolean prepared;

			@Override
			public void prepare() throws SailException {
				if (!prepared) {
					preparedChangeset(this);
					prepared = true;
				}
				super.prepare();
			}

			@Override
			public void flush() throws SailException {
				merge(this);
			}

			@Override
			public void close() throws SailException {
				try {
					// ��this�� Changeset should have been removed from `pending` already, unless we are rolling back a
					// transaction in which case we need to remove it when closing the Changeset.
					if (pending.contains(this)) {
						removeThisFromPendingWithoutCausingDeadlock();
					}
				} finally {
					try {
						super.close();
					} finally {
						if (prepared) {
							closeChangeset(this);
							prepared = false;
						}
						autoFlush();
					}
				}
			}

			/**
			 * The outer SailSourceBranch could be in use in a SERIALIZABLE transaction, so we don't want to cause any
			 * deadlocks by taking the ��semaphore�� if ��this�� Changeset is already in the process of being removed from
			 * ��pending��.
			 */
			private void removeThisFromPendingWithoutCausingDeadlock() {
				long tryLockMillis = 10;
				while (pending.contains(this)) {
					boolean locked = false;
					try {
						locked = semaphore.tryLock(tryLockMillis *= 2, TimeUnit.MILLISECONDS);
						if (locked) {
							pending.remove(this);
						}
					} catch (InterruptedException e) {
						Thread.currentThread().interrupt();
						throw new SailException(e);
					} finally {
						if (locked) {
							semaphore.unlock();
						}
					}

				}
			}

			@Override
			public Model createEmptyModel() {
				return modelFactory.createEmptyModel();
			}
		};
		try {
			semaphore.lock();
			pending.add(changeset);
		} finally {
			semaphore.unlock();
		}
		return changeset;
	}

	@Override
	public SailDataset dataset(IsolationLevel level) throws SailException {
		SailDataset dataset = new DelegatingSailDataset(derivedFromSerializable(level)) {

			@Override
			public void close() throws SailException {
				super.close();
				try {
					semaphore.lock();
					observers.remove(this);
					compressChanges();
					autoFlush();
				} finally {
					semaphore.unlock();
				}
			}
		};
		try {
			semaphore.lock();
			observers.add(dataset);
		} finally {
			semaphore.unlock();
		}
		return dataset;
	}

	@Override
	public SailSource fork() {
		return new SailSourceBranch(this, modelFactory);
	}

	@Override
	public void prepare() throws SailException {
		try {
			semaphore.lock();
			if (!changes.isEmpty()) {
				if (prepared == null && serializable == null) {
					prepared = backingSource.sink(IsolationLevels.NONE);
				} else if (prepared == null) {
					prepared = serializable;
				}
				prepare(prepared);
				prepared.prepare();
			}
		} finally {
			semaphore.unlock();
		}
	}

	@Override
	public void flush() throws SailException {
		try {
			semaphore.lock();
			if (!changes.isEmpty()) {
				if (prepared == null) {
					prepare();
				}
				flush(prepared);
				prepared.flush();
				try {
					if (prepared != serializable) {
						prepared.close();
					}
				} finally {
					prepared = null;
				}
			}
		} catch (SailException e) {
			// clear changes if flush fails
			changes.clear();
			prepared = null;
			throw e;
		} finally {
			semaphore.unlock();
		}
	}

	public boolean isChanged() {
		try {
			semaphore.lock();
			return !changes.isEmpty();
		} finally {
			semaphore.unlock();
		}
	}

	@Override
	public String toString() {
		return backingSource.toString() + "\n" + changes;
	}

	void preparedChangeset(Changeset changeset) {
		semaphore.lock();
	}

	void merge(Changeset change) {
		try {
			semaphore.lock();
			pending.remove(change);
			if (isChanged(change)) {
				Changeset merged;
				changes.add(change.shallowClone());
				compressChanges();
				merged = changes.getLast();

				// ��pending�� is a synchronized collection, so we should in theory use a synchronized block here to
				// protect our iterator. The ��semaphore�� is already protecting all writes, and we have already acquired
				// the ��semaphore��. Synchronizing on the ��pending�� collection could potentially lead to a deadlock when
				// closing a Changeset during rollback.
				for (Changeset c : pending) {
					c.prepend(merged);
				}
			}
		} finally {
			semaphore.unlock();
		}
	}

	void compressChanges() {
		try {
			semaphore.lock();
			while (changes.size() > 1) {
				Changeset pop = changes.removeLast();
				if (changes.peekLast().isRefback()) {
					changes.addLast(pop);
					break;
				}

				try {
					prepare(pop, changes.getLast());
					flush(pop, changes.getLast());
				} catch (SailException e) {
					// Changeset does not throw SailException
					throw new AssertionError(e);
				}
			}

		} finally {
			semaphore.unlock();
		}
	}

	void closeChangeset(Changeset changeset) {
		semaphore.unlock();
	}

	void autoFlush() throws SailException {
		if (autoFlush && semaphore.tryLock()) {
			try {
				if (observers.isEmpty()) {
					flush();
				}
			} finally {
				semaphore.unlock();
			}
		}
	}

	private boolean isChanged(Changeset change) {
		return change.isChanged();
	}

	private SailDataset derivedFromSerializable(IsolationLevel level) throws SailException {
		try {
			semaphore.lock();
			if (serializable == null && level.isCompatibleWith(IsolationLevels.SERIALIZABLE)) {
				serializable = backingSource.sink(level);
			}
			SailDataset derivedFrom = derivedFromSnapshot(level);
			if (serializable == null) {
				return derivedFrom;
			} else {
				return new ObservingSailDataset(derivedFrom, sink(level));
			}
		} finally {
			semaphore.unlock();
		}
	}

	private SailDataset derivedFromSnapshot(IsolationLevel level) throws SailException {
		try {
			semaphore.lock();
			SailDataset derivedFrom;
			if (this.snapshot != null) {
				// this object is already has at least snapshot isolation
				derivedFrom = new DelegatingSailDataset(this.snapshot) {

					@Override
					public void close() throws SailException {
						// don't close snapshot yet
					}
				};
			} else {
				derivedFrom = backingSource.dataset(level);
				if (level.isCompatibleWith(IsolationLevels.SNAPSHOT)) {
					this.snapshot = derivedFrom;
					// don't release snapshot until this SailSource is released
					derivedFrom = new DelegatingSailDataset(derivedFrom) {

						@Override
						public void close() throws SailException {
							// don't close snapshot yet
						}
					};
				}
			}
			Iterator<Changeset> iter = changes.iterator();
			while (iter.hasNext()) {
				derivedFrom = new SailDatasetImpl(derivedFrom, iter.next());
			}
			return derivedFrom;
		} finally {
			semaphore.unlock();
		}
	}

	private void prepare(SailSink sink) throws SailException {
		try {
			semaphore.lock();
			for (Changeset change : changes) {
				prepare(change, sink);
			}
		} finally {
			semaphore.unlock();
		}
	}

	private void prepare(Changeset change, SailSink sink) throws SailException {
		change.sinkObserved(sink);
	}

	private void flush(SailSink sink) throws SailException {
		try {
			semaphore.lock();
			if (changes.size() == 1 && !changes.getFirst().isRefback() && sink instanceof Changeset
					&& !isChanged((Changeset) sink)) {
				// one change to apply that is not in use to an empty Changeset
				Changeset dst = (Changeset) sink;
				dst.setChangeset(changes.pop());
			} else {
				Iterator<Changeset> iter = changes.iterator();
				while (iter.hasNext()) {
					flush(iter.next(), sink);
					iter.remove();
				}
			}
		} finally {
			semaphore.unlock();
		}
	}

	private void flush(Changeset change, SailSink sink) throws SailException {
		prepare(change, sink);
		if (change.isNamespaceCleared()) {
			sink.clearNamespaces();
		}
		Set<String> removedPrefixes = change.getRemovedPrefixes();
		if (removedPrefixes != null) {
			for (String prefix : removedPrefixes) {
				sink.removeNamespace(prefix);
			}
		}
		Map<String, String> addedNamespaces = change.getAddedNamespaces();
		if (addedNamespaces != null) {
			for (Map.Entry<String, String> e : addedNamespaces.entrySet()) {
				sink.setNamespace(e.getKey(), e.getValue());
			}
		}
		if (change.isStatementCleared()) {
			sink.clear();
		}
		Set<Resource> deprecatedContexts = change.getDeprecatedContexts();
		if (deprecatedContexts != null && !deprecatedContexts.isEmpty()) {
			sink.clear(deprecatedContexts.toArray(new Resource[0]));
		}

		change.sinkDeprecated(sink);
		change.sinkApproved(sink);
	}

}