SailDatasetImpl.java

/*******************************************************************************
 * Copyright (c) 2015 Eclipse RDF4J contributors, Aduna, and others.
 *
 * All rights reserved. This program and the accompanying materials
 * are made available under the terms of the Eclipse Distribution License v1.0
 * which accompanies this distribution, and is available at
 * http://www.eclipse.org/org/documents/edl-v10.php.
 *
 * SPDX-License-Identifier: BSD-3-Clause
 *******************************************************************************/
package org.eclipse.rdf4j.sail.base;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.function.Function;

import org.eclipse.rdf4j.common.iteration.AbstractCloseableIteration;
import org.eclipse.rdf4j.common.iteration.CloseableIteration;
import org.eclipse.rdf4j.common.iteration.CloseableIteratorIteration;
import org.eclipse.rdf4j.common.iteration.DistinctIteration;
import org.eclipse.rdf4j.common.iteration.DualUnionIteration;
import org.eclipse.rdf4j.common.iteration.EmptyIteration;
import org.eclipse.rdf4j.common.iteration.FilterIteration;
import org.eclipse.rdf4j.model.IRI;
import org.eclipse.rdf4j.model.Namespace;
import org.eclipse.rdf4j.model.Resource;
import org.eclipse.rdf4j.model.Statement;
import org.eclipse.rdf4j.model.Triple;
import org.eclipse.rdf4j.model.Value;
import org.eclipse.rdf4j.model.impl.SimpleNamespace;
import org.eclipse.rdf4j.sail.SailException;

/**
 * A view of an {@link SailSource} that is derived from a backing {@link SailDataset}.
 *
 * @author James Leigh
 */
class SailDatasetImpl implements SailDataset {

	private static final EmptyIteration<Triple> TRIPLE_EMPTY_ITERATION = new EmptyIteration<>();
	private static final EmptyIteration<Namespace> NAMESPACES_EMPTY_ITERATION = new EmptyIteration<>();
	private static final EmptyIteration<Statement> STATEMENT_EMPTY_ITERATION = new EmptyIteration<>();

	/**
	 * {@link SailDataset} of the backing {@link SailSource}.
	 */
	private final SailDataset derivedFrom;

	/**
	 * Changes that have not yet been {@link SailSource#flush()}ed to the backing {@link SailDataset}.
	 */
	private final Changeset changes;

	/**
	 * Create a derivative dataset that applies the given changeset. The life cycle of this and the given
	 * {@link SailDataset} are bound.
	 *
	 * @param derivedFrom will be released when this object is released
	 * @param changes     changeset to be observed with the given dataset
	 */
	public SailDatasetImpl(SailDataset derivedFrom, Changeset changes) {
		this.derivedFrom = derivedFrom;
		this.changes = changes;
		changes.addRefback(this);
	}

	@Override
	public String toString() {
		return changes + "\n" + derivedFrom;
	}

	@Override
	public void close() throws SailException {
		changes.removeRefback(this);
		derivedFrom.close();
	}

	@Override
	public String getNamespace(String prefix) throws SailException {
		Map<String, String> addedNamespaces = changes.getAddedNamespaces();
		if (addedNamespaces != null && addedNamespaces.containsKey(prefix)) {
			return addedNamespaces.get(prefix);
		}
		Set<String> removedPrefixes = changes.getRemovedPrefixes();
		if (removedPrefixes != null && removedPrefixes.contains(prefix) || changes.isNamespaceCleared()) {
			return null;
		}
		return derivedFrom.getNamespace(prefix);
	}

	@Override
	public CloseableIteration<? extends Namespace> getNamespaces() throws SailException {
		final CloseableIteration<? extends Namespace> namespaces;
		if (changes.isNamespaceCleared()) {
			namespaces = NAMESPACES_EMPTY_ITERATION;
		} else {
			namespaces = derivedFrom.getNamespaces();
		}
		Iterator<Map.Entry<String, String>> added = null;
		Set<String> removed;
		synchronized (this) {
			Map<String, String> addedNamespaces = changes.getAddedNamespaces();
			if (addedNamespaces != null) {
				added = addedNamespaces.entrySet().iterator();
			}
			removed = changes.getRemovedPrefixes();
		}
		if (added == null && removed == null) {
			return namespaces;
		}
		final Iterator<Map.Entry<String, String>> addedIter = added;
		final Set<String> removedSet = removed;
		return new AbstractCloseableIteration<>() {

			volatile Namespace next;

			@Override
			public boolean hasNext() throws SailException {
				if (isClosed()) {
					return false;
				}
				if (addedIter != null && addedIter.hasNext()) {
					return true;
				}
				Namespace toCheckNext = next;
				while (toCheckNext == null && namespaces.hasNext()) {
					toCheckNext = next = namespaces.next();
					if (removedSet != null && removedSet.contains(toCheckNext.getPrefix())) {
						toCheckNext = next = null;
					}
				}
				return toCheckNext != null;
			}

			@Override
			public Namespace next() throws SailException {
				if (isClosed()) {
					throw new NoSuchElementException("The iteration has been closed.");
				}
				if (addedIter != null && addedIter.hasNext()) {
					Entry<String, String> e = addedIter.next();
					return new SimpleNamespace(e.getKey(), e.getValue());
				}
				try {
					if (hasNext()) {
						Namespace toCheckNext = next;
						if (toCheckNext != null) {
							return toCheckNext;
						}
					}
					close();
					throw new NoSuchElementException("The iteration has been closed.");
				} finally {
					next = null;
				}
			}

			@Override
			public void remove() {
				throw new UnsupportedOperationException();
			}

			@Override
			public void handleClose() throws SailException {
				namespaces.close();
			}
		};
	}

	@Override
	public CloseableIteration<? extends Resource> getContextIDs() throws SailException {
		final CloseableIteration<? extends Resource> contextIDs;
		contextIDs = derivedFrom.getContextIDs();
		Iterator<Resource> added = null;
		Set<Resource> removed = null;
		synchronized (this) {
			Set<Resource> approvedContexts = changes.getApprovedContexts();
			if (approvedContexts != null) {
				added = approvedContexts.iterator();
			}
			Set<Resource> deprecatedContexts = changes.getDeprecatedContexts();
			if (deprecatedContexts != null) {
				removed = deprecatedContexts;
			}
		}
		if (added == null && removed == null) {
			return contextIDs;
		}
		final Iterator<Resource> addedIter = added;
		final Set<Resource> removedSet = removed;

		return new AbstractCloseableIteration<>() {

			volatile Resource next;

			@Override
			public boolean hasNext() throws SailException {
				if (isClosed()) {
					return false;
				}
				if (addedIter != null && addedIter.hasNext()) {
					return true;
				}
				Resource toCheckNext = next;
				while (toCheckNext == null && contextIDs.hasNext()) {
					toCheckNext = next = contextIDs.next();
					if (removedSet != null && removedSet.contains(toCheckNext)) {
						toCheckNext = next = null;
					}
				}
				return toCheckNext != null;
			}

			@Override
			public Resource next() throws SailException {
				if (isClosed()) {
					throw new NoSuchElementException("The iteration has been closed.");
				}
				if (addedIter != null && addedIter.hasNext()) {
					return addedIter.next();
				}
				try {
					if (hasNext()) {
						Resource toCheckNext = next;
						if (toCheckNext != null) {
							return toCheckNext;
						}
					}
					close();
					throw new NoSuchElementException("The iteration has been closed.");
				} finally {
					next = null;
				}
			}

			@Override
			public void remove() throws SailException {
				throw new UnsupportedOperationException();
			}

			@Override
			public void handleClose() throws SailException {
				contextIDs.close();
			}
		};
	}

	@Override
	public CloseableIteration<? extends Statement> getStatements(Resource subj, IRI pred, Value obj,
			Resource... contexts) throws SailException {
		Set<Resource> deprecatedContexts = changes.getDeprecatedContexts();
		CloseableIteration<? extends Statement> iter;
		if (changes.isStatementCleared()
				|| contexts == null && deprecatedContexts != null && deprecatedContexts.contains(null)
				|| contexts != null && contexts.length > 0 && deprecatedContexts != null
						&& deprecatedContexts.containsAll(Arrays.asList(contexts))) {
			iter = null;
		} else if (contexts != null && contexts.length > 0 && deprecatedContexts != null) {
			List<Resource> remaining = new ArrayList<>(Arrays.asList(contexts));
			remaining.removeAll(deprecatedContexts);
			iter = derivedFrom.getStatements(subj, pred, obj, remaining.toArray(new Resource[0]));
		} else {
			iter = derivedFrom.getStatements(subj, pred, obj, contexts);
		}
		if (changes.hasDeprecated() && iter != null) {
			iter = difference(iter, changes::hasDeprecated);
		}

		if (changes.hasApproved() && iter != null) {

			return new DistinctModelReducingUnionIteration(
					iter,
					changes::removeApproved,
					() -> changes.getApprovedStatements(subj, pred, obj, contexts));

		} else if (changes.hasApproved()) {
			Iterator<Statement> i = changes.getApprovedStatements(subj, pred, obj, contexts).iterator();
			return new CloseableIteratorIteration<>(i);
		} else if (iter != null) {
			return iter;
		} else {
			return STATEMENT_EMPTY_ITERATION;
		}
	}

	@Override
	public CloseableIteration<? extends Triple> getTriples(Resource subj, IRI pred, Value obj)
			throws SailException {

		CloseableIteration<? extends Triple> iter;
		if (changes.isStatementCleared()) {
			// nothing in the backing source is relevant, but we may still need to return approved data
			// from the changeset
			iter = null;
		} else {
			iter = derivedFrom.getTriples(subj, pred, obj);
		}

		if (changes.hasDeprecated() && iter != null) {
			iter = triplesDifference(iter, triple -> isDeprecated(triple, changes.getDeprecatedStatements()));
		}

		if (changes.hasApproved()) {
			if (iter != null) {
				CloseableIteratorIteration<? extends Triple> tripleExceptionCloseableIteratorIteration = new CloseableIteratorIteration<>(
						changes.getApprovedTriples(subj, pred, obj).iterator());

				// merge newly approved triples in the changeset with data from the backing source
				// TODO: see if use of collection factory is possible here.
				return new DistinctIteration<>(
						DualUnionIteration.getWildcardInstance(iter, tripleExceptionCloseableIteratorIteration),
						new HashSet<>());
			}

			// nothing relevant in the backing source, just return all matching approved triples from the changeset
			return new CloseableIteratorIteration<>(changes.getApprovedTriples(subj, pred, obj).iterator());
		} else if (iter != null) {
			return iter;
		} else {
			return TRIPLE_EMPTY_ITERATION;
		}
	}

	private CloseableIteration<? extends Statement> difference(
			CloseableIteration<? extends Statement> result, Function<Statement, Boolean> excluded) {
		return new FilterIteration<Statement>(result) {

			@Override
			protected boolean accept(Statement stmt) {
				return !excluded.apply(stmt);
			}

			@Override
			protected void handleClose() {

			}
		};
	}

	private CloseableIteration<? extends Triple> triplesDifference(
			CloseableIteration<? extends Triple> result, Function<Triple, Boolean> excluded) {
		return new FilterIteration<Triple>(result) {

			@Override
			protected boolean accept(Triple stmt) {
				return !excluded.apply(stmt);
			}

			@Override
			protected void handleClose() {

			}
		};
	}

	private boolean isDeprecated(Triple triple, List<Statement> deprecatedStatements) {
		// the triple is deprecated if the changeset deprecates all existing statements in the backing dataset that
		// involve this triple.
		try (CloseableIteration<? extends Statement> subjectStatements = derivedFrom
				.getStatements(triple, null, null)) {
			while (subjectStatements.hasNext()) {
				Statement st = subjectStatements.next();
				if (!deprecatedStatements.contains(st)) {
					return false;
				}
			}
		}
		try (CloseableIteration<? extends Statement> objectStatements = derivedFrom
				.getStatements(null, null, triple)) {
			while (objectStatements.hasNext()) {
				Statement st = objectStatements.next();
				if (!deprecatedStatements.contains(st)) {
					return false;
				}
			}
		}
		return true;
	}
}