AsyncIteratorBuffer.java
/*******************************************************************************
* Copyright (c) 2023 Eclipse RDF4J contributors.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Distribution License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: BSD-3-Clause
******************************************************************************/
package org.eclipse.rdf4j.query.algebra.evaluation.iterator;
import static java.util.concurrent.ForkJoinPool.commonPool;
import java.util.ArrayDeque;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.eclipse.rdf4j.common.annotation.Experimental;
import org.eclipse.rdf4j.common.iteration.CloseableIteration;
import org.eclipse.rdf4j.common.iteration.LookAheadIteration;
import org.eclipse.rdf4j.query.BindingSet;
import org.eclipse.rdf4j.query.QueryEvaluationException;
import org.eclipse.rdf4j.query.algebra.evaluation.QueryEvaluationStep;
import org.eclipse.rdf4j.query.algebra.evaluation.impl.QueryEvaluationContext;
/**
* @author H��vard M. Ottestad
*/
@Experimental
public class AsyncIteratorBuffer extends LookAheadIteration<BindingSet> {
private final CloseableIteration<BindingSet> iteration;
private final ConcurrentLinkedQueue<BindingSet> queue = new ConcurrentLinkedQueue<>();
private Future<ArrayDeque<BindingSet>> future;
public AsyncIteratorBuffer(CloseableIteration<BindingSet> iteration)
throws QueryEvaluationException {
this.iteration = iteration;
}
public static CloseableIteration<BindingSet> getInstance(QueryEvaluationStep iterationPrepared, BindingSet bindings,
QueryEvaluationContext context) {
CloseableIteration<BindingSet> iter = iterationPrepared.evaluate(bindings);
if (iter == QueryEvaluationStep.EMPTY_ITERATION) {
return iter;
}
return new AsyncIteratorBuffer(iter);
}
ArrayDeque<BindingSet> nextBuffer;
BindingSet next;
void calculateNext() {
if (next != null) {
return;
}
if (future == null) {
try {
async();
} catch (ExecutionException | InterruptedException e) {
throw new RuntimeException(e);
}
}
next = queue.poll();
if (next != null) {
return;
}
while (!(future.isDone() || future.isCancelled()) && queue.isEmpty()) {
Thread.onSpinWait();
}
next = queue.poll();
}
private void async() throws ExecutionException, InterruptedException {
future = commonPool().submit(() -> {
while (iteration.hasNext()) {
queue.add(iteration.next());
}
return null;
});
}
@Override
protected BindingSet getNextElement() throws QueryEvaluationException {
calculateNext();
BindingSet temp = next;
next = null;
return temp;
}
@Override
protected void handleClose() throws QueryEvaluationException {
try {
if (future != null) {
future.cancel(true);
}
} finally {
iteration.close();
}
}
}