JoinExecutorBase.java
/*******************************************************************************
* Copyright (c) 2015 Eclipse RDF4J contributors, Aduna, and others.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Distribution License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: BSD-3-Clause
*******************************************************************************/
package org.eclipse.rdf4j.repository.sparql.federation;
import org.eclipse.rdf4j.common.iteration.AbstractCloseableIteration;
import org.eclipse.rdf4j.common.iteration.CloseableIteration;
import org.eclipse.rdf4j.common.iteration.EmptyIteration;
import org.eclipse.rdf4j.common.iteration.LookAheadIteration;
import org.eclipse.rdf4j.query.BindingSet;
import org.eclipse.rdf4j.query.QueryEvaluationException;
import org.eclipse.rdf4j.query.algebra.TupleExpr;
import org.eclipse.rdf4j.query.impl.QueueCursor;
/**
* Base class for any join parallel join executor. Note that this class extends {@link LookAheadIteration} and thus any
* implementation of this class is applicable for pipelining when used in a different thread (access to shared variables
* is synchronized).
*
* @author Andreas Schwarte
*/
public abstract class JoinExecutorBase<T> extends LookAheadIteration<T> {
/**
* @deprecated No replacement, don't use static shared int variables.
*/
protected static int NEXT_JOIN_ID = 1;
/* Constants */
protected final TupleExpr rightArg; // the right argument for the join
protected final BindingSet bindings; // the bindings
protected final CloseableIteration<T> leftIter;
protected volatile CloseableIteration<T> rightIter;
/**
* @deprecated Use {@link AbstractCloseableIteration#isClosed()} instead.
*/
protected volatile boolean closed = false;
/**
* @deprecated Use {@link #isFinished()} instead.
*/
protected volatile boolean finished = false;
protected final QueueCursor<CloseableIteration<T>> rightQueue = new QueueCursor<>(1024);
protected JoinExecutorBase(CloseableIteration<T> leftIter, TupleExpr rightArg,
BindingSet bindings) throws QueryEvaluationException {
this.leftIter = leftIter;
this.rightArg = rightArg;
this.bindings = bindings;
}
public final void run() {
try {
handleBindings();
} catch (Exception e) {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
toss(e);
} finally {
finished = true;
rightQueue.done();
}
}
/**
* Implementations must implement this method to handle bindings. Use the following as a template <code>
* while (!closed && leftIter.hasNext()) {
* // your code
* }
* </code> and add results to rightQueue. Note that addResult() is implemented synchronized and thus thread safe. In
* case you can guarantee sequential access, it is also possible to directly access rightQueue
*/
protected abstract void handleBindings() throws Exception;
public void addResult(CloseableIteration<T> res) {
/* optimization: avoid adding empty results */
if (res instanceof EmptyIteration) {
return;
}
try {
rightQueue.put(res);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Error adding element to right queue", e);
}
}
public void done() {
// no-op
}
public void toss(Exception e) {
rightQueue.toss(e);
}
@Override
public T getNextElement() throws QueryEvaluationException {
// TODO check if we need to protect rightQueue from synchronized access
// wasn't done in the original implementation either
// if we see any weird behavior check here !!
while (rightIter != null || rightQueue.hasNext()) {
CloseableIteration<T> nextRightIter = rightIter;
if (nextRightIter == null) {
nextRightIter = rightIter = rightQueue.next();
}
if (nextRightIter != null) {
if (nextRightIter.hasNext()) {
return nextRightIter.next();
} else {
rightIter = null;
nextRightIter.close();
}
}
}
return null;
}
@Override
public void handleClose() throws QueryEvaluationException {
closed = true;
try {
rightQueue.close();
} finally {
try {
CloseableIteration<T> toCloseRightIter = rightIter;
rightIter = null;
if (toCloseRightIter != null) {
toCloseRightIter.close();
}
} finally {
CloseableIteration<T> toCloseLeftIter = leftIter;
if (toCloseLeftIter != null) {
toCloseLeftIter.close();
}
}
}
}
/**
* Gets whether this executor is finished or aborted.
*
* @return true if this executor is finished or aborted
*/
public boolean isFinished() {
return finished;
}
}