AbstractBulkJoinPlanNode.java
/*******************************************************************************
* Copyright (c) 2020 Eclipse RDF4J contributors.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Distribution License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: BSD-3-Clause
*******************************************************************************/
package org.eclipse.rdf4j.sail.shacl.ast.planNodes;
import java.util.ArrayDeque;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.eclipse.rdf4j.model.Resource;
import org.eclipse.rdf4j.query.BindingSet;
import org.eclipse.rdf4j.query.Dataset;
import org.eclipse.rdf4j.query.algebra.BindingSetAssignment;
import org.eclipse.rdf4j.query.algebra.TupleExpr;
import org.eclipse.rdf4j.query.algebra.helpers.AbstractSimpleQueryModelVisitor;
import org.eclipse.rdf4j.query.impl.EmptyBindingSet;
import org.eclipse.rdf4j.sail.SailConnection;
import org.eclipse.rdf4j.sail.shacl.ast.SparqlQueryParserCache;
import org.eclipse.rdf4j.sail.shacl.ast.StatementMatcher;
import org.eclipse.rdf4j.sail.shacl.ast.constraintcomponents.AbstractConstraintComponent;
public abstract class AbstractBulkJoinPlanNode implements PlanNode {
public static final List<StatementMatcher.Variable> DEFAULT_VARS = List.of(new StatementMatcher.Variable("a"),
new StatementMatcher.Variable("c"));
public static final String BINDING_NAME = "a";
protected static final int BULK_SIZE = 1000;
private final List<StatementMatcher.Variable> vars;
private final String varsQueryString;
StackTraceElement[] stackTrace;
protected Function<BindingSet, ValidationTuple> mapper;
ValidationExecutionLogger validationExecutionLogger;
public AbstractBulkJoinPlanNode(List<StatementMatcher.Variable> vars) {
this.vars = vars;
this.varsQueryString = vars.stream()
.map(StatementMatcher.Variable::asSparqlVariable)
.reduce((a, b) -> a + " " + b)
.orElseThrow();
this.stackTrace = Thread.currentThread().getStackTrace();
}
TupleExpr parseQuery(String query) {
// #VALUES_INJECTION_POINT# is an annotation in the query where there is a "new scope" due to the bottom up
// semantics of SPARQL but where we don't actually want a new scope.
query = query.replace(AbstractConstraintComponent.VALUES_INJECTION_POINT, "\nVALUES (?a) {}\n");
String completeQuery = "select distinct " + varsQueryString + " where {\nVALUES (?a) {}\n" + query + "\n}";
return SparqlQueryParserCache.get(completeQuery);
}
void runQuery(Collection<ValidationTuple> left, ArrayDeque<ValidationTuple> right, SailConnection connection,
TupleExpr parsedQuery, Dataset dataset, Resource[] dataGraph, boolean skipBasedOnPreviousConnection,
SailConnection previousStateConnection) {
List<BindingSet> newBindindingSet = buildBindingSets(left, connection, skipBasedOnPreviousConnection,
previousStateConnection, dataGraph);
if (!newBindindingSet.isEmpty()) {
updateQuery(parsedQuery, newBindindingSet);
executeQuery(right, connection, dataset, parsedQuery);
}
}
private void executeQuery(ArrayDeque<ValidationTuple> right, SailConnection connection,
Dataset dataset, TupleExpr parsedQuery) {
// System.out.println(stackTrace[3].getClassName());
try (Stream<? extends BindingSet> stream = connection
.evaluate(parsedQuery, dataset, EmptyBindingSet.getInstance(), true)
.stream()) {
stream
.map(mapper)
.sorted(ValidationTuple::compareActiveTarget)
.forEachOrdered(right::addFirst);
}
}
private void updateQuery(TupleExpr parsedQuery, List<BindingSet> newBindindingSet) {
parsedQuery
.visit(new AbstractSimpleQueryModelVisitor<>(false) {
@Override
public void meet(BindingSetAssignment node) {
Set<String> bindingNames = node.getBindingNames();
if (bindingNames.size() == 1 && bindingNames.contains(BINDING_NAME)) {
node.setBindingSets(newBindindingSet);
}
super.meet(node);
}
});
}
private List<BindingSet> buildBindingSets(Collection<ValidationTuple> left, SailConnection connection,
boolean skipBasedOnPreviousConnection, SailConnection previousStateConnection, Resource[] dataGraph) {
return left.stream()
.filter(tuple -> {
if (!skipBasedOnPreviousConnection) {
return true;
}
boolean hasStatement;
if (!(tuple.getActiveTarget().isResource())) {
hasStatement = previousStateConnection.hasStatement(null, null, tuple.getActiveTarget(),
true, dataGraph);
} else {
hasStatement = previousStateConnection.hasStatement(((Resource) tuple.getActiveTarget()),
null, null, true, dataGraph) ||
previousStateConnection.hasStatement(null, null, tuple.getActiveTarget(), true,
dataGraph);
}
if (!hasStatement && validationExecutionLogger.isEnabled()) {
validationExecutionLogger.log(depth(),
this.getClass().getSimpleName() + ":IgnoredDueToPreviousStateConnection", tuple, this,
getId(), null);
}
return hasStatement;
})
.map(ValidationTuple::getActiveTarget)
.map(r -> new SingletonBindingSet(BINDING_NAME, r))
.collect(Collectors.toList());
}
@Override
public boolean producesSorted() {
return true;
}
@Override
public boolean requiresSorted() {
return true;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
AbstractBulkJoinPlanNode that = (AbstractBulkJoinPlanNode) o;
return mapper.equals(that.mapper);
}
@Override
public int hashCode() {
return Objects.hash(mapper);
}
}