ShaclSailConnection.java
/*******************************************************************************
* Copyright (c) 2018 Eclipse RDF4J contributors.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Distribution License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: BSD-3-Clause
*******************************************************************************/
package org.eclipse.rdf4j.sail.shacl;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.eclipse.rdf4j.common.concurrent.locks.Lock;
import org.eclipse.rdf4j.common.concurrent.locks.StampedLockManager;
import org.eclipse.rdf4j.common.iteration.CloseableIteration;
import org.eclipse.rdf4j.common.transaction.IsolationLevel;
import org.eclipse.rdf4j.common.transaction.IsolationLevels;
import org.eclipse.rdf4j.common.transaction.TransactionSetting;
import org.eclipse.rdf4j.model.IRI;
import org.eclipse.rdf4j.model.Resource;
import org.eclipse.rdf4j.model.Statement;
import org.eclipse.rdf4j.model.Value;
import org.eclipse.rdf4j.model.vocabulary.RDF4J;
import org.eclipse.rdf4j.model.vocabulary.SESAME;
import org.eclipse.rdf4j.repository.sail.SailRepositoryConnection;
import org.eclipse.rdf4j.sail.NotifyingSailConnection;
import org.eclipse.rdf4j.sail.Sail;
import org.eclipse.rdf4j.sail.SailConnection;
import org.eclipse.rdf4j.sail.SailConnectionListener;
import org.eclipse.rdf4j.sail.SailException;
import org.eclipse.rdf4j.sail.UpdateContext;
import org.eclipse.rdf4j.sail.helpers.AbstractSailConnection;
import org.eclipse.rdf4j.sail.helpers.NotifyingSailConnectionWrapper;
import org.eclipse.rdf4j.sail.memory.MemoryStore;
import org.eclipse.rdf4j.sail.shacl.ShaclSail.TransactionSettings.ValidationApproach;
import org.eclipse.rdf4j.sail.shacl.ast.ContextWithShape;
import org.eclipse.rdf4j.sail.shacl.results.ValidationReport;
import org.eclipse.rdf4j.sail.shacl.results.lazy.LazyValidationReport;
import org.eclipse.rdf4j.sail.shacl.results.lazy.ValidationResultIterator;
import org.eclipse.rdf4j.sail.shacl.wrapper.data.ConnectionsGroup;
import org.eclipse.rdf4j.sail.shacl.wrapper.data.RdfsSubClassOfReasoner;
import org.eclipse.rdf4j.sail.shacl.wrapper.data.VerySimpleRdfsBackwardsChainingConnection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author Heshan Jayasinghe
* @author H��vard Ottestad
*/
public class ShaclSailConnection extends NotifyingSailConnectionWrapper implements SailConnectionListener {
private static final Logger logger = LoggerFactory.getLogger(ShaclSailConnection.class);
private final SailConnection previousStateConnection;
private final SailConnection serializableConnection;
private final boolean useDefaultShapesGraph;
private IRI[] shapesGraphs;
Sail addedStatements;
Sail removedStatements;
private final HashSet<Statement> addedStatementsSet = new HashSet<>();
private final HashSet<Statement> removedStatementsSet = new HashSet<>();
private boolean shapeRefreshNeeded = false;
private boolean shapesModifiedInCurrentTransaction = false;
public final ShaclSail sail;
private Stats stats;
RdfsSubClassOfReasoner rdfsSubClassOfReasoner;
private boolean prepareHasBeenCalled = false;
private Lock exclusiveSerializableValidationLock;
private Lock nonExclusiveSerializableValidationLock;
private StampedLockManager.Cache<List<ContextWithShape>>.WritableState writableShapesCache;
private StampedLockManager.Cache<List<ContextWithShape>>.ReadableState readableShapesCache;
private final SailRepositoryConnection shapesRepoConnection;
// used to determine if we are currently registered as a connection listener (getting added/removed notifications)
private boolean connectionListenerActive = false;
private IsolationLevel currentIsolationLevel = null;
private Settings transactionSettings;
private TransactionSetting[] transactionSettingsRaw = new TransactionSetting[0];
private volatile boolean closed;
ShaclSailConnection(ShaclSail sail, NotifyingSailConnection connection, SailConnection previousStateConnection,
SailRepositoryConnection shapesRepoConnection, SailConnection serializableConnection) {
super(connection);
this.previousStateConnection = previousStateConnection;
this.shapesRepoConnection = shapesRepoConnection;
this.serializableConnection = serializableConnection;
this.sail = sail;
this.transactionSettings = getDefaultSettings(sail);
this.useDefaultShapesGraph = sail.getShapesGraphs().contains(RDF4J.SHACL_SHAPE_GRAPH);
}
ShaclSailConnection(ShaclSail sail, NotifyingSailConnection connection, SailConnection previousStateConnection,
SailRepositoryConnection shapesRepoConnection) {
super(connection);
this.previousStateConnection = previousStateConnection;
this.shapesRepoConnection = shapesRepoConnection;
this.serializableConnection = null;
this.sail = sail;
this.transactionSettings = getDefaultSettings(sail);
this.useDefaultShapesGraph = sail.getShapesGraphs().contains(RDF4J.SHACL_SHAPE_GRAPH);
}
ShaclSailConnection(ShaclSail sail, NotifyingSailConnection connection,
SailRepositoryConnection shapesRepoConnection, SailConnection serializableConnection) {
super(connection);
this.previousStateConnection = null;
this.shapesRepoConnection = shapesRepoConnection;
this.serializableConnection = serializableConnection;
this.sail = sail;
this.transactionSettings = getDefaultSettings(sail);
this.useDefaultShapesGraph = sail.getShapesGraphs().contains(RDF4J.SHACL_SHAPE_GRAPH);
}
ShaclSailConnection(ShaclSail sail, NotifyingSailConnection connection,
SailRepositoryConnection shapesRepoConnection) {
super(connection);
this.previousStateConnection = null;
this.serializableConnection = null;
this.shapesRepoConnection = shapesRepoConnection;
this.sail = sail;
this.transactionSettings = getDefaultSettings(sail);
this.useDefaultShapesGraph = sail.getShapesGraphs().contains(RDF4J.SHACL_SHAPE_GRAPH);
}
private Settings getDefaultSettings(ShaclSail sail) {
return new Settings(sail.isCacheSelectNodes(), sail.isValidationEnabled(), sail.isParallelValidation(),
currentIsolationLevel);
}
@Override
public void setTransactionSettings(TransactionSetting... settings) {
super.setTransactionSettings(settings);
this.transactionSettingsRaw = settings;
}
@Override
public void begin() throws SailException {
begin(sail.getDefaultIsolationLevel());
}
@Override
public void begin(IsolationLevel level) throws SailException {
if (closed) {
throw new SailException("Connection is closed");
}
currentIsolationLevel = level;
assert addedStatements == null;
assert removedStatements == null;
assert readableShapesCache == null;
assert writableShapesCache == null;
assert nonExclusiveSerializableValidationLock == null;
assert exclusiveSerializableValidationLock == null;
assert shapesGraphs == null;
shapesGraphs = sail.getShapesGraphs().stream().map(g -> {
if (g.equals(RDF4J.NIL)) {
return null;
}
if (g.equals(SESAME.NIL)) {
return null;
}
return g;
}).toArray(IRI[]::new);
stats = new Stats();
// start two transactions, synchronize on underlying sail so that we get two transactions immediately
// successively
synchronized (sail) {
super.begin(level);
hasStatement(null, null, null, false); // actually force a transaction to start
shapesRepoConnection.begin(currentIsolationLevel);
if (previousStateConnection != null) {
previousStateConnection.begin(currentIsolationLevel);
previousStateConnection.hasStatement(null, null, null, false); // actually force a transaction to start
}
}
stats.setEmptyBeforeTransaction(ConnectionHelper.isEmpty(this));
transactionSettings = getDefaultSettings(sail);
if (stats.wasEmptyBeforeTransaction() && !shouldUseSerializableValidation()) {
transactionSettings.switchToBulkValidation();
}
transactionSettings.applyTransactionSettings(getLocalTransactionSettings());
assert transactionSettings.parallelValidation != null;
assert transactionSettings.cacheSelectedNodes != null;
assert transactionSettings.validationApproach != null;
if (isBulkValidation() || !isValidationEnabled()) {
removeConnectionListener(this);
} else {
addConnectionListener(this);
}
}
/**
* @return the transaction settings that are based purely on the settings based down through the begin(...) method
* without considering any sail level settings for things like caching or parallel validation.
*/
private Settings getLocalTransactionSettings() {
return new Settings(this);
}
@Override
public void addConnectionListener(SailConnectionListener listener) {
if (!connectionListenerActive && isValidationEnabled()) {
super.addConnectionListener(this);
connectionListenerActive = true;
}
}
boolean isValidationEnabled() {
return transactionSettings.getValidationApproach() != ValidationApproach.Disabled;
}
@Override
public void removeConnectionListener(SailConnectionListener listener) {
super.removeConnectionListener(listener);
connectionListenerActive = false;
}
private Sail getNewMemorySail() {
MemoryStore sail = new MemoryStore();
sail.setDefaultIsolationLevel(IsolationLevels.NONE);
sail.init();
return sail;
}
@Override
public void commit() throws SailException {
if (closed) {
throw new SailException("Connection is closed");
}
if (!prepareHasBeenCalled) {
prepare();
}
try {
long before = getTimeStamp();
if (previousStateConnection != null) {
previousStateConnection.commit();
}
super.commit();
shapesRepoConnection.commit();
if (sail.isPerformanceLogging()) {
logger.info("commit() excluding validation and cleanup took {} ms", getTimeStamp() - before);
}
} finally {
cleanup();
}
}
@Override
public void addStatement(UpdateContext modify, Resource subj, IRI pred, Value obj, Resource... contexts)
throws SailException {
if (useDefaultShapesGraph && contexts.length == 1 && RDF4J.SHACL_SHAPE_GRAPH.equals(contexts[0])) {
shapesRepoConnection.add(subj, pred, obj, contexts);
shapeRefreshNeeded = true;
} else {
super.addStatement(modify, subj, pred, obj, contexts);
}
}
@Override
public void removeStatement(UpdateContext modify, Resource subj, IRI pred, Value obj, Resource... contexts)
throws SailException {
if (useDefaultShapesGraph && contexts.length == 1 && RDF4J.SHACL_SHAPE_GRAPH.equals(contexts[0])) {
shapesRepoConnection.remove(subj, pred, obj, contexts);
shapeRefreshNeeded = true;
} else {
super.removeStatement(modify, subj, pred, obj, contexts);
}
}
@Override
public void addStatement(Resource subj, IRI pred, Value obj, Resource... contexts) throws SailException {
if (useDefaultShapesGraph && contexts.length == 1 && RDF4J.SHACL_SHAPE_GRAPH.equals(contexts[0])) {
shapesRepoConnection.add(subj, pred, obj, contexts);
shapeRefreshNeeded = true;
} else {
super.addStatement(subj, pred, obj, contexts);
}
}
@Override
public void removeStatements(Resource subj, IRI pred, Value obj, Resource... contexts) throws SailException {
if (useDefaultShapesGraph && contexts.length == 1 && RDF4J.SHACL_SHAPE_GRAPH.equals(contexts[0])) {
shapesRepoConnection.remove(subj, pred, obj, contexts);
shapeRefreshNeeded = true;
} else {
super.removeStatements(subj, pred, obj, contexts);
}
}
@Override
public void clear(Resource... contexts) throws SailException {
if (Arrays.asList(contexts).contains(RDF4J.SHACL_SHAPE_GRAPH)) {
shapesRepoConnection.clear();
shapeRefreshNeeded = true;
}
super.clear(contexts);
}
@Override
public void rollback() throws SailException {
if (closed) {
throw new SailException("Connection is closed");
}
try {
if (readableShapesCache != null) {
readableShapesCache.close();
readableShapesCache = null;
}
if (writableShapesCache != null) {
writableShapesCache.purge();
writableShapesCache.close();
writableShapesCache = null;
}
if (previousStateConnection != null && previousStateConnection.isActive()) {
previousStateConnection.rollback();
}
} finally {
try {
if (shapesRepoConnection.isActive()) {
shapesRepoConnection.rollback();
}
} finally {
try {
if (isActive()) {
super.rollback();
}
} finally {
cleanup();
}
}
}
}
private void cleanup() {
long before = 0;
try {
if (sail.isPerformanceLogging()) {
before = System.currentTimeMillis();
}
logger.debug("Cleanup");
if (addedStatements != null) {
if (addedStatements != sail.getBaseSail()) {
addedStatements.shutDown();
}
addedStatements = null;
}
if (removedStatements != null) {
removedStatements.shutDown();
removedStatements = null;
}
addedStatementsSet.clear();
removedStatementsSet.clear();
stats = null;
prepareHasBeenCalled = false;
shapeRefreshNeeded = false;
shapesModifiedInCurrentTransaction = false;
currentIsolationLevel = null;
shapesGraphs = null;
} finally {
try {
cleanupShapesReadWriteLock();
} finally {
cleanupReadWriteLock();
}
if (sail.isPerformanceLogging()) {
logger.info("cleanup() took {} ms", System.currentTimeMillis() - before);
}
}
}
private void cleanupShapesReadWriteLock() {
try {
if (writableShapesCache != null) {
try {
// we need to refresh the shapes cache!
writableShapesCache.purge();
} finally {
writableShapesCache.close();
}
}
} finally {
if (readableShapesCache != null) {
readableShapesCache.close();
}
}
writableShapesCache = null;
readableShapesCache = null;
}
private void cleanupReadWriteLock() {
try {
if (exclusiveSerializableValidationLock != null) {
exclusiveSerializableValidationLock.release();
}
} finally {
if (nonExclusiveSerializableValidationLock != null) {
nonExclusiveSerializableValidationLock.release();
}
}
exclusiveSerializableValidationLock = null;
nonExclusiveSerializableValidationLock = null;
}
private ValidationReport validate(List<ContextWithShape> shapes, boolean validateEntireBaseSail)
throws InterruptedException {
assert isValidationEnabled();
try {
try (ConnectionsGroup connectionsGroup = getConnectionsGroup()) {
return performValidation(shapes, validateEntireBaseSail, connectionsGroup);
}
} finally {
rdfsSubClassOfReasoner = null;
}
}
void prepareValidation(ValidationSettings validationSettings) throws InterruptedException {
assert isValidationEnabled();
if (sail.isRdfsSubClassReasoning()) {
rdfsSubClassOfReasoner = RdfsSubClassOfReasoner.createReasoner(this, validationSettings);
}
if (!isBulkValidation()) {
fillAddedAndRemovedStatementRepositories();
}
}
ConnectionsGroup getConnectionsGroup() {
return new ConnectionsGroup(new VerySimpleRdfsBackwardsChainingConnection(this, rdfsSubClassOfReasoner),
previousStateConnection, addedStatements, removedStatements, stats,
this::getRdfsSubClassOfReasoner, transactionSettings, sail.sparqlValidation);
}
private ValidationReport performValidation(List<ContextWithShape> shapes, boolean validateEntireBaseSail,
ConnectionsGroup connectionsGroup) throws InterruptedException {
long beforeValidation = 0;
if (sail.isPerformanceLogging()) {
beforeValidation = System.currentTimeMillis();
}
try {
int numberOfShapes = shapes.size();
Stream<Callable<ValidationResultIterator>> callableStream = shapes
.stream()
.map(contextWithShapes -> new ShapeValidationContainer(
contextWithShapes.getShape(),
() -> contextWithShapes.getShape()
.generatePlans(connectionsGroup,
new ValidationSettings(contextWithShapes.getDataGraph(),
sail.isLogValidationPlans(), validateEntireBaseSail,
sail.isPerformanceLogging())),
sail.isGlobalLogValidationExecution(), sail.isLogValidationViolations(),
sail.getEffectiveValidationResultsLimitPerConstraint(), sail.isPerformanceLogging(),
sail.isLogValidationPlans(),
logger,
connectionsGroup))
.filter(ShapeValidationContainer::hasPlanNode)
.map(validationContainer -> validationContainer::performValidation);
List<ValidationResultIterator> validationResultIterators = new ArrayList<>(numberOfShapes);
List<Future<ValidationResultIterator>> futures = Collections.emptyList();
boolean parallelValidation = numberOfShapes > 1 && isParallelValidation();
try {
futures = callableStream
.map(callable -> {
if (Thread.currentThread().isInterrupted()) {
return null;
}
if (parallelValidation) {
return sail.submitToExecutorService(callable);
} else {
FutureTask<ValidationResultIterator> futureTask = new FutureTask<>(callable);
futureTask.run();
return futureTask;
}
})
.collect(Collectors.toList());
for (Future<ValidationResultIterator> future : futures) {
assert future != null;
try {
if (!Thread.currentThread().isInterrupted()) {
validationResultIterators.add(future.get());
}
} catch (ExecutionException e) {
Throwable cause = e.getCause();
if (cause instanceof InterruptedException) {
throw new InterruptedException();
} else if (cause instanceof RuntimeException) {
throw ((RuntimeException) cause);
} else if (cause instanceof Error) {
throw ((Error) cause);
} else {
// this should only happen if we throw a checked exception from the Callable that
// isn't handled in the if/elseif above
assert false;
throw new IllegalStateException(cause);
}
}
}
if (Thread.currentThread().isInterrupted()) {
throw new InterruptedException();
}
} finally {
for (Future<ValidationResultIterator> future : futures) {
future.cancel(true);
}
}
if (Thread.currentThread().isInterrupted()) {
throw new InterruptedException();
}
return new LazyValidationReport(validationResultIterators, sail.getValidationResultsLimitTotal());
} finally {
if (sail.isPerformanceLogging()) {
logger.info("Actual validation and generating plans took {} ms",
System.currentTimeMillis() - beforeValidation);
}
}
}
private boolean isParallelValidation() {
assert !(transactionSettings.isParallelValidation() && !supportsConcurrentReads());
assert !(getIsolationLevel() == IsolationLevels.SERIALIZABLE && transactionSettings
.isParallelValidation()) : "Concurrent reads is buggy for SERIALIZABLE transactions.";
return transactionSettings.isParallelValidation();
}
void fillAddedAndRemovedStatementRepositories() throws InterruptedException {
assert !isBulkValidation();
assert isValidationEnabled();
long before = 0;
if (sail.isPerformanceLogging()) {
before = System.currentTimeMillis();
}
List<Future<Object>> futures = Collections.emptyList();
boolean parallelValidation = isParallelValidation() && !addedStatementsSet.isEmpty()
&& !removedStatementsSet.isEmpty();
try {
futures = Stream.of(addedStatementsSet, removedStatementsSet)
.map(set -> (Callable<Object>) () -> {
Set<Statement> otherSet;
Sail repository;
if (set == addedStatementsSet) {
otherSet = removedStatementsSet;
if (addedStatements != null && addedStatements != sail.getBaseSail()) {
addedStatements.shutDown();
}
addedStatements = getNewMemorySail();
repository = addedStatements;
set.forEach(stats::added);
} else {
otherSet = addedStatementsSet;
if (removedStatements != null) {
removedStatements.shutDown();
removedStatements = null;
}
removedStatements = getNewMemorySail();
repository = removedStatements;
set.forEach(stats::removed);
}
try (SailConnection connection = repository.getConnection()) {
connection.begin(IsolationLevels.NONE);
set.stream()
.filter(statement -> !otherSet.contains(statement))
.flatMap(statement -> rdfsSubClassOfReasoner == null ? Stream.of(statement)
: rdfsSubClassOfReasoner.forwardChain(statement))
.forEach(statement -> {
if (!Thread.currentThread().isInterrupted()) {
connection.addStatement(statement.getSubject(),
statement.getPredicate(), statement.getObject(),
statement.getContext());
}
});
if (Thread.interrupted()) {
throw new InterruptedException();
}
connection.commit();
}
return null;
})
.map(callable -> {
if (Thread.currentThread().isInterrupted()) {
return null;
}
if (parallelValidation) {
return sail.submitToExecutorService(callable);
} else {
FutureTask<Object> objectFutureTask = new FutureTask<>(callable);
objectFutureTask.run();
return objectFutureTask;
}
})
.collect(Collectors.toList());
for (Future<Object> future : futures) {
try {
if (!Thread.currentThread().isInterrupted()) {
future.get();
}
} catch (ExecutionException e) {
Throwable cause = e.getCause();
if (cause instanceof InterruptedException) {
throw ((InterruptedException) cause);
} else if (cause instanceof RuntimeException) {
throw ((RuntimeException) cause);
} else if (cause instanceof Error) {
throw ((Error) cause);
} else {
// this should only happen if we throw a checked exception from the Callable that isn't handled
// in the if/elseif above
throw new IllegalStateException(cause);
}
}
}
} finally {
for (Future<Object> future : futures) {
future.cancel(true);
}
}
if (sail.isPerformanceLogging()) {
logger.info("fillAddedAndRemovedStatementRepositories() took {} ms", System.currentTimeMillis() - before);
}
}
private IsolationLevel getIsolationLevel() {
return currentIsolationLevel;
}
@Override
synchronized public void close() throws SailException {
if (closed) {
return;
}
if (getWrappedConnection() instanceof AbstractSailConnection) {
AbstractSailConnection abstractSailConnection = (AbstractSailConnection) getWrappedConnection();
abstractSailConnection.waitForOtherOperations(true);
}
try {
if (isActive()) {
rollback();
}
} finally {
try {
shapesRepoConnection.close();
} finally {
try {
if (previousStateConnection != null) {
previousStateConnection.close();
}
} finally {
try {
if (serializableConnection != null) {
serializableConnection.close();
}
} finally {
try {
super.close();
} finally {
try {
sail.closeConnection();
} finally {
try {
cleanupShapesReadWriteLock();
} finally {
try {
cleanupReadWriteLock();
} finally {
closed = true;
}
}
}
}
}
}
}
}
}
@Override
public void prepare() throws SailException {
if (closed) {
throw new SailException("Connection is closed");
}
prepareHasBeenCalled = true;
long before = 0;
flush();
try {
if (sail.isPerformanceLogging()) {
before = System.currentTimeMillis();
}
boolean useSerializableValidation = shouldUseSerializableValidation() && !isBulkValidation();
if (sail.isSerializableValidation()) {
if (useSerializableValidation) {
exclusiveSerializableValidationLock = sail.serializableValidationLock.getWriteLock();
} else {
nonExclusiveSerializableValidationLock = sail.serializableValidationLock.getReadLock();
}
} else {
assert !useSerializableValidation
: "ShaclSail does not have serializable validation enabled but ShaclSailConnection still attempted to use serializable validation!";
}
if (!isValidationEnabled()) {
logger.debug("Validation skipped because validation was disabled");
if (shapeRefreshNeeded || !connectionListenerActive) {
// getting the shapes write lock will ensure that the shapes cache is refreshed when cleanup() is
// called after commit/rollback
writableShapesCache = sail.getCachedShapesForWriting();
}
return;
}
assert !shapeRefreshNeeded
|| !shapesModifiedInCurrentTransaction
: "isShapeRefreshNeeded should trigger shapesModifiedInCurrentTransaction once we have loaded the modified shapes, but shapesModifiedInCurrentTransaction should be null until then";
if (!shapeRefreshNeeded && !isBulkValidation() && addedStatementsSet.isEmpty()
&& removedStatementsSet.isEmpty()) {
logger.debug("Nothing has changed, nothing to validate.");
return;
}
List<ContextWithShape> currentShapes = null;
List<ContextWithShape> shapesAfterRefresh = null;
if (shapeRefreshNeeded || !connectionListenerActive || isBulkValidation()) {
if (writableShapesCache == null) {
writableShapesCache = sail.getCachedShapesForWriting();
}
shapesModifiedInCurrentTransaction = shapeRefreshNeeded;
shapeRefreshNeeded = false;
shapesAfterRefresh = sail.getShapes(shapesRepoConnection, this, shapesGraphs);
} else {
if (readableShapesCache == null) {
readableShapesCache = sail.getCachedShapes();
}
}
if (readableShapesCache != null) {
currentShapes = readableShapesCache.getData();
}
assert currentShapes != null || shapesAfterRefresh != null;
assert !(currentShapes != null && shapesAfterRefresh != null);
if (isEmpty(currentShapes) && isEmpty(shapesAfterRefresh)) {
logger.debug("Validation skipped because there are no shapes to validate");
return;
}
stats.setEmptyIncludingCurrentTransaction(ConnectionHelper.isEmpty(this));
prepareValidation(
new ValidationSettings(null, sail.isLogValidationPlans(), false, sail.isPerformanceLogging()));
ValidationReport invalidTuples = null;
if (useSerializableValidation) {
synchronized (sail.singleConnectionMonitor) {
if (!sail.usesSingleConnection()) {
invalidTuples = serializableValidation(
shapesAfterRefresh != null ? shapesAfterRefresh : currentShapes);
}
}
}
if (invalidTuples == null) {
invalidTuples = validate(
shapesAfterRefresh != null ? shapesAfterRefresh : currentShapes,
shapesModifiedInCurrentTransaction || isBulkValidation());
}
boolean valid = invalidTuples.conforms();
if (!valid) {
throw new ShaclSailValidationException(invalidTuples);
}
} catch (InterruptedException e) {
throw ShaclSail.convertToSailException(e);
} finally {
if (sail.isPerformanceLogging()) {
logger.info("prepare() including validation (excluding flushing and super.prepare()) took {} ms",
System.currentTimeMillis() - before);
}
// if the thread has been interrupted we should try to return quickly
if (!Thread.currentThread().isInterrupted()) {
shapesRepoConnection.prepare();
if (previousStateConnection != null) {
previousStateConnection.prepare();
}
super.prepare();
}
}
}
private boolean isEmpty(List<ContextWithShape> shapesList) {
if (shapesList == null) {
return true;
}
for (ContextWithShape shapesWithContext : shapesList) {
if (shapesWithContext.hasShape()) {
return false;
}
}
return true;
}
private boolean shouldUseSerializableValidation() {
return serializableConnection != null && sail.isSerializableValidation()
&& currentIsolationLevel == IsolationLevels.SNAPSHOT;
}
private boolean isBulkValidation() {
return transactionSettings.getValidationApproach() == ValidationApproach.Bulk;
}
private ValidationReport serializableValidation(List<ContextWithShape> shapesAfterRefresh)
throws InterruptedException {
try {
try (ConnectionsGroup connectionsGroup = new ConnectionsGroup(
new VerySimpleRdfsBackwardsChainingConnection(serializableConnection, rdfsSubClassOfReasoner), null,
addedStatements, removedStatements, stats, this::getRdfsSubClassOfReasoner, transactionSettings,
sail.sparqlValidation)) {
connectionsGroup.getBaseConnection().begin(IsolationLevels.SNAPSHOT);
// actually force a transaction to start
connectionsGroup.getBaseConnection().hasStatement(null, null, null, false);
stats.setEmptyBeforeTransaction(ConnectionHelper.isEmpty(connectionsGroup.getBaseConnection()));
try (SailConnection connection = addedStatements.getConnection()) {
SailConnection baseConnection = connectionsGroup.getBaseConnection();
ConnectionHelper.transferStatements(connection, baseConnection::addStatement);
}
try (SailConnection connection = removedStatements.getConnection()) {
SailConnection baseConnection = connectionsGroup.getBaseConnection();
ConnectionHelper.transferStatements(connection, baseConnection::removeStatements);
}
serializableConnection.flush();
return performValidation(shapesAfterRefresh, shapesModifiedInCurrentTransaction || isBulkValidation(),
connectionsGroup);
} finally {
serializableConnection.rollback();
}
} finally {
rdfsSubClassOfReasoner = null;
}
}
@Override
public void statementAdded(Statement statement) {
if (prepareHasBeenCalled) {
throw new IllegalStateException("Detected changes after prepare() has been called.");
}
checkIfShapesRefreshIsNeeded(statement);
boolean add = addedStatementsSet.add(statement);
if (!add) {
removedStatementsSet.remove(statement);
}
checkTransactionalValidationLimit();
}
@Override
public void statementRemoved(Statement statement) {
if (prepareHasBeenCalled) {
throw new IllegalStateException("Detected changes after prepare() has been called.");
}
checkIfShapesRefreshIsNeeded(statement);
boolean add = removedStatementsSet.add(statement);
if (!add) {
addedStatementsSet.remove(statement);
}
checkTransactionalValidationLimit();
}
private void checkIfShapesRefreshIsNeeded(Statement statement) {
if (!shapeRefreshNeeded) {
for (IRI shapesGraph : shapesGraphs) {
if (Objects.equals(statement.getContext(), shapesGraph)) {
shapeRefreshNeeded = true;
break;
}
}
}
}
private void checkTransactionalValidationLimit() {
if ((addedStatementsSet.size() + removedStatementsSet.size()) > sail.getTransactionalValidationLimit()) {
if (shouldUseSerializableValidation()) {
logger.debug(
"Transaction size limit exceeded, could not switch to bulk validation because serializable validation is enabled.");
} else {
logger.debug("Transaction size limit exceeded, reverting to bulk validation.");
removeConnectionListener(this);
Settings bulkValidation = getLocalTransactionSettings();
bulkValidation.setValidationApproach(ShaclSail.TransactionSettings.ValidationApproach.Bulk);
getTransactionSettings().applyTransactionSettings(bulkValidation);
removedStatementsSet.clear();
addedStatementsSet.clear();
}
}
}
public RdfsSubClassOfReasoner getRdfsSubClassOfReasoner() {
return rdfsSubClassOfReasoner;
}
@Override
public CloseableIteration<? extends Statement> getStatements(Resource subj, IRI pred, Value obj,
boolean includeInferred, Resource... contexts) throws SailException {
if (useDefaultShapesGraph && contexts.length == 1 && RDF4J.SHACL_SHAPE_GRAPH.equals(contexts[0])) {
return ConnectionHelper
.getCloseableIteration(shapesRepoConnection.getStatements(subj, pred, obj, includeInferred));
}
return super.getStatements(subj, pred, obj, includeInferred, contexts);
}
@Override
public boolean hasStatement(Resource subj, IRI pred, Value obj, boolean includeInferred, Resource... contexts)
throws SailException {
if (useDefaultShapesGraph && contexts.length == 1 && RDF4J.SHACL_SHAPE_GRAPH.equals(contexts[0])) {
return shapesRepoConnection.hasStatement(subj, pred, obj, includeInferred);
}
return super.hasStatement(subj, pred, obj, includeInferred, contexts);
}
public ValidationReport revalidate() {
if (!isActive()) {
throw new IllegalStateException("No active transaction!");
}
try {
return validate(sail.getShapes(shapesRepoConnection, this, shapesGraphs), true);
} catch (InterruptedException e) {
throw ShaclSail.convertToSailException(e);
}
}
Settings getTransactionSettings() {
return transactionSettings;
}
private long getTimeStamp() {
if (sail.isPerformanceLogging()) {
return System.currentTimeMillis();
}
return 0;
}
public static class Settings {
private ValidationApproach validationApproach;
private Boolean cacheSelectedNodes;
private Boolean parallelValidation;
private IsolationLevel isolationLevel;
transient private Settings previous = null;
public Settings(boolean cacheSelectNodes, boolean validationEnabled, boolean parallelValidation,
IsolationLevel isolationLevel) {
this.cacheSelectedNodes = cacheSelectNodes;
if (!validationEnabled) {
validationApproach = ValidationApproach.Disabled;
} else {
this.validationApproach = ValidationApproach.Auto;
}
this.parallelValidation = parallelValidation;
this.isolationLevel = isolationLevel;
}
public Settings(ShaclSailConnection connection) {
TransactionSetting[] transactionSettingsRaw = connection.transactionSettingsRaw;
assert transactionSettingsRaw != null;
ValidationApproach validationApproach = null;
Boolean cacheSelectedNodes = null;
Boolean parallelValidation = null;
for (TransactionSetting transactionSetting : transactionSettingsRaw) {
if (transactionSetting instanceof ValidationApproach) {
validationApproach = (ValidationApproach) transactionSetting;
} else if (transactionSetting instanceof ShaclSail.TransactionSettings.PerformanceHint) {
switch (((ShaclSail.TransactionSettings.PerformanceHint) transactionSetting)) {
case ParallelValidation:
parallelValidation = true;
break;
case SerialValidation:
parallelValidation = false;
break;
case CacheDisabled:
cacheSelectedNodes = false;
break;
case CacheEnabled:
cacheSelectedNodes = true;
break;
}
}
}
this.validationApproach = validationApproach;
this.cacheSelectedNodes = cacheSelectedNodes;
if (!connection.supportsConcurrentReads()) {
this.parallelValidation = false;
} else {
this.parallelValidation = parallelValidation;
}
}
private Settings(Settings settings) {
this.validationApproach = settings.validationApproach;
this.cacheSelectedNodes = settings.cacheSelectedNodes;
this.parallelValidation = settings.parallelValidation;
this.isolationLevel = settings.isolationLevel;
this.previous = settings.previous;
}
public ValidationApproach getValidationApproach() {
return validationApproach;
}
public boolean isCacheSelectNodes() {
return cacheSelectedNodes;
}
public boolean isParallelValidation() {
return parallelValidation;
}
public IsolationLevel getIsolationLevel() {
return isolationLevel;
}
static ValidationApproach getMostSignificantValidationApproach(
ValidationApproach base,
ValidationApproach overriding) {
if (base == null && overriding == null) {
return ValidationApproach.Auto;
}
return ValidationApproach.getHighestPriority(base, overriding);
}
void applyTransactionSettings(Settings transactionSettingsLocal) {
previous = new Settings(this);
// get the most significant validation approach first (eg. if validation is disabled on the sail level, then
// validation can not be enabled on the transaction level
validationApproach = getMostSignificantValidationApproach(validationApproach,
transactionSettingsLocal.validationApproach);
// apply restrictions first
if (validationApproach == ValidationApproach.Bulk) {
cacheSelectedNodes = false;
parallelValidation = false;
}
// override settings
if (transactionSettingsLocal.parallelValidation != null) {
parallelValidation = transactionSettingsLocal.parallelValidation;
}
if (transactionSettingsLocal.cacheSelectedNodes != null) {
cacheSelectedNodes = transactionSettingsLocal.cacheSelectedNodes;
}
assert transactionSettingsLocal.isolationLevel == null;
}
@Override
public String toString() {
return "Settings{" +
"validationApproach=" + validationApproach +
", cacheSelectedNodes=" + cacheSelectedNodes +
", parallelValidation=" + parallelValidation +
", isolationLevel=" + isolationLevel +
'}';
}
public void switchToBulkValidation() {
ValidationApproach newValidationApproach = getMostSignificantValidationApproach(validationApproach,
ValidationApproach.Bulk);
if (newValidationApproach != this.validationApproach) {
this.validationApproach = newValidationApproach;
parallelValidation = false;
cacheSelectedNodes = false;
}
}
private void setValidationApproach(ValidationApproach validationApproach) {
this.validationApproach = validationApproach;
}
private void setCacheSelectedNodes(Boolean cacheSelectedNodes) {
this.cacheSelectedNodes = cacheSelectedNodes;
}
private void setParallelValidation(Boolean parallelValidation) {
this.parallelValidation = parallelValidation;
}
private void setIsolationLevel(IsolationLevel isolationLevel) {
this.isolationLevel = isolationLevel;
}
}
}