QueueIteration.java

/*******************************************************************************
 * Copyright (c) 2015 Eclipse RDF4J contributors, Aduna, and others.
 *
 * All rights reserved. This program and the accompanying materials
 * are made available under the terms of the Eclipse Distribution License v1.0
 * which accompanies this distribution, and is available at
 * http://www.eclipse.org/org/documents/edl-v10.php.
 *
 * SPDX-License-Identifier: BSD-3-Clause
 *******************************************************************************/
package org.eclipse.rdf4j.common.iteration;

import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

/**
 * Makes working with a queue easier by adding the methods {@link #done()} and {@link #toss(Exception)} and after
 * converting the Exception to the required type using {@link #convert(Exception)}.
 *
 * @author James Leigh
 */
public abstract class QueueIteration<E, T extends RuntimeException> extends LookAheadIteration<E> {

	private final AtomicBoolean done = new AtomicBoolean(false);

	private final BlockingQueue<E> queue;

	private final E afterLast = createAfterLast();

	private final Queue<Exception> exceptions = new ConcurrentLinkedQueue<>();

	/**
	 * Creates an <var>QueueIteration</var> with the given (fixed) capacity and default access policy.
	 *
	 * @param capacity the capacity of this queue
	 */
	protected QueueIteration(int capacity) {
		this(capacity, false);
	}

	/**
	 * Creates an <var>QueueIteration</var> with the given (fixed) capacity and the specified access policy.
	 *
	 * @param capacity the capacity of this queue
	 * @param fair     if <var>true</var> then queue accesses for threads blocked on insertion or removal, are processed
	 *                 in FIFO order; if <var>false</var> the access order is unspecified.
	 */
	protected QueueIteration(int capacity, boolean fair) {
		super();
		this.queue = new ArrayBlockingQueue<>(capacity, fair);
	}

	/**
	 * Creates an <var>QueueIteration</var> with the given {@link BlockingQueue} as its backing queue.<br>
	 * It may not be threadsafe to modify or access the given {@link BlockingQueue} from other locations. This method
	 * only enables the default {@link ArrayBlockingQueue} to be overridden.
	 *
	 * @param queue A BlockingQueue that is not used in other locations, but will be used as the backing Queue
	 *              implementation for this cursor.
	 */
	protected QueueIteration(BlockingQueue<E> queue) {
		this.queue = queue;
	}

	/**
	 * Converts an exception from the underlying iteration to an exception of type <var>X</var>.
	 */
	protected abstract T convert(Exception e);

	/**
	 * The next time {@link #next()} is called this exception will be thrown. If it is not a QueryEvaluationException or
	 * RuntimeException it will be wrapped in a QueryEvaluationException.
	 */
	public void toss(Exception exception) {
		exceptions.add(exception);
	}

	/**
	 * Adds another item to the queue, blocking while the queue is full.
	 */
	public void put(E item) throws InterruptedException, T {
		try {
			while (!isClosed() && !done.get() && !Thread.currentThread().isInterrupted()
					&& !queue.offer(item, 1, TimeUnit.SECONDS)) {
				// No body, just iterating regularly through the loop conditions to respond to state changes without a
				// full busy-wait loop
			}
			// Proactively close if interruption didn't propagate an exception to the catch clause below
			if (done.get() || Thread.currentThread().isInterrupted()) {
				close();
			}
		} catch (InterruptedException e) {
			close();
			throw e;
		}
	}

	/**
	 * Indicates the method {@link #put(Object)} will not be called in the queue anymore.
	 */
	public void done() {
		// Lazily set here, and then come back in handleClose and use set if necessary
		done.lazySet(true);
		boolean offer = queue.offer(afterLast);
		if (!offer) {
			// TODO: Log inability to add sentinel at debug level
			// The sentinel is forced onto the queue during the close method
		}
	}

	/**
	 * Returns the next item in the queue, which may be <var>null</var>, or throws an exception.
	 */
	@Override
	public E getNextElement() {
		if (isClosed()) {
			return null;
		}
		try {
			checkException();
			E take;
			if (done.get()) {
				take = queue.poll();
			} else {
				take = queue.take();
				if (done.get()) {
					done(); // in case the queue was full before
				}
			}
			if (isAfterLast(take)) {
				done(); // put afterLast back for others
				checkException();
				return null;
			}
			checkException();
			return take;
		} catch (InterruptedException e) {
			try {
				checkException();
			} finally {
				try {
					close();
				} finally {
					Thread.currentThread().interrupt();
				}
			}
			return null;
		}
	}

	@Override
	public void handleClose() {
		done.set(true);
		do {
			queue.clear(); // ensure extra room is available
		} while (!queue.offer(afterLast));
		checkException();
	}

	public void checkException() throws T {
		while (!exceptions.isEmpty()) {
			try {
				close();
				throw exceptions.remove();
			} catch (Exception e) {
				if (e instanceof InterruptedException || Thread.interrupted()) {
					Thread.currentThread().interrupt();
				} else {
					throw convert(e);
				}
			}
		}

	}

	private boolean isAfterLast(E take) {
		return take == null || take == afterLast;
	}

	@SuppressWarnings("unchecked")
	private E createAfterLast() {
		return (E) new Object();
	}

}