ControlledWorkerUnion.java

/*******************************************************************************
 * Copyright (c) 2019 Eclipse RDF4J contributors.
 *
 * All rights reserved. This program and the accompanying materials
 * are made available under the terms of the Eclipse Distribution License v1.0
 * which accompanies this distribution, and is available at
 * http://www.eclipse.org/org/documents/edl-v10.php.
 *
 * SPDX-License-Identifier: BSD-3-Clause
 *******************************************************************************/
package org.eclipse.rdf4j.federated.evaluation.union;

import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;

import org.eclipse.rdf4j.federated.evaluation.concurrent.ControlledWorkerScheduler;
import org.eclipse.rdf4j.federated.structures.QueryInfo;
import org.eclipse.rdf4j.query.QueryEvaluationException;

/**
 * Execution of union tasks with {@link ControlledWorkerScheduler}. Tasks can be added using the provided functions.
 * Note that the union operation is to be executed with the {@link #run()} method (also threaded execution is possible).
 * Results are then contained in this iteration.
 *
 * @author Andreas Schwarte
 */
public class ControlledWorkerUnion<T> extends WorkerUnionBase<T> {

	public static int waitingCount = 0;
	public static int awakeCount = 0;

	protected final ControlledWorkerScheduler<T> scheduler;

	protected final Phaser phaser = new Phaser(1);

	public ControlledWorkerUnion(ControlledWorkerScheduler<T> scheduler,
			QueryInfo queryInfo) {
		super(queryInfo);
		this.scheduler = scheduler;
	}

	@Override
	protected void union() throws Exception {

		// schedule all tasks and inform about finish
		phaser.bulkRegister(tasks.size());
		scheduler.scheduleAll(tasks, this);

		// wait until all tasks are executed
		phaser.awaitAdvanceInterruptibly(phaser.arrive(), queryInfo.getMaxRemainingTimeMS(), TimeUnit.MILLISECONDS);
	}

	@Override
	public void done() {
		super.done();
		phaser.arriveAndDeregister();
	}

	@Override
	public void toss(Exception e) {
		super.toss(e);
		phaser.arriveAndDeregister();
	}

	@Override
	public void handleClose() throws QueryEvaluationException {
		try {
			super.handleClose();
		} finally {
			// signal the phaser to close (if currently being blocked)
			phaser.forceTermination();
		}
	}

}