LongMultithreadedTransactions.java
/*******************************************************************************
* Copyright (c) 2019 Eclipse RDF4J contributors.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Distribution License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: BSD-3-Clause
*******************************************************************************/
package org.eclipse.rdf4j.sail.nativerdf;
import java.io.File;
import java.util.Random;
import java.util.stream.IntStream;
import org.eclipse.rdf4j.common.transaction.IsolationLevels;
import org.eclipse.rdf4j.model.IRI;
import org.eclipse.rdf4j.model.ValueFactory;
import org.eclipse.rdf4j.model.impl.SimpleValueFactory;
import org.eclipse.rdf4j.model.vocabulary.RDFS;
import org.eclipse.rdf4j.sail.NotifyingSail;
import org.eclipse.rdf4j.sail.NotifyingSailConnection;
import org.eclipse.rdf4j.sail.SailConflictException;
import org.eclipse.rdf4j.sail.SailConnection;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
public class LongMultithreadedTransactions {
@TempDir
File tmpDir;
NotifyingSail getBaseSail() {
return new NativeStore(tmpDir);
}
@Test
@Disabled
public void test() {
ValueFactory vf = SimpleValueFactory.getInstance();
NotifyingSail baseSail = getBaseSail();
Random random = new Random(6424235);
IntStream.range(0, 10000).parallel().forEach(i -> {
try (SailConnection connection = baseSail.getConnection()) {
executeATransaction(vf, random, i, connection);
executeATransaction(vf, random, i, connection);
executeATransaction(vf, random, i, connection);
executeATransaction(vf, random, i, connection);
executeATransaction(vf, random, i, connection);
executeATransaction(vf, random, i, connection);
executeATransaction(vf, random, i, connection);
executeATransaction(vf, random, i, connection);
executeATransaction(vf, random, i, connection);
executeATransaction(vf, random, i, connection);
}
});
}
@Test
@Disabled
public void test1() {
ValueFactory vf = SimpleValueFactory.getInstance();
NotifyingSail baseSail = getBaseSail();
try (NotifyingSailConnection connection0 = baseSail.getConnection()) {
try (NotifyingSailConnection connection1 = baseSail.getConnection()) {
try (NotifyingSailConnection connection2 = baseSail.getConnection()) {
try (NotifyingSailConnection connection3 = baseSail.getConnection()) {
connection0.begin(IsolationLevels.SERIALIZABLE);
connection1.begin(IsolationLevels.SERIALIZABLE);
connection2.begin(IsolationLevels.SERIALIZABLE);
connection3.begin(IsolationLevels.SERIALIZABLE);
boolean b1 = connection0.hasStatement(null, null, null, true);
boolean b2 = connection1.hasStatement(null, null, null, true);
boolean b3 = connection2.hasStatement(null, null, null, true);
boolean b4 = connection3.hasStatement(null, null, null, true);
System.out.println(b1);
System.out.println(b2);
System.out.println(b3);
System.out.println(b4);
connection0.addStatement(vf.createBNode(), RDFS.LABEL, vf.createLiteral("a"));
connection1.addStatement(vf.createBNode(), RDFS.LABEL, vf.createLiteral("a"));
connection2.addStatement(vf.createBNode(), RDFS.LABEL, vf.createLiteral("a"));
connection3.addStatement(vf.createBNode(), RDFS.LABEL, vf.createLiteral("a"));
IntStream.range(0, 4).parallel().forEach(i -> {
if (i == 0) {
try {
connection0.prepare();
} catch (Exception e) {
e.printStackTrace();
connection0.rollback();
throw e;
}
}
if (i == 1) {
connection1.addStatement(vf.createIRI("http://example.com/" + i), RDFS.LABEL,
vf.createLiteral("a"));
}
if (i == 2) {
try {
connection2.prepare();
} catch (Exception e) {
e.printStackTrace();
connection2.rollback();
throw e;
}
}
if (i == 3) {
connection3.addStatement(vf.createIRI("http://example.com/" + i), RDFS.LABEL,
vf.createLiteral("a"));
}
});
connection1.prepare();
connection3.prepare();
connection0.commit();
connection1.commit();
connection2.commit();
connection3.commit();
}
}
}
}
}
private void executeATransaction(ValueFactory vf, Random r, int i, SailConnection connection) {
connection.begin(IsolationLevels.SERIALIZABLE);
boolean b = connection.hasStatement(null, null, null, true);
if (i % 10 == 0) {
connection.removeStatements(null, null, null);
connection.flush();
}
{
int function = r.nextInt(9);
IRI iri = vf.createIRI("http://example.com/" + r.nextInt(10));
int i1 = r.nextInt(i);
for (int k = 0; k < 1000; k++) {
switch (function) {
case 0:
connection.addStatement(iri, RDFS.LABEL, vf.createLiteral(k + "_" + i1));
break;
case 1:
connection.hasStatement(iri, RDFS.LABEL, vf.createLiteral(k + "_" + i1), true);
break;
case 2:
connection.hasStatement(null, RDFS.LABEL, vf.createLiteral(k + "_" + i1), true);
break;
case 3:
connection.hasStatement(iri, null, vf.createLiteral(k + "_" + i1), true);
break;
case 4:
connection.hasStatement(iri, RDFS.LABEL, null, true);
break;
case 5:
connection.hasStatement(null, null, vf.createLiteral(k + "_" + i1), true);
break;
case 6:
connection.hasStatement(iri, null, null, true);
break;
case 7:
connection.hasStatement(null, RDFS.LABEL, null, true);
break;
case 8:
connection.removeStatements(iri, RDFS.LABEL, vf.createLiteral(k + "_" + i1));
break;
}
Thread.yield();
connection.flush();
}
}
{
int function = r.nextInt(9);
IRI iri = vf.createIRI("http://example.com/" + r.nextInt(10));
int i1 = r.nextInt(i);
for (int k = 0; k < 1000; k++) {
switch (function) {
case 0:
connection.addStatement(iri, RDFS.LABEL, vf.createLiteral(k + "_" + i1));
break;
case 1:
connection.hasStatement(iri, RDFS.LABEL, vf.createLiteral(k + "_" + i1), true);
break;
case 2:
connection.hasStatement(null, RDFS.LABEL, vf.createLiteral(k + "_" + i1), true);
break;
case 3:
connection.hasStatement(iri, null, vf.createLiteral(k + "_" + i1), true);
break;
case 4:
connection.hasStatement(iri, RDFS.LABEL, null, true);
break;
case 5:
connection.hasStatement(null, null, vf.createLiteral(k + "_" + i1), true);
break;
case 6:
connection.hasStatement(iri, null, null, true);
break;
case 7:
connection.hasStatement(null, RDFS.LABEL, null, true);
break;
case 8:
connection.removeStatements(iri, RDFS.LABEL, vf.createLiteral(k + "_" + i1));
break;
}
Thread.yield();
}
}
{
int function = r.nextInt(9);
IRI iri = vf.createIRI("http://example.com/" + r.nextInt(10));
int i1 = r.nextInt(i);
for (int k = 0; k < 1000; k++) {
switch (function) {
case 0:
connection.addStatement(iri, RDFS.LABEL, vf.createLiteral(k + "_" + i1));
break;
case 1:
connection.hasStatement(iri, RDFS.LABEL, vf.createLiteral(k + "_" + i1), true);
break;
case 2:
connection.hasStatement(null, RDFS.LABEL, vf.createLiteral(k + "_" + i1), true);
break;
case 3:
connection.hasStatement(iri, null, vf.createLiteral(k + "_" + i1), true);
break;
case 4:
connection.hasStatement(iri, RDFS.LABEL, null, true);
break;
case 5:
connection.hasStatement(null, null, vf.createLiteral(k + "_" + i1), true);
break;
case 6:
connection.hasStatement(iri, null, null, true);
break;
case 7:
connection.hasStatement(null, RDFS.LABEL, null, true);
break;
case 8:
connection.removeStatements(iri, RDFS.LABEL, vf.createLiteral(k + "_" + i1));
break;
}
Thread.yield();
}
}
try {
Thread.yield();
connection.prepare();
Thread.yield();
connection.commit();
System.out.println(b);
} catch (SailConflictException ignore) {
connection.rollback();
executeATransaction(vf, r, i, connection);
}
}
}