ShaclSail.java
/*******************************************************************************
* Copyright (c) 2018 Eclipse RDF4J contributors.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Distribution License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: BSD-3-Clause
*******************************************************************************/
package org.eclipse.rdf4j.sail.shacl;
import java.io.File;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import org.eclipse.rdf4j.common.annotation.Experimental;
import org.eclipse.rdf4j.common.annotation.InternalUseOnly;
import org.eclipse.rdf4j.common.concurrent.locks.ReadPrefReadWriteLockManager;
import org.eclipse.rdf4j.common.concurrent.locks.StampedLockManager;
import org.eclipse.rdf4j.common.concurrent.locks.diagnostics.ConcurrentCleaner;
import org.eclipse.rdf4j.common.transaction.IsolationLevels;
import org.eclipse.rdf4j.common.transaction.TransactionSetting;
import org.eclipse.rdf4j.model.IRI;
import org.eclipse.rdf4j.model.vocabulary.DASH;
import org.eclipse.rdf4j.model.vocabulary.RDF4J;
import org.eclipse.rdf4j.model.vocabulary.RSX;
import org.eclipse.rdf4j.model.vocabulary.SESAME;
import org.eclipse.rdf4j.model.vocabulary.SHACL;
import org.eclipse.rdf4j.repository.Repository;
import org.eclipse.rdf4j.repository.RepositoryConnection;
import org.eclipse.rdf4j.repository.sail.SailRepository;
import org.eclipse.rdf4j.repository.sail.SailRepositoryConnection;
import org.eclipse.rdf4j.sail.NotifyingSail;
import org.eclipse.rdf4j.sail.NotifyingSailConnection;
import org.eclipse.rdf4j.sail.Sail;
import org.eclipse.rdf4j.sail.SailConnection;
import org.eclipse.rdf4j.sail.SailException;
import org.eclipse.rdf4j.sail.memory.MemoryStore;
import org.eclipse.rdf4j.sail.memory.MemoryStoreConnection;
import org.eclipse.rdf4j.sail.shacl.ast.ContextWithShape;
import org.eclipse.rdf4j.sail.shacl.ast.Shape;
import org.eclipse.rdf4j.sail.shacl.wrapper.shape.CombinedShapeSource;
import org.eclipse.rdf4j.sail.shacl.wrapper.shape.Rdf4jShaclShapeGraphShapeSource;
import org.eclipse.rdf4j.sail.shacl.wrapper.shape.ShapeSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
//@formatter:off
/**
* A {@link Sail} implementation that adds support for the Shapes Constraint Language (SHACL).
* <p>
* The ShaclSail looks for SHACL shape data in a special named graph {@link RDF4J#SHACL_SHAPE_GRAPH}.
* <h4>Working example</h4>
* <p>
*
* <pre>
* import java.io.IOException;
* import java.io.StringReader;
*
* import org.eclipse.rdf4j.common.exception.ValidationException;
* import org.eclipse.rdf4j.model.Model;
* import org.eclipse.rdf4j.model.vocabulary.RDF4J;
* import org.eclipse.rdf4j.repository.RepositoryException;
* import org.eclipse.rdf4j.repository.sail.SailRepository;
* import org.eclipse.rdf4j.repository.sail.SailRepositoryConnection;
* import org.eclipse.rdf4j.rio.RDFFormat;
* import org.eclipse.rdf4j.rio.Rio;
* import org.eclipse.rdf4j.rio.WriterConfig;
* import org.eclipse.rdf4j.rio.helpers.BasicWriterSettings;
* import org.eclipse.rdf4j.sail.memory.MemoryStore;
* import org.eclipse.rdf4j.sail.shacl.ShaclSail;
*
* public class ShaclSampleCode {
*
* public static void main(String[] args) throws IOException {
*
* ShaclSail shaclSail = new ShaclSail(new MemoryStore());
*
* SailRepository sailRepository = new SailRepository(shaclSail);
* sailRepository.init();
*
* try (SailRepositoryConnection connection = sailRepository.getConnection()) {
*
* connection.begin();
*
* StringReader shaclRules = new StringReader(String.join("\n", "",
* "@prefix ex: <http://example.com/ns#> .",
* "@prefix sh: <http://www.w3.org/ns/shacl#> .",
* "@prefix xsd: <http://www.w3.org/2001/XMLSchema#> .",
* "@prefix foaf: <http://xmlns.com/foaf/0.1/>.",
*
* "ex:PersonShape",
* " a sh:NodeShape ;",
* " sh:targetClass foaf:Person ;",
* " sh:property ex:PersonShapeProperty .",
*
* "ex:PersonShapeProperty ",
* " sh:path foaf:age ;",
* " sh:datatype xsd:int ;",
* " sh:maxCount 1 ;",
* " sh:minCount 1 ."
* ));
*
* connection.add(shaclRules, "", RDFFormat.TURTLE, RDF4J.SHACL_SHAPE_GRAPH);
* connection.commit();
*
* connection.begin();
*
* StringReader invalidSampleData = new StringReader(String.join("\n", "",
* "@prefix ex: <http://example.com/ns#> .",
* "@prefix foaf: <http://xmlns.com/foaf/0.1/>.",
* "@prefix xsd: <http://www.w3.org/2001/XMLSchema#> .",
*
* "ex:peter a foaf:Person ;",
* " foaf:age 20, \"30\"^^xsd:int ."
*
* ));
*
* connection.add(invalidSampleData, "", RDFFormat.TURTLE);
* try {
* connection.commit();
* } catch (RepositoryException exception) {
* Throwable cause = exception.getCause();
* if (cause instanceof ValidationException) {
*
* // use the validationReportModel to understand validation violations
* Model validationReportModel = ((ValidationException) cause).validationReportAsModel();
*
* // Pretty print the validation report
* WriterConfig writerConfig = new WriterConfig()
* .set(BasicWriterSettings.PRETTY_PRINT, true)
* .set(BasicWriterSettings.INLINE_BLANK_NODES, true);
*
* Rio.write(validationReportModel, System.out, RDFFormat.TURTLE, writerConfig);
* System.out.println();
* }
*
* throw exception;
* }
* }
* }
* }
* </pre>
*
* @author Heshan Jayasinghe
* @author H��vard Ottestad
* @see <a href="https://www.w3.org/TR/shacl/">SHACL W3C Recommendation</a>
*/
//@formatter:on
public class ShaclSail extends ShaclSailBaseConfiguration {
private static final int AVAILABLE_PROCESSORS = Runtime.getRuntime().availableProcessors();
private static final Logger logger = LoggerFactory.getLogger(ShaclSail.class);
private static final ConcurrentCleaner cleaner = new ConcurrentCleaner();
/**
* an initialized {@link Repository} for storing/retrieving Shapes data
*/
private SailRepository shapesRepo;
// lockManager used for read/write locks used to synchronize validation so that SNAPSHOT isolation is sufficient to
// achieve SERIALIZABLE isolation wrt. validation
final ReadPrefReadWriteLockManager serializableValidationLock = new ReadPrefReadWriteLockManager(
"ShaclSail_SerializableValidation");
// shapesCacheLockManager used to keep track of changes to the cache
private StampedLockManager.Cache<List<ContextWithShape>> cachedShapes;
// true if the base sail supports IsolationLevels.SNAPSHOT
private boolean supportsSnapshotIsolation;
// This is used to keep track of the current connection, if the opening and closing of connections is done serially.
// If it is done in parallel, then this will catch that and the multipleConcurrentConnections == true.
private final transient AtomicLong singleConnectionCounter = new AtomicLong();
final Object singleConnectionMonitor = new Object();
private final AtomicBoolean initialized = new AtomicBoolean(false);
private final RevivableExecutorService executorService;
@InternalUseOnly
StampedLockManager.Cache<List<ContextWithShape>>.WritableState getCachedShapesForWriting()
throws InterruptedException {
return cachedShapes.getWriteState();
}
@InternalUseOnly
public StampedLockManager.Cache<List<ContextWithShape>>.ReadableState getCachedShapes()
throws InterruptedException {
return cachedShapes.getReadState();
}
static class CleanableState implements Runnable {
private final AtomicBoolean initialized;
private final ExecutorService executorService;
CleanableState(AtomicBoolean initialized, ExecutorService executorService) {
this.initialized = initialized;
this.executorService = executorService;
}
public void run() {
if (initialized.get()) {
logger.error("ShaclSail was garbage collected without shutdown() having been called first.");
}
executorService.shutdownNow();
}
}
public ShaclSail(NotifyingSail baseSail) {
super(baseSail);
executorService = getExecutorService();
cleaner.register(this, new CleanableState(initialized, executorService));
this.supportsSnapshotIsolation = baseSail.getSupportedIsolationLevels().contains(IsolationLevels.SNAPSHOT);
}
public ShaclSail() {
super();
executorService = getExecutorService();
cleaner.register(this, new CleanableState(initialized, executorService));
}
@Override
public void setBaseSail(Sail baseSail) {
super.setBaseSail(baseSail);
this.supportsSnapshotIsolation = baseSail.getSupportedIsolationLevels().contains(IsolationLevels.SNAPSHOT);
}
/**
* @return
* @implNote This is an extension point for configuring a different executor service for parallel validation. The
* code is marked as experimental because it may change from one minor release to another.
*/
@Experimental
protected RevivableExecutorService getExecutorService() {
return new RevivableExecutorService(
() -> Executors.newFixedThreadPool(AVAILABLE_PROCESSORS,
r -> {
Thread t = Executors.defaultThreadFactory().newThread(r);
// this thread pool does not need to stick around if the all other threads are done, because
// it is only used for SHACL validation and if all other threads have ended then there would
// be no thread to receive the validation results.
t.setDaemon(true);
t.setName("ShaclSail validation thread " + t.getId());
return t;
}));
}
void closeConnection() {
// closing a connection will set the to zero if there are currently no other connections open
singleConnectionCounter.compareAndSet(1, 0);
}
boolean usesSingleConnection() {
// return false; // if this method returns false, then the connection will always use the new serializable validation
assert singleConnectionCounter.get() != 0;
return singleConnectionCounter.get() == 1;
}
/**
* Lists the predicates that have been implemented in the ShaclSail. All of these, and all combinations,
* <i>should</i> work, please report any bugs. For sh:path, only single predicate paths, or single predicate inverse
* paths are supported. DASH and RSX features may need to be enabled.
*
* @return List of IRIs (SHACL predicates)
*/
public static List<IRI> getSupportedShaclPredicates() {
return Arrays.asList(
SHACL.TARGET_CLASS,
SHACL.PATH,
SHACL.PROPERTY,
SHACL.OR,
SHACL.AND,
SHACL.MIN_COUNT,
SHACL.MAX_COUNT,
SHACL.MIN_LENGTH,
SHACL.MAX_LENGTH,
SHACL.PATTERN,
SHACL.FLAGS,
SHACL.NODE_KIND_PROP,
SHACL.LANGUAGE_IN,
SHACL.DATATYPE,
SHACL.MIN_EXCLUSIVE,
SHACL.MIN_INCLUSIVE,
SHACL.MAX_EXCLUSIVE,
SHACL.MAX_INCLUSIVE,
SHACL.CLASS,
SHACL.TARGET_NODE,
SHACL.DEACTIVATED,
SHACL.TARGET_SUBJECTS_OF,
SHACL.IN,
SHACL.UNIQUE_LANG,
SHACL.NOT,
SHACL.TARGET_OBJECTS_OF,
SHACL.HAS_VALUE,
SHACL.TARGET_PROP,
SHACL.INVERSE_PATH,
SHACL.ALTERNATIVE_PATH,
// SHACL.ONE_OR_MORE_PATH,
// SHACL.ZERO_OR_MORE_PATH,
SHACL.NODE,
SHACL.QUALIFIED_MAX_COUNT,
SHACL.QUALIFIED_MIN_COUNT,
SHACL.QUALIFIED_VALUE_SHAPE,
SHACL.SHAPES_GRAPH,
SHACL.MESSAGE,
SHACL.NAME,
SHACL.DESCRIPTION,
SHACL.DEFAULT_VALUE,
SHACL.ORDER,
SHACL.GROUP,
SHACL.DECLARE,
SHACL.SPARQL,
SHACL.SELECT,
SHACL.PREFIXES,
SHACL.PREFIX_PROP,
SHACL.NAMESPACE_PROP,
SHACL.SEVERITY_PROP,
SHACL.EQUALS,
SHACL.LESS_THAN,
SHACL.LESS_THAN_OR_EQUALS,
SHACL.CLOSED,
SHACL.IGNORED_PROPERTIES,
DASH.hasValueIn,
RSX.targetShape,
RSX.dataGraph,
RSX.shapesGraph
);
}
@Override
public void init() throws SailException {
if (!initialized.compareAndSet(false, true)) {
// already initialized
return;
}
super.init();
executorService.init();
if (shapesRepo != null) {
shapesRepo.shutDown();
shapesRepo = null;
}
if (super.getBaseSail().getDataDir() != null) {
String path = super.getBaseSail().getDataDir().getPath();
if (path.endsWith("/")) {
path = path.substring(0, path.length() - 1);
}
path = path + "/shapes-graph/";
logger.info("Shapes will be persisted in: " + path);
MemoryStore sail = new MemoryStore(new File(path));
sail.setConnectionTimeOut(1000);
shapesRepo = new SailRepository(sail);
} else {
MemoryStore sail = new MemoryStore();
sail.setConnectionTimeOut(1000);
shapesRepo = new SailRepository(sail);
}
shapesRepo.init();
try (SailRepositoryConnection shapesRepoConnection = shapesRepo.getConnection()) {
shapesRepoConnection.begin(IsolationLevels.NONE);
shapesRepoConnection.commit();
}
cachedShapes = new StampedLockManager.Cache<>(new StampedLockManager(), () -> {
IRI[] shapesGraphs = getShapesGraphs().stream()
.map(g -> {
if (g.equals(RDF4J.NIL)) {
return null;
}
if (g.equals(SESAME.NIL)) {
return null;
}
return g;
})
.toArray(IRI[]::new);
boolean onlyRdf4jShaclShapeGraph = shapesGraphs.length == 1
&& RDF4J.SHACL_SHAPE_GRAPH.equals(shapesGraphs[0]);
return getShapes(shapesGraphs, onlyRdf4jShaclShapeGraph);
});
try {
cachedShapes.warmUp();
} catch (InterruptedException e) {
throw convertToSailException(e);
}
}
@InternalUseOnly
public List<ContextWithShape> getShapes(RepositoryConnection shapesRepoConnection, SailConnection sailConnection,
IRI[] shapesGraphs) throws SailException {
try (ShapeSource shapeSource = new CombinedShapeSource(shapesRepoConnection, sailConnection)
.withContext(shapesGraphs)) {
return Shape.Factory.getShapes(shapeSource,
new Shape.ParseSettings(isEclipseRdf4jShaclExtensions(), isDashDataShapes()));
}
}
@InternalUseOnly
public List<ContextWithShape> getShapes(RepositoryConnection shapesRepoConnection, IRI[] shapesGraphs)
throws SailException {
try (ShapeSource shapeSource = new Rdf4jShaclShapeGraphShapeSource(shapesRepoConnection)
.withContext(shapesGraphs)) {
return Shape.Factory.getShapes(shapeSource,
new Shape.ParseSettings(isEclipseRdf4jShaclExtensions(), isDashDataShapes()));
}
}
@Override
public synchronized void shutDown() throws SailException {
if (shapesRepo != null) {
shapesRepo.shutDown();
shapesRepo = null;
}
cachedShapes = null;
boolean terminated = shutdownExecutorService(false);
initialized.set(false);
super.shutDown();
if (!terminated) {
shutdownExecutorService(true);
}
}
private boolean shutdownExecutorService(boolean forced) {
boolean terminated = false;
executorService.shutdown();
try {
terminated = executorService.awaitTermination(200, TimeUnit.MILLISECONDS);
} catch (InterruptedException ignored) {
Thread.currentThread().interrupt();
}
if (forced && !terminated) {
executorService.shutdownNow();
logger.error("Shutdown ShaclSail while validation is still running.");
terminated = true;
}
return terminated;
}
<T> Future<T> submitToExecutorService(Callable<T> runnable) {
return executorService.submit(runnable);
}
@Override
public NotifyingSailConnection getConnection() throws SailException {
init();
synchronized (singleConnectionMonitor) {
singleConnectionCounter.incrementAndGet();
}
try {
NotifyingSailConnection connection = super.getConnection();
if (connection instanceof MemoryStoreConnection) {
if (isSerializableValidation()) {
return new ShaclSailConnection(this, connection, super.getConnection(), shapesRepo.getConnection(),
super.getConnection());
} else {
return new ShaclSailConnection(this, connection, super.getConnection(), shapesRepo.getConnection());
}
} else if (isSerializableValidation()) {
return new ShaclSailConnection(this, connection, shapesRepo.getConnection(), super.getConnection());
} else {
return new ShaclSailConnection(this, connection, shapesRepo.getConnection());
}
} catch (Throwable t) {
singleConnectionCounter.decrementAndGet();
throw t;
}
}
@InternalUseOnly
public List<ContextWithShape> getShapes(IRI[] shapesGraphs, boolean onlyRdf4jShaclShapeGraph) {
try (SailRepositoryConnection shapesRepoConnection = shapesRepo.getConnection()) {
shapesRepoConnection.begin(IsolationLevels.READ_COMMITTED);
try {
if (onlyRdf4jShaclShapeGraph) {
return getShapes(shapesRepoConnection, shapesGraphs);
} else {
try (NotifyingSailConnection sailConnection = getBaseSail().getConnection()) {
sailConnection.begin(IsolationLevels.READ_COMMITTED);
try {
return getShapes(shapesRepoConnection, sailConnection, shapesGraphs);
} finally {
sailConnection.rollback();
}
}
}
} finally {
shapesRepoConnection.rollback();
}
}
}
public void setShapesGraphs(Set<IRI> shapesGraphs) {
if (initialized.get()) {
try {
try (StampedLockManager.Cache<List<ContextWithShape>>.WritableState writeState = cachedShapes
.getWriteState()) {
super.setShapesGraphs(shapesGraphs);
writeState.purge();
}
} catch (InterruptedException e) {
throw convertToSailException(e);
}
} else {
super.setShapesGraphs(shapesGraphs);
}
}
@Override
public boolean isSerializableValidation() {
if (!supportsSnapshotIsolation && super.isSerializableValidation()) {
if (logger.isDebugEnabled()) {
logger.debug(
"Serializable validation is enabled but can not be used because the base sail does not support IsolationLevels.SNAPSHOT!");
}
}
return supportsSnapshotIsolation && super.isSerializableValidation();
}
static SailException convertToSailException(InterruptedException e) {
Thread.currentThread().interrupt();
return new SailException(e);
}
public static class TransactionSettings {
@Experimental
public enum PerformanceHint implements TransactionSetting {
/**
* Run validation is parallel (multithreaded).
*/
ParallelValidation("ParallelValidation"),
/**
* Run validation serially (single threaded)
*/
SerialValidation("SerialValidation"),
/**
* Cache intermediate results. Uses more memory but can reduce validation time.
*/
CacheEnabled("CacheEnabled"),
/**
* Do not cache intermediate results.
*/
CacheDisabled("CacheDisabled");
private final String value;
PerformanceHint(String value) {
this.value = value;
}
@Override
public String getName() {
return PerformanceHint.class.getCanonicalName();
}
@Override
public String getValue() {
return value;
}
}
public enum ValidationApproach implements TransactionSetting {
/**
* Do not run any validation. This could potentially lead to your database becoming invalid.
*/
Disabled("Disabled", 0),
/**
* Use a validation approach that is optimized for bulk operations such as adding or removing large amounts
* of data. This will automatically disable parallel validation and turn off caching. Add performance hints
* to enable parallel validation or caching if you have enough resources (RAM).
*/
Bulk("Bulk", 1),
/**
* Let the SHACL engine decide on the best approach for validating. This typically means that it will use
* transactional validation except when changing the SHACL Shape.
*/
Auto("Auto", 2);
private final String value;
// lowest priority wins
private final int priority;
ValidationApproach(String value, int priority) {
this.value = value;
this.priority = priority;
}
@Override
public String getName() {
return ValidationApproach.class.getCanonicalName();
}
@Override
public String getValue() {
return value;
}
public static ValidationApproach getHighestPriority(ValidationApproach v1, ValidationApproach v2) {
assert v1 != null || v2 != null;
if (v1 == null) {
return v2;
}
if (v2 == null) {
return v1;
}
if (v1.priority < v2.priority) {
return v1;
}
return v2;
}
}
private final String value;
TransactionSettings(String value) {
this.value = value;
}
public String getValue() {
return value;
}
}
/**
* @implNote This is an extension point for configuring a different executor service for parallel validation. The
* code is marked as experimental because it may change from one minor release to another.
* @return
*/
@Experimental
@SuppressWarnings("NullableProblems")
protected static class RevivableExecutorService implements ExecutorService {
private final Supplier<ExecutorService> supplier;
ExecutorService delegate;
boolean initialized = false;
public RevivableExecutorService(Supplier<ExecutorService> supplier) {
this.supplier = supplier;
}
public void init() {
assert delegate == null || delegate.isTerminated();
delegate = supplier.get();
initialized = true;
}
@Override
public void shutdown() {
if (initialized) {
delegate.shutdown();
}
}
@Override
public List<Runnable> shutdownNow() {
if (initialized) {
return delegate.shutdownNow();
} else {
return Collections.emptyList();
}
}
@Override
public boolean isShutdown() {
return !initialized || delegate.isShutdown();
}
@Override
public boolean isTerminated() {
return !initialized || delegate.isTerminated();
}
@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
return !initialized || delegate.awaitTermination(timeout, unit);
}
@Override
public <T> Future<T> submit(Callable<T> task) {
assert initialized;
return delegate.submit(task);
}
@Override
public <T> Future<T> submit(Runnable task, T result) {
assert initialized;
return delegate.submit(task, result);
}
@Override
public Future<?> submit(Runnable task) {
assert initialized;
return delegate.submit(task);
}
@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
assert initialized;
return delegate.invokeAll(tasks);
}
@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
throws InterruptedException {
assert initialized;
return delegate.invokeAll(tasks, timeout, unit);
}
@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException {
assert initialized;
return delegate.invokeAny(tasks);
}
@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
assert initialized;
return delegate.invokeAny(tasks, timeout, unit);
}
@Override
public void execute(Runnable command) {
assert initialized;
delegate.execute(command);
}
}
}