DistinctModelReducingUnionIteration.java

/*******************************************************************************
 * Copyright (c) 2019 Eclipse RDF4J contributors.
 *
 * All rights reserved. This program and the accompanying materials
 * are made available under the terms of the Eclipse Distribution License v1.0
 * which accompanies this distribution, and is available at
 * http://www.eclipse.org/org/documents/edl-v10.php.
 *
 * SPDX-License-Identifier: BSD-3-Clause
 *******************************************************************************/
package org.eclipse.rdf4j.sail.base;

import java.util.Iterator;
import java.util.function.Consumer;
import java.util.function.Supplier;

import org.eclipse.rdf4j.common.iteration.CloseableIteration;
import org.eclipse.rdf4j.common.iteration.LookAheadIteration;
import org.eclipse.rdf4j.model.Statement;
import org.eclipse.rdf4j.sail.SailException;

/**
 * <p>
 * An Iteration that takes two source. An iterator and a model.
 * </p>
 * <p>
 * For every statement returned by the iterator, that statement is removed from the model. Once the iterator is
 * exhausted, a new filteredStatementsIterator is created by applying the filterable function to the model.
 * </p>
 * <p>
 * The point of this iteration is to create a distinct iterator that produces only distinct results in a lazy and
 * mutable manner. This is useful when iterating in a transaction, since the user may have added duplicate statements.
 * On a potential second iteration there will be no need for further deduplication, since the initial deduplication was
 * mutable.
 * </p>
 * <p>
 * Model will throw a ConcurrentModificationException if two threads call .remove(...) at the same time or one thread
 * calls .next() on an iterator while another calls .remove(...). This is resolved by synchronizing access to the model
 * and by consuming the entire iterator into an ArrayList, effectively caching the filtered part of the model in memory.
 * There is no overflow to disk for this cache.
 * </p>
 **/
public class DistinctModelReducingUnionIteration extends LookAheadIteration<Statement> {

	private final CloseableIteration<? extends Statement> iterator;
	private final Consumer<Statement> approvedRemover;
	private final Supplier<Iterable<Statement>> approvedSupplier;

	DistinctModelReducingUnionIteration(CloseableIteration<? extends Statement> iterator,
			Consumer<Statement> approvedRemover,
			Supplier<Iterable<Statement>> approvedSupplier) {
		this.iterator = iterator;
		this.approvedRemover = approvedRemover;
		this.approvedSupplier = approvedSupplier;
	}

	private Iterator<? extends Statement> filteredStatementsIterator;

	@Override
	protected Statement getNextElement() throws SailException {
		Statement next = null;

		// first run through the statements from the base store
		if (iterator.hasNext()) {
			next = iterator.next();

			// remove the statement from the approved model in the Changeset, in case the approved model has a duplicate
			approvedRemover.accept(next);
		} else {
			// we have now exhausted the base store and will start returning data added in this transaction but not yet
			// committed, eg. approved model in the Changeset

			if (filteredStatementsIterator == null) {
				filteredStatementsIterator = approvedSupplier.get().iterator();
			}

			if (filteredStatementsIterator.hasNext()) {
				next = filteredStatementsIterator.next();
			}
		}

		return next;
	}

	@Override
	protected void handleClose() throws SailException {
		iterator.close();
	}

}