BackgroundGraphResult.java

/*******************************************************************************
 * Copyright (c) 2015 Eclipse RDF4J contributors, Aduna, and others.
 *
 * All rights reserved. This program and the accompanying materials
 * are made available under the terms of the Eclipse Distribution License v1.0
 * which accompanies this distribution, and is available at
 * http://www.eclipse.org/org/documents/edl-v10.php.
 *
 * SPDX-License-Identifier: BSD-3-Clause
 *******************************************************************************/
package org.eclipse.rdf4j.query.impl;

import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.Charset;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;

import org.eclipse.rdf4j.common.iteration.IterationWrapper;
import org.eclipse.rdf4j.model.Statement;
import org.eclipse.rdf4j.query.GraphQueryResult;
import org.eclipse.rdf4j.query.QueryEvaluationException;
import org.eclipse.rdf4j.rio.RDFHandler;
import org.eclipse.rdf4j.rio.RDFHandlerException;
import org.eclipse.rdf4j.rio.RDFParser;

/**
 * Provides concurrent access to statements as they are being parsed when instances of this class are run as Threads.
 *
 * @author James Leigh
 */
public class BackgroundGraphResult extends IterationWrapper<Statement>
		implements GraphQueryResult, Runnable, RDFHandler {

	private final RDFParser parser;

	private final Charset charset;

	private final InputStream in;

	private final String baseURI;

	private final CountDownLatch namespacesReady = new CountDownLatch(1);

	private final CountDownLatch finishedParsing = new CountDownLatch(1);

	private final Map<String, String> namespaces = new ConcurrentHashMap<>();

	private final QueueCursor<Statement> queue;

	public BackgroundGraphResult(RDFParser parser, InputStream in, Charset charset, String baseURI) {
		this(new QueueCursor<>(10), parser, in, charset, baseURI);
	}

	public BackgroundGraphResult(QueueCursor<Statement> queue, RDFParser parser, InputStream in, Charset charset,
			String baseURI) {
		super(queue);
		this.queue = queue;
		this.parser = parser;
		this.in = in;
		this.charset = charset;
		this.baseURI = baseURI;
	}

	@Override
	protected void handleClose() throws QueryEvaluationException {
		try {
			super.handleClose();
		} finally {
			queue.done();
		}
		try {
			finishedParsing.await();
		} catch (InterruptedException e) {
			Thread.currentThread().interrupt();
		} finally {
			queue.checkException();
		}
	}

	@Override
	public void run() {
		try {
			try {
				parser.setRDFHandler(this);
				if (charset == null) {
					parser.parse(in, baseURI);
				} else {
					parser.parse(new InputStreamReader(in, charset), baseURI);
				}
			} finally {
				in.close();
			}
		} catch (Exception e) {
			queue.toss(e);
		} finally {
			queue.done();
			namespacesReady.countDown();
			finishedParsing.countDown();
		}
	}

	@Override
	public void startRDF() throws RDFHandlerException {
		// no-op
	}

	@Override
	public Map<String, String> getNamespaces() {
		try {
			namespacesReady.await();
			// Show the user an unmodifiable view on the map but we can still change it here
			return Collections.unmodifiableMap(namespaces);
		} catch (InterruptedException e) {
			Thread.currentThread().interrupt();
			return Collections.emptyMap();
		} finally {
			queue.checkException();
		}
	}

	@Override
	public void handleComment(String comment) throws RDFHandlerException {
		// ignore
	}

	@Override
	public void handleNamespace(String prefix, String uri) throws RDFHandlerException {
		namespaces.put(prefix, uri);
	}

	@Override
	public void handleStatement(Statement st) throws RDFHandlerException {
		namespacesReady.countDown();
		try {
			queue.put(st);
		} catch (InterruptedException e) {
			Thread.currentThread().interrupt();
			queue.toss(e);
			queue.done();
		}
	}

	@Override
	public void endRDF() throws RDFHandlerException {
		namespacesReady.countDown();
	}

}