SerializableTest.java
/*******************************************************************************
* Copyright (c) 2019 Eclipse RDF4J contributors.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Distribution License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: BSD-3-Clause
*******************************************************************************/
package org.eclipse.rdf4j.sail.shacl;
import static org.junit.jupiter.api.Assertions.assertThrows;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import org.apache.commons.io.IOUtils;
import org.eclipse.rdf4j.common.transaction.IsolationLevels;
import org.eclipse.rdf4j.model.IRI;
import org.eclipse.rdf4j.model.ValueFactory;
import org.eclipse.rdf4j.model.impl.SimpleValueFactory;
import org.eclipse.rdf4j.model.vocabulary.RDF;
import org.eclipse.rdf4j.model.vocabulary.RDF4J;
import org.eclipse.rdf4j.model.vocabulary.RDFS;
import org.eclipse.rdf4j.repository.RepositoryException;
import org.eclipse.rdf4j.repository.sail.SailRepository;
import org.eclipse.rdf4j.repository.sail.SailRepositoryConnection;
import org.eclipse.rdf4j.sail.shacl.results.ValidationReport;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.parallel.Isolated;
@Isolated
public class SerializableTest {
@Test
public void testMaxCountSnapshot() throws IOException, InterruptedException {
for (int i = 0; i < 10; i++) {
SailRepository repo = Utils.getInitializedShaclRepository("shaclMax.trig");
// ((ShaclSail) repo.getSail()).setGlobalLogValidationExecution(true);
multithreadedMaxCountViolation(IsolationLevels.SNAPSHOT, repo);
try (SailRepositoryConnection connection = repo.getConnection()) {
connection.begin();
ValidationReport revalidate = ((ShaclSailConnection) connection.getSailConnection()).revalidate();
// Rio.write(revalidate.asModel(), System.out, RDFFormat.TRIG);
Assertions.assertTrue(revalidate.conforms());
connection.commit();
}
repo.shutDown();
}
}
@Test
public void testMaxCountSerializable() throws IOException, InterruptedException {
SailRepository repo = Utils.getInitializedShaclRepository("shaclMax.trig");
multithreadedMaxCountViolation(IsolationLevels.SERIALIZABLE, repo);
try (SailRepositoryConnection connection = repo.getConnection()) {
connection.begin();
ValidationReport revalidate = ((ShaclSailConnection) connection.getSailConnection()).revalidate();
// Rio.write(revalidate.asModel(), System.out, RDFFormat.TRIG);
Assertions.assertTrue(revalidate.conforms());
connection.commit();
}
repo.shutDown();
}
@Test
public void testMaxCount2Serializable() throws IOException, InterruptedException {
SailRepository repo = Utils.getInitializedShaclRepository("shaclMax.trig");
multithreadedMaxCount2Violation(IsolationLevels.SERIALIZABLE, repo);
try (SailRepositoryConnection connection = repo.getConnection()) {
connection.begin();
ValidationReport revalidate = ((ShaclSailConnection) connection.getSailConnection()).revalidate();
// Rio.write(revalidate.asModel(), System.out, RDFFormat.TRIG);
Assertions.assertTrue(revalidate.conforms());
connection.commit();
}
repo.shutDown();
}
@Test
public void testMaxCount2Snapshot() throws IOException, InterruptedException {
SailRepository repo = Utils.getInitializedShaclRepository("shaclMax.trig");
multithreadedMaxCount2Violation(IsolationLevels.SNAPSHOT, repo);
try (SailRepositoryConnection connection = repo.getConnection()) {
connection.begin();
ValidationReport revalidate = ((ShaclSailConnection) connection.getSailConnection()).revalidate();
// Rio.write(revalidate.asModel(), System.out, RDFFormat.TRIG);
Assertions.assertTrue(revalidate.conforms());
connection.commit();
}
repo.shutDown();
}
@Test
public void testMaxCount3Snapshot() throws IOException, InterruptedException {
SailRepository repo = Utils.getInitializedShaclRepository("shaclMax.trig");
multithreadedMaxCount3Violation(IsolationLevels.SNAPSHOT, repo);
try (SailRepositoryConnection connection = repo.getConnection()) {
connection.begin();
ValidationReport revalidate = ((ShaclSailConnection) connection.getSailConnection()).revalidate();
// Rio.write(revalidate.asModel(), System.out, RDFFormat.TRIG);
Assertions.assertTrue(revalidate.conforms());
connection.commit();
}
repo.shutDown();
}
@Test
public void serializableParallelValidation() throws Throwable {
SailRepository repo = Utils
.getInitializedShaclRepository("test-cases/complex/targetShapeAndQualifiedShape/shacl.trig");
ShaclSail sail = (ShaclSail) repo.getSail();
sail.setShapesGraphs(Set.of(RDF4J.NIL));
sail.setParallelValidation(true);
sail.setEclipseRdf4jShaclExtensions(true);
try (SailRepositoryConnection connection = repo.getConnection()) {
connection.begin(IsolationLevels.SERIALIZABLE);
connection.prepareUpdate(IOUtils.toString(
Objects.requireNonNull(SerializableTest.class.getClassLoader()
.getResource("test-cases/complex/targetShapeAndQualifiedShape/invalid/case1/query1.rq")),
StandardCharsets.UTF_8)).execute();
assertThrows(ShaclSailValidationException.class, () -> {
try {
connection.commit();
} catch (RepositoryException e) {
throw e.getCause();
}
});
} finally {
repo.shutDown();
}
}
private void multithreadedMaxCountViolation(IsolationLevels isolationLevel, SailRepository repo)
throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(2);
ValueFactory vf = SimpleValueFactory.getInstance();
IRI iri = vf.createIRI("http://example.com/resouce1");
Runnable runnable1 = () -> {
try (SailRepositoryConnection connection = repo.getConnection()) {
connection.begin(isolationLevel);
countDownLatch.countDown();
connection.add(iri, RDF.TYPE, RDFS.RESOURCE);
connection.add(iri, RDFS.LABEL, vf.createLiteral("a"));
connection.add(iri, RDFS.LABEL, vf.createLiteral("b"));
try {
countDownLatch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
try {
connection.commit();
} catch (Exception ignored) {
}
}
};
Runnable runnable2 = () -> {
try (SailRepositoryConnection connection = repo.getConnection()) {
connection.begin(isolationLevel);
countDownLatch.countDown();
connection.add(iri, RDF.TYPE, RDFS.RESOURCE);
connection.add(iri, RDFS.LABEL, vf.createLiteral("c"));
connection.add(iri, RDFS.LABEL, vf.createLiteral("d"));
try {
countDownLatch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
try {
connection.commit();
} catch (Exception ignored) {
}
}
};
Thread thread1 = new Thread(runnable1);
Thread thread2 = new Thread(runnable2);
thread1.start();
thread2.start();
thread1.join();
thread2.join();
}
private void multithreadedMaxCount2Violation(IsolationLevels isolationLevel, SailRepository repo)
throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(2);
ValueFactory vf = SimpleValueFactory.getInstance();
IRI iri = vf.createIRI("http://example.com/resouce1");
Runnable runnable1 = () -> {
try (SailRepositoryConnection connection = repo.getConnection()) {
connection.begin(isolationLevel);
connection.add(iri, RDF.TYPE, RDFS.RESOURCE);
countDownLatch.countDown();
try {
countDownLatch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
try {
connection.commit();
} catch (Exception ignored) {
}
}
};
Runnable runnable2 = () -> {
try (SailRepositoryConnection connection = repo.getConnection()) {
connection.begin(isolationLevel);
connection.add(iri, RDFS.LABEL, vf.createLiteral("a"));
connection.add(iri, RDFS.LABEL, vf.createLiteral("b"));
connection.add(iri, RDFS.LABEL, vf.createLiteral("c"));
countDownLatch.countDown();
try {
countDownLatch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
try {
connection.commit();
} catch (Exception ignored) {
}
}
};
Thread thread1 = new Thread(runnable1);
Thread thread2 = new Thread(runnable2);
thread1.start();
thread2.start();
thread1.join();
thread2.join();
}
private void multithreadedMaxCount3Violation(IsolationLevels isolationLevel, SailRepository repo)
throws InterruptedException {
CountDownLatch syncPoint1 = new CountDownLatch(1);
CountDownLatch syncPoint2 = new CountDownLatch(1);
CountDownLatch syncPoint3 = new CountDownLatch(1);
CountDownLatch syncPoint4 = new CountDownLatch(2);
ValueFactory vf = SimpleValueFactory.getInstance();
IRI resource1 = vf.createIRI("http://example.com/resource1");
IRI resource2 = vf.createIRI("http://example.com/resource2");
Runnable runnable1 = () -> {
try (SailRepositoryConnection connection = repo.getConnection()) {
syncPoint1.countDown();
connection.begin(isolationLevel);
connection.add(resource1, RDF.TYPE, RDFS.RESOURCE);
syncPoint2.await();
syncPoint4.await();
connection.commit();
} catch (Exception e) {
System.out.println("runnable1: " + e.getMessage());
}
};
Runnable runnable2 = () -> {
try {
syncPoint1.await();
try (SailRepositoryConnection connection = repo.getConnection()) {
connection.begin(isolationLevel);
connection.add(resource1, RDFS.LABEL, vf.createLiteral("a"));
connection.add(resource1, RDFS.LABEL, vf.createLiteral("b"));
connection.add(resource1, RDFS.LABEL, vf.createLiteral("c"));
connection.add(resource2, RDF.TYPE, RDFS.RESOURCE);
syncPoint3.countDown();
syncPoint4.await();
syncPoint2.countDown();
connection.commit();
}
} catch (Exception e) {
System.out.println("runnable2: " + e.getMessage());
}
};
Runnable runnable3 = () -> {
try {
syncPoint3.await();
try (SailRepositoryConnection connection = repo.getConnection()) {
connection.begin(IsolationLevels.READ_COMMITTED);
connection.add(resource2, RDFS.LABEL, vf.createLiteral("d"));
connection.add(resource2, RDFS.LABEL, vf.createLiteral("e"));
connection.add(resource2, RDFS.LABEL, vf.createLiteral("f"));
syncPoint4.countDown();
syncPoint2.await();
connection.commit();
}
} catch (Exception e) {
System.out.println("runnable3: " + e.getMessage());
}
};
Runnable runnable4 = () -> {
try {
syncPoint3.await();
try (SailRepositoryConnection connection = repo.getConnection()) {
connection.begin(IsolationLevels.READ_COMMITTED);
connection.add(resource2, RDFS.LABEL, vf.createLiteral("d"));
connection.add(resource2, RDFS.LABEL, vf.createLiteral("e"));
connection.add(resource2, RDFS.LABEL, vf.createLiteral("f"));
syncPoint4.countDown();
syncPoint2.await();
connection.commit();
}
} catch (Exception e) {
System.out.println("runnable4: " + e.getMessage());
}
};
Thread[] threads = {
new Thread(runnable1),
new Thread(runnable2),
new Thread(runnable3),
new Thread(runnable4)
};
for (Thread thread : threads) {
thread.start();
}
for (Thread thread : threads) {
thread.join();
}
}
}