FedXQueueCursor.java

/*******************************************************************************
 * Copyright (c) 2019 Eclipse RDF4J contributors.
 *
 * All rights reserved. This program and the accompanying materials
 * are made available under the terms of the Eclipse Distribution License v1.0
 * which accompanies this distribution, and is available at
 * http://www.eclipse.org/org/documents/edl-v10.php.
 *
 * SPDX-License-Identifier: BSD-3-Clause
 *******************************************************************************/
package org.eclipse.rdf4j.federated.evaluation.concurrent;

import java.lang.ref.WeakReference;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

import org.eclipse.rdf4j.common.iteration.CloseableIteration;
import org.eclipse.rdf4j.query.QueryEvaluationException;
import org.eclipse.rdf4j.query.impl.QueueCursor;
import org.eclipse.rdf4j.sail.SailException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Specialized variants of {@link QueueCursor} which avoids converting any exception if it is already of
 * type{@link QueryEvaluationException}.
 *
 * @param <T>
 * @author Andreas Schwarte
 */
public class FedXQueueCursor<T> extends QueueCursor<CloseableIteration<T>> {

	private static final Logger log = LoggerFactory.getLogger(FedXQueueCursor.class);

	public static <T> FedXQueueCursor<T> create(int capacity, WeakReference<?> callerReference) {
		assert callerReference == null;
		return create(capacity);
	}

	public static <T> FedXQueueCursor<T> create(int capacity) {
		BlockingQueue<CloseableIteration<T>> queue = new ArrayBlockingQueue<>(capacity,
				false);
		return new FedXQueueCursor<>(queue);
	}

	/**
	 * Reference to the queue such that we can access it in {@link #handleClose()}. This is required to make sure that
	 * we can close the non-consumed iterations from the queue. Note that the private queue of the super class is not
	 * accessible.
	 */
	private final BlockingQueue<CloseableIteration<T>> queueRef;

	private FedXQueueCursor(BlockingQueue<CloseableIteration<T>> queue) {
		super(queue);
		this.queueRef = queue;
	}

	@Override
	protected QueryEvaluationException convert(Exception e) {
		if (e instanceof QueryEvaluationException) {
			return (QueryEvaluationException) e;
		}
		if (e instanceof InterruptedException) {
			Thread.currentThread().interrupt();
		}
		return super.convert(e);
	}

	@Override
	public void handleClose() throws QueryEvaluationException {

		try {
			Throwable throwable = null;

			// consume all remaining elements from the queue and make sure to close them
			// Note: unfortunately we cannot access "afterLast" of the super class
			// => thus have to make a check whether the polled object is actually a
			// closable iteration
			while (!queueRef.isEmpty()) {
				try {
					Object take = queueRef.poll();
					if (take instanceof CloseableIteration) {
						((CloseableIteration<?>) take).close();
					}
				} catch (Throwable t) {
					if (t instanceof InterruptedException) {
						Thread.currentThread().interrupt();
					}
					if (throwable != null) {
						t.addSuppressed(throwable);
					}
					throwable = t;
				}
			}
			done(); // re-add after-last

			if (throwable != null) {
				if (throwable instanceof RuntimeException) {
					throw ((RuntimeException) throwable);
				}
				if (throwable instanceof Error) {
					throw ((Error) throwable);
				}
				throw new SailException(throwable);
			}
		} finally {
			super.handleClose();
		}
	}

}