BackgroundResultExecutor.java

/*******************************************************************************
 * Copyright (c) 2019 Eclipse RDF4J contributors.
 *
 * All rights reserved. This program and the accompanying materials
 * are made available under the terms of the Eclipse Distribution License v1.0
 * which accompanies this distribution, and is available at
 * http://www.eclipse.org/org/documents/edl-v10.php.
 *
 * SPDX-License-Identifier: BSD-3-Clause
 *******************************************************************************/
package org.eclipse.rdf4j.http.client;

import java.io.InputStream;
import java.lang.ref.WeakReference;
import java.nio.charset.Charset;
import java.util.HashSet;
import java.util.Objects;
import java.util.concurrent.ExecutorService;

import org.eclipse.rdf4j.common.concurrent.locks.diagnostics.CleanerGraphQueryResult;
import org.eclipse.rdf4j.common.concurrent.locks.diagnostics.CleanerTupleQueryResult;
import org.eclipse.rdf4j.common.concurrent.locks.diagnostics.ConcurrentCleaner;
import org.eclipse.rdf4j.query.GraphQueryResult;
import org.eclipse.rdf4j.query.QueryResult;
import org.eclipse.rdf4j.query.TupleQueryResult;
import org.eclipse.rdf4j.query.impl.BackgroundGraphResult;
import org.eclipse.rdf4j.query.resultio.TupleQueryResultParser;
import org.eclipse.rdf4j.query.resultio.helpers.BackgroundTupleResult;
import org.eclipse.rdf4j.rio.RDFParser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BackgroundResultExecutor implements AutoCloseable {

	private final static ConcurrentCleaner cleaner = new ConcurrentCleaner();

	private final Logger logger = LoggerFactory.getLogger(BackgroundResultExecutor.class);

	private final ExecutorService executor;

	private final HashSet<QueryResult<?>> executing = new HashSet<>();

	public BackgroundResultExecutor(ExecutorService executor) {
		this.executor = Objects.requireNonNull(executor, "Executor service was null");
	}

	public TupleQueryResult parse(TupleQueryResultParser parser, InputStream in, WeakReference<?> callerReference) {
		BackgroundTupleResult result = new BackgroundTupleResult(parser, in);
		autoCloseRunnable(result, result);
		return new CleanerTupleQueryResult(result, cleaner);
	}

	public GraphQueryResult parse(RDFParser parser, InputStream in, Charset charset, String baseURI,
			WeakReference<?> callerReference) {
		BackgroundGraphResult result = new BackgroundGraphResult(parser, in, charset, baseURI);
		autoCloseRunnable(result, result);
		return new CleanerGraphQueryResult(result, cleaner);
	}

	/**
	 * Force close any executing background result parsers
	 */
	@Override
	public void close() {
		synchronized (executing) {
			for (AutoCloseable onclose : executing) {
				try {
					onclose.close();
				} catch (Exception e) {
					if (e instanceof InterruptedException) {
						Thread.currentThread().interrupt();
					}
					logger.error(e.toString(), e);
				}
			}
		}
	}

	private void autoCloseRunnable(QueryResult<?> result, Runnable runner) {
		synchronized (executing) {
			executing.add(result);
		}
		executor.execute(() -> {
			try {
				runner.run();
			} finally {
				synchronized (executing) {
					executing.remove(result);
				}
			}
		});
	}

}