ExtensibleDynamicEvaluationStatistics.java

/*******************************************************************************
 * Copyright (c) 2020 Eclipse RDF4J contributors.
 *
 * All rights reserved. This program and the accompanying materials
 * are made available under the terms of the Eclipse Distribution License v1.0
 * which accompanies this distribution, and is available at
 * http://www.eclipse.org/org/documents/edl-v10.php.
 *
 * SPDX-License-Identifier: BSD-3-Clause
 *******************************************************************************/
package org.eclipse.rdf4j.sail.extensiblestore.evaluationstatistics;

import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;

import org.eclipse.rdf4j.common.annotation.Experimental;
import org.eclipse.rdf4j.model.IRI;
import org.eclipse.rdf4j.model.Resource;
import org.eclipse.rdf4j.model.Statement;
import org.eclipse.rdf4j.model.Value;
import org.eclipse.rdf4j.query.algebra.StatementPattern;
import org.eclipse.rdf4j.query.algebra.Var;
import org.eclipse.rdf4j.sail.extensiblestore.ExtensibleSailStore;
import org.eclipse.rdf4j.sail.extensiblestore.valuefactory.ExtensibleStatement;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;

import net.agkn.hll.HLL;

/**
 * <p>
 * ExtensibleDynamicEvaluationStatistics aims to keep an internal estimate of the cardinality of various statement
 * patterns.
 * </p>
 *
 * <p>
 * It support getting the overall size, any single dimension cardinality (eg. ?a rdf:type ?b) and also two
 * multidimensional patterns (:Peter rdf:type ?b; and ?a rdf:type foaf:Person).
 * </p>
 *
 * <p>
 * Since evaluation statistics are best-effort, we use HLL as sets to keep the number of statements for each pattern we
 * support. HLL is a very memory efficient set implementation. Furthermore we hash each pattern into a fixed bucket
 * size, 1024 for single dimension and 64 per dimension for multidimensional patterns.
 * </p>
 *
 * <p>
 * This means that adding ':peter rdf:type foaf:Person' and ':lisa rdf:type foaf:Person' could potentially return
 * getCardinality(:peter, ?b, ?c) = 2 if both :peter and :lisa hash to the same of the 1024 buckets in subjectIndex.
 * </p>
 *
 * <p>
 * HLL does not support "remove" operations, so there are two sets of every index. One for all added statements and one
 * for all removed statements. If the user adds, removes and re-adds the same statement then the cardinality for that
 * statement will be incorrect. We call this effect "staleness". To prevent staleness from affecting the returned
 * cardinalities this class needs to be monitored by calling the staleness(...) method. This will automatically be done
 * every 60 seconds by the ExtensibleSailStore.
 * </p>
 */
@Experimental
public class ExtensibleDynamicEvaluationStatistics extends ExtensibleEvaluationStatistics implements DynamicStatistics {
	private static final Logger logger = LoggerFactory.getLogger(ExtensibleDynamicEvaluationStatistics.class);
	private static final int QUEUE_LIMIT = 128;
	private static final int SINGLE_DIMENSION_INDEX_SIZE = 1024;

	ConcurrentLinkedQueue<StatementQueueItem> queue = new ConcurrentLinkedQueue<>();

	private final Object monitor = new Object();

	AtomicInteger queueSize = new AtomicInteger();

	private final HashFunction HASH_FUNCTION = Hashing.murmur3_128();

	private final HLL EMPTY_HLL = getHLL();

	private final HLL size = getHLL();
	private final HLL size_removed = getHLL();

	private final Map<Integer, HLL> subjectIndex = new HashMap<>();
	private final Map<Integer, HLL> predicateIndex = new HashMap<>();
	private final Map<Integer, HLL> objectIndex = new HashMap<>();
	private final Map<Integer, HLL> contextIndex = new HashMap<>();
	private final HLL defaultContext = getHLL();

	private final HLL[][] subjectPredicateIndex = new HLL[64][64];
	private final HLL[][] predicateObjectIndex = new HLL[64][64];

	private final Map<Integer, HLL> subjectIndex_removed = new HashMap<>();
	private final Map<Integer, HLL> predicateIndex_removed = new HashMap<>();
	private final Map<Integer, HLL> objectIndex_removed = new HashMap<>();
	private final Map<Integer, HLL> contextIndex_removed = new HashMap<>();
	private final HLL defaultContext_removed = getHLL();

	private final HLL[][] subjectPredicateIndex_removed = new HLL[64][64];
	private final HLL[][] predicateObjectIndex_removed = new HLL[64][64];
	volatile private Thread queueConsumingThread;

	public ExtensibleDynamicEvaluationStatistics(ExtensibleSailStore extensibleSailStore) {
		super(extensibleSailStore);

		Stream.of(subjectPredicateIndex, predicateObjectIndex, subjectPredicateIndex_removed,
				predicateObjectIndex_removed).forEach(index -> {
					for (int i = 0; i < index.length; i++) {
						for (int j = 0; j < index[i].length; j++) {
							index[i][j] = getHLL();
						}
					}
				});

	}

	@Override
	protected CardinalityCalculator createCardinalityCalculator() {
		return new ExtensibleDynamicEvaluationStatisticsCardinalityCalculator();
	}

	@Override
	public double staleness(long expectedSize) {
		synchronized (monitor) {
			double estimatedSize = size.cardinality() - size_removed.cardinality();

			// add 500 because this is our minimum margin of error
			estimatedSize += 500;
			expectedSize += 500;

			double diff = Math.abs(estimatedSize - expectedSize);

			double staleness;

			if (estimatedSize + expectedSize == 0 || diff == 0) {
				staleness = 0;
			} else {
				if (expectedSize > estimatedSize) {
					staleness = diff / expectedSize;
				} else {
					staleness = diff / Math.max(0, estimatedSize);
				}
			}

			logger.debug("expected size {}; estimated size: {}; staleness: {}", expectedSize, estimatedSize, staleness);

			return staleness;
		}
	}

	class ExtensibleDynamicEvaluationStatisticsCardinalityCalculator extends CardinalityCalculator {

		@Override
		protected double getCardinality(StatementPattern sp) {
			synchronized (monitor) {

				double min = size.cardinality() - size_removed.cardinality();

				min = Math.min(min, getSubjectCardinality(sp.getSubjectVar()));
				min = Math.min(min, getPredicateCardinality(sp.getPredicateVar()));
				min = Math.min(min, getObjectCardinality(sp.getObjectVar()));

				// skip more complex evaluations if min is unlikely to get lower
				if (min < 2) {
					return min;
				}

				if (sp.getSubjectVar().getValue() != null && sp.getPredicateVar().getValue() != null) {
					min = Math.min(min,
							getHllCardinality(
									subjectPredicateIndex,
									subjectPredicateIndex_removed,
									sp.getSubjectVar().getValue(),
									sp.getPredicateVar().getValue()));
				}

				if (sp.getPredicateVar().getValue() != null && sp.getObjectVar().getValue() != null) {
					min = Math.min(min,
							getHllCardinality(
									predicateObjectIndex,
									predicateObjectIndex_removed,
									sp.getPredicateVar().getValue(),
									sp.getObjectVar().getValue()));
				}

				return min;
			}

		}

		@Override
		protected double getSubjectCardinality(Var var) {
			synchronized (monitor) {
				if (var.getValue() == null) {
					return size.cardinality();
				} else {
					return getHllCardinality(subjectIndex, subjectIndex_removed, var.getValue());
				}
			}

		}

		@Override
		protected double getPredicateCardinality(Var var) {
			synchronized (monitor) {
				if (var.getValue() == null) {
					return size.cardinality();
				} else {
					return getHllCardinality(predicateIndex, predicateIndex_removed, var.getValue());
				}

			}
		}

		@Override
		protected double getObjectCardinality(Var var) {
			synchronized (monitor) {
				if (var.getValue() == null) {
					return size.cardinality();
				} else {
					return getHllCardinality(objectIndex, objectIndex_removed, var.getValue());
				}
			}
		}

		@Override
		protected double getContextCardinality(Var var) {
			synchronized (monitor) {
				if (var.getValue() == null) {
					return defaultContext.cardinality() - defaultContext_removed.cardinality();
				} else {
					return getHllCardinality(contextIndex, contextIndex_removed, var.getValue());
				}
			}
		}
	}

	private double getHllCardinality(HLL[][] index, HLL[][] index_removed,
			Value value1, Value value2) {

		int value1IndexIntoAdded = Math.abs(value1.hashCode() % index.length);
		int value2IndexIntoAdded = Math.abs(value2.hashCode() % index.length);
		double cardinalityAdded = index[value1IndexIntoAdded][value2IndexIntoAdded].cardinality();

		int value1IndexIntoRemoved = Math.abs(value1.hashCode() % index_removed.length);
		int value2IndexIntoRemoved = Math.abs(value2.hashCode() % index_removed.length);
		double removedStatements = index_removed[value1IndexIntoRemoved][value2IndexIntoRemoved].cardinality();

		return cardinalityAdded - removedStatements;
	}

	private double getHllCardinality(Map<Integer, HLL> index,
			Map<Integer, HLL> index_removed, Value value) {

		int indexIntoMap = Math.abs(value.hashCode() % SINGLE_DIMENSION_INDEX_SIZE);

		double cardinalityAdded = index.getOrDefault(indexIntoMap, EMPTY_HLL).cardinality();
		double cardinalityRemoved = index_removed.getOrDefault(indexIntoMap, EMPTY_HLL).cardinality();

		return cardinalityAdded - cardinalityRemoved;
	}

	@Override
	public void add(ExtensibleStatement statement) {

		queue.add(new StatementQueueItem(statement, StatementQueueItem.Type.added));

		int size = queueSize.incrementAndGet();
		if (size > QUEUE_LIMIT && queueConsumingThread == null) {
			startQueueConsumingThread();
		}
	}

	private void startQueueConsumingThread() {
		synchronized (monitor) {
			if (queueConsumingThread == null) {
				queueConsumingThread = new Thread(() -> {
					try {
						while (!queue.isEmpty()) {
							StatementQueueItem poll = queue.poll();
							queueSize.decrementAndGet();
							Statement statement = poll.statement;
							long statementHash = HASH_FUNCTION
									.hashString(statement.toString(), StandardCharsets.UTF_8)
									.asLong();

							if (poll.type == StatementQueueItem.Type.added) {

								handleStatement(statement, statementHash, size, subjectIndex, predicateIndex,
										objectIndex,
										subjectPredicateIndex, predicateObjectIndex, defaultContext, contextIndex);

							} else { // removed

								assert poll.type == StatementQueueItem.Type.removed;

								handleStatement(statement, statementHash, size_removed, subjectIndex_removed,
										predicateIndex_removed, objectIndex_removed, subjectPredicateIndex_removed,
										predicateObjectIndex_removed, defaultContext_removed, contextIndex_removed);

							}

							if (queue.isEmpty()) {
								try {
									Thread.sleep(2);
								} catch (InterruptedException ignored) {

								}
							}
						}
					} finally {
						queueConsumingThread = null;
					}

				});

				queueConsumingThread.setDaemon(true);
				queueConsumingThread.start();

			}
		}
	}

	private void handleStatement(Statement statement, long statementHash, HLL size,
			Map<Integer, HLL> subjectIndex, Map<Integer, HLL> predicateIndex,
			Map<Integer, HLL> objectIndex, HLL[][] subjectPredicateIndex,
			HLL[][] predicateObjectIndex, HLL defaultContext,
			Map<Integer, HLL> contextIndex) {
		synchronized (monitor) {
			size.addRaw(statementHash);

			int subjectHash = statement.getSubject().hashCode();
			int predicateHash = statement.getPredicate().hashCode();
			int objectHash = statement.getObject().hashCode();

			indexOneValue(statementHash, subjectIndex, subjectHash);
			indexOneValue(statementHash, predicateIndex, predicateHash);
			indexOneValue(statementHash, objectIndex, objectHash);

			indexTwoValues(statementHash, subjectPredicateIndex, subjectHash, predicateHash);
			indexTwoValues(statementHash, predicateObjectIndex, predicateHash, objectHash);

			if (statement.getContext() == null) {
				defaultContext.addRaw(statementHash);
			} else {
				indexOneValue(statementHash, contextIndex, statement.getContext().hashCode());
			}
		}
	}

	static class StatementQueueItem {
		ExtensibleStatement statement;
		Type type;

		public StatementQueueItem(ExtensibleStatement statement, Type type) {
			this.statement = statement;
			this.type = type;
		}

		enum Type {
			added,
			removed
		}
	}

	private void indexTwoValues(long statementHash, HLL[][] index, int indexHash, int indexHash2) {
		index[Math.abs(indexHash % index.length)][Math.abs(indexHash2 % index.length)].addRaw(statementHash);
	}

	private void indexOneValue(long statementHash, Map<Integer, HLL> index, int indexHash) {
		index.compute(Math.abs(indexHash % SINGLE_DIMENSION_INDEX_SIZE), (key, val) -> {
			if (val == null) {
				val = getHLL();
			}
			val.addRaw(statementHash);
			return val;
		});
	}

	private HLL getHLL() {
		return new HLL(13/* log2m */, 5/* registerWidth */);
	}

	@Override
	public void remove(ExtensibleStatement statement) {

		queue.add(new StatementQueueItem(statement, StatementQueueItem.Type.removed));

		int size = queueSize.incrementAndGet();
		if (size > QUEUE_LIMIT && queueConsumingThread == null) {
			startQueueConsumingThread();
		}
	}

	@Override
	public void removeByQuery(Resource subj, IRI pred, Value obj, boolean inferred, Resource... contexts) {
		// not implemented yet
		// we should be able to handle cases where we are removing with up to two specified dimensions.
	}

	public void waitForQueue() throws InterruptedException {
		while (queueConsumingThread != null) {
			try {
				queueConsumingThread.join();
			} catch (NullPointerException ignored) {
			}
		}
	}
}