TimeLimitIteration.java
/*******************************************************************************
* Copyright (c) 2015 Eclipse RDF4J contributors, Aduna, and others.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Distribution License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: BSD-3-Clause
*******************************************************************************/
package org.eclipse.rdf4j.common.iteration;
import java.util.NoSuchElementException;
import java.util.Timer;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author Arjohn Kampman
*/
public abstract class TimeLimitIteration<E> extends IterationWrapper<E> {
private static final Timer timer = new Timer("TimeLimitIteration", true);
private final Logger logger = LoggerFactory.getLogger(this.getClass());
private final InterruptTask<E> interruptTask;
private final AtomicBoolean isInterrupted = new AtomicBoolean(false);
protected TimeLimitIteration(CloseableIteration<? extends E> iter, long timeLimit) {
super(iter);
assert timeLimit > 0 : "time limit must be a positive number, is: " + timeLimit;
interruptTask = new InterruptTask<>(this);
timer.schedule(interruptTask, timeLimit);
}
@Override
public boolean hasNext() {
checkInterrupted();
if (isClosed()) {
return false;
}
checkInterrupted();
try {
boolean result = super.hasNext();
checkInterrupted();
return result;
} catch (NoSuchElementException e) {
checkInterrupted();
close();
throw e;
}
}
@Override
public E next() {
checkInterrupted();
if (isClosed()) {
throw new NoSuchElementException("The iteration has been closed.");
}
checkInterrupted();
try {
return super.next();
} catch (NoSuchElementException e) {
checkInterrupted();
close();
throw e;
}
}
@Override
public void remove() {
checkInterrupted();
if (isClosed()) {
throw new IllegalStateException("The iteration has been closed.");
}
checkInterrupted();
try {
super.remove();
} catch (IllegalStateException e) {
checkInterrupted();
close();
throw e;
}
}
@Override
protected void handleClose() {
try {
interruptTask.cancel();
} finally {
super.handleClose();
}
}
private void checkInterrupted() {
if (isInterrupted.get()) {
try {
throwInterruptedException();
} finally {
try {
close();
} catch (Exception e) {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
logger.warn("TimeLimitIteration timed out and failed to close successfully: ", e);
}
}
}
}
/**
* If the iteration is interrupted by its time limit, this method is called to generate and throw the appropriate
* exception.
*
*/
protected abstract void throwInterruptedException();
/**
* Users of this class must call this method to interrupt the execution at the next available point. It does not
* immediately interrupt the running method, but will call close() and set a flag to increase the chances of it
* being picked up as soon as possible and to cleanup its resources. <br/>
* Note, this method does not generate {@link InterruptedException}s that would occur if {@link Thread#interrupt()}
* were called on this thread.
*/
void interrupt() {
isInterrupted.set(true);
try {
close();
} catch (Exception e) {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
logger.warn("TimeLimitIteration timed out and failed to close successfully: ", e);
}
}
}