MemStatementList.java
/*******************************************************************************
* Copyright (c) 2015 Eclipse RDF4J contributors, Aduna, and others.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Distribution License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: BSD-3-Clause
*******************************************************************************/
package org.eclipse.rdf4j.sail.memory.model;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.VarHandle;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;
/**
* A dedicated data structure for storing MemStatement objects, offering operations optimized for their use in the
* memory Sail.
*/
public class MemStatementList {
private static final MemStatement[] EMPTY_ARRAY = {};
// statements will be null when the array is growing
private volatile MemStatement[] statements = EMPTY_ARRAY;
private static final VarHandle STATEMENTS;
static final VarHandle STATEMENTS_ARRAY;
private volatile int size;
private static final VarHandle SIZE;
// When inserting a new statement into the statements array we need to iterate through the array looking for a free
// spot. We keep track of where we last inserted a statement by storting that index in previouslyInsertedIndex. This
// doesn't guarantee that previouslyInsertedIndex+1 wil be free, but it gives us a decent hint as to where to start
// looking. When multiple threads are inserting at the same time the previouslyInsertedIndex should be considered
// "best effort".
private volatile int previouslyInsertedIndex = -1;
private static final VarHandle PREVIOUSLY_INSERTED_INDEX;
private volatile int guaranteedLastIndexInUse = -1;
private static final VarHandle GUARANTEED_LAST_INDEX_IN_USE;
private volatile boolean prioritiseCleanup;
private static final VarHandle PRIORITISE_CLEANUP;
private final AtomicReference<Thread> prioritisedThread = new AtomicReference<>();
public MemStatementList() {
}
public MemStatementList(int capacity) {
statements = new MemStatement[capacity];
}
public int size() {
return ((int) SIZE.getAcquire(this));
}
public boolean isEmpty() {
return ((int) SIZE.getAcquire(this)) == 0;
}
public void add(MemStatement st) throws InterruptedException {
if (((boolean) PRIORITISE_CLEANUP.getOpaque(this))) {
long start = System.currentTimeMillis();
long stop = start + TimeUnit.SECONDS.toMillis(30);
while (stop > System.currentTimeMillis() && ((boolean) PRIORITISE_CLEANUP.getVolatile(this))) {
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(10));
}
}
do {
MemStatement[] statements = getStatements();
int length = statements.length;
boolean shouldGrowArray = true;
if (length > (int) SIZE.getAcquire(this)) {
int previouslyInsertedIndex = (int) PREVIOUSLY_INSERTED_INDEX.getOpaque(this);
if (previouslyInsertedIndex >= length) {
continue;
}
int i = previouslyInsertedIndex + 1 >= length ? 0 : previouslyInsertedIndex + 1;
for (; i != previouslyInsertedIndex; i = (i + 1 >= length ? 0 : i + 1)) {
if (statements[i] == null) {
boolean success = STATEMENTS_ARRAY.compareAndSet(statements, i, null, st);
if (success) {
shouldGrowArray = false;
// check if the statements array has been swapped out (because it has grown) while we were
// inserting into it
MemStatement[] statementsAfterInsert = getStatementsWithoutInterrupt();
if (statementsAfterInsert != statements
&& STATEMENTS_ARRAY.getAcquire(statementsAfterInsert, i) != st) {
// We wrote into an array while it was growing and our write was lost.
break;
}
PREVIOUSLY_INSERTED_INDEX.setRelease(this, i);
SIZE.getAndAdd(this, 1);
updateGuaranteedLastIndexInUse(i);
return;
}
} else if (previouslyInsertedIndex < 0 && i == length - 1) {
// The array is full but no threads have made it to the code line where the
// PREVIOUSLY_INSERTED_INDEX is updated. Don't grow the array just yet, it is better to wait
// until PREVIOUSLY_INSERTED_INDEX is updated.
shouldGrowArray = false;
break;
}
}
}
if (shouldGrowArray && STATEMENTS.compareAndSet(this, statements, null)) {
// Grow array
MemStatement[] newArray = new MemStatement[Math.max(4, length * 2)];
if (statements != EMPTY_ARRAY) {
System.arraycopy(statements, 0, newArray, 0, length);
}
STATEMENTS.setRelease(this, newArray);
}
if (Thread.interrupted()) {
throw new InterruptedException();
}
} while (true);
}
private void updateGuaranteedLastIndexInUse(int newValue) {
int guaranteedLastIndexInUse = (int) GUARANTEED_LAST_INDEX_IN_USE.getAcquire(this);
if (guaranteedLastIndexInUse < newValue) {
while (guaranteedLastIndexInUse < newValue
&& !GUARANTEED_LAST_INDEX_IN_USE.compareAndSet(this, guaranteedLastIndexInUse, newValue)) {
guaranteedLastIndexInUse = (int) GUARANTEED_LAST_INDEX_IN_USE.getAcquire(this);
}
}
}
public boolean optimisticRemove(MemStatement st) throws InterruptedException {
MemStatement[] statements = getStatements();
for (int i = 0; i < statements.length; i++) {
if (statements[i] == st) {
return optimisticInnerRemove(st, statements, i);
}
}
if (Thread.interrupted()) {
throw new InterruptedException();
}
return false;
}
public boolean optimisticRemove(MemStatement st, int index) throws InterruptedException {
MemStatement[] statements = getStatements();
if (statements[index] == st && optimisticInnerRemove(st, statements, index)) {
return true;
} else {
return optimisticRemove(st);
}
}
private boolean optimisticInnerRemove(MemStatement toRemove, MemStatement[] statements, int i) {
boolean success = STATEMENTS_ARRAY.weakCompareAndSet(statements, i, toRemove, null);
if (success) {
MemStatement[] statementsAfterRemoval = getStatementsWithoutInterrupt();
if (statementsAfterRemoval != statements) {
// We don't know if the statement was removed because the STATEMENTS_ARRAY has changed (because it
// grew). Since it can never shrink we know that if we managed to remove the statement then the index
// should either be null or a different statement
if (STATEMENTS_ARRAY.getAcquire(statementsAfterRemoval, i) == toRemove) {
return false;
}
}
SIZE.getAndAdd(this, -1);
return true;
} else {
return false;
}
}
public void clear() {
statements = EMPTY_ARRAY;
size = 0;
previouslyInsertedIndex = -1;
guaranteedLastIndexInUse = -10;
prioritiseCleanup = false;
}
public void cleanSnapshots(int currentSnapshot) throws InterruptedException {
boolean error;
do {
MemStatement[] statements = getStatements();
// reset the error flag
error = false;
for (int i = 0; i < statements.length; i++) {
if (Thread.interrupted()) {
throw new InterruptedException();
}
MemStatement statement = statements[i];
if (statement != null && statement.getTillSnapshot() <= currentSnapshot) {
boolean success = optimisticInnerRemove(statement, statements, i);
if (!success) {
error = true;
break;
}
}
}
if (!error) {
// make sure that the statements list didn't grow while we were cleaning it
error = !STATEMENTS.compareAndSet(this, statements, statements);
}
} while (error);
}
/**
* Iterates through this list and returns the statement that exactly matches the provided arguments. The subject,
* predicate and object should not be null. If the context is null it will match statements with null as their
* context.
*
* @param subject
* @param predicate
* @param object
* @param context
* @param snapshot
* @return
*/
public MemStatement getExact(MemResource subject, MemIRI predicate, MemValue object, MemResource context,
int snapshot) throws InterruptedException {
MemStatement[] statements = getStatements();
int lastIndexToCheck = getGuaranteedLastIndexInUse();
for (int i = 0; i <= lastIndexToCheck; i++) {
MemStatement memStatement = statements[i];
if (memStatement != null && memStatement.exactMatch(subject, predicate, object, context)
&& memStatement.isInSnapshot(snapshot)) {
return memStatement;
}
}
return null;
}
/**
* An internal method to retrieve the inner array that stores the statements. Useful to reduce the number of
* volatile reads.
*
* @return the underlying array og MemStatements
*/
public MemStatement[] getStatements() throws InterruptedException {
MemStatement[] statements = (MemStatement[]) STATEMENTS.getAcquire(this);
while (statements == null) {
if (Thread.interrupted()) {
throw new InterruptedException();
}
Thread.onSpinWait();
statements = (MemStatement[]) STATEMENTS.getAcquire(this);
}
return statements;
}
private MemStatement[] getStatementsWithoutInterrupt() {
MemStatement[] statements = (MemStatement[]) STATEMENTS.getAcquire(this);
while (statements == null) {
Thread.onSpinWait();
statements = (MemStatement[]) STATEMENTS.getAcquire(this);
}
return statements;
}
public int getGuaranteedLastIndexInUse() {
return ((int) GUARANTEED_LAST_INDEX_IN_USE.getAcquire(this));
}
public void setPrioritiseCleanup(boolean prioritiseCleanup) {
if (!prioritiseCleanup) {
if (prioritisedThread.compareAndSet(Thread.currentThread(), null)) {
PRIORITISE_CLEANUP.setVolatile(this, false);
} else {
assert !((boolean) PRIORITISE_CLEANUP.getVolatile(this));
}
} else {
if (prioritisedThread.compareAndSet(null, Thread.currentThread())) {
Thread.currentThread().setPriority(Thread.MAX_PRIORITY);
PRIORITISE_CLEANUP.setVolatile(this, true);
} else {
throw new IllegalStateException("A cleanup thread is already prioritised: " + prioritisedThread.get());
}
}
}
static {
try {
SIZE = MethodHandles.lookup()
.in(MemStatementList.class)
.findVarHandle(MemStatementList.class, "size", int.class);
} catch (ReflectiveOperationException e) {
throw new Error(e);
}
}
static {
try {
PREVIOUSLY_INSERTED_INDEX = MethodHandles.lookup()
.in(MemStatementList.class)
.findVarHandle(MemStatementList.class, "previouslyInsertedIndex", int.class);
} catch (ReflectiveOperationException e) {
throw new Error(e);
}
}
static {
try {
GUARANTEED_LAST_INDEX_IN_USE = MethodHandles.lookup()
.in(MemStatementList.class)
.findVarHandle(MemStatementList.class, "guaranteedLastIndexInUse", int.class);
} catch (ReflectiveOperationException e) {
throw new Error(e);
}
}
static {
try {
PRIORITISE_CLEANUP = MethodHandles.lookup()
.in(MemStatementList.class)
.findVarHandle(MemStatementList.class, "prioritiseCleanup", boolean.class);
} catch (ReflectiveOperationException e) {
throw new Error(e);
}
}
static {
try {
STATEMENTS = MethodHandles.lookup()
.in(MemStatementList.class)
.findVarHandle(MemStatementList.class, "statements", MemStatement[].class);
} catch (ReflectiveOperationException e) {
throw new Error(e);
}
}
static {
STATEMENTS_ARRAY = MethodHandles.arrayElementVarHandle(MemStatement[].class);
}
boolean verifySizeForTesting() {
MemStatement[] statements1 = getStatementsWithoutInterrupt();
int size = 0;
for (int i = 0; i < statements1.length; i++) {
if (statements1[i] != null) {
size++;
}
}
return size == size();
}
int getRealSizeForTesting() {
MemStatement[] statements1 = getStatementsWithoutInterrupt();
int size = 0;
for (int i = 0; i < statements1.length; i++) {
if (statements1[i] != null) {
size++;
}
}
return size;
}
}