AsyncIteratorDirect.java
/*******************************************************************************
* Copyright (c) 2023 Eclipse RDF4J contributors.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Distribution License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: BSD-3-Clause
******************************************************************************/
package org.eclipse.rdf4j.query.algebra.evaluation.iterator;
import java.util.ArrayDeque;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.eclipse.rdf4j.common.annotation.Experimental;
import org.eclipse.rdf4j.common.iteration.CloseableIteration;
import org.eclipse.rdf4j.common.iteration.LookAheadIteration;
import org.eclipse.rdf4j.query.BindingSet;
import org.eclipse.rdf4j.query.QueryEvaluationException;
import org.eclipse.rdf4j.query.algebra.evaluation.QueryEvaluationStep;
import org.eclipse.rdf4j.query.algebra.evaluation.impl.QueryEvaluationContext;
/**
* @author H��vard M. Ottestad
*/
@Experimental
public class AsyncIteratorDirect extends LookAheadIteration<BindingSet> {
private final CloseableIteration<BindingSet> iteration;
private final ExecutorService executorService;
private Future<ArrayDeque<BindingSet>> future;
public AsyncIteratorDirect(CloseableIteration<BindingSet> iteration)
throws QueryEvaluationException {
this.iteration = iteration;
this.executorService = Executors.newSingleThreadExecutor();
}
public static CloseableIteration<BindingSet> getInstance(QueryEvaluationStep iterationPrepared, BindingSet bindings,
QueryEvaluationContext context) {
CloseableIteration<BindingSet> iter = iterationPrepared.evaluate(bindings);
if (iter == QueryEvaluationStep.EMPTY_ITERATION) {
return iter;
}
return new AsyncIteratorDirect(iter);
}
ArrayDeque<BindingSet> nextBuffer;
volatile BindingSet next;
void calculateNext() {
if (next != null) {
return;
}
if (future == null) {
try {
async();
} catch (ExecutionException | InterruptedException e) {
throw new RuntimeException(e);
}
}
while (!(future.isDone() || future.isCancelled()) && next == null) {
Thread.onSpinWait();
}
}
private void async() throws ExecutionException, InterruptedException {
future = executorService.submit(() -> {
while (iteration.hasNext()) {
while (next != null) {
Thread.onSpinWait();
}
next = iteration.next();
}
return null;
});
}
@Override
protected BindingSet getNextElement() throws QueryEvaluationException {
calculateNext();
BindingSet temp = next;
next = null;
return temp;
}
@Override
protected void handleClose() throws QueryEvaluationException {
try {
if (future != null) {
future.cancel(true);
}
} finally {
try {
executorService.shutdownNow();
} finally {
iteration.close();
}
}
}
}