ControlledWorkerScheduler.java

/*******************************************************************************
 * Copyright (c) 2019 Eclipse RDF4J contributors.
 *
 * All rights reserved. This program and the accompanying materials
 * are made available under the terms of the Eclipse Distribution License v1.0
 * which accompanies this distribution, and is available at
 * http://www.eclipse.org/org/documents/edl-v10.php.
 *
 * SPDX-License-Identifier: BSD-3-Clause
 *******************************************************************************/
package org.eclipse.rdf4j.federated.evaluation.concurrent;

import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.eclipse.rdf4j.common.annotation.Experimental;
import org.eclipse.rdf4j.common.iteration.CloseableIteration;
import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerBindJoin;
import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerJoin;
import org.eclipse.rdf4j.federated.evaluation.union.ControlledWorkerUnion;
import org.eclipse.rdf4j.federated.exception.ExceptionUtil;
import org.eclipse.rdf4j.federated.exception.FedXRuntimeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * ControlledWorkerScheduler is a task scheduler that uses a FIFO queue for managing its process. Each instance has a
 * pool with a fixed number of worker threads. Once notified a worker picks the next task from the queue and executes
 * it. The results is then returned to the controlling instance retrieved from the task.
 *
 * @author Andreas Schwarte
 * @see ControlledWorkerUnion
 * @see ControlledWorkerJoin
 * @see ControlledWorkerBindJoin
 */
public class ControlledWorkerScheduler<T> implements Scheduler<T>, TaskWrapperAware {

	private static final Logger log = LoggerFactory.getLogger(ControlledWorkerScheduler.class);

	private final ExecutorService executor;

	// TODO: in the next major version of RDF4J this final field should be removed.
	// Initialization of the executor service should managed the details
	private final BlockingQueue<Runnable> _taskQueue;

	private final int nWorkers;
	private final String name;
	private TaskWrapper taskWrapper;

	/**
	 * Construct a new instance with the specified number of workers and the given name.
	 *
	 * @param nWorkers
	 * @param name
	 */
	public ControlledWorkerScheduler(int nWorkers, String name) {
		this.nWorkers = nWorkers;
		this.name = name;
		this._taskQueue = createBlockingQueue();
		this.executor = createExecutorService(nWorkers, name);
	}

	/**
	 * Schedule the specified parallel task.
	 *
	 * @param task the task to schedule
	 */
	@Override
	public void schedule(ParallelTask<T> task) {
		assert !task.getControl().isFinished();
		Runnable runnable = new WorkerRunnable(task);

		// Note: for specific use-cases the runnable may be wrapped (e.g. to allow injection of thread-contexts). By
		// default the unmodified runnable is used
		if (taskWrapper != null) {
			runnable = taskWrapper.wrap(runnable);
		}

		try {
			task.getQueryInfo().registerScheduledTask(task);
		} catch (Throwable e) {
			task.cancel();
			throw e;
		}

		Future<?> future = executor.submit(runnable);

		// register the future to the task
		if (task instanceof ParallelTaskBase<?>) {
			((ParallelTaskBase<?>) task).setScheduledFuture(future);
		}

		// TODO rejected execution exception?

	}

	/**
	 * Schedule the given tasks and inform about finish using the same lock, i.e. all tasks are scheduled one after the
	 * other.
	 *
	 * @param tasks
	 * @param control
	 */
	public void scheduleAll(List<ParallelTask<T>> tasks, ParallelExecutor<T> control) {
		for (ParallelTask<T> task : tasks) {
			schedule(task);
		}

	}

	public int getTotalNumberOfWorkers() {
		return nWorkers;
	}

	@Deprecated(forRemoval = true, since = "5.1") // currently unused and this class is internal
	public int getNumberOfTasks() {
		return _taskQueue.size();
	}

	/**
	 * Create the {@link BlockingQueue} used for the thread pool. The default implementation creates a
	 * {@link LinkedBlockingQueue}.
	 *
	 * @return
	 */
	@Experimental
	protected BlockingQueue<Runnable> createBlockingQueue() {
		return new LinkedBlockingQueue<>();
	}

	/**
	 * Create the {@link ExecutorService} which is managing the individual {@link ParallelTask}s in a thread pool. The
	 * default implementation creates a thread pool with a {@link LinkedBlockingQueue}.
	 *
	 * The thread pool should be configured to terminate idle threads after a period of time (default: 60s)
	 *
	 * @param nWorkers the number of workers in the thread pool
	 * @param name     the base name for threads in the pool
	 * @return
	 */
	@Experimental
	protected ExecutorService createExecutorService(int nWorkers, String name) {

		ThreadPoolExecutor executor = new ThreadPoolExecutor(nWorkers, nWorkers, 60L, TimeUnit.SECONDS, this._taskQueue,
				new NamingThreadFactory(name));
		executor.allowCoreThreadTimeOut(true);
		return executor;
	}

	@Override
	public void abort() {
		if (!executor.isTerminated()) {
			log.info("Aborting workers of " + name + ".");

			executor.shutdownNow();
			try {
				executor.awaitTermination(30, TimeUnit.SECONDS);
			} catch (InterruptedException e) {
				Thread.currentThread().interrupt();
				throw new FedXRuntimeException(e);
			}
		}
	}

	@Override
	public void done() {
		/* not needed here, implementations call informFinish(control) to notify done status */
	}

	@Override
	public void handleResult(CloseableIteration<T> res) {
		/* not needed here since the result is passed directly to the control instance */
		throw new RuntimeException("Unsupported Operation for this scheduler.");
	}

	@Override
	public void informFinish() {
		throw new RuntimeException("Unsupported Operation for this scheduler!");
	}

	/**
	 * Inform this scheduler that the specified control instance will no longer submit tasks.
	 *
	 * @param control
	 */
	public void informFinish(ParallelExecutor<T> control) {

		// TODO
	}

	@Override
	public boolean isRunning() {
		/* Note: this scheduler can only determine runtime for a given control instance! */
		throw new RuntimeException("Unsupported Operation for this scheduler.");
	}

	/**
	 * Determine if there are still task running or queued for the specified control.
	 *
	 * @param control
	 * @return true, if there are unfinished tasks, false otherwise
	 */
	public boolean isRunning(ParallelExecutor<T> control) {
		return true; // TODO
	}

	@Override
	public void toss(Exception e) {
		/* not needed here: exceptions are directly tossed to the controlling instance */
		throw new RuntimeException("Unsupported Operation for this scheduler.");
	}

	class WorkerRunnable implements Runnable {

		private final ParallelTask<T> task;
		private final ParallelExecutor<T> taskControl;

		private volatile boolean aborted = false;

		public WorkerRunnable(ParallelTask<T> task) {
			super();
			this.task = task;
			this.taskControl = task.getControl();

		}

		@Override
		public void run() {
			CloseableIteration<T> res = null;

			try {

				if (aborted || Thread.currentThread().isInterrupted() || taskControl.isFinished()) {
					throw new InterruptedException();
				}

				if (log.isTraceEnabled()) {
					log.trace("Performing task " + task + " in " + Thread.currentThread().getName());
				}

				res = task.performTask();
				taskControl.addResult(res);
				if (aborted) {
					res.close();
				}
				taskControl.done();
			} catch (Throwable t) {
				try {
					if (t instanceof InterruptedException) {
						Thread.currentThread().interrupt();
					}

					log.debug("Exception encountered while evaluating task (" + t.getClass().getSimpleName() + "): "
							+ t.getMessage());
				} finally {
					try {
						taskControl.toss(ExceptionUtil.toException(t));
					} finally {
						try {
							// e.g. interrupted
							if (res != null) {
								res.close();
							}
						} finally {
							task.cancel();
						}
					}
				}

			}

		}

		public void abort() {
			this.aborted = true;
		}
	}

	/**
	 * Structure to maintain the status for a given control instance.
	 *
	 * @author Andreas Schwarte
	 */
	protected class ControlStatus {
		public int waiting;
		public boolean done;

		public ControlStatus(int waiting, boolean done) {
			this.waiting = waiting;
			this.done = done;
		}
	}

	@Override
	public void shutdown() {
		executor.shutdown();
		try {
			executor.awaitTermination(30, TimeUnit.SECONDS);
		} catch (InterruptedException e) {
			Thread.currentThread().interrupt();
			throw new FedXRuntimeException(e);
		}

	}

	@Override
	public void setTaskWrapper(TaskWrapper taskWrapper) {
		this.taskWrapper = taskWrapper;
	}
}