MemorySailStore.java
/*******************************************************************************
* Copyright (c) 2015 Eclipse RDF4J contributors, Aduna, and others.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Distribution License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: BSD-3-Clause
*******************************************************************************/
package org.eclipse.rdf4j.sail.memory;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.VarHandle;
import java.lang.ref.Cleaner;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.LongAdder;
import org.apache.commons.lang3.time.StopWatch;
import org.eclipse.rdf4j.common.concurrent.locks.ExclusiveReentrantLockManager;
import org.eclipse.rdf4j.common.concurrent.locks.Lock;
import org.eclipse.rdf4j.common.concurrent.locks.diagnostics.ConcurrentCleaner;
import org.eclipse.rdf4j.common.iteration.CloseableIteration;
import org.eclipse.rdf4j.common.iteration.CloseableIteratorIteration;
import org.eclipse.rdf4j.common.iteration.EmptyIteration;
import org.eclipse.rdf4j.common.transaction.IsolationLevel;
import org.eclipse.rdf4j.common.transaction.IsolationLevels;
import org.eclipse.rdf4j.model.IRI;
import org.eclipse.rdf4j.model.Namespace;
import org.eclipse.rdf4j.model.Resource;
import org.eclipse.rdf4j.model.Statement;
import org.eclipse.rdf4j.model.Value;
import org.eclipse.rdf4j.model.ValueFactory;
import org.eclipse.rdf4j.model.impl.LinkedHashModel;
import org.eclipse.rdf4j.query.algebra.StatementPattern;
import org.eclipse.rdf4j.query.algebra.Var;
import org.eclipse.rdf4j.query.algebra.evaluation.impl.EvaluationStatistics;
import org.eclipse.rdf4j.query.algebra.evaluation.impl.evaluationsteps.StatementPatternQueryEvaluationStep;
import org.eclipse.rdf4j.sail.SailConflictException;
import org.eclipse.rdf4j.sail.SailException;
import org.eclipse.rdf4j.sail.base.BackingSailSource;
import org.eclipse.rdf4j.sail.base.SailDataset;
import org.eclipse.rdf4j.sail.base.SailSink;
import org.eclipse.rdf4j.sail.base.SailSource;
import org.eclipse.rdf4j.sail.base.SailStore;
import org.eclipse.rdf4j.sail.memory.model.MemBNode;
import org.eclipse.rdf4j.sail.memory.model.MemIRI;
import org.eclipse.rdf4j.sail.memory.model.MemResource;
import org.eclipse.rdf4j.sail.memory.model.MemStatement;
import org.eclipse.rdf4j.sail.memory.model.MemStatementIterator;
import org.eclipse.rdf4j.sail.memory.model.MemStatementIteratorCache;
import org.eclipse.rdf4j.sail.memory.model.MemStatementList;
import org.eclipse.rdf4j.sail.memory.model.MemTriple;
import org.eclipse.rdf4j.sail.memory.model.MemTripleIterator;
import org.eclipse.rdf4j.sail.memory.model.MemValue;
import org.eclipse.rdf4j.sail.memory.model.MemValueFactory;
import org.eclipse.rdf4j.sail.memory.model.WeakObjectRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* An implementation of {@link SailStore} that keeps committed statements in a {@link MemStatementList}.
*
* @author James Leigh
*/
class MemorySailStore implements SailStore {
private final static Logger logger = LoggerFactory.getLogger(MemorySailStore.class);
private static final Runtime RUNTIME = Runtime.getRuntime();
// Maximum that can be allocated.
private static final long MAX_MEMORY = RUNTIME.maxMemory();
// Small heaps (small values for MAX_MEMORY) would trigger the cleanup priority too often. This is the threshold for
// running the code that checks for low memory and priorities cleanup.
private static final int CLEANUP_MAX_MEMORY_THRESHOLD = 256 * 1024 * 1024;
// A constant for the absolute lowest amount of free memory before we prioritise cleanup.
private static final int CLEANUP_MINIMUM_FREE_MEMORY = 64 * 1024 * 1024;
// A ratio of how much free memory there is before we prioritise cleanup. For a 1 GB heap a ratio of 1/8 means that
// we prioritise cleanup if there is less than 128 MB of free memory.
private static final double CLEANUP_MINIMUM_FREE_MEMORY_RATIO = 1.0 / 8;
public static final EmptyIteration<MemStatement> EMPTY_ITERATION = (EmptyIteration<MemStatement>) StatementPatternQueryEvaluationStep.EMPTY_ITERATION;
public static final EmptyIteration<MemTriple> EMPTY_TRIPLE_ITERATION = new EmptyIteration<>();
public static final MemResource[] EMPTY_CONTEXT = {};
public static final MemResource[] NULL_CONTEXT = { null };
private final MemStatementIteratorCache iteratorCache = new MemStatementIteratorCache(10);
/**
* Factory/cache for MemValue objects.
*/
private final MemValueFactory valueFactory = new MemValueFactory();
/**
* List containing all available statements.
*/
private final MemStatementList statements = new MemStatementList(256);
/**
* This gets set to `true` when we add our first inferred statement. If the value is `false` we guarantee that there
* are no inferred statements in the MemorySailStore. If it is `true` then an inferred statement was added at some
* point, but we make no guarantees regarding if there still are inferred statements or if they are in the current
* snapshot.
* <p>
* The purpose of this variable is to optimize read operations that only read inferred statements when there are no
* inferred statements.
*/
private volatile boolean mayHaveInferred = false;
/**
* Identifies the current snapshot.
*/
private volatile int currentSnapshot;
final SnapshotMonitor snapshotMonitor;
/**
* Store for namespace prefix info.
*/
private final MemNamespaceStore namespaceStore = new MemNamespaceStore();
/**
* Lock manager used to give the snapshot cleanup thread exclusive access to the statement list.
*/
/**
* Lock manager used to prevent concurrent writes.
*/
private final ExclusiveReentrantLockManager txnLockManager = new ExclusiveReentrantLockManager();
/**
* Cleanup thread that removes deprecated statements when no other threads are accessing this list. Seee
* {@link #scheduleSnapshotCleanup()}.
*/
private volatile Thread snapshotCleanupThread;
/**
* Lock object used to synchronize concurrent access to {@link #snapshotCleanupThread}.
*/
private final Object snapshotCleanupThreadLockObject = new Object();
public MemorySailStore(boolean debug) {
snapshotMonitor = new SnapshotMonitor(debug);
}
@Override
public ValueFactory getValueFactory() {
return valueFactory;
}
@Override
public void close() {
synchronized (snapshotCleanupThreadLockObject) {
if (snapshotCleanupThread != null) {
snapshotCleanupThread.interrupt();
snapshotCleanupThread = null;
}
}
valueFactory.clear();
statements.clear();
namespaceStore.clear();
invalidateCache();
}
private void invalidateCache() {
iteratorCache.invalidateCache();
}
@Override
public EvaluationStatistics getEvaluationStatistics() {
return new MemEvaluationStatistics(valueFactory, statements);
}
@Override
public SailSource getExplicitSailSource() {
return new MemorySailSource(true);
}
@Override
public SailSource getInferredSailSource() {
return new MemorySailSource(false);
}
/**
* Creates a StatementIterator that contains the statements matching the specified pattern of subject, predicate,
* object, context. Inferred statements are excluded when <var>explicitOnly</var> is set to <var>true</var> .
* Statements from the null context are excluded when <var>namedContextsOnly</var> is set to <var>true</var>. The
* returned StatementIterator will assume the specified read mode.
*/
private CloseableIteration<MemStatement> createStatementIterator(Resource subj, IRI pred, Value obj,
Boolean explicit, int snapshot, Resource... contexts) throws InterruptedException {
// Perform look-ups for value-equivalents of the specified values
if (explicit != null && !explicit && !mayHaveInferred && snapshot >= 0) {
return EMPTY_ITERATION;
}
if (statements.isEmpty()) {
return EMPTY_ITERATION;
}
MemResource memSubj = valueFactory.getMemResource(subj);
if (subj != null && memSubj == null) {
// non-existent subject
return EMPTY_ITERATION;
}
MemIRI memPred = valueFactory.getMemURI(pred);
if (pred != null && memPred == null) {
// non-existent predicate
return EMPTY_ITERATION;
}
MemValue memObj = valueFactory.getMemValue(obj);
if (obj != null && memObj == null) {
// non-existent object
return EMPTY_ITERATION;
}
MemResource[] memContexts;
MemStatementList smallestList;
if (contexts.length == 0) {
memContexts = EMPTY_CONTEXT;
smallestList = statements;
} else if (contexts.length == 1 && contexts[0] == null) {
memContexts = NULL_CONTEXT;
smallestList = statements;
} else if (contexts.length == 1) {
MemResource memContext = valueFactory.getMemResource(contexts[0]);
if (memContext == null) {
// non-existent context
return EMPTY_ITERATION;
}
memContexts = new MemResource[] { memContext };
smallestList = memContext.getContextStatementList();
if (smallestList.isEmpty()) {
return EMPTY_ITERATION;
}
} else {
Set<MemResource> contextSet = new LinkedHashSet<>(2 * contexts.length);
for (Resource context : contexts) {
MemResource memContext = valueFactory.getMemResource(context);
if (context == null || memContext != null) {
contextSet.add(memContext);
}
}
if (contextSet.isEmpty()) {
// no known contexts specified
return EMPTY_ITERATION;
}
memContexts = contextSet.toArray(new MemResource[contextSet.size()]);
smallestList = statements;
}
return getMemStatementIterator(memSubj, memPred, memObj, explicit, snapshot, memContexts, smallestList);
}
private CloseableIteration<MemStatement> getMemStatementIterator(MemResource subj, MemIRI pred,
MemValue obj, Boolean explicit, int snapshot, MemResource[] memContexts, MemStatementList statementList)
throws InterruptedException {
if (explicit != null && !explicit) {
// we are looking for inferred statements
if (!mayHaveInferred && snapshot >= 0) {
return EMPTY_ITERATION;
}
}
MemStatementList smallestList = getSmallestStatementList(subj, pred, obj);
if (smallestList == null) {
smallestList = statementList;
} else if (smallestList.isEmpty()) {
return EMPTY_ITERATION;
} else if (smallestList.size() > statementList.size()) {
smallestList = statementList;
}
return MemStatementIterator.cacheAwareInstance(smallestList, subj, pred, obj, explicit, snapshot, memContexts,
iteratorCache);
}
private MemStatementList getSmallestStatementList(MemResource subj, MemIRI pred, MemValue obj) {
MemStatementList smallestList = null;
if (subj != null) {
smallestList = subj.getSubjectStatementList();
if (smallestList.isEmpty()) {
return smallestList;
}
}
if (pred != null) {
MemStatementList l = pred.getPredicateStatementList();
if (smallestList == null) {
smallestList = l;
if (smallestList.isEmpty()) {
return smallestList;
}
} else if (l.size() < smallestList.size()) {
smallestList = l;
if (smallestList.isEmpty()) {
return smallestList;
}
}
}
if (obj != null) {
MemStatementList l = obj.getObjectStatementList();
if (smallestList == null) {
smallestList = l;
} else if (l.size() < smallestList.size()) {
smallestList = l;
}
}
return smallestList;
}
/**
* Creates a TripleIterator that contains the triples matching the specified pattern of subject, predicate, object,
* context.
*/
private CloseableIteration<MemTriple> createTripleIterator(Resource subj, IRI pred, Value obj,
int snapshot) throws InterruptedException {
// Perform look-ups for value-equivalents of the specified values
MemResource memSubj = valueFactory.getMemResource(subj);
if (subj != null && memSubj == null) {
// non-existent subject
return EMPTY_TRIPLE_ITERATION;
}
MemIRI memPred = valueFactory.getMemURI(pred);
if (pred != null && memPred == null) {
// non-existent predicate
return EMPTY_TRIPLE_ITERATION;
}
MemValue memObj = valueFactory.getMemValue(obj);
if (obj != null && memObj == null) {
// non-existent object
return EMPTY_TRIPLE_ITERATION;
}
// TODO there is no separate index for Triples, so for now we iterate over all statements to find matches.
return new MemTripleIterator<>(statements, memSubj, memPred, memObj, snapshot);
}
/**
* Removes statements from old snapshots from the main statement list and resets the snapshot to 1 for the rest of
* the statements.
*
* @throws InterruptedException
*/
protected void cleanSnapshots() throws InterruptedException {
int currentSnapshot = this.currentSnapshot;
int highestUnusedTillSnapshot = snapshotMonitor.getFirstUnusedOrElse(currentSnapshot - 1);
if (highestUnusedTillSnapshot >= currentSnapshot) {
logger.debug("No old snapshot versions are currently unused, {} >= {} (currentSnapshot).",
highestUnusedTillSnapshot, currentSnapshot);
}
try {
boolean prioritiseCleaning = false;
StopWatch stopWatch = null;
if (logger.isDebugEnabled()) {
stopWatch = StopWatch.createStarted();
logger.debug("Started cleaning snapshots.");
}
prioritiseCleaning = prioritiseSnapshotCleaningIfLowOnMemory(prioritiseCleaning);
// Sets used to keep track of which lists have already been processed
HashSet<MemValue> processedSubjects = new HashSet<>();
HashSet<MemValue> processedPredicates = new HashSet<>();
HashSet<MemValue> processedObjects = new HashSet<>();
HashSet<MemValue> processedContexts = new HashSet<>();
MemStatement[] statements = this.statements.getStatements();
/*
* The order of the statement list won't change from lastStmtPos down while we don't have the write lock (it
* might shrink or grow) as (1) new statements are always appended last, (2) we are the only process that
* removes statements, (3) this list is cleared on close.
*/
for (int i = statements.length - 1; i >= 0; i--) {
if (Thread.currentThread().isInterrupted()) {
break;
}
MemStatement st = statements[i];
if (st == null) {
continue;
}
if (st.getTillSnapshot() <= highestUnusedTillSnapshot) {
MemResource subj = st.getSubject();
if (processedSubjects.add(subj)) {
subj.cleanSnapshotsFromSubjectStatements(highestUnusedTillSnapshot);
}
MemIRI pred = st.getPredicate();
if (processedPredicates.add(pred)) {
pred.cleanSnapshotsFromPredicateStatements(highestUnusedTillSnapshot);
}
MemValue obj = st.getObject();
if (processedObjects.add(obj)) {
obj.cleanSnapshotsFromObjectStatements(highestUnusedTillSnapshot);
}
MemResource context = st.getContext();
if (context != null && processedContexts.add(context)) {
context.cleanSnapshotsFromContextStatements(highestUnusedTillSnapshot);
}
// stale statement
this.statements.optimisticRemove(st, i);
prioritiseCleaning = prioritiseSnapshotCleaningIfLowOnMemory(prioritiseCleaning);
}
if (i % 100_000 == 0) {
if (getFreeToAllocateMemory() < CLEANUP_MINIMUM_FREE_MEMORY / 2) {
prioritiseCleaning = prioritiseSnapshotCleaningIfLowOnMemory(prioritiseCleaning);
processedSubjects = new HashSet<>();
processedPredicates = new HashSet<>();
processedObjects = new HashSet<>();
processedContexts = new HashSet<>();
System.gc();
}
}
}
processedSubjects.clear();
processedPredicates.clear();
processedObjects.clear();
processedContexts.clear();
if (logger.isDebugEnabled() && stopWatch != null) {
stopWatch.stop();
logger.debug("Cleaning snapshots took {} seconds.", stopWatch.getTime(TimeUnit.SECONDS));
}
} finally {
statements.setPrioritiseCleanup(false);
}
}
private boolean prioritiseSnapshotCleaningIfLowOnMemory(boolean prioritiseCleaning) {
if (!prioritiseCleaning && MAX_MEMORY >= CLEANUP_MAX_MEMORY_THRESHOLD) {
long freeToAllocateMemory = getFreeToAllocateMemory();
if (memoryIsLow(freeToAllocateMemory)) {
logger.debug(
"Low free memory ({} MB)! Prioritising cleaning of removed statements from the MemoryStore.",
freeToAllocateMemory / 1024 / 1024);
prioritiseCleaning = true;
this.statements.setPrioritiseCleanup(true);
}
}
return prioritiseCleaning;
}
private static boolean memoryIsLow(long freeToAllocateMemory) {
return freeToAllocateMemory < CLEANUP_MINIMUM_FREE_MEMORY
|| (freeToAllocateMemory + 0.0) / MAX_MEMORY < CLEANUP_MINIMUM_FREE_MEMORY_RATIO;
}
private long getFreeToAllocateMemory() {
// total currently allocated JVM memory
long totalMemory = RUNTIME.totalMemory();
// amount of memory free in the currently allocated JVM memory
long freeMemory = RUNTIME.freeMemory();
// estimated memory used
long used = totalMemory - freeMemory;
// amount of memory the JVM can still allocate from the OS (upper boundary is the max heap)
return MAX_MEMORY - used;
}
protected void scheduleSnapshotCleanup() {
// we don't schedule snapshot cleanup on small memory stores
if (statements.size() < 1000) {
return;
}
Thread toCheckSnapshotCleanupThread = snapshotCleanupThread;
if (toCheckSnapshotCleanupThread == null || !toCheckSnapshotCleanupThread.isAlive()) {
synchronized (snapshotCleanupThreadLockObject) {
toCheckSnapshotCleanupThread = snapshotCleanupThread;
if (toCheckSnapshotCleanupThread == null || !toCheckSnapshotCleanupThread.isAlive()) {
Runnable runnable = () -> {
try {
// sleep for up to 5 seconds unless we are low on memory
for (int i = 0; i < 100 * 5 && !memoryIsLow(getFreeToAllocateMemory() * 2); i++) {
Thread.sleep(10);
}
cleanSnapshots();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.info("snapshot cleanup interrupted");
}
};
toCheckSnapshotCleanupThread = snapshotCleanupThread = new Thread(runnable,
"MemoryStore snapshot cleanup");
toCheckSnapshotCleanupThread.setDaemon(true);
toCheckSnapshotCleanupThread.start();
Thread.yield();
}
}
}
}
private final class MemorySailSource extends BackingSailSource {
private final boolean explicit;
public MemorySailSource(boolean explicit) {
this.explicit = explicit;
}
@Override
public SailSink sink(IsolationLevel level) throws SailException {
return new MemorySailSink(explicit, level.isCompatibleWith(IsolationLevels.SERIALIZABLE));
}
@Override
public MemorySailDataset dataset(IsolationLevel level) throws SailException {
if (level.isCompatibleWith(IsolationLevels.SNAPSHOT_READ)) {
return new MemorySailDataset(explicit, currentSnapshot);
} else {
return new MemorySailDataset(explicit);
}
}
}
private final class MemorySailSink implements SailSink {
private volatile boolean closed = false;
private final boolean explicit;
private final int serializable;
private final SnapshotMonitor.ReservedSnapshot reservedSnapshot;
private int nextSnapshot;
private Set<StatementPattern> observations;
private volatile Lock txnLock;
private boolean requireCleanup;
public MemorySailSink(boolean explicit, boolean serializable) throws SailException {
this.explicit = explicit;
if (serializable) {
this.serializable = currentSnapshot;
this.reservedSnapshot = snapshotMonitor.reserve(this.serializable, this);
} else {
this.serializable = Integer.MAX_VALUE;
this.reservedSnapshot = null;
}
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
if (explicit) {
sb.append("explicit ");
} else {
sb.append("inferred ");
}
if (txnLock != null) {
sb.append("snapshot ").append(nextSnapshot);
} else {
sb.append(super.toString());
}
return sb.toString();
}
@Override
public synchronized void prepare() throws SailException {
acquireExclusiveTransactionLock();
if (observations != null) {
for (StatementPattern p : observations) {
Resource subj = (Resource) p.getSubjectVar().getValue();
IRI pred = (IRI) p.getPredicateVar().getValue();
Value obj = p.getObjectVar().getValue();
Var ctxVar = p.getContextVar();
Resource[] contexts;
if (ctxVar == null) {
contexts = new Resource[0];
} else {
contexts = new Resource[] { (Resource) ctxVar.getValue() };
}
try (CloseableIteration<MemStatement> iter = createStatementIterator(subj, pred, obj,
null, -1, contexts)) {
while (iter.hasNext()) {
MemStatement st = iter.next();
int since = st.getSinceSnapshot();
int till = st.getTillSnapshot();
if (serializable < since && since < nextSnapshot
|| serializable < till && till < nextSnapshot) {
throw new SailConflictException("Observed State has Changed");
}
}
} catch (InterruptedException e) {
throw convertToSailException(e);
}
}
}
}
@Override
public synchronized void flush() throws SailException {
if (txnLock != null && txnLock.isActive()) {
invalidateCache();
currentSnapshot = Math.max(currentSnapshot, nextSnapshot);
if (requireCleanup) {
scheduleSnapshotCleanup();
}
}
}
@Override
public void close() {
if (!closed) {
closed = true;
try {
if (reservedSnapshot != null) {
reservedSnapshot.release();
}
} finally {
try {
releaseLock();
} finally {
observations = null;
}
}
}
}
synchronized private void releaseLock() {
if (txnLock != null) {
assert txnLock.isActive();
txnLock.release();
txnLock = null;
}
}
@Override
public synchronized void setNamespace(String prefix, String name) {
acquireExclusiveTransactionLock();
namespaceStore.setNamespace(prefix, name);
}
@Override
public synchronized void removeNamespace(String prefix) {
acquireExclusiveTransactionLock();
namespaceStore.removeNamespace(prefix);
}
@Override
public synchronized void clearNamespaces() {
acquireExclusiveTransactionLock();
namespaceStore.clear();
}
@Override
public synchronized void observe(Resource subj, IRI pred, Value obj, Resource... contexts)
throws SailException {
if (observations == null) {
observations = new HashSet<>();
}
if (contexts == null) {
observations.add(new StatementPattern(new Var("s", subj), new Var("p", pred), new Var("o", obj),
new Var("g", null)));
} else if (contexts.length == 0) {
observations.add(new StatementPattern(new Var("s", subj), new Var("p", pred), new Var("o", obj)));
} else {
for (Resource ctx : contexts) {
observations.add(new StatementPattern(new Var("s", subj), new Var("p", pred), new Var("o", obj),
new Var("g", ctx)));
}
}
}
@Override
public synchronized void clear(Resource... contexts) {
acquireExclusiveTransactionLock();
invalidateCache();
requireCleanup = true;
try (CloseableIteration<MemStatement> iter = createStatementIterator(null, null, null,
explicit, nextSnapshot, contexts)) {
while (iter.hasNext()) {
MemStatement st = iter.next();
st.setTillSnapshot(nextSnapshot);
}
} catch (InterruptedException e) {
throw convertToSailException(e);
}
}
@Override
public synchronized void approve(Resource subj, IRI pred, Value obj, Resource ctx) {
acquireExclusiveTransactionLock();
invalidateCache();
try {
addStatement(subj, pred, obj, ctx, explicit);
} catch (InterruptedException e) {
throw convertToSailException(e);
}
}
@Override
public synchronized void approveAll(Set<Statement> approved, Set<Resource> approvedContexts) {
acquireExclusiveTransactionLock();
invalidateCache();
try {
for (Statement statement : approved) {
addStatement(statement.getSubject(), statement.getPredicate(), statement.getObject(),
statement.getContext(), explicit);
}
} catch (InterruptedException e) {
throw convertToSailException(e);
}
}
@Override
public synchronized void deprecateAll(Set<Statement> deprecated) {
acquireExclusiveTransactionLock();
invalidateCache();
requireCleanup = true;
int nextSnapshot = this.nextSnapshot;
for (Statement statement : deprecated) {
innerDeprecate(statement, nextSnapshot);
}
}
@Override
public synchronized void deprecate(Statement statement) throws SailException {
acquireExclusiveTransactionLock();
invalidateCache();
requireCleanup = true;
innerDeprecate(statement, nextSnapshot);
}
private void innerDeprecate(Statement statement, int nextSnapshot) {
if (statement instanceof MemStatement) {
MemStatement toDeprecate = (MemStatement) statement;
if ((nextSnapshot < 0 || toDeprecate.isInSnapshot(nextSnapshot))
&& toDeprecate.isExplicit() == explicit) {
toDeprecate.setTillSnapshot(nextSnapshot);
}
} else if (statement instanceof LinkedHashModel.ModelStatement
&& ((LinkedHashModel.ModelStatement) statement).getStatement() instanceof MemStatement) {
// The Changeset uses a LinkedHashModel to store it's changes. It still keeps a reference to the
// original statement that can be retrieved here.
MemStatement toDeprecate = (MemStatement) ((LinkedHashModel.ModelStatement) statement).getStatement();
if ((nextSnapshot < 0 || toDeprecate.isInSnapshot(nextSnapshot))
&& toDeprecate.isExplicit() == explicit) {
toDeprecate.setTillSnapshot(nextSnapshot);
}
} else {
try (CloseableIteration<MemStatement> iter = createStatementIterator(
statement.getSubject(), statement.getPredicate(), statement.getObject(), explicit, nextSnapshot,
statement.getContext())) {
while (iter.hasNext()) {
MemStatement st = iter.next();
st.setTillSnapshot(nextSnapshot);
}
} catch (InterruptedException e) {
throw convertToSailException(e);
}
}
}
private void acquireExclusiveTransactionLock() throws SailException {
if (txnLock == null) {
synchronized (this) {
if (txnLock == null) {
try {
txnLock = txnLockManager.getExclusiveLock();
nextSnapshot = currentSnapshot + 1;
} catch (InterruptedException e) {
throw convertToSailException(e);
}
}
}
}
}
private MemStatement addStatement(Resource subj, IRI pred, Value obj, Resource context, boolean explicit)
throws SailException, InterruptedException {
if (!explicit) {
mayHaveInferred = true;
}
// Get or create MemValues for the operands
MemResource memSubj = valueFactory.getOrCreateMemResource(subj);
MemIRI memPred = valueFactory.getOrCreateMemURI(pred);
MemValue memObj = valueFactory.getOrCreateMemValue(obj);
MemResource memContext = context == null ? null : valueFactory.getOrCreateMemResource(context);
if (memSubj.hasSubjectStatements() && memPred.hasPredicateStatements() && memObj.hasObjectStatements()
&& (memContext == null || memContext.hasContextStatements())) {
// All values are used in at least one statement. Possibly, the
// statement is already present. Check this.
if (statementAlreadyExists(explicit, memSubj, memPred, memObj, memContext, nextSnapshot)) {
return null;
}
}
// completely new statement
MemStatement st = new MemStatement(memSubj, memPred, memObj, memContext, explicit, nextSnapshot);
statements.add(st);
st.addToComponentLists();
invalidateCache();
return st;
}
private boolean statementAlreadyExists(boolean explicit, MemResource memSubj, MemIRI memPred, MemValue memObj,
MemResource memContext, int nextSnapshot) throws InterruptedException {
MemStatementList statementList = getSmallestMemStatementList(memSubj, memPred, memObj, memContext);
MemStatement memStatement = statementList.getExact(memSubj, memPred, memObj, memContext,
nextSnapshot);
if (memStatement != null) {
if (!memStatement.isExplicit() && explicit) {
// Implicit statement is now added explicitly
memStatement.setTillSnapshot(this.nextSnapshot);
} else {
// statement already exists
return true;
}
}
return false;
}
private MemStatementList getSmallestMemStatementList(MemResource memSubj, MemIRI memPred, MemValue memObj,
MemResource memContext) {
MemStatementList statementList = memSubj.getSubjectStatementList();
if (statementList.size() <= 1) {
return statementList;
}
if (memPred.getPredicateStatementCount() < statementList.size()) {
statementList = memPred.getPredicateStatementList();
if (statementList.size() <= 1) {
return statementList;
}
}
if (memObj.getObjectStatementCount() < statementList.size()) {
statementList = memObj.getObjectStatementList();
if (statementList.size() <= 1) {
return statementList;
}
}
if (memContext != null && memContext.getContextStatementCount() < statementList.size()) {
statementList = memContext.getContextStatementList();
}
return statementList;
}
@Override
public boolean deprecateByQuery(Resource subj, IRI pred, Value obj, Resource[] contexts) {
acquireExclusiveTransactionLock();
boolean deprecated = false;
requireCleanup = true;
invalidateCache();
try (CloseableIteration<MemStatement> iter = createStatementIterator(subj, pred, obj,
explicit, nextSnapshot, contexts)) {
while (iter.hasNext()) {
deprecated = true;
MemStatement st = iter.next();
st.setTillSnapshot(nextSnapshot);
}
} catch (InterruptedException e) {
throw convertToSailException(e);
}
invalidateCache();
return deprecated;
}
}
/**
* @author James Leigh
*/
private final class MemorySailDataset implements SailDataset {
private final boolean explicit;
private final int snapshot;
private final SnapshotMonitor.ReservedSnapshot reservedSnapshot;
private volatile boolean closed;
public MemorySailDataset(boolean explicit) throws SailException {
this.explicit = explicit;
this.snapshot = -1;
this.reservedSnapshot = null;
}
public MemorySailDataset(boolean explicit, int snapshot) throws SailException {
this.explicit = explicit;
this.snapshot = snapshot;
this.reservedSnapshot = snapshotMonitor.reserve(snapshot, this);
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
if (explicit) {
sb.append("explicit ");
} else {
sb.append("inferred ");
}
if (snapshot >= 0) {
sb.append("snapshot ").append(snapshot);
} else {
sb.append(super.toString());
}
return sb.toString();
}
@Override
public void close() {
if (closed) {
return;
}
closed = true;
if (reservedSnapshot != null) {
reservedSnapshot.release();
}
}
@Override
public String getNamespace(String prefix) throws SailException {
return namespaceStore.getNamespace(prefix);
}
@Override
public CloseableIteration<? extends Namespace> getNamespaces() {
return new CloseableIteratorIteration<>(namespaceStore.iterator());
}
@Override
public CloseableIteration<? extends Resource> getContextIDs() throws SailException {
// Note: we can't do this in a streaming fashion due to concurrency
// issues; iterating over the set of IRIs or bnodes while another
// thread
// adds statements with new resources would result in
// ConcurrentModificationException's (issue SES-544).
// Create a list of all resources that are used as contexts
ArrayList<MemResource> contextIDs = new ArrayList<>(32);
int snapshot = getCurrentSnapshot();
try (WeakObjectRegistry.AutoCloseableIterator<MemIRI> memIRIsIterator = valueFactory
.getMemIRIsIterator()) {
while (memIRIsIterator.hasNext()) {
MemResource memResource = memIRIsIterator.next();
if (isContextResource(memResource, snapshot)) {
contextIDs.add(memResource);
}
}
} catch (InterruptedException e) {
throw convertToSailException(e);
}
try (WeakObjectRegistry.AutoCloseableIterator<MemBNode> memBNodesIterator = valueFactory
.getMemBNodesIterator()) {
while (memBNodesIterator.hasNext()) {
MemResource memResource = memBNodesIterator.next();
if (isContextResource(memResource, snapshot)) {
contextIDs.add(memResource);
}
}
} catch (InterruptedException e) {
throw convertToSailException(e);
}
return new CloseableIteratorIteration<>(contextIDs.iterator());
}
@Override
public CloseableIteration<MemStatement> getStatements(Resource subj, IRI pred, Value obj,
Resource... contexts) throws SailException {
try {
return createStatementIterator(subj, pred, obj, explicit, getCurrentSnapshot(), contexts);
} catch (InterruptedException e) {
throw convertToSailException(e);
}
}
@Override
public CloseableIteration<MemTriple> getTriples(Resource subj, IRI pred, Value obj)
throws SailException {
try {
return createTripleIterator(subj, pred, obj, getCurrentSnapshot());
} catch (InterruptedException e) {
throw convertToSailException(e);
}
}
private int getCurrentSnapshot() {
if (snapshot >= 0) {
return snapshot;
} else {
return currentSnapshot;
}
}
private boolean isContextResource(MemResource memResource, int snapshot)
throws SailException, InterruptedException {
MemStatementList contextStatements = memResource.getContextStatementList();
// Filter resources that are not used as context identifier
if (contextStatements.isEmpty()) {
return false;
}
// Filter more thoroughly by considering snapshot and read-mode
// parameters
try (MemStatementIterator iter = new MemStatementIterator(contextStatements, null, null,
null, null, snapshot, null)) {
return iter.hasNext();
}
}
}
private SailException convertToSailException(InterruptedException e) {
Thread.currentThread().interrupt();
return new SailException(e);
}
/**
* SnapshotMonitor is used to keep track of which snapshot version are no longer is use (read or write) so that we
* can safely clean that snapshot version.
*/
static class SnapshotMonitor {
private static final ConcurrentCleaner cleaner = new ConcurrentCleaner();
private final ConcurrentHashMap<Integer, LongAdder> activeSnapshots = new ConcurrentHashMap<>();
private final boolean debug;
// The LongAdder used to track the number of reservations (uses) for a snapshot version is kept in the
// activeSnapshots map. When all reservations are released and the LongAdder.sum() == 0 we should be able to
// safely remove it, this can however cause race conditions if the LongAdder can still be incremented (e.g. when
// the snapshot version is the current snapshot). By assuming that there will never be any new reservations for
// an "old" snapshot version, we can then safely remove the LongAdder if the snapshot version that it is
// tracking is lower than the highestEverReservedSnapshot.
private final AtomicInteger highestEverReservedSnapshot = new AtomicInteger(-1);
public SnapshotMonitor(boolean debug) {
this.debug = debug;
}
public int getFirstUnusedOrElse(int currentSnapshot) {
int maximum = this.highestEverReservedSnapshot.getAcquire();
int min = Integer.MAX_VALUE;
for (Map.Entry<Integer, LongAdder> entry : activeSnapshots.entrySet()) {
if (entry.getKey() <= min) {
if (entry.getKey() < maximum && entry.getValue().sum() == 0) {
activeSnapshots.computeIfPresent(entry.getKey(), (k, v) -> {
if (v.sum() == 0) {
return null;
}
return v;
});
} else {
min = entry.getKey() - 1;
}
}
}
if (min == Integer.MAX_VALUE) {
return currentSnapshot - 1;
} else {
return min;
}
}
public ReservedSnapshot reserve(int snapshot, Object reservedBy) {
int highestEverReservedSnapshot = this.highestEverReservedSnapshot.getAcquire();
while (snapshot > highestEverReservedSnapshot) {
if (this.highestEverReservedSnapshot.compareAndSet(highestEverReservedSnapshot, snapshot)) {
highestEverReservedSnapshot = snapshot;
} else {
highestEverReservedSnapshot = this.highestEverReservedSnapshot.getAcquire();
}
}
LongAdder longAdder = activeSnapshots.computeIfAbsent(snapshot, (k) -> new LongAdder());
longAdder.increment();
return new ReservedSnapshot(snapshot, reservedBy, debug, longAdder, activeSnapshots,
this.highestEverReservedSnapshot);
}
static class ReservedSnapshot {
private static final int SNAPSHOT_RELEASED = -1;
private final ConcurrentHashMap<Integer, LongAdder> activeSnapshots;
private final LongAdder frequency;
private final AtomicInteger highestEverReservedSnapshot;
private Cleaner.Cleanable cleanable;
private final Throwable stackTraceForDebugging;
@SuppressWarnings("FieldMayBeFinal")
private volatile int snapshot;
private final static VarHandle SNAPSHOT;
static {
try {
SNAPSHOT = MethodHandles.lookup()
.in(ReservedSnapshot.class)
.findVarHandle(ReservedSnapshot.class, "snapshot", int.class);
} catch (ReflectiveOperationException e) {
throw new Error(e);
}
}
public ReservedSnapshot(int snapshot, Object reservedBy, boolean debug,
LongAdder frequency, ConcurrentHashMap<Integer, LongAdder> activeSnapshots,
AtomicInteger highestEverReservedSnapshot) {
this.snapshot = snapshot;
if (debug) {
stackTraceForDebugging = new Throwable("Unreleased snapshot version");
} else {
stackTraceForDebugging = null;
}
this.activeSnapshots = activeSnapshots;
this.frequency = frequency;
this.highestEverReservedSnapshot = highestEverReservedSnapshot;
cleanable = cleaner.register(reservedBy, () -> {
int tempSnapshot = ((int) SNAPSHOT.getVolatile(this));
if (tempSnapshot != SNAPSHOT_RELEASED) {
String message = "Releasing MemorySailStore snapshot {} which was reserved and never released (possibly unclosed MemorySailDataset or MemorySailSink).";
if (stackTraceForDebugging != null) {
logger.warn(message, tempSnapshot, stackTraceForDebugging);
} else {
logger.warn(message, tempSnapshot);
}
release();
}
});
}
public void release() {
int snapshot = (int) SNAPSHOT.getAcquire(this);
if (snapshot != SNAPSHOT_RELEASED) {
if (SNAPSHOT.compareAndSet(this, snapshot, SNAPSHOT_RELEASED)) {
frequency.decrement();
assert frequency.sum() >= 0;
if (snapshot < highestEverReservedSnapshot.getAcquire() && frequency.sum() == 0) {
activeSnapshots.computeIfPresent(snapshot, (k, v) -> {
if (v.sum() == 0) {
return null;
}
return v;
});
}
}
}
Cleaner.Cleanable cleanable = this.cleanable;
if (cleanable != null) {
this.cleanable = null;
cleanable.clean();
}
}
}
}
}