IsolationLevelTest.java
/*******************************************************************************
* Copyright (c) 2015 Eclipse RDF4J contributors, Aduna, and others.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Distribution License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: BSD-3-Clause
*******************************************************************************/
package org.eclipse.rdf4j.testsuite.repository.optimistic;
import static org.junit.Assert.assertEquals;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.eclipse.rdf4j.common.iteration.CloseableIteration;
import org.eclipse.rdf4j.common.transaction.IsolationLevel;
import org.eclipse.rdf4j.common.transaction.IsolationLevels;
import org.eclipse.rdf4j.model.IRI;
import org.eclipse.rdf4j.model.Literal;
import org.eclipse.rdf4j.model.Resource;
import org.eclipse.rdf4j.model.Statement;
import org.eclipse.rdf4j.model.Value;
import org.eclipse.rdf4j.model.ValueFactory;
import org.eclipse.rdf4j.model.base.CoreDatatype;
import org.eclipse.rdf4j.model.vocabulary.RDF;
import org.eclipse.rdf4j.repository.Repository;
import org.eclipse.rdf4j.repository.RepositoryConnection;
import org.eclipse.rdf4j.repository.RepositoryException;
import org.eclipse.rdf4j.repository.UnknownTransactionStateException;
import org.eclipse.rdf4j.testsuite.repository.OptimisticIsolationTest;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Test that the Repository correctly supports claimed isolation levels.
*
* @author James Leigh
*/
public class IsolationLevelTest {
@BeforeClass
public static void setUpClass() {
System.setProperty("org.eclipse.rdf4j.repository.debug", "true");
}
@AfterClass
public static void afterClass() {
System.setProperty("org.eclipse.rdf4j.repository.debug", "false");
}
private final Logger logger = LoggerFactory.getLogger(IsolationLevelTest.class);
/*-----------*
* Variables *
*-----------*/
protected Repository store;
private String failedMessage;
private Throwable failed;
/*---------*
* Methods *
*---------*/
@Before
public void setUp() throws Exception {
store = OptimisticIsolationTest.getEmptyInitializedRepository(IsolationLevelTest.class);
failed = null;
}
@After
public void tearDown() {
store.shutDown();
}
protected boolean isSupported(IsolationLevels level) throws RepositoryException {
try (RepositoryConnection con = store.getConnection()) {
try {
con.begin(level);
return true;
} finally {
con.rollback();
}
} catch (UnknownTransactionStateException e) {
return false;
}
}
@Test
public void testNone() {
readPending(IsolationLevels.NONE);
}
@Test
public void testReadUncommitted() {
rollbackTriple(IsolationLevels.READ_UNCOMMITTED);
readPending(IsolationLevels.READ_UNCOMMITTED);
}
@Test
public void testReadCommitted() throws Exception {
readCommitted(IsolationLevels.READ_COMMITTED);
rollbackTriple(IsolationLevels.READ_COMMITTED);
readPending(IsolationLevels.READ_COMMITTED);
}
@Test
public void testSnapshotRead() throws Exception {
if (isSupported(IsolationLevels.SNAPSHOT_READ)) {
snapshotRead(IsolationLevels.SNAPSHOT_READ);
readCommitted(IsolationLevels.SNAPSHOT_READ);
rollbackTriple(IsolationLevels.SNAPSHOT_READ);
readPending(IsolationLevels.SNAPSHOT_READ);
} else {
logger.warn("{} does not support {}", store, IsolationLevels.SNAPSHOT_READ);
}
}
@Test
public void testSnapshot() throws Exception {
if (isSupported(IsolationLevels.SNAPSHOT)) {
snapshot(IsolationLevels.SNAPSHOT);
snapshotRead(IsolationLevels.SNAPSHOT);
repeatableRead(IsolationLevels.SNAPSHOT);
readCommitted(IsolationLevels.SNAPSHOT);
rollbackTriple(IsolationLevels.SNAPSHOT);
readPending(IsolationLevels.SNAPSHOT);
} else {
logger.warn("{} does not support {}", store, IsolationLevels.SNAPSHOT);
}
}
@Test
public void testSerializable() throws Exception {
if (isSupported(IsolationLevels.SERIALIZABLE)) {
serializable(IsolationLevels.SERIALIZABLE);
snapshot(IsolationLevels.SERIALIZABLE);
snapshotRead(IsolationLevels.SERIALIZABLE);
repeatableRead(IsolationLevels.SERIALIZABLE);
readCommitted(IsolationLevels.SERIALIZABLE);
rollbackTriple(IsolationLevels.SERIALIZABLE);
readPending(IsolationLevels.SERIALIZABLE);
} else {
logger.warn("{} does not support {}", store, IsolationLevels.SERIALIZABLE);
}
}
/**
* Every connection must support reading it own changes
*/
private void readPending(IsolationLevel level) throws RepositoryException {
clear(store);
try (RepositoryConnection con = store.getConnection()) {
con.begin(level);
con.add(RDF.NIL, RDF.TYPE, RDF.LIST);
assertEquals(1, count(con, RDF.NIL, RDF.TYPE, RDF.LIST, false));
con.remove(RDF.NIL, RDF.TYPE, RDF.LIST);
con.commit();
}
}
/**
* Supports rolling back added triples
*/
private void rollbackTriple(IsolationLevel level) throws RepositoryException {
clear(store);
try (RepositoryConnection con = store.getConnection()) {
con.begin(level);
con.add(RDF.NIL, RDF.TYPE, RDF.LIST);
con.rollback();
assertEquals(0, count(con, RDF.NIL, RDF.TYPE, RDF.LIST, false));
}
}
/**
* Read operations must not see uncommitted changes
*/
private void readCommitted(final IsolationLevel level) throws Exception {
clear(store);
final CountDownLatch start = new CountDownLatch(2);
final CountDownLatch begin = new CountDownLatch(1);
final CountDownLatch uncommitted = new CountDownLatch(1);
Thread writer = new Thread(() -> {
try (RepositoryConnection write = store.getConnection()) {
start.countDown();
start.await();
write.begin(level);
write.add(RDF.NIL, RDF.TYPE, RDF.LIST);
begin.countDown();
uncommitted.await(1, TimeUnit.SECONDS);
write.rollback();
} catch (Throwable e) {
fail("Writer failed", e);
}
});
Thread reader = new Thread(() -> {
try (RepositoryConnection read = store.getConnection()) {
start.countDown();
start.await();
begin.await();
read.begin(level);
// must not read uncommitted changes
long counted = count(read, RDF.NIL, RDF.TYPE, RDF.LIST, false);
uncommitted.countDown();
try {
read.commit();
} catch (RepositoryException e) {
// it is okay to abort after a dirty read
// e.printStackTrace();
return;
}
// not read if transaction is consistent
assertEquals(0, counted);
} catch (Throwable e) {
fail("Reader failed", e);
}
});
reader.start();
writer.start();
reader.join();
writer.join();
assertNotFailed();
}
/**
* Any statement read in a transaction must remain present until the transaction is over
*/
private void repeatableRead(final IsolationLevels level) throws Exception {
clear(store);
final CountDownLatch start = new CountDownLatch(2);
final CountDownLatch begin = new CountDownLatch(1);
final CountDownLatch observed = new CountDownLatch(1);
final CountDownLatch changed = new CountDownLatch(1);
Thread writer = new Thread(() -> {
try (RepositoryConnection write = store.getConnection()) {
start.countDown();
start.await();
write.begin(level);
write.add(RDF.NIL, RDF.TYPE, RDF.LIST);
write.commit();
begin.countDown();
observed.await();
write.begin(level);
write.remove(RDF.NIL, RDF.TYPE, RDF.LIST);
write.commit();
changed.countDown();
} catch (Throwable e) {
fail("Writer failed", e);
}
});
Thread reader = new Thread(() -> {
try (RepositoryConnection read = store.getConnection()) {
start.countDown();
start.await();
begin.await();
read.begin(level);
long first = count(read, RDF.NIL, RDF.TYPE, RDF.LIST, false);
assertEquals(1, first);
observed.countDown();
changed.await(1, TimeUnit.SECONDS);
// observed statements must continue to exist
long second = count(read, RDF.NIL, RDF.TYPE, RDF.LIST, false);
try {
read.commit();
} catch (RepositoryException e) {
// it is okay to abort on inconsistency
// e.printStackTrace();
read.rollback();
return;
}
// statement must continue to exist if transaction consistent
assertEquals(first, second);
} catch (Throwable e) {
fail("Reader failed", e);
}
});
reader.start();
writer.start();
reader.join();
writer.join();
assertNotFailed();
}
/**
* Query results must not include statements added after the first result is read
*/
private void snapshotRead(IsolationLevel level) throws RepositoryException {
clear(store);
try (RepositoryConnection con = store.getConnection()) {
con.begin(level);
int size = 1;
for (int i = 0; i < size; i++) {
insertTestStatement(con, i);
}
int counter = 0;
try (CloseableIteration<? extends Statement> stmts = con.getStatements(null, null,
null, false)) {
while (stmts.hasNext()) {
Statement st = stmts.next();
counter++;
if (counter < size) {
// remove observed statement to force new state
con.remove(st.getSubject(), st.getPredicate(), st.getObject(), st.getContext());
insertTestStatement(con, size + counter);
insertTestStatement(con, size + size + counter);
}
}
}
try {
con.commit();
} catch (RepositoryException e) {
// it is okay to abort after a dirty read
e.printStackTrace();
return;
}
assertEquals(size, counter);
}
}
/**
* Reader observes the complete state of the store and ensure that does not change
*/
private void snapshot(final IsolationLevels level) throws Exception {
clear(store);
final CountDownLatch start = new CountDownLatch(2);
final CountDownLatch begin = new CountDownLatch(1);
final CountDownLatch observed = new CountDownLatch(1);
final CountDownLatch changed = new CountDownLatch(1);
Thread writer = new Thread(() -> {
try {
try (RepositoryConnection write = store.getConnection()) {
start.countDown();
start.await();
write.begin(level);
insertTestStatement(write, 1);
write.commit();
begin.countDown();
observed.await(1, TimeUnit.SECONDS);
write.begin(level);
insertTestStatement(write, 2);
write.commit();
changed.countDown();
}
} catch (Throwable e) {
fail("Writer failed", e);
}
});
Thread reader = new Thread(() -> {
try (RepositoryConnection read = store.getConnection()) {
start.countDown();
start.await();
begin.await();
read.begin(level);
long first = count(read, null, null, null, false);
observed.countDown();
changed.await(1, TimeUnit.SECONDS);
// new statements must not be observed
long second = count(read, null, null, null, false);
try {
read.commit();
} catch (RepositoryException e) {
// it is okay to abort on inconsistency
// e.printStackTrace();
read.rollback();
return;
}
// store must not change if transaction consistent
assertEquals(first, second);
} catch (Throwable e) {
fail("Reader failed", e);
}
});
reader.start();
writer.start();
reader.join();
writer.join();
assertNotFailed();
}
/**
* Two transactions read a value and replace it
*/
private void serializable(final IsolationLevels level) throws Exception {
clear(store);
final ValueFactory vf = store.getValueFactory();
final IRI subj = vf.createIRI("http://test#s");
final IRI pred = vf.createIRI("http://test#p");
try (RepositoryConnection prep = store.getConnection()) {
prep.begin(level);
prep.add(subj, pred, vf.createLiteral(1));
prep.commit();
}
final CountDownLatch start = new CountDownLatch(2);
final CountDownLatch observed = new CountDownLatch(2);
Thread t1 = incrementBy(start, observed, level, vf, subj, pred, 3);
Thread t2 = incrementBy(start, observed, level, vf, subj, pred, 5);
t2.start();
t1.start();
t2.join();
t1.join();
assertNotFailed();
try (RepositoryConnection check = store.getConnection()) {
check.begin(level);
Literal lit = readLiteral(check, subj, pred);
int val = lit.intValue();
// val could be 4 or 6 if one transaction was aborted
if (val != 4 && val != 6) {
assertEquals(9, val);
}
check.commit();
}
}
protected Thread incrementBy(final CountDownLatch start, final CountDownLatch observed, final IsolationLevels level,
final ValueFactory vf, final IRI subj, final IRI pred, final int by) {
return new Thread(() -> {
try (RepositoryConnection con = store.getConnection()) {
start.countDown();
start.await();
con.begin(level);
Literal o1 = readLiteral(con, subj, pred);
observed.countDown();
observed.await(1, TimeUnit.SECONDS);
con.remove(subj, pred, o1);
con.add(subj, pred, vf.createLiteral(o1.intValue() + by));
try {
con.commit();
} catch (RepositoryException e) {
// it is okay to abort on conflict
// e.printStackTrace();
con.rollback();
}
} catch (Throwable e) {
fail("Increment " + by + " failed", e);
}
});
}
private void clear(Repository store) throws RepositoryException {
try (RepositoryConnection con = store.getConnection()) {
con.begin();
con.clear();
con.commit();
}
}
protected long count(RepositoryConnection con, Resource subj, IRI pred, Value obj, boolean includeInferred,
Resource... contexts) throws RepositoryException {
try (CloseableIteration<Statement> stmts = con.getStatements(subj, pred, obj,
includeInferred, contexts)) {
long counter = 0;
while (stmts.hasNext()) {
stmts.next();
counter++;
}
return counter;
}
}
protected Literal readLiteral(RepositoryConnection con, final IRI subj, final IRI pred) throws RepositoryException {
try (CloseableIteration<? extends Statement> stmts = con.getStatements(subj, pred, null,
false)) {
if (!stmts.hasNext()) {
return null;
}
Value obj = stmts.next().getObject();
if (stmts.hasNext()) {
org.junit.Assert.fail("multiple literals: " + obj + " and " + stmts.next());
}
return (Literal) obj;
}
}
protected void insertTestStatement(RepositoryConnection connection, int i) throws RepositoryException {
ValueFactory vf = connection.getValueFactory();
Literal lit = vf.createLiteral(Integer.toString(i), CoreDatatype.XSD.INTEGER);
connection.add(vf.createIRI("http://test#s" + i), vf.createIRI("http://test#p"), lit,
vf.createIRI("http://test#context_" + i));
}
protected synchronized void fail(String message, Throwable t) {
failedMessage = message;
failed = t;
}
protected synchronized void assertNotFailed() {
if (failed != null) {
throw (AssertionError) new AssertionError(failedMessage).initCause(failed);
}
}
}