RepositoryFederatedServiceIntegrationTest.java

/*******************************************************************************
 * Copyright (c) 2019 Eclipse RDF4J contributors.
 *
 * All rights reserved. This program and the accompanying materials
 * are made available under the terms of the Eclipse Distribution License v1.0
 * which accompanies this distribution, and is available at
 * http://www.eclipse.org/org/documents/edl-v10.php.
 *
 * SPDX-License-Identifier: BSD-3-Clause
 *******************************************************************************/
package org.eclipse.rdf4j.repository.sparql.federation;

import static org.junit.jupiter.api.Assertions.assertEquals;

import java.util.Collection;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import org.eclipse.rdf4j.common.iteration.Iterations;
import org.eclipse.rdf4j.model.IRI;
import org.eclipse.rdf4j.model.Literal;
import org.eclipse.rdf4j.model.Statement;
import org.eclipse.rdf4j.model.Value;
import org.eclipse.rdf4j.model.ValueFactory;
import org.eclipse.rdf4j.model.impl.SimpleValueFactory;
import org.eclipse.rdf4j.model.vocabulary.RDFS;
import org.eclipse.rdf4j.query.BindingSet;
import org.eclipse.rdf4j.query.QueryEvaluationException;
import org.eclipse.rdf4j.query.TupleQueryResult;
import org.eclipse.rdf4j.query.algebra.evaluation.federation.FederatedService;
import org.eclipse.rdf4j.query.algebra.evaluation.federation.FederatedServiceResolver;
import org.eclipse.rdf4j.repository.Repository;
import org.eclipse.rdf4j.repository.RepositoryConnection;
import org.eclipse.rdf4j.repository.sail.SailRepository;
import org.eclipse.rdf4j.sail.memory.MemoryStore;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import com.google.common.collect.Lists;

/**
 * Integration tests for {@link RepositoryFederatedService}
 *
 * @author Andreas Schwarte
 */
public class RepositoryFederatedServiceIntegrationTest {

	private static final ValueFactory vf = SimpleValueFactory.getInstance();

	private SailRepository serviceRepo;
	private SailRepository localRepo;
	private RepositoryFederatedService federatedService;

	@BeforeEach
	public void before() {
		serviceRepo = new SailRepository(new MemoryStore());
		serviceRepo.init();

		federatedService = new RepositoryFederatedService(serviceRepo);

		localRepo = new SailRepository(new MemoryStore());
		localRepo.setFederatedServiceResolver(new FederatedServiceResolver() {

			@Override
			public FederatedService getService(String serviceUrl) throws QueryEvaluationException {
				return federatedService;
			}
		});
		localRepo.init();
	}

	@AfterEach
	public void after() {
		federatedService.shutdown();
		localRepo.shutDown();
		serviceRepo.shutDown();
		System.setProperty("org.eclipse.rdf4j.repository.debug", "false");
	}

	@Test
	public void test() {

		addData(serviceRepo, Lists.newArrayList(vf.createStatement(iri("s1"), RDFS.LABEL, l("val1"))));

		String query = "SELECT ?var WHERE { VALUES ?var { 'val1' 'val2' } . SERVICE <urn:dummy> { ?s ?p ?var  } }";

		assertResultEquals(evaluateQuery(query), "var", Lists.newArrayList(l("val1")));
	}

	@Test
	public void test2() {

		addData(serviceRepo, Lists.newArrayList(
				vf.createStatement(iri("s1"), RDFS.LABEL, l("val1")),
				vf.createStatement(iri("s2"), RDFS.LABEL, l("val2")),
				vf.createStatement(iri("s3"), RDFS.LABEL, l("val3"))));

		String query = "SELECT ?var WHERE { VALUES ?var { 'val1' 'val2' } . SERVICE <urn:dummy> { ?s ?p ?var  } }";

		assertResultEquals(evaluateQuery(query), "var", Lists.newArrayList(l("val1"), l("val2")));
	}

	@Test
	public void test3() {

		addData(serviceRepo, Lists.newArrayList(
				vf.createStatement(iri("s1"), RDFS.LABEL, l("val1")),
				vf.createStatement(iri("s2"), RDFS.LABEL, l("val2")),
				vf.createStatement(iri("s3"), RDFS.LABEL, l("val3"))));

		String query = "SELECT ?var WHERE { VALUES ?var { 'val1' 'val2' } . SERVICE <urn:dummy> { SELECT ?var { ?s ?p ?var } LIMIT 1000  } } order by ?var";

		assertResultEquals(evaluateQuery(query), "var", Lists.newArrayList(l("val1"), l("val2")));
	}

	@Test
	public void test3a() {

		addData(serviceRepo, Lists.newArrayList(
				vf.createStatement(iri("s1"), RDFS.LABEL, l("val1")),
				vf.createStatement(iri("s2"), RDFS.LABEL, l("val2")),
				vf.createStatement(iri("s3"), RDFS.LABEL, l("val3"))));

		String query = "SELECT ?s ?var WHERE { VALUES ?var { 'val1' 'val2' } . OPTIONAL { SERVICE <urn:dummy> { SELECT ?s ?var { ?s ?p ?var . FILTER (?var='val2') } LIMIT 1  } } }";

		List<BindingSet> res = evaluateQuery(query);
		assertResultEquals(res, "var", Lists.newArrayList(l("val1"), l("val2")));
		assertResultEquals(res, "s", Lists.newArrayList(null, (iri("s2"))));
	}

	@Test
	public void test4() {

		addData(serviceRepo, Lists.newArrayList(
				vf.createStatement(iri("s1"), RDFS.LABEL, l("val1")),
				vf.createStatement(iri("s2"), RDFS.LABEL, l("val2")),
				vf.createStatement(iri("s3"), RDFS.LABEL, l("val3"))));

		String query = "SELECT ?var WHERE { SERVICE <urn:dummy> { ?s ?p ?var } . SERVICE <urn:dummy> {  ?s ?p ?var  } } order by ?var";

		assertResultEquals(evaluateQuery(query), "var", Lists.newArrayList(l("val1"), l("val2"), l("val3")));
	}

	@Test
	public void test4a() {

		addData(serviceRepo, Lists.newArrayList(
				vf.createStatement(iri("s1"), RDFS.LABEL, l("val1")),
				vf.createStatement(iri("s2"), RDFS.LABEL, l("val2")),
				vf.createStatement(iri("s3"), RDFS.LABEL, l("val3"))));

		// Note: here we apply a workaround and explicitly project "?__rowIdx"
		String query = "SELECT ?var WHERE { SERVICE <urn:dummy> { SELECT ?var { ?s ?p ?var } LIMIT 3 } . SERVICE <urn:dummy> { SELECT ?s ?var ?__rowIdx { ?s ?p ?var } LIMIT 3  } } order by ?var";

		assertResultEquals(evaluateQuery(query), "var", Lists.newArrayList(l("val1"), l("val2"), l("val3")));
	}

	@Test
	public void test4b() {

		addData(serviceRepo, Lists.newArrayList(
				vf.createStatement(iri("s1"), RDFS.LABEL, l("val1")),
				vf.createStatement(iri("s2"), RDFS.LABEL, l("val2")),
				vf.createStatement(iri("s3"), RDFS.LABEL, l("val3"))));

		String query = "SELECT ?var WHERE { SERVICE <urn:dummy> { SELECT ?var { ?s ?p ?var } LIMIT 3 } . SERVICE <urn:dummy> { SELECT ?s ?var { ?s ?p ?var } LIMIT 3  } } order by ?var";

		assertResultEquals(evaluateQuery(query), "var", Lists.newArrayList(l("val1"), l("val2"), l("val3")));
	}

	@Test
	public void test5() {

		addData(serviceRepo, Lists.newArrayList(vf.createStatement(iri("s1"), RDFS.LABEL, l("val1"))));

		String query = "SELECT ?var ?output WHERE { VALUES ?var { 'val1' 'val2' } . SERVICE <urn:dummy> { BIND(CONCAT(?var, '_processed') AS ?output) } }";

		List<BindingSet> res = evaluateQuery(query);
		assertResultEquals(res, "var", Lists.newArrayList(l("val1"), l("val2")));
		assertResultEquals(res, "output", Lists.newArrayList(l("val1_processed"), l("val2_processed")));
	}

	@Test
	public void test5a() {

		addData(serviceRepo, Lists.newArrayList(vf.createStatement(iri("s1"), RDFS.LABEL, l("val1"))));
		addData(serviceRepo, Lists.newArrayList(vf.createStatement(iri("s2"), RDFS.LABEL, l("val2"))));

		// Note: here we apply a workaround and explicitly project "?__rowIdx"
		String query = "SELECT ?var ?output WHERE { SERVICE <urn:dummy> { SELECT ?var { ?s ?p ?var } LIMIT 3 }  . SERVICE <urn:dummy> { SELECT (CONCAT(?var, '_processed') AS ?output) ?__rowIdx WHERE { } } }";

		List<BindingSet> res = evaluateQuery(query);
		assertResultEquals(res, "var", Lists.newArrayList(l("val1"), l("val2")));
		assertResultEquals(res, "output", Lists.newArrayList(l("val1_processed"), l("val2_processed")));
	}

	@Test
	public void test5b() {

		addData(serviceRepo, Lists.newArrayList(vf.createStatement(iri("s1"), RDFS.LABEL, l("val1"))));
		addData(serviceRepo, Lists.newArrayList(vf.createStatement(iri("s2"), RDFS.LABEL, l("val2"))));

		String query = "SELECT ?var ?output WHERE { SERVICE <urn:dummy> { SELECT ?var { ?s ?p ?var } LIMIT 3 }  . SERVICE <urn:dummy> { SELECT (CONCAT(?var, '_processed') AS ?output) WHERE { } } }";

		List<BindingSet> res = evaluateQuery(query);
		assertResultEquals(res, "var", Lists.newArrayList(l("val1"), l("val2")));
		assertResultEquals(res, "output", Lists.newArrayList(l("val1_processed"), l("val2_processed")));
	}

	@Test
	public void test6() {

		addData(serviceRepo, Lists.newArrayList(vf.createStatement(iri("s1"), RDFS.LABEL, l("val1"))));
		addData(serviceRepo, Lists.newArrayList(vf.createStatement(iri("s2"), RDFS.LABEL, l("val2"))));
		addData(serviceRepo, Lists.newArrayList(vf.createStatement(iri("s3"), RDFS.LABEL, l("val3"))));

		String query = "SELECT ?var ?cnt WHERE { SERVICE <urn:dummy> { SELECT ?var { ?s ?p ?var } LIMIT 2 }  . SERVICE <urn:dummy> { SELECT ?var ?cnt ?__rowIdx WHERE { SELECT (COUNT(?s2) AS ?cnt) WHERE { ?s2 ?p2 ?var  } } } }";

		List<BindingSet> res = evaluateQuery(query);
		assertEquals(2, res.size());
		BindingSet b1 = res.get(0);
		assertEquals(l("val1"), b1.getValue("var"));
		assertEquals(1, ((Literal) b1.getValue("cnt")).intValue());
	}

	@Test
	public void test6b() {

		addData(serviceRepo, Lists.newArrayList(vf.createStatement(iri("s1"), RDFS.LABEL, l("val1"))));
		addData(serviceRepo, Lists.newArrayList(vf.createStatement(iri("s2"), RDFS.LABEL, l("val2"))));
		addData(serviceRepo, Lists.newArrayList(vf.createStatement(iri("s3"), RDFS.LABEL, l("val3"))));

		String query = "SELECT ?var ?cnt WHERE { SERVICE <urn:dummy> { SELECT ?var { ?s ?p ?var } LIMIT 1 }  . SERVICE <urn:dummy> { SELECT ?var ?cnt ?__rowIdx WHERE { SELECT (COUNT(?s2) AS ?cnt) WHERE { ?s2 ?p2 ?var  } } } }";

		List<BindingSet> res = evaluateQuery(query);
		assertEquals(1, res.size());
		BindingSet b1 = res.get(0);
		assertEquals(l("val1"), b1.getValue("var"));
		assertEquals(1, ((Literal) b1.getValue("cnt")).intValue());
	}

	@Test
	public void test7_CrossProduct() {

		addData(serviceRepo, Lists.newArrayList(vf.createStatement(iri("s1"), RDFS.LABEL, l("serviceVal"))));

		String query = "SELECT ?var ?o WHERE { VALUES ?var { 'val1' 'val2' } . SERVICE <urn:dummy> { ?s ?p ?o  } }";

		List<BindingSet> res = evaluateQuery(query);
		assertResultEquals(res, "var", Lists.newArrayList(l("val1"), l("val2")));
		assertResultEquals(res, "o", Lists.newArrayList(l("serviceVal"), l("serviceVal")));
	}

	@Test
	public void test8_subSelectAll() {

		addData(serviceRepo, Lists.newArrayList(vf.createStatement(iri("s1"), RDFS.LABEL, l("val1"))));

		String query = "SELECT ?var WHERE { SERVICE <urn:dummy> { SELECT ?var WHERE { VALUES ?var { 'val1' 'val2' } } } . SERVICE <urn:dummy> { SELECT * WHERE { ?s ?p ?var }  } }";

		assertResultEquals(evaluateQuery(query), "var", Lists.newArrayList(l("val1")));
	}

	@Test
	public void test8a_subSelectAll() {

		addData(serviceRepo, Lists.newArrayList(vf.createStatement(iri("s1"), RDFS.LABEL, l("val1"))));

		// query has multiple whitespaces "SELECT *", thus does not insert "?__rowIdx" and goes into fallback evaluation
		String query = "SELECT ?var WHERE { SERVICE <urn:dummy> { SELECT ?var WHERE { VALUES ?var { 'val1' 'val2' } } } . SERVICE <urn:dummy> { SELECT   * WHERE { ?s ?p ?var }  } }";

		assertResultEquals(evaluateQuery(query), "var", Lists.newArrayList(l("val1")));
	}

	@Test
	public void test9_connectionHandling() throws Exception {

		/*
		 * The purpose of this test is to simulate concurrent access to the RepositoryFederatedService and thus
		 * demonstrate correct behavior for the connection handling. Particularly, this test should terminate properly,
		 * and there should not be any hanging connections waiting for the shutdown.
		 */

		System.setProperty("org.eclipse.rdf4j.repository.debug", "true");
		List<Value> values = Lists.newArrayList();
		for (int i = 0; i < 10; i++) {
			values.add(l("value" + i));
		}
		addData(serviceRepo,
				values.stream()
						.map(value -> vf.createStatement(iri("s1"), RDFS.LABEL, value))
						.collect(Collectors.toList()));

		ExecutorService executor = Executors.newFixedThreadPool(5);
		try {
			for (int i = 0; i < 5; i++) {
				executor.submit(() -> {

					String query = "SELECT ?var WHERE { SERVICE <urn:dummy> { ?s ?p ?var  } }";
					assertResultEquals(evaluateQuery(query), "var", values);
				});
			}

		} finally {
			executor.shutdown();
			executor.awaitTermination(10, TimeUnit.SECONDS);
		}

	}

	@Test
	public void test10_consumePartially() {

		/*
		 * The purpose of this test is validate that connections are closed properly, even if a result is consume only
		 * partially. If it wasn't working we would see a hanging junit testing waiting for connections to close
		 */

		List<Value> values = Lists.newArrayList();
		for (int i = 0; i < 10; i++) {
			values.add(l("value" + i));
		}
		addData(serviceRepo,
				values.stream()
						.map(value -> vf.createStatement(iri("s1"), RDFS.LABEL, value))
						.collect(Collectors.toList()));

		String query = "SELECT ?var WHERE { SERVICE <urn:dummy> { ?s ?p ?var  } }";
		try (RepositoryConnection conn = localRepo.getConnection()) {
			try (TupleQueryResult tqr = conn.prepareTupleQuery(query).evaluate()) {

				// consume only two items
				tqr.next();
				tqr.next();
			}
		}
	}

	private void addData(Repository repo, Collection<? extends Statement> m) {
		try (RepositoryConnection conn = repo.getConnection()) {
			conn.add(m);
		}
	}

	private List<BindingSet> evaluateQuery(String query) {
		try (RepositoryConnection conn = localRepo.getConnection()) {
			try (TupleQueryResult tqr = conn.prepareTupleQuery(query).evaluate()) {
				return Iterations.asList(tqr);
			}
		}
	}

	private IRI iri(String localName) {
		return vf.createIRI("http://example.org/resource/", localName);
	}

	private Literal l(String literal) {
		return vf.createLiteral(literal);
	}

	private void assertResultEquals(List<BindingSet> res, String bindingName, List<Value> expected) {
		assertEquals(expected, res.stream().map(b -> b.getValue(bindingName)).collect(Collectors.toList()));
	}
}