AsyncIteratorReadAhead.java
/*******************************************************************************
* Copyright (c) 2023 Eclipse RDF4J contributors.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Distribution License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: BSD-3-Clause
******************************************************************************/
package org.eclipse.rdf4j.query.algebra.evaluation.iterator;
import java.util.ArrayDeque;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.eclipse.rdf4j.common.annotation.Experimental;
import org.eclipse.rdf4j.common.iteration.CloseableIteration;
import org.eclipse.rdf4j.common.iteration.LookAheadIteration;
import org.eclipse.rdf4j.query.BindingSet;
import org.eclipse.rdf4j.query.QueryEvaluationException;
import org.eclipse.rdf4j.query.algebra.evaluation.QueryEvaluationStep;
import org.eclipse.rdf4j.query.algebra.evaluation.impl.QueryEvaluationContext;
/**
* @author H��vard M. Ottestad
*/
@Experimental
public class AsyncIteratorReadAhead extends LookAheadIteration<BindingSet> {
private final int READ_AHEAD_LIMIT = 1024 * 1024 * 16;
private final ExecutorService executorService;
private int readAhead = 4;
private final CloseableIteration<BindingSet> iteration;
private Future<ArrayDeque<BindingSet>> future;
public AsyncIteratorReadAhead(CloseableIteration<BindingSet> iteration)
throws QueryEvaluationException {
this.iteration = iteration;
this.executorService = Executors.newSingleThreadExecutor();
}
public static CloseableIteration<BindingSet> getInstance(QueryEvaluationStep iterationPrepared, BindingSet bindings,
QueryEvaluationContext context) {
CloseableIteration<BindingSet> iter = iterationPrepared.evaluate(bindings);
if (iter == QueryEvaluationStep.EMPTY_ITERATION) {
return iter;
}
return new AsyncIteratorReadAhead(iter);
}
ArrayDeque<BindingSet> nextBuffer;
BindingSet next;
void calculateNext() {
if (next != null) {
return;
}
if (nextBuffer != null && !nextBuffer.isEmpty()) {
next = nextBuffer.removeFirst();
return;
}
try {
nextBuffer = async();
if (nextBuffer != null && !nextBuffer.isEmpty()) {
next = nextBuffer.removeFirst();
return;
}
} catch (ExecutionException | InterruptedException e) {
throw new RuntimeException(e);
}
}
private ArrayDeque<BindingSet> async() throws ExecutionException, InterruptedException {
ArrayDeque<BindingSet> ret = null;
if (future != null) {
ret = future.get();
future = null;
} else {
if (iteration.hasNext()) {
ret = new ArrayDeque<>(1);
ret.add(iteration.next());
} else {
return null;
}
}
if (readAhead < READ_AHEAD_LIMIT) {
readAhead *= 2;
}
ArrayDeque<BindingSet> buffer;
if (nextBuffer != null) {
nextBuffer.clear();
buffer = nextBuffer;
} else {
buffer = new ArrayDeque<>();
}
future = executorService.submit(() -> {
int currentReadAhead = readAhead;
for (int i = 0; i < currentReadAhead && iteration.hasNext(); i++) {
buffer.addLast(iteration.next());
}
if (buffer.isEmpty()) {
return null;
}
return buffer;
});
return ret;
}
@Override
protected BindingSet getNextElement() throws QueryEvaluationException {
calculateNext();
BindingSet temp = next;
next = null;
return temp;
}
@Override
protected void handleClose() throws QueryEvaluationException {
try {
if (future != null) {
future.cancel(true);
}
} finally {
try {
executorService.shutdownNow();
} finally {
iteration.close();
}
}
}
}