ParallelExecutorBase.java
/*******************************************************************************
* Copyright (c) 2019 Eclipse RDF4J contributors.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Distribution License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: BSD-3-Clause
*******************************************************************************/
package org.eclipse.rdf4j.federated.evaluation.concurrent;
import java.util.concurrent.atomic.AtomicLong;
import org.eclipse.rdf4j.common.iteration.CloseableIteration;
import org.eclipse.rdf4j.common.iteration.EmptyIteration;
import org.eclipse.rdf4j.common.iteration.LookAheadIteration;
import org.eclipse.rdf4j.federated.evaluation.FederationEvalStrategy;
import org.eclipse.rdf4j.federated.evaluation.join.JoinExecutorBase;
import org.eclipse.rdf4j.federated.evaluation.union.UnionExecutorBase;
import org.eclipse.rdf4j.federated.exception.ExceptionUtil;
import org.eclipse.rdf4j.federated.structures.QueryInfo;
import org.eclipse.rdf4j.query.QueryEvaluationException;
import org.eclipse.rdf4j.query.QueryInterruptedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Base class for common parallel executors such as {@link JoinExecutorBase} and {@link UnionExecutorBase}.
*
* @param <T>
* @author Andreas Schwarte
* @see JoinExecutorBase
* @see UnionExecutorBase
*/
public abstract class ParallelExecutorBase<T> extends LookAheadIteration<T>
implements ParallelExecutor<T> {
protected static final Logger log = LoggerFactory.getLogger(ParallelExecutorBase.class);
protected static final AtomicLong NEXT_EXECUTOR_ID = new AtomicLong(0L);
/* Constants */
protected final FederationEvalStrategy strategy; // the evaluation strategy
protected final long executorId; // the executor id
protected final QueryInfo queryInfo;
/* Variables */
protected volatile Thread evaluationThread;
protected FedXQueueCursor<T> rightQueue = FedXQueueCursor.create(1024);
protected volatile CloseableIteration<T> rightIter;
protected volatile boolean finished = false;
public ParallelExecutorBase(QueryInfo queryInfo) throws QueryEvaluationException {
this.strategy = queryInfo.getStrategy();
this.executorId = NEXT_EXECUTOR_ID.incrementAndGet();
this.queryInfo = queryInfo;
}
@Override
public final void run() {
if (isClosed()) {
return;
}
evaluationThread = Thread.currentThread();
if (evaluationThread.isInterrupted()) {
return;
}
if (log.isTraceEnabled()) {
log.trace("Performing execution of " + getDisplayId() + ", thread: " + evaluationThread.getName());
}
try {
performExecution();
checkTimeout();
if (log.isTraceEnabled()) {
log.trace(getDisplayId() + " is finished.");
}
done();
} catch (InterruptedException e) {
toss(ExceptionUtil.toException(e));
evaluationThread.interrupt();
} catch (Throwable t) {
toss(ExceptionUtil.toException(t));
} finally {
// signal DONE to queue: unblock polling
rightQueue.done();
finished = true;
evaluationThread = null;
}
}
/**
* Perform the parallel execution.
* <p>
* Note that this method must block until the execution is completed.
*
* @throws Exception
*/
protected abstract void performExecution() throws Exception;
@Override
public void addResult(CloseableIteration<T> res) {
try {
/* optimization: avoid adding empty results */
if (res instanceof EmptyIteration) {
return;
}
if (isClosed() || rightQueue.isClosed()) {
res.close();
return;
}
rightQueue.put(res);
if (isClosed() || rightQueue.isClosed()) {
res.close();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
res.close();
throw new RuntimeException("Error adding element to right queue", e);
}
}
@Override
public void done() {
// no-op
}
@Override
public void toss(Exception e) {
rightQueue.toss(e);
if (log.isTraceEnabled()) {
log.trace("Tossing exception of " + getDisplayId() + ": " + e.getMessage());
}
}
@Override
public T getNextElement() throws QueryEvaluationException {
// TODO check if we need to protect rightQueue from synchronized access
// wasn't done in the original implementation either
// if we see any weird behavior check here !!
while (rightIter != null || rightQueue.hasNext()) {
if (rightIter == null) {
rightIter = rightQueue.next();
}
if (rightIter.hasNext()) {
return rightIter.next();
} else {
rightIter.close();
rightIter = null;
}
}
rightQueue.checkException();
return null;
}
/**
* Checks whether the query execution has run into a timeout. If so, a {@link QueryInterruptedException} is thrown.
*
* @throws QueryInterruptedException
*/
protected void checkTimeout() throws QueryInterruptedException {
long maxTimeLeft = queryInfo.getMaxRemainingTimeMS();
if (maxTimeLeft <= 0) {
throw new QueryInterruptedException("Query evaluation has run into a timeout");
}
}
@Override
public void handleClose() throws QueryEvaluationException {
try {
rightQueue.close();
} finally {
if (rightIter != null) {
try {
rightIter.close();
rightIter = null;
} catch (Throwable ignore) {
if (ignore instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
log.trace("Failed to send interrupt signal:", ignore);
}
}
}
}
/**
* Return true if this executor is finished or aborted
*
* @return whether the executor is finished
*/
@Override
public boolean isFinished() {
return finished;
}
@Override
public QueryInfo getQueryInfo() {
return queryInfo;
}
/**
* @return a unique identifier of this execution
*/
protected String getId() {
return "#" + executorId + " (Query: " + queryInfo.getQueryID() + ")";
}
public String getDisplayId() {
return getExecutorType() + " " + getId();
}
/**
* @return the executor type, e.g. "Join". Default "Executor"
*/
protected String getExecutorType() {
return "Executor";
}
@Override
public String toString() {
return getExecutorType() + " " + getClass().getSimpleName() + " {id: " + getId() + "}";
}
}