BulkedExternalLeftOuterJoin.java
/*******************************************************************************
* Copyright (c) 2020 Eclipse RDF4J contributors.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Distribution License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: BSD-3-Clause
*******************************************************************************/
package org.eclipse.rdf4j.sail.shacl.ast.planNodes;
import java.util.ArrayDeque;
import java.util.List;
import java.util.Objects;
import java.util.function.Function;
import org.apache.commons.text.StringEscapeUtils;
import org.eclipse.rdf4j.common.iteration.CloseableIteration;
import org.eclipse.rdf4j.model.Resource;
import org.eclipse.rdf4j.query.BindingSet;
import org.eclipse.rdf4j.query.Dataset;
import org.eclipse.rdf4j.query.algebra.TupleExpr;
import org.eclipse.rdf4j.query.algebra.evaluation.iterator.PeekMarkIterator;
import org.eclipse.rdf4j.sail.SailConnection;
import org.eclipse.rdf4j.sail.shacl.ast.SparqlFragment;
import org.eclipse.rdf4j.sail.shacl.ast.StatementMatcher;
import org.eclipse.rdf4j.sail.shacl.wrapper.data.ConnectionsGroup;
/**
* @author H��vard Ottestad
* <p>
* External means that this plan node can join the iterator from a plan node with an external source (Repository
* or SailConnection) based on a query or a predicate.
*/
public class BulkedExternalLeftOuterJoin extends AbstractBulkJoinPlanNode {
private final SailConnection connection;
private final PlanNode leftNode;
private final Dataset dataset;
private final Resource[] dataGraph;
private TupleExpr parsedQuery;
private final String query;
private boolean printed = false;
public BulkedExternalLeftOuterJoin(PlanNode leftNode, SailConnection connection, Resource[] dataGraph,
SparqlFragment query,
Function<BindingSet, ValidationTuple> mapper, ConnectionsGroup connectionsGroup,
List<StatementMatcher.Variable> vars) {
super(vars);
leftNode = PlanNodeHelper.handleSorting(this, leftNode, connectionsGroup);
this.leftNode = leftNode;
this.query = query.getNamespacesForSparql()
+ StatementMatcher.StableRandomVariableProvider.normalize(query.getFragment(), List.of(), List.of());
this.connection = connection;
assert this.connection != null;
this.mapper = mapper;
this.dataset = PlanNodeHelper.asDefaultGraphDataset(dataGraph);
this.dataGraph = dataGraph;
}
@Override
public CloseableIteration<? extends ValidationTuple> iterator() {
return new LoggingCloseableIteration(this, validationExecutionLogger) {
ArrayDeque<ValidationTuple> left;
ArrayDeque<ValidationTuple> right;
private PeekMarkIterator<? extends ValidationTuple> leftNodeIterator;
@Override
protected void init() {
left = new ArrayDeque<>(BULK_SIZE);
right = new ArrayDeque<>(BULK_SIZE);
leftNodeIterator = new PeekMarkIterator<>(leftNode.iterator());
}
private void calculateNext() {
if (!left.isEmpty()) {
return;
}
while (left.size() < BULK_SIZE && leftNodeIterator.hasNext()) {
if (!left.isEmpty()) {
ValidationTuple peek = leftNodeIterator.peek();
if (peek.sameTargetAs(left.getFirst())) {
// stop if we detect a duplicate target since we only support distinct targets on the left
// side of the join
assert false : "Current and next left target is the same: " + peek + " " + left.getFirst();
break;
}
}
left.addFirst(leftNodeIterator.next());
}
if (left.isEmpty()) {
return;
}
if (parsedQuery == null) {
parsedQuery = parseQuery(query);
}
runQuery(left, right, connection, parsedQuery, dataset, dataGraph, false, null);
}
@Override
public void localClose() {
if (leftNodeIterator != null) {
leftNodeIterator.close();
}
}
@Override
protected boolean localHasNext() {
calculateNext();
return !left.isEmpty();
}
@Override
protected ValidationTuple loggingNext() {
calculateNext();
if (!left.isEmpty()) {
ValidationTuple leftPeek = left.peekLast();
ValidationTuple joined = null;
if (!right.isEmpty()) {
ValidationTuple rightPeek = right.peekLast();
if (rightPeek.sameTargetAs(leftPeek)) {
// we have a join !
joined = ValidationTupleHelper.join(leftPeek, rightPeek);
right.removeLast();
ValidationTuple rightPeek2 = right.peekLast();
if (rightPeek2 == null || !rightPeek2.sameTargetAs(leftPeek)) {
// no more to join from right, pop left so we don't print it again.
left.removeLast();
}
}
}
if (joined != null) {
return joined;
} else {
left.removeLast();
return leftPeek;
}
}
return null;
}
};
}
@Override
public int depth() {
return leftNode.depth() + 1;
}
@Override
public void getPlanAsGraphvizDot(StringBuilder stringBuilder) {
if (printed) {
return;
}
printed = true;
stringBuilder.append(getId() + " [label=\"" + StringEscapeUtils.escapeJava(this.toString()) + "\"];")
.append("\n");
leftNode.getPlanAsGraphvizDot(stringBuilder);
// added/removed connections are always newly minted per plan node, so we instead need to compare the underlying
// sail
// if (connection instanceof MemoryStoreConnection) {
// stringBuilder.append(System.identityHashCode(((MemoryStoreConnection) connection).getSail()) + " -> "
// + getId() + " [label=\"right\"]").append("\n");
// } else {
stringBuilder.append(System.identityHashCode(connection) + " -> " + getId() + " [label=\"right\"]")
.append("\n");
// }
stringBuilder.append(leftNode.getId() + " -> " + getId() + " [label=\"left\"]").append("\n");
}
@Override
public String toString() {
return "BulkedExternalLeftOuterJoin{" + "query=" + query.replace("\n", " ")
+ '}';
}
@Override
public String getId() {
return System.identityHashCode(this) + "";
}
@Override
public void receiveLogger(ValidationExecutionLogger validationExecutionLogger) {
this.validationExecutionLogger = validationExecutionLogger;
leftNode.receiveLogger(validationExecutionLogger);
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
if (!super.equals(o)) {
return false;
}
BulkedExternalLeftOuterJoin that = (BulkedExternalLeftOuterJoin) o;
return Objects.equals(connection, that.connection)
&& leftNode.equals(that.leftNode)
&& Objects.equals(dataset, that.dataset)
&& query.equals(that.query);
}
@Override
public int hashCode() {
return Objects.hash(super.hashCode(), connection, dataset, leftNode, query);
}
}