ExtensibleSailStore.java

/*******************************************************************************
 * Copyright (c) 2019 Eclipse RDF4J contributors.
 *
 * All rights reserved. This program and the accompanying materials
 * are made available under the terms of the Eclipse Distribution License v1.0
 * which accompanies this distribution, and is available at
 * http://www.eclipse.org/org/documents/edl-v10.php.
 *
 * SPDX-License-Identifier: BSD-3-Clause
 *******************************************************************************/
package org.eclipse.rdf4j.sail.extensiblestore;

import org.eclipse.rdf4j.common.iteration.CloseableIteration;
import org.eclipse.rdf4j.model.ValueFactory;
import org.eclipse.rdf4j.model.impl.SimpleValueFactory;
import org.eclipse.rdf4j.query.algebra.evaluation.impl.EvaluationStatistics;
import org.eclipse.rdf4j.sail.SailException;
import org.eclipse.rdf4j.sail.base.SailSource;
import org.eclipse.rdf4j.sail.base.SailStore;
import org.eclipse.rdf4j.sail.extensiblestore.evaluationstatistics.DynamicStatistics;
import org.eclipse.rdf4j.sail.extensiblestore.evaluationstatistics.EvaluationStatisticsEnum;
import org.eclipse.rdf4j.sail.extensiblestore.evaluationstatistics.EvaluationStatisticsWrapper;
import org.eclipse.rdf4j.sail.extensiblestore.evaluationstatistics.ExtensibleEvaluationStatistics;
import org.eclipse.rdf4j.sail.extensiblestore.valuefactory.ExtensibleStatement;
import org.eclipse.rdf4j.sail.extensiblestore.valuefactory.ExtensibleStatementHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * @author H��vard Mikkelsen Ottestad
 */
public class ExtensibleSailStore implements SailStore {

	private static final Logger logger = LoggerFactory.getLogger(ExtensibleSailStore.class);
	public static final int EVALUATION_STATISTICS_STALENESS_CHECK_INTERVAL = 1000 * 60;

	private final ExtensibleSailSource sailSource;
	private final ExtensibleSailSource sailSourceInferred;
	private final EvaluationStatisticsEnum evaluationStatisticsEnum;
	private ExtensibleEvaluationStatistics evaluationStatistics;
	private Thread evaluationStatisticsMaintainerThread;
	private final DataStructureInterface dataStructure;
	private volatile boolean closed;

	public ExtensibleSailStore(DataStructureInterface dataStructure,
			NamespaceStoreInterface namespaceStore, EvaluationStatisticsEnum evaluationStatisticsEnum,
			ExtensibleStatementHelper extensibleStatementHelper) {

		this.evaluationStatisticsEnum = evaluationStatisticsEnum;
		this.evaluationStatistics = evaluationStatisticsEnum.getInstance(this);

		if (evaluationStatistics instanceof DynamicStatistics) {
			dataStructure = new EvaluationStatisticsWrapper(dataStructure, (DynamicStatistics) evaluationStatistics);
			startEvaluationStatisticsMaintainerThread();
		}

		this.dataStructure = dataStructure;
		sailSource = new ExtensibleSailSource(dataStructure, namespaceStore, false, extensibleStatementHelper);
		sailSourceInferred = new ExtensibleSailSource(dataStructure, namespaceStore, true, extensibleStatementHelper);

	}

	synchronized private void startEvaluationStatisticsMaintainerThread() {
		if (!closed) {
			return;
		}
		evaluationStatisticsMaintainerThread = new Thread(new EvaluationStatisticsThread());
		evaluationStatisticsMaintainerThread.setDaemon(true);
		evaluationStatisticsMaintainerThread.start();
	}

	@Override
	synchronized public void close() throws SailException {
		closed = true;
		if (evaluationStatisticsMaintainerThread != null) {
			evaluationStatisticsMaintainerThread.interrupt();
		}
		sailSource.close();
		sailSourceInferred.close();
	}

	@Override
	public ValueFactory getValueFactory() {
		return SimpleValueFactory.getInstance();
	}

	@Override
	public EvaluationStatistics getEvaluationStatistics() {
		return evaluationStatistics;
	}

	@Override
	public SailSource getExplicitSailSource() {
		return sailSource;
	}

	@Override
	public SailSource getInferredSailSource() {
		return sailSourceInferred;
	}

	public void init() {
		sailSource.init();
		sailSourceInferred.init();
	}

	private void startRecalculateStatistics() {

		logger.info("Recalculating stats: started");
		DynamicStatistics instance = (DynamicStatistics) evaluationStatisticsEnum.getInstance(this);

		addToStats(instance, dataStructure.getStatements(null, null, null, false));
		addToStats(instance, dataStructure.getStatements(null, null, null, true));

		((EvaluationStatisticsWrapper) dataStructure).setEvaluationStatistics(instance);

		evaluationStatistics = (ExtensibleEvaluationStatistics) instance;
		logger.info("Recalculating stats: complete");

	}

	private void addToStats(DynamicStatistics instance,
			CloseableIteration<? extends ExtensibleStatement> statements) {

		long estimatedSize = dataStructure.getEstimatedSize();

		long counter = 0;
		while (statements.hasNext()) {
			ExtensibleStatement next = statements.next();
			instance.add(next);

			if (Thread.interrupted() || closed) {
				return;
			}

			if (++counter % 100000 == 0) {
				logger.info("Recalculating stats: {}%", Math.round(100.0 / estimatedSize * counter));
			}
		}

	}

	class EvaluationStatisticsThread implements Runnable {

		@Override
		public void run() {

			try {
				try {
					Thread.sleep(EVALUATION_STATISTICS_STALENESS_CHECK_INTERVAL);
				} catch (InterruptedException e) {
					return;
				}

				if (closed) {
					return;
				}

				long estimatedSize = dataStructure.getEstimatedSize();

				if (estimatedSize > 1000) {
					double staleness = ((DynamicStatistics) evaluationStatistics).staleness(estimatedSize);

					if (staleness > 0.2) {
						long formattedStaleness = Math.round(staleness * 100);
						logger.info("Evaluation statistics is stale ({}%) and needs to be recalculated",
								formattedStaleness);
						startRecalculateStatistics();
					}

				}

			} catch (Exception e) {
				if (!(closed || Thread.interrupted())) {
					throw new RuntimeException(e);
				}

			} finally {
				if (!Thread.interrupted()) {
					startEvaluationStatisticsMaintainerThread();
				}
			}

		}
	}

}