EagerReadCache.java
/*******************************************************************************
* Copyright (c) 2023 Eclipse RDF4J contributors.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Distribution License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: BSD-3-Clause
******************************************************************************/
package org.eclipse.rdf4j.sail.extensiblestore;
import java.util.Comparator;
import java.util.Iterator;
import org.eclipse.rdf4j.common.iteration.CloseableIteration;
import org.eclipse.rdf4j.common.iteration.LookAheadIteration;
import org.eclipse.rdf4j.model.IRI;
import org.eclipse.rdf4j.model.Model;
import org.eclipse.rdf4j.model.Resource;
import org.eclipse.rdf4j.model.Statement;
import org.eclipse.rdf4j.model.Value;
import org.eclipse.rdf4j.model.impl.LinkedHashModel;
import org.eclipse.rdf4j.sail.SailException;
import org.eclipse.rdf4j.sail.extensiblestore.valuefactory.ExtensibleStatement;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A read cache that keeps a hot copy of the underlying data structure
*/
public class EagerReadCache implements DataStructureInterface {
private static final Logger logger = LoggerFactory.getLogger(EagerReadCache.class);
private static final Runtime RUNTIME = Runtime.getRuntime();
// We will not cache anything if there is less than 32 MB of "free" memory
private static final long MIN_AVAILABLE_MEM = 32 * 1024 * 1024;
// We will always cache up to 100 000 statements
private static final int MIN_CACHE_SIZE = 100_000;
private final DataStructureInterface delegate;
private volatile LinkedHashModel cache = null;
public EagerReadCache(DataStructureInterface delegate) {
this.delegate = delegate;
}
@Override
public void addStatement(ExtensibleStatement statement) {
delegate.addStatement(statement);
clearCache();
}
@Override
public void removeStatement(ExtensibleStatement statement) {
delegate.removeStatement(statement);
clearCache();
}
@Override
public CloseableIteration<? extends ExtensibleStatement> getStatements(Resource subject,
IRI predicate, Value object, boolean inferred, Resource... context) {
Model cache = this.cache;
if (cache == null) {
if (subject != null && predicate != null && object != null) {
// The NotifyingSail will typically trigger this method argument pattern when it checks if a statement
// already exists before adding it. If we triggered fillCache() for that use case we would get a lot of
// cache thrashing because the subsequent write operation would clear the cache.
return delegate.getStatements(subject, predicate, object, inferred, context);
}
cache = fillCache();
}
if (cache == null) {
// if memory is low then fillCache() can return null
return delegate.getStatements(subject, predicate, object, inferred, context);
}
Iterable<Statement> statements = cache.getStatements(subject, predicate, object, context);
return new FilteringIteration<>(new LookAheadIteration<>() {
final Iterator<Statement> iterator = statements.iterator();
@Override
protected ExtensibleStatement getNextElement() throws SailException {
if (iterator.hasNext()) {
Statement next = iterator.next();
return (ExtensibleStatement) ((LinkedHashModel.ModelStatement) next).getStatement();
}
return null;
}
@Override
protected void handleClose() {
}
}, subject, predicate, object, inferred, context);
}
private int lowMemCounter = 0;
private synchronized Model fillCache() {
LinkedHashModel cache = this.cache;
if (cache != null) {
return cache;
}
logger.debug("Filling cache");
if (lowMemCounter > 100) {
// Since we seem to be chronically low on memory we can skip checking how much memory is free for a while
if (lowMemCounter++ % 100 == 0 && !isLowOnMemory()) {
lowMemCounter = 0;
} else {
logger.debug("Canceled filling cache due to low memory");
return null;
}
}
cache = new LinkedHashModel();
int i = 0;
try (var statements = delegate.getStatements(null, null, null, true)) {
while (statements.hasNext()) {
if (i++ > MIN_CACHE_SIZE && i % 1000 == 0 && isLowOnMemory()) {
logger.debug("Canceled filling cache due to low memory");
lowMemCounter++;
return null;
}
cache.add(statements.next());
}
}
try (var statements = delegate.getStatements(null, null, null, false)) {
while (statements.hasNext()) {
if (i++ > MIN_CACHE_SIZE && i % 1000 == 0 && isLowOnMemory()) {
logger.debug("Canceled filling cache due to low memory");
lowMemCounter++;
return null;
}
cache.add(statements.next());
}
}
this.cache = cache;
logger.debug("Cache filled");
return cache;
}
private boolean isLowOnMemory() {
if (getFreeToAllocateMemory() < MIN_AVAILABLE_MEM) {
// Attempt to force the JVM to free up memory
System.gc();
try {
Thread.sleep(1);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new SailException(e);
}
System.gc();
try {
Thread.sleep(1);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new SailException(e);
}
return getFreeToAllocateMemory() < MIN_AVAILABLE_MEM;
}
return false;
}
private static long getFreeToAllocateMemory() {
// maximum heap size the JVM can allocate
long maxMemory = RUNTIME.maxMemory();
// total currently allocated JVM memory
long totalMemory = RUNTIME.totalMemory();
// amount of memory free in the currently allocated JVM memory
long freeMemory = RUNTIME.freeMemory();
// estimated memory used
long used = totalMemory - freeMemory;
// amount of memory the JVM can still allocate from the OS (upper boundary is the max heap)
long freeToAllocateMemory = maxMemory - used;
return freeToAllocateMemory;
}
@Override
public void flushForReading() {
delegate.flushForReading();
}
@Override
public void init() {
delegate.init();
}
@Override
public void clear(boolean inferred, Resource[] contexts) {
delegate.clear(inferred, contexts);
clearCache();
}
@Override
public void flushForCommit() {
delegate.flushForCommit();
clearCache();
}
@Override
public boolean removeStatementsByQuery(Resource subj, IRI pred, Value obj, boolean inferred, Resource[] contexts) {
boolean removed = delegate.removeStatementsByQuery(subj, pred, obj, inferred, contexts);
clearCache();
return removed;
}
public void clearCache() {
if (cache != null) {
logger.debug("Clearing cache");
}
cache = null;
}
@Override
public long getEstimatedSize() {
Model cache = this.cache;
if (cache != null) {
return cache.size();
}
return delegate.getEstimatedSize();
}
@Override
public Comparator<Value> getComparator() {
return null;
}
}