ParallelServiceExecutor.java

/*******************************************************************************
 * Copyright (c) 2019 Eclipse RDF4J contributors.
 *
 * All rights reserved. This program and the accompanying materials
 * are made available under the terms of the Eclipse Distribution License v1.0
 * which accompanies this distribution, and is available at
 * http://www.eclipse.org/org/documents/edl-v10.php.
 *
 * SPDX-License-Identifier: BSD-3-Clause
 *******************************************************************************/
package org.eclipse.rdf4j.federated.evaluation.concurrent;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.eclipse.rdf4j.common.iteration.CloseableIteration;
import org.eclipse.rdf4j.common.iteration.Iterations;
import org.eclipse.rdf4j.common.iteration.LookAheadIteration;
import org.eclipse.rdf4j.federated.FederationContext;
import org.eclipse.rdf4j.federated.algebra.FedXService;
import org.eclipse.rdf4j.federated.evaluation.FederationEvalStrategy;
import org.eclipse.rdf4j.federated.structures.QueryInfo;
import org.eclipse.rdf4j.query.BindingSet;
import org.eclipse.rdf4j.query.QueryEvaluationException;
import org.eclipse.rdf4j.query.QueryInterruptedException;
import org.eclipse.rdf4j.query.algebra.Service;
import org.eclipse.rdf4j.repository.sparql.federation.CollectionIteration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Parallel executor for {@link FedXService} nodes, which wrap SERVICE expressions.
 *
 * Uses the union scheduler to execute the task
 *
 * @author Andreas Schwarte
 */
public class ParallelServiceExecutor extends LookAheadIteration<BindingSet>
		implements ParallelExecutor<BindingSet> {

	/*
	 * IMPLEMENTATION NOTE
	 *
	 * This class explicitly does not extend ParallelServiceExecutor: here the execution of the #run() is non blocking,
	 * i.g. blocking is done a consumption time of the iterator
	 */

	protected static final Logger log = LoggerFactory.getLogger(ParallelServiceExecutor.class);

	protected final FedXService service;
	protected final FederationEvalStrategy strategy;
	protected final BindingSet bindings;
	protected final FederationContext federationContext;

	protected CloseableIteration<BindingSet> rightIter = null;
	protected boolean finished = false;
	protected Exception error = null;

	private CountDownLatch latch = null;

	/**
	 * @param service
	 * @param strategy
	 * @param bindings
	 * @param federationContext
	 */
	public ParallelServiceExecutor(FedXService service,
			FederationEvalStrategy strategy, BindingSet bindings, FederationContext federationContext) {
		super();
		this.service = service;
		this.strategy = strategy;
		this.bindings = bindings;
		this.federationContext = federationContext;
	}

	@Override
	public void run() {

		latch = new CountDownLatch(1);
		ControlledWorkerScheduler<BindingSet> scheduler = federationContext.getManager().getUnionScheduler();
		scheduler.schedule(new ParallelServiceTask());
	}

	@Override
	public void addResult(CloseableIteration<BindingSet> res) {

		rightIter = res;
		latch.countDown();
	}

	@Override
	public void toss(Exception e) {

		error = e;
		latch.countDown();
	}

	@Override
	public void done() {
		// no-op
	}

	@Override
	public boolean isFinished() {
		synchronized (this) {
			return finished;
		}
	}

	@Override
	public QueryInfo getQueryInfo() {
		return service.getQueryInfo();
	}

	@Override
	protected BindingSet getNextElement() throws QueryEvaluationException {

		// error resulting from TOSS
		if (error != null) {
			if (error instanceof QueryEvaluationException) {
				throw (QueryEvaluationException) error;
			}
			throw new QueryEvaluationException(error);
		}

		if (rightIter == null) {
			// block if not evaluated
			try {
				boolean completed = latch.await(getQueryInfo().getMaxRemainingTimeMS(), TimeUnit.MILLISECONDS);
				if (!completed) {
					throw new QueryInterruptedException("Timeout during service evaluation");
				}
			} catch (InterruptedException e) {
				Thread.currentThread().interrupt();
				log.debug("Error while evaluating service expression. Thread got interrupted.");
				error = e;
			}
		}

		// check again for error
		if (error != null) {
			if (error instanceof QueryEvaluationException) {
				throw (QueryEvaluationException) error;
			}
			throw new QueryEvaluationException(error);
		}

		if (rightIter.hasNext()) {
			return rightIter.next();
		}

		return null;
	}

	@Override
	protected void handleClose() {

	}

	/**
	 * Task for evaluating service requests
	 *
	 * @author Andreas Schwarte
	 */
	private class ParallelServiceTask extends ParallelTaskBase<BindingSet> {

		@Override
		protected CloseableIteration<BindingSet> performTaskInternal() throws Exception {

			// Note: in order two avoid deadlocks we consume the SERVICE result.
			// This is basically required to avoid processing background tuple
			// request (i.e. HTTP slots) in the correct order.
			Service service1 = service.getService();
			try (CloseableIteration<BindingSet> evaluate = strategy.precompile(service1).evaluate(bindings)) {
				return new CollectionIteration<>(Iterations.asList(evaluate));
			}
		}

		@Override
		public ParallelExecutor<BindingSet> getControl() {
			return ParallelServiceExecutor.this;
		}

	}
}