ReadCommittedWrapper.java

/*******************************************************************************
 * Copyright (c) 2019 Eclipse RDF4J contributors.
 *
 * All rights reserved. This program and the accompanying materials
 * are made available under the terms of the Eclipse Distribution License v1.0
 * which accompanies this distribution, and is available at
 * http://www.eclipse.org/org/documents/edl-v10.php.
 *
 * SPDX-License-Identifier: BSD-3-Clause
 *******************************************************************************/
package org.eclipse.rdf4j.sail.extensiblestore;

import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.stream.Collectors;

import org.eclipse.rdf4j.common.iteration.CloseableIteration;
import org.eclipse.rdf4j.common.iteration.EmptyIteration;
import org.eclipse.rdf4j.common.iteration.LookAheadIteration;
import org.eclipse.rdf4j.common.iteration.SingletonIteration;
import org.eclipse.rdf4j.common.order.StatementOrder;
import org.eclipse.rdf4j.model.IRI;
import org.eclipse.rdf4j.model.Resource;
import org.eclipse.rdf4j.model.Statement;
import org.eclipse.rdf4j.model.Value;
import org.eclipse.rdf4j.model.impl.SimpleValueFactory;
import org.eclipse.rdf4j.sail.SailException;
import org.eclipse.rdf4j.sail.extensiblestore.valuefactory.ExtensibleStatement;
import org.eclipse.rdf4j.sail.extensiblestore.valuefactory.ExtensibleStatementHelper;

/**
 * @author H��vard Mikkelsen Ottestad
 */
class ReadCommittedWrapper implements DataStructureInterface {

	private final DataStructureInterface dataStructure;

	private Map<ExtensibleStatement, ExtensibleStatement> internalAdded = new HashMap<>();
	private Map<ExtensibleStatement, ExtensibleStatement> internalRemoved = new HashMap<>();

	ReadCommittedWrapper(DataStructureInterface dataStructure) {
		this.dataStructure = dataStructure;
	}

	@Override
	public void addStatement(ExtensibleStatement statement) {
		ExtensibleStatement put = internalAdded.put(statement, statement);
		if (put == null) {
			internalRemoved.remove(statement);
		}
	}

	@Override
	public void removeStatement(ExtensibleStatement statement) {
		ExtensibleStatement put = internalRemoved.put(statement, statement);
		if (put == null) {
			internalAdded.remove(statement);
		}

	}

	@Override
	public CloseableIteration<? extends ExtensibleStatement> getStatements(Resource subject,
			IRI predicate, Value object, boolean inferred, Resource... context) {

		// must match single statement
		if (subject != null && predicate != null && object != null && context != null && context.length == 1) {
			Statement statement = SimpleValueFactory.getInstance()
					.createStatement(subject, predicate, object, context[0]);

			statement = ExtensibleStatementHelper.getDefaultImpl().fromStatement(statement, inferred);

			ExtensibleStatement extensibleStatement = internalAdded.get(statement);

			if (extensibleStatement != null) {
				return new SingletonIteration<>(extensibleStatement);
			} else {
				if (internalRemoved.containsKey(statement)) {
					return new EmptyIteration<>();
				} else {
					synchronized (dataStructure) {
						return dataStructure.getStatements(subject, predicate, object, inferred, context);
					}
				}
			}

		} else {
			synchronized (dataStructure) {

				return new LookAheadIteration<>() {

					final Set<ExtensibleStatement> internalAddedLocal = new HashSet<>(internalAdded.values());
					final Set<ExtensibleStatement> internalRemovedLocal = new HashSet<>(internalRemoved.values());

					final Iterator<ExtensibleStatement> left = internalAddedLocal.stream()
							.filter(statement -> {

								if (subject != null && !statement.getSubject().equals(subject)) {
									return false;
								}
								if (predicate != null && !statement.getPredicate().equals(predicate)) {
									return false;
								}
								if (object != null && !statement.getObject().equals(object)) {
									return false;
								}
								if (context != null && context.length > 0
										&& !containsContext(context, statement.getContext())) {
									return false;
								}

								if (!inferred && inferred != statement.isInferred()) {
									return false;
								}

								return true;
							})
							.iterator();

					final CloseableIteration<? extends ExtensibleStatement> right = dataStructure
							.getStatements(
									subject, predicate, object, inferred, context);

					@Override
					protected void handleClose() throws SailException {
						right.close();
					}

					@Override
					protected ExtensibleStatement getNextElement() throws SailException {

						ExtensibleStatement next = null;

						do {
							ExtensibleStatement tempNext = null;
							if (left.hasNext()) {
								tempNext = left.next();
							} else if (right.hasNext()) {
								tempNext = right.next();

								if (!internalAddedLocal.isEmpty() && internalAddedLocal.contains(tempNext)) {
									tempNext = null;
								}
							}

							if (tempNext != null) {
								if (internalRemovedLocal.isEmpty() || !internalRemovedLocal.contains(tempNext)) {
									next = tempNext;
								}
							}

						} while (next == null && (left.hasNext() || right.hasNext()));

						return next;

					}

				};

			}
		}

	}

	@Override
	public CloseableIteration<? extends ExtensibleStatement> getStatements(StatementOrder statementOrder,
			Resource subject,
			IRI predicate, Value object, boolean inferred, Resource... context) {

		// must match single statement
		if (subject != null && predicate != null && object != null && context != null && context.length == 1) {
			Statement statement = SimpleValueFactory.getInstance()
					.createStatement(subject, predicate, object, context[0]);

			statement = ExtensibleStatementHelper.getDefaultImpl().fromStatement(statement, inferred);

			ExtensibleStatement extensibleStatement = internalAdded.get(statement);

			if (extensibleStatement != null) {
				return new SingletonIteration<>(extensibleStatement);
			} else {
				if (internalRemoved.containsKey(statement)) {
					return new EmptyIteration<>();
				} else {
					synchronized (dataStructure) {
						return dataStructure.getStatements(statementOrder, subject, predicate, object, inferred,
								context);
					}
				}
			}

		} else {
			synchronized (dataStructure) {

				return new LookAheadIteration<>() {

					Comparator<Statement> statementComparator = statementOrder
							.getComparator(dataStructure.getComparator());

					final TreeSet<ExtensibleStatement> internalAddedLocal = new TreeSet<>(statementComparator);

					{
						internalAddedLocal.addAll(internalAdded.values());
					}

					final Set<ExtensibleStatement> internalRemovedLocal = new HashSet<>(internalRemoved.values());

					final Iterator<ExtensibleStatement> left = internalAddedLocal.stream()
							.filter(statement -> {

								if (subject != null && !statement.getSubject().equals(subject)) {
									return false;
								}
								if (predicate != null && !statement.getPredicate().equals(predicate)) {
									return false;
								}
								if (object != null && !statement.getObject().equals(object)) {
									return false;
								}
								if (context != null && context.length > 0
										&& !containsContext(context, statement.getContext())) {
									return false;
								}

								if (!inferred && inferred != statement.isInferred()) {
									return false;
								}

								return true;
							})
							.iterator();

					final CloseableIteration<? extends ExtensibleStatement> right = dataStructure
							.getStatements(statementOrder, subject, predicate, object, inferred, context);

					ExtensibleStatement nextLeft = null;
					ExtensibleStatement nextRight = null;

					@Override
					protected void handleClose() throws SailException {
						right.close();
					}

					@Override
					protected ExtensibleStatement getNextElement() throws SailException {

						ExtensibleStatement next = null;

						do {
							ExtensibleStatement tempNext = null;

							if (nextLeft == null && left.hasNext()) {
								nextLeft = left.next();
							}

							if (nextRight == null && right.hasNext()) {
								nextRight = right.next();
							}

							if (nextLeft != null && nextRight != null) {
								int compare = statementComparator.compare(nextLeft, nextRight);

								if (compare <= 0) {
									tempNext = nextLeft;
									nextLeft = null;
								} else {
									tempNext = nextRight;
									nextRight = null;
									if (!internalAddedLocal.isEmpty() && internalAddedLocal.contains(tempNext)) {
										tempNext = null;
									}
								}
							} else if (nextLeft != null) {
								tempNext = nextLeft;
								nextLeft = null;
							} else if (nextRight != null) {
								tempNext = nextRight;
								nextRight = null;
								if (!internalAddedLocal.isEmpty() && internalAddedLocal.contains(tempNext)) {
									tempNext = null;
								}
							}

							if (tempNext != null) {
								if (internalRemovedLocal.isEmpty() || !internalRemovedLocal.contains(tempNext)) {
									next = tempNext;
								}
							}

						} while (next == null && (left.hasNext() || right.hasNext()));

						return next;

					}

				};

			}
		}

	}

	private static boolean containsContext(Resource[] haystack, Resource needle) {
		for (Resource resource : haystack) {
			if (resource == null && needle == null) {
				return true;
			}
			if (resource != null && resource.equals(needle)) {
				return true;
			}
		}
		return false;
	}

	@Override
	public void flushForCommit() {

		if (internalAdded.isEmpty() && internalRemoved.isEmpty()) {
			return;
		}

		List<ExtensibleStatement> internalAddedEffective = internalAdded
				.keySet()
				.stream()
				.filter(statement -> !internalRemoved.containsKey(statement))
				.collect(Collectors.toList());

		synchronized (dataStructure) {
			internalAddedEffective.forEach(dataStructure::addStatement);
			internalRemoved.values().forEach(dataStructure::removeStatement);
			dataStructure.flushForReading();
		}

		internalAdded = new HashMap<>();
		internalRemoved = new HashMap<>();

	}

	@Override
	public void flushForReading() {
	}

	@Override
	public Comparator<Value> getComparator() {
		return dataStructure.getComparator();
	}

	@Override
	public Set<StatementOrder> getSupportedOrders(Resource subj, IRI pred, Value obj, boolean inferred,
			Resource... contexts) {
		return dataStructure.getSupportedOrders(subj, pred, obj, inferred, contexts);
	}

	@Override
	public void init() {
		dataStructure.init();
	}

}