ServiceTests.java
/*******************************************************************************
* Copyright (c) 2019 Eclipse RDF4J contributors.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Distribution License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: BSD-3-Clause
*******************************************************************************/
package org.eclipse.rdf4j.federated;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.eclipse.rdf4j.common.iteration.CloseableIteration;
import org.eclipse.rdf4j.common.iteration.Iterations;
import org.eclipse.rdf4j.federated.endpoint.Endpoint;
import org.eclipse.rdf4j.federated.repository.FedXRepository;
import org.eclipse.rdf4j.http.client.HttpClientSessionManager;
import org.eclipse.rdf4j.model.Value;
import org.eclipse.rdf4j.model.impl.SimpleValueFactory;
import org.eclipse.rdf4j.query.BindingSet;
import org.eclipse.rdf4j.query.QueryEvaluationException;
import org.eclipse.rdf4j.query.QueryResults;
import org.eclipse.rdf4j.query.TupleQueryResult;
import org.eclipse.rdf4j.query.algebra.Service;
import org.eclipse.rdf4j.query.algebra.evaluation.federation.FederatedService;
import org.eclipse.rdf4j.query.algebra.evaluation.federation.FederatedServiceResolver;
import org.eclipse.rdf4j.repository.Repository;
import org.eclipse.rdf4j.repository.sail.SailRepository;
import org.eclipse.rdf4j.repository.sparql.federation.RepositoryFederatedService;
import org.eclipse.rdf4j.repository.sparql.federation.SPARQLFederatedService;
import org.eclipse.rdf4j.repository.sparql.federation.SPARQLServiceResolver;
import org.eclipse.rdf4j.repository.util.Repositories;
import org.eclipse.rdf4j.sail.memory.MemoryStore;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import com.google.common.collect.Sets;
public class ServiceTests extends SPARQLBaseTest {
@Test
public void test1() throws Exception {
assumeSparqlEndpoint();
/* test select query retrieving all persons from endpoint 1 (SERVICE) */
prepareTest(Arrays.asList("/tests/data/data1.ttl", "/tests/data/data2.ttl", "/tests/data/data3.ttl",
"/tests/data/data4.ttl"));
evaluateQueryPlan("/tests/service/query01.rq", "/tests/service/query01.qp");
execute("/tests/service/query01.rq", "/tests/service/query01.srx", false, true);
}
@Test
public void test1a_byName() throws Exception {
/* test select query retrieving all persons from endpoint 1 (SERVICE) by name */
prepareTest(Arrays.asList("/tests/data/data1.ttl", "/tests/data/data2.ttl", "/tests/data/data3.ttl",
"/tests/data/data4.ttl"));
evaluateQueryPlan("/tests/service/query01a.rq", "/tests/service/query01.qp");
execute("/tests/service/query01a.rq", "/tests/service/query01.srx", false, true);
}
@Test
public void test2() throws Exception {
assumeSparqlEndpoint();
/* test select query retrieving all persons from endpoint 1 (SERVICE) + exclusive statement => group */
prepareTest(Arrays.asList("/tests/data/data1.ttl", "/tests/data/data2.ttl", "/tests/data/data3.ttl",
"/tests/data/data4.ttl"));
evaluateQueryPlan("/tests/service/query02.rq", "/tests/service/query02.qp");
execute("/tests/service/query02.rq", "/tests/service/query02.srx", false, true);
}
@Test
public void test2_differentOrder() throws Exception {
assumeSparqlEndpoint();
/*
* test select query retrieving all persons from endpoint 1 (SERVICE) + exclusive statement => group In contrast
* to test2: order is different
*/
prepareTest(Arrays.asList("/tests/data/data1.ttl", "/tests/data/data2.ttl", "/tests/data/data3.ttl",
"/tests/data/data4.ttl"));
evaluateQueryPlan("/tests/service/query02a.rq", "/tests/service/query02a.qp");
execute("/tests/service/query02a.rq", "/tests/service/query02.srx", false, true);
}
@Test
public void test3() throws Exception {
assumeSparqlEndpoint();
/*
* test select query retrieving all persons from endpoint 1 (SERVICE), endpoint not part of federation =>
* evaluate using RDF4J
*/
prepareTest(Arrays.asList("/tests/data/data1.ttl", "/tests/data/data2.ttl", "/tests/data/data3.ttl",
"/tests/data/data4.ttl"));
Endpoint endpoint1 = federationContext().getEndpointManager().getEndpointByName("http://endpoint1");
fedxRule.removeEndpoint(endpoint1);
execute("/tests/service/query03.rq", "/tests/service/query03.srx", false, true);
}
@Test
public void test4() throws Exception {
// evaluates by sparql endpoint URL, cannot be done with native store
assumeSparqlEndpoint();
/* two service which form exclusive groups */
prepareTest(Arrays.asList("/tests/data/data1.ttl", "/tests/data/data2.ttl", "/tests/data/data3.ttl",
"/tests/data/data4.ttl"));
evaluateQueryPlan("/tests/service/query04.rq", "/tests/service/query04.qp");
execute("/tests/service/query04.rq", "/tests/service/query04.srx", false, true);
}
@Test
public void test4a() throws Exception {
/* two service which form exclusive groups (resolving by name) */
prepareTest(Arrays.asList("/tests/data/data1.ttl", "/tests/data/data2.ttl", "/tests/data/data3.ttl",
"/tests/data/data4.ttl"));
evaluateQueryPlan("/tests/service/query04a.rq", "/tests/service/query04.qp");
execute("/tests/service/query04a.rq", "/tests/service/query04a.srx", false, true);
}
@Test
public void test5() throws Exception {
assumeSparqlEndpoint();
/* two services, one becomes exclusive group, the other is evaluated as service (filter) */
prepareTest(Arrays.asList("/tests/data/data1.ttl", "/tests/data/data2.ttl", "/tests/data/data3.ttl",
"/tests/data/data4.ttl"));
execute("/tests/service/query05.rq", "/tests/service/query05.srx", false, true);
}
@Test
public void test6() throws Exception {
/*
* two services, one becomes exclusive group, the other is evaluated as service (filter), uses name of
* federation member in SERVICE
*/
prepareTest(Arrays.asList("/tests/data/data1.ttl", "/tests/data/data2.ttl", "/tests/data/data3.ttl",
"/tests/data/data4.ttl"));
execute("/tests/service/query06.rq", "/tests/service/query06.srx", false, true);
}
@Test
public void test7() throws Exception {
// evaluates by sparql endpoint URL, cannot be done with native store
assumeSparqlEndpoint();
/* two services, both evaluated as SERVICE (FILTER), uses name of federation member in SERVICE */
prepareTest(Arrays.asList("/tests/data/data1.ttl", "/tests/data/data2.ttl", "/tests/data/data3.ttl",
"/tests/data/data4.ttl"));
execute("/tests/service/query07.rq", "/tests/service/query07.srx", false, true);
}
@Test
public void test8() throws Exception {
assumeSparqlEndpoint();
/*
* test select query retrieving all persons from endpoint 1 (SERVICE) + exclusive statement => group
*/
prepareTest(Arrays.asList("/tests/data/data1.ttl", "/tests/data/data2.ttl", "/tests/data/data3.ttl",
"/tests/data/data4.ttl"));
evaluateQueryPlan("/tests/service/query08.rq", "/tests/service/query08.qp");
execute("/tests/service/query08.rq", "/tests/service/query08.srx", false, true);
}
@Test
public void test9() throws Exception {
assumeSparqlEndpoint();
FederatedServiceResolver serviceResolver = new SPARQLServiceResolver() {
@Override
protected FederatedService createService(String serviceUrl) throws QueryEvaluationException {
return new TestSparqlFederatedService(serviceUrl, getHttpClientSessionManager());
}
};
// workaround for test: shutdown and re-initialize in order to set a custom federated service
FedXRepository repo = fedxRule.getRepository();
repo.shutDown();
repo.setFederatedServiceResolver(serviceResolver);
repo.init();
/*
* test select query retrieving all persons from endpoint 1 (SERVICE), endpoint not part of federation =>
* evaluate using externally provided service resolver endpoint1 is reachable as
* http://localhost:18080/repositories/endpoint1 via HTTP
*/
prepareTest(Arrays.asList("/tests/data/data1.ttl", "/tests/data/data2.ttl", "/tests/data/data3.ttl",
"/tests/data/data4.ttl"));
Endpoint endpoint1 = federationContext().getEndpointManager().getEndpointByName("http://endpoint1");
fedxRule.removeEndpoint(endpoint1);
execute("/tests/service/query03.rq", "/tests/service/query03.srx", false, false);
Assertions.assertEquals(1,
((TestSparqlFederatedService) serviceResolver
.getService("http://localhost:18080/repositories/endpoint1")).serviceRequestCount.get());
}
@Test
public void test10_serviceBoundJoin() throws Exception {
assumeSparqlEndpoint();
FederatedServiceResolver serviceResolver = new SPARQLServiceResolver() {
@Override
protected FederatedService createService(String serviceUrl) throws QueryEvaluationException {
return new TestSparqlFederatedService(serviceUrl, getHttpClientSessionManager());
}
};
// workaround for test: shutdown and re-initialize in order to set a custom federated service
FedXRepository repo = fedxRule.getRepository();
repo.shutDown();
repo.setFederatedServiceResolver(serviceResolver);
repo.init();
fedxRule.getFederationContext().getConfig().withBoundJoinBlockSize(5);
/*
* test select query retrieving all persons from endpoint 1 (SERVICE), endpoint not part of federation =>
* evaluate using externally provided service resolver endpoint1 is reachable as
* http://localhost:18080/repositories/endpoint1 via HTTP
*/
prepareTest(Arrays.asList("/tests/data/data1.ttl", "/tests/data/data2.ttl", "/tests/data/data3.ttl",
"/tests/data/data4.ttl"));
Endpoint endpoint1 = federationContext().getEndpointManager().getEndpointByName("http://endpoint1");
fedxRule.removeEndpoint(endpoint1);
StringBuilder query = new StringBuilder();
query.append("SELECT * WHERE { VALUES ?input { ");
for (int i = 0; i < 50; i++) {
query.append(" \"input").append(i).append("\" ");
}
query.append(" }");
query.append(
" SERVICE <http://localhost:18080/repositories/endpoint1> { BIND (CONCAT(?input, '_processed') AS ?output) } ");
query.append(" }");
try (TupleQueryResult tqr = queryManager().prepareTupleQuery(query.toString()).evaluate()) {
List<BindingSet> res = Iterations.asList(tqr);
Assertions.assertEquals(50, res.size());
Set<Value> expected = Sets.newHashSet();
for (int i = 0; i < 50; i++) {
expected.add(SimpleValueFactory.getInstance().createLiteral("input" + i + "_processed"));
}
Assertions.assertEquals(expected, res.stream().map(b -> b.getValue("output")).collect(Collectors.toSet()));
}
// all requests are executed in bind-join with constant size
// for this test bind join size is set to 5, hence we see 10 bind join requests
TestSparqlFederatedService tfs = ((TestSparqlFederatedService) serviceResolver
.getService("http://localhost:18080/repositories/endpoint1"));
Assertions.assertEquals(10, tfs.boundJoinRequestCount.get());
}
@Test
public void test10_serviceSimpleEvaluation() throws Exception {
assumeSparqlEndpoint();
fedxRule.setConfig(c -> c.withEnableServiceAsBoundJoin(false));
FederatedServiceResolver serviceResolver = new SPARQLServiceResolver() {
@Override
protected FederatedService createService(String serviceUrl) throws QueryEvaluationException {
return new TestSparqlFederatedService(serviceUrl, getHttpClientSessionManager());
}
};
// workaround for test: shutdown and re-initialize in order to set a custom federated service
FedXRepository repo = fedxRule.getRepository();
repo.shutDown();
repo.setFederatedServiceResolver(serviceResolver);
repo.init();
/*
* test select query retrieving all persons from endpoint 1 (SERVICE), endpoint not part of federation =>
* evaluate using externally provided service resolver endpoint1 is reachable as
* http://localhost:18080/repositories/endpoint1 via HTTP
*/
prepareTest(Arrays.asList("/tests/data/data1.ttl", "/tests/data/data2.ttl", "/tests/data/data3.ttl",
"/tests/data/data4.ttl"));
Endpoint endpoint1 = federationContext().getEndpointManager().getEndpointByName("http://endpoint1");
fedxRule.removeEndpoint(endpoint1);
StringBuilder query = new StringBuilder();
query.append("SELECT * WHERE { VALUES ?input { ");
for (int i = 0; i < 50; i++) {
query.append(" \"input").append(i).append("\" ");
}
query.append(" }");
query.append(
" SERVICE <http://localhost:18080/repositories/endpoint1> { BIND (CONCAT(?input, '_processed') AS ?output) } ");
query.append(" }");
try (TupleQueryResult tqr = queryManager().prepareTupleQuery(query.toString()).evaluate()) {
List<BindingSet> res = Iterations.asList(tqr);
Assertions.assertEquals(50, res.size());
Set<Value> expected = Sets.newHashSet();
for (int i = 0; i < 50; i++) {
expected.add(SimpleValueFactory.getInstance().createLiteral("input" + i + "_processed"));
}
Assertions.assertEquals(expected, res.stream().map(b -> b.getValue("output")).collect(Collectors.toSet()));
}
// all input bindings are evaluated as simple join
TestSparqlFederatedService tfs = ((TestSparqlFederatedService) serviceResolver
.getService("http://localhost:18080/repositories/endpoint1"));
Assertions.assertEquals(50, tfs.serviceRequestCount.get());
Assertions.assertEquals(0, tfs.boundJoinRequestCount.get());
}
@Test
public void test10_serviceSilent() throws Exception {
assumeSparqlEndpoint();
Repository localStore = new SailRepository(new MemoryStore());
SPARQLServiceResolver serviceResolver = new SPARQLServiceResolver() {
@Override
protected FederatedService createService(String serviceUrl) throws QueryEvaluationException {
if (serviceUrl.equals("urn:memStore")) {
return new RepositoryFederatedService(localStore, true);
}
return new TestSparqlFederatedService(serviceUrl, getHttpClientSessionManager());
}
};
// workaround for test: shutdown and re-initialize in order to set a custom service resolver
FedXRepository repo = fedxRule.getRepository();
repo.shutDown();
repo.setFederatedServiceResolver(serviceResolver);
repo.init();
prepareTest(Arrays.asList("/tests/data/data1.ttl", "/tests/data/data2.ttl", "/tests/data/data3.ttl",
"/tests/data/data4.ttl"));
List<BindingSet> bs = Repositories.tupleQueryNoTransaction(fedxRule.repository,
"SELECT * WHERE { VALUES ?input { 'input1' } . SERVICE SILENT <urn:memStore> { BIND (CONCAT(?input, '_processed') AS ?output) } }",
iter -> QueryResults.asList(iter));
assertContainsAll(bs, "output", Sets.newHashSet(l("input1_processed")));
serviceResolver.shutDown();
}
@Test
@Disabled("test is flaky - see https://github.com/eclipse/rdf4j/issues/3160")
public void test11_errorHandling() throws Exception {
assumeSparqlEndpoint();
/*
* test select query where SERVICE is not part of federation and produces error
*/
prepareTest(Arrays.asList("/tests/data/data1.ttl", "/tests/data/data2.ttl", "/tests/data/data3.ttl",
"/tests/data/data4.ttl"));
Endpoint endpoint1 = federationContext().getEndpointManager().getEndpointByName("http://endpoint1");
fedxRule.removeEndpoint(endpoint1);
// run a simple SERVICE query
repoSettings(1).resetOperationsCounter();
repoSettings(1).setFailAfter(0);
String query_a = readQueryString("/tests/service/query11_error_a.rq");
Assertions.assertThrows(QueryEvaluationException.class, () -> {
Repositories.tupleQueryNoTransaction(fedxRule.repository, query_a,
iter -> QueryResults.asList(iter));
});
// run query where service does not produce errors
String query_b = readQueryString("/tests/service/query11_error_b.rq");
repoSettings(1).setFailAfter(-1);
List<BindingSet> bs = Repositories.tupleQueryNoTransaction(fedxRule.repository, query_b,
iter -> QueryResults.asList(iter));
Assertions
.assertEquals(Sets.newHashSet("Person2", "Person5"),
bs.stream()
.map(b -> b.getValue("name").stringValue())
.collect(Collectors.toSet()));
// re-run, but now simulate errors
repoSettings(1).resetOperationsCounter();
repoSettings(1).setFailAfter(1);
Assertions.assertThrows(QueryEvaluationException.class, () -> {
Repositories.tupleQueryNoTransaction(fedxRule.repository, query_b,
iter -> QueryResults.asList(iter));
});
}
static class TestSparqlFederatedService extends SPARQLFederatedService {
AtomicInteger serviceRequestCount = new AtomicInteger(0);
AtomicInteger boundJoinRequestCount = new AtomicInteger(0);
public TestSparqlFederatedService(String serviceUrl, HttpClientSessionManager client) {
super(serviceUrl, client);
}
@Override
public CloseableIteration<BindingSet> select(Service service,
Set<String> projectionVars, BindingSet bindings, String baseUri) throws QueryEvaluationException {
serviceRequestCount.incrementAndGet();
return super.select(service, projectionVars, bindings, baseUri);
}
@Override
public CloseableIteration<BindingSet> evaluate(Service service,
CloseableIteration<BindingSet> bindings, String baseUri)
throws QueryEvaluationException {
boundJoinRequestCount.incrementAndGet();
return super.evaluate(service, bindings, baseUri);
}
}
}