LazyReadCache.java

/*******************************************************************************
 * Copyright (c) 2019 Eclipse RDF4J contributors.
 *
 * All rights reserved. This program and the accompanying materials
 * are made available under the terms of the Eclipse Distribution License v1.0
 * which accompanies this distribution, and is available at
 * http://www.eclipse.org/org/documents/edl-v10.php.
 *
 * SPDX-License-Identifier: BSD-3-Clause
 *******************************************************************************/
package org.eclipse.rdf4j.sail.extensiblestore;

import java.util.ArrayList;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;

import org.eclipse.rdf4j.common.iteration.CloseableIteration;
import org.eclipse.rdf4j.common.iteration.LookAheadIteration;
import org.eclipse.rdf4j.model.IRI;
import org.eclipse.rdf4j.model.Resource;
import org.eclipse.rdf4j.model.Value;
import org.eclipse.rdf4j.sail.SailException;
import org.eclipse.rdf4j.sail.extensiblestore.valuefactory.ExtensibleStatement;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;

public class LazyReadCache implements DataStructureInterface {

	private static final Logger logger = LoggerFactory.getLogger(LazyReadCache.class);

	DataStructureInterface delegate;

	final int STATEMENTS_PER_CACHE_ITEM_LIMIT = 100000;

	// A soft values-based cache of unlimited size. We currently handle retention and removal manually.
	Cache<PartialStatement, List<ExtensibleStatement>> cache = CacheBuilder.newBuilder().softValues().build();

	// The cache ticket is incremented every time the cache is cleared. A getStatements operation retrieves the current
	// cacheTicket when the iteration is opened. When the iteration is closed it checks the retrieved ticket against the
	// currentTicket, if they are the same then the statements can be cached. If the tickets don't match it means that
	// there was a write operation at some point after the iteration was opened, and the statements accumulated in the
	// iteration are stale and can not be cached.
	private volatile long cacheTicket = Long.MIN_VALUE;

	public LazyReadCache(DataStructureInterface delegate) {
		this.delegate = delegate;
	}

	@Override
	public void addStatement(ExtensibleStatement statement) {
		delegate.addStatement(statement);
		clearCache();
	}

	@Override
	public void removeStatement(ExtensibleStatement statement) {
		delegate.removeStatement(statement);
		clearCache();
	}

	@Override
	public CloseableIteration<? extends ExtensibleStatement> getStatements(Resource subject,
			IRI predicate,
			Value object, boolean inferred, Resource... context) {

		PartialStatement partialStatement = new PartialStatement(subject, predicate, object, inferred, context);
		CloseableIteration<? extends ExtensibleStatement> cached = getCached(partialStatement);
		if (cached != null) {
			logger.trace("cache hit");
			return cached;
		}

		long localCacheTicket = cacheTicket;

		return new CloseableIteration<>() {

			final CloseableIteration<? extends ExtensibleStatement> statements = delegate.getStatements(
					subject,
					predicate, object, inferred, context);
			List<ExtensibleStatement> cache = new ArrayList<>();

			@Override
			public boolean hasNext() throws SailException {
				return statements.hasNext();
			}

			@Override
			public ExtensibleStatement next() throws SailException {

				ExtensibleStatement next = statements.next();

				if (cache != null) {
					cache.add(next);
					if (cache.size() > STATEMENTS_PER_CACHE_ITEM_LIMIT) {
						cache = null;
						logger.trace("cache limit");
					}
				}

				return next;
			}

			@Override
			public void remove() throws SailException {

			}

			@Override
			public void close() throws SailException {
				if (!statements.hasNext()) {
					submitToCache(localCacheTicket, partialStatement, cache);
				} else {
					logger.trace("iteration was not fully consumed before being closed and could not be cached");
				}
				statements.close();

			}
		};

	}

	synchronized private CloseableIteration<? extends ExtensibleStatement> getCached(
			PartialStatement partialStatement) {
		List<ExtensibleStatement> statements = cache.getIfPresent(partialStatement);

		if (statements != null) {

			return new LookAheadIteration<>() {

				final Iterator<ExtensibleStatement> iterator = statements.iterator();

				@Override
				protected ExtensibleStatement getNextElement() throws SailException {
					if (iterator.hasNext()) {
						return iterator.next();
					}
					return null;
				}

				@Override
				protected void handleClose() {

				}
			};

		}

		return null;
	}

	@Override
	public void flushForReading() {
		delegate.flushForReading();
	}

	@Override
	public void init() {
		delegate.init();
	}

	@Override
	public void clear(boolean inferred, Resource[] contexts) {
		delegate.clear(inferred, contexts);
		clearCache();
	}

	@Override
	public void flushForCommit() {
		delegate.flushForCommit();
		clearCache();
	}

	@Override
	public boolean removeStatementsByQuery(Resource subj, IRI pred, Value obj, boolean inferred, Resource[] contexts) {
		boolean removed = delegate.removeStatementsByQuery(subj, pred, obj, inferred, contexts);
		clearCache();
		return removed;
	}

	synchronized public void clearCache() {
		cache.invalidateAll();
		// overflow is not a problem since we use == to compare in submitToCache
		cacheTicket++;
	}

	synchronized public void submitToCache(Long localCacheTicket, PartialStatement partialStatement,
			List<ExtensibleStatement> statements) {
		if (localCacheTicket == cacheTicket && statements != null) {
			cache.put(partialStatement, statements);
		}

	}

	@Override
	public Comparator<Value> getComparator() {
		return delegate.getComparator();
	}

	@Override
	public long getEstimatedSize() {
		return delegate.getEstimatedSize();
	}
}