InnerMergeJoinIterator.java

/*******************************************************************************
 * Copyright (c) 2023 Eclipse RDF4J contributors.
 *
 * All rights reserved. This program and the accompanying materials
 * are made available under the terms of the Eclipse Distribution License v1.0
 * which accompanies this distribution, and is available at
 * http://www.eclipse.org/org/documents/edl-v10.php.
 *
 * SPDX-License-Identifier: BSD-3-Clause
 ******************************************************************************/

package org.eclipse.rdf4j.query.algebra.evaluation.iterator;

import java.util.Comparator;
import java.util.NoSuchElementException;
import java.util.function.Function;

import org.eclipse.rdf4j.common.annotation.Experimental;
import org.eclipse.rdf4j.common.iteration.CloseableIteration;
import org.eclipse.rdf4j.model.Value;
import org.eclipse.rdf4j.query.Binding;
import org.eclipse.rdf4j.query.BindingSet;
import org.eclipse.rdf4j.query.MutableBindingSet;
import org.eclipse.rdf4j.query.QueryEvaluationException;
import org.eclipse.rdf4j.query.algebra.evaluation.ArrayBindingSet;
import org.eclipse.rdf4j.query.algebra.evaluation.QueryEvaluationStep;
import org.eclipse.rdf4j.query.algebra.evaluation.impl.QueryEvaluationContext;

/**
 * @author H��vard M. Ottestad
 */
@Experimental
public class InnerMergeJoinIterator implements CloseableIteration<BindingSet> {

	private final PeekMarkIterator<BindingSet> leftIterator;
	private final PeekMarkIterator<BindingSet> rightIterator;
	private final Comparator<Value> cmp;
	private final Function<BindingSet, Value> valueFunction;
	private final QueryEvaluationContext context;

	private BindingSet next;
	private BindingSet currentLeft;
	private Value currentLeftValue;
	private Value leftPeekValue;

	// -1 for unset, 0 for equal, 1 for different
	int currentLeftValueAndPeekEquals = -1;

	private boolean closed = false;

	InnerMergeJoinIterator(CloseableIteration<BindingSet> leftIterator, CloseableIteration<BindingSet> rightIterator,
			Comparator<Value> cmp, Function<BindingSet, Value> valueFunction, QueryEvaluationContext context)
			throws QueryEvaluationException {

		this.leftIterator = new PeekMarkIterator<>(leftIterator);
		this.rightIterator = new PeekMarkIterator<>(rightIterator);
		this.cmp = cmp;
		this.valueFunction = valueFunction;
		this.context = context;
	}

	public static CloseableIteration<BindingSet> getInstance(QueryEvaluationStep leftPrepared,
			QueryEvaluationStep preparedRight, BindingSet bindings, Comparator<Value> cmp,
			Function<BindingSet, Value> value, QueryEvaluationContext context) {

		CloseableIteration<BindingSet> leftIter = leftPrepared.evaluate(bindings);
		if (leftIter == QueryEvaluationStep.EMPTY_ITERATION) {
			return leftIter;
		}

		CloseableIteration<BindingSet> rightIter = preparedRight.evaluate(bindings);
		if (rightIter == QueryEvaluationStep.EMPTY_ITERATION) {
			leftIter.close();
			return rightIter;
		}

		return new InnerMergeJoinIterator(leftIter, rightIter, cmp, value, context);
	}

	private BindingSet join(BindingSet left, BindingSet right, boolean createNewBindingSet) {
		MutableBindingSet joined;
		if (!createNewBindingSet && left instanceof MutableBindingSet) {
			joined = (MutableBindingSet) left;
		} else {
			joined = context.createBindingSet(left);
		}
		if (joined instanceof ArrayBindingSet && right instanceof ArrayBindingSet) {
			((ArrayBindingSet) joined).addAll((ArrayBindingSet) right);
			return joined;
		}

		for (Binding binding : right) {
			if (!joined.hasBinding(binding.getName())) {
				joined.addBinding(binding);
			}
		}
		return joined;
	}

	private void calculateNext() {
		if (next != null) {
			return;
		}

		if (currentLeft == null && leftIterator.hasNext()) {
			currentLeft = leftIterator.next();
			currentLeftValue = null;
			leftPeekValue = null;
			currentLeftValueAndPeekEquals = -1;

		}

		if (currentLeft == null) {
			return;
		}

		loop();

	}

	private void loop() {
		while (next == null) {
			if (rightIterator.hasNext()) {
				BindingSet peekRight = rightIterator.peek();

				if (currentLeftValue == null) {
					currentLeftValue = valueFunction.apply(currentLeft);
					leftPeekValue = null;
					currentLeftValueAndPeekEquals = -1;
				}

				int compare = compare(currentLeftValue, valueFunction.apply(peekRight));

				if (compare == 0) {
					equal();
					return;
				} else if (compare < 0) {
					// leftIterator is behind, or in other words, rightIterator is ahead
					if (leftIterator.hasNext()) {
						lessThan();
					} else {
						close();
						return;
					}
				} else {
					// rightIterator is behind, skip forward
					rightIterator.next();

				}

			} else if (rightIterator.isResettable() && leftIterator.hasNext()) {
				rightIterator.reset();
				currentLeft = leftIterator.next();
				currentLeftValue = null;
				leftPeekValue = null;
				currentLeftValueAndPeekEquals = -1;
			} else {
				close();
				return;
			}

		}
	}

	private int compare(Value left, Value right) {
		int compareTo;

		if (left == right) {
			compareTo = 0;
		} else {
			compareTo = cmp.compare(left, right);
		}
		return compareTo;
	}

	private void lessThan() {
		Value oldLeftValue = currentLeftValue;
		currentLeft = leftIterator.next();
		if (leftPeekValue != null) {
			currentLeftValue = leftPeekValue;
		} else {
			currentLeftValue = valueFunction.apply(currentLeft);
		}

		leftPeekValue = null;
		currentLeftValueAndPeekEquals = -1;

		if (oldLeftValue.equals(currentLeftValue)) {
			// we have duplicate keys on the leftIterator and need to reset the rightIterator (if it
			// is resettable)
			if (rightIterator.isResettable()) {
				rightIterator.reset();
			}
		} else {
			rightIterator.unmark();
		}
	}

	private void equal() {
		if (rightIterator.isResettable()) {
			next = join(currentLeft, rightIterator.next(), true);
		} else {
			doLeftPeek();

			if (currentLeftValueAndPeekEquals == 0) {
				rightIterator.mark();
				next = join(currentLeft, rightIterator.next(), true);
			} else {
				next = join(rightIterator.next(), currentLeft, false);
			}
		}
	}

	private void doLeftPeek() {
		if (leftPeekValue == null) {
			BindingSet leftPeek = leftIterator.peek();
			leftPeekValue = leftPeek != null ? valueFunction.apply(leftPeek) : null;
			currentLeftValueAndPeekEquals = -1;
		}

		if (currentLeftValueAndPeekEquals == -1) {
			boolean equals = currentLeftValue.equals(leftPeekValue);
			if (equals) {
				currentLeftValue = leftPeekValue;
				currentLeftValueAndPeekEquals = 0;
			} else {
				currentLeftValueAndPeekEquals = 1;
			}
		}
	}

	@Override
	public final boolean hasNext() {
		if (isClosed()) {
			return false;
		}

		calculateNext();

		return next != null;
	}

	@Override
	public final BindingSet next() {
		if (isClosed()) {
			throw new NoSuchElementException("The iteration has been closed.");
		}
		calculateNext();

		if (next == null) {
			close();
		}

		BindingSet result = next;

		if (result != null) {
			next = null;
			return result;
		} else {
			throw new NoSuchElementException();
		}
	}

	/**
	 * Throws an {@link UnsupportedOperationException}.
	 */
	@Override
	public void remove() {
		throw new UnsupportedOperationException();
	}

	/**
	 * Checks whether this CloseableIteration has been closed.
	 *
	 * @return <var>true</var> if the CloseableIteration has been closed, <var>false</var> otherwise.
	 */
	public final boolean isClosed() {
		return closed;
	}

	@Override
	public final void close() {
		if (!closed) {
			closed = true;
			try {
				leftIterator.close();
			} finally {
				rightIterator.close();
			}
		}
	}
}