TupleQueryResultTest.java

/*******************************************************************************
 * Copyright (c) 2015 Eclipse RDF4J contributors, Aduna, and others.
 *
 * All rights reserved. This program and the accompanying materials
 * are made available under the terms of the Eclipse Distribution License v1.0
 * which accompanies this distribution, and is available at
 * http://www.eclipse.org/org/documents/edl-v10.php.
 *
 * SPDX-License-Identifier: BSD-3-Clause
 *******************************************************************************/
package org.eclipse.rdf4j.testsuite.repository;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.StringReader;
import java.lang.ref.Reference;
import java.lang.ref.WeakReference;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Random;
import java.util.concurrent.TimeUnit;

import org.apache.commons.lang3.time.StopWatch;
import org.eclipse.rdf4j.model.ValueFactory;
import org.eclipse.rdf4j.model.vocabulary.RDF;
import org.eclipse.rdf4j.query.BindingSet;
import org.eclipse.rdf4j.query.QueryLanguage;
import org.eclipse.rdf4j.query.TupleQuery;
import org.eclipse.rdf4j.query.TupleQueryResult;
import org.eclipse.rdf4j.query.resultio.QueryResultIO;
import org.eclipse.rdf4j.query.resultio.TupleQueryResultFormat;
import org.eclipse.rdf4j.query.resultio.TupleQueryResultWriter;
import org.eclipse.rdf4j.repository.Repository;
import org.eclipse.rdf4j.repository.RepositoryConnection;
import org.eclipse.rdf4j.repository.RepositoryException;
import org.eclipse.rdf4j.rio.RDFFormat;
import org.eclipse.rdf4j.rio.RDFParseException;
import org.eclipse.rdf4j.rio.UnsupportedRDFormatException;
import org.eclipse.rdf4j.sail.SailException;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import ch.qos.logback.classic.Level;

public abstract class TupleQueryResultTest {

	private final Logger logger = LoggerFactory.getLogger(TupleQueryResultTest.class);

	private Repository rep;

	private RepositoryConnection con;

	private String emptyResultQuery;

	private String multipleResultQuery;

	private final Random random = new Random(43252333);

	private List<WeakReference<TupleQueryResult>> unclosedQueryResults;

	@BeforeEach
	public void setUp() throws Exception {
		System.setProperty("org.eclipse.rdf4j.repository.debug", "true");
		ch.qos.logback.classic.Logger root = (ch.qos.logback.classic.Logger) LoggerFactory
				.getLogger("org.eclipse.rdf4j.sail.helpers.AbstractSailConnection");
		root.setLevel(Level.ERROR);

		rep = createRepository();
		con = rep.getConnection();

		buildQueries();
		addData();
		unclosedQueryResults = new ArrayList<>();

	}

	@AfterEach
	public void tearDown() throws Exception {
		System.setProperty("org.eclipse.rdf4j.repository.debug", "false");
		ch.qos.logback.classic.Logger root = (ch.qos.logback.classic.Logger) LoggerFactory
				.getLogger("org.eclipse.rdf4j.sail.helpers.AbstractSailConnection");
		root.setLevel(Level.WARN);
		try {
			con.close();
			con = null;
		} finally {
			System.gc();
			Thread.sleep(1);
			rep.shutDown();
			rep = null;
		}
	}

	protected Repository createRepository() {
		Repository repository = newRepository();
		try (RepositoryConnection con = repository.getConnection()) {
			con.clear();
			con.clearNamespaces();
		}
		return repository;
	}

	protected abstract Repository newRepository();

	/*
	 * build some simple SPARQL queries to use for testing the result set object.
	 */
	private void buildQueries() {
		StringBuilder query = new StringBuilder();

		query.append("SELECT * ");
		query.append("WHERE { ?X ?P ?Y . FILTER (?X != ?X) }");

		emptyResultQuery = query.toString();

		query = new StringBuilder();

		query.append("PREFIX dc: <http://purl.org/dc/elements/1.1/>\n");
		query.append("SELECT DISTINCT ?P \n");
		query.append("WHERE { [] dc:publisher ?P }\n");

		String singleResultQuery = query.toString();

		query = new StringBuilder();

		query.append("PREFIX dc: <http://purl.org/dc/elements/1.1/>\n");
		query.append("SELECT DISTINCT ?P ?D \n");
		query.append("WHERE { [] dc:publisher ?P;\n");
		query.append("        dc:date ?D. }\n");

		multipleResultQuery = query.toString();
	}

	private void addData() throws IOException, UnsupportedRDFormatException, RDFParseException, RepositoryException {
		try (InputStream defaultGraph = TupleQueryResultTest.class
				.getResourceAsStream("/testcases/default-graph-1.ttl")) {
			con.add(defaultGraph, "", RDFFormat.TURTLE);
		}
	}

	@Test
	public void testGetBindingNames() {
		try (TupleQueryResult result = con.prepareTupleQuery(multipleResultQuery).evaluate()) {
			List<String> headers = result.getBindingNames();

			assertThat(headers.get(0)).as("first header element").isEqualTo("P");
			assertThat(headers.get(1)).as("second header element").isEqualTo("D");
		}
	}

	@Test
	public void testIterator() {

		try (TupleQueryResult result = con.prepareTupleQuery(multipleResultQuery).evaluate()) {
			int count = 0;
			while (result.hasNext()) {
				result.next();
				count++;
			}

			assertTrue(count > 1, "query should have multiple results.");
		}
	}

	@Test
	public void testIsEmpty() {

		try (TupleQueryResult result = con.prepareTupleQuery(emptyResultQuery).evaluate()) {
			Assertions.assertFalse(result.hasNext(), "Query result should be empty");
		}
	}

	@Test
	public void testCountMatchesAllSelect() {
		try (TupleQueryResult result = con.prepareTupleQuery("SELECT * WHERE {?s ?p ?o}").evaluate()) {
			long size = con.size();
			for (int i = 0; i < size; i++) {
				assertTrue(result.hasNext());
				BindingSet next = result.next();
				Assertions.assertNotNull(next);
			}
			Assertions.assertFalse(result.hasNext(), "Query result should be empty");
		}
	}

	@Test
	public void testStreaming() throws Exception {
		ValueFactory vf = con.getValueFactory();
		int subjectIndex = 0;
		int predicateIndex = 100;
		int objectIndex = 1000;
		int testStatementCount = 1000;
		int count = 0;
		con.begin();
		while (count < testStatementCount) {
			con.add(vf.createIRI("urn:test:" + subjectIndex), vf.createIRI("urn:test:" + predicateIndex),
					vf.createIRI("urn:test:" + objectIndex));
			if (Math.round(random.nextDouble()) > 0) {
				subjectIndex++;
			}
			if (Math.round(random.nextDouble()) > 0) {
				predicateIndex++;
			}
			if (Math.round(random.nextDouble()) > 0) {
				objectIndex++;
			}
			count++;
		}
		con.commit();

		for (int evaluateCount = 0; evaluateCount < 1000; evaluateCount++) {
			try (ByteArrayOutputStream stream = new ByteArrayOutputStream(191226);
					RepositoryConnection nextCon = rep.getConnection()) {
				TupleQueryResultWriter sparqlWriter = QueryResultIO.createTupleWriter(TupleQueryResultFormat.SPARQL,
						stream);
				TupleQuery tupleQuery = nextCon.prepareTupleQuery(QueryLanguage.SPARQL,
						"SELECT ?s ?p ?o WHERE { ?s ?p ?o . }");
				tupleQuery.setIncludeInferred(false);
				tupleQuery.evaluate(sparqlWriter);
			}
		}
	}

	@Test
	public void testNotClosingResult() {
		ValueFactory vf = con.getValueFactory();
		int subjectIndex = 0;
		int predicateIndex = 100;
		int objectIndex = 1000;
		int testStatementCount = 1000;
		int count = 0;
		con.begin();
		while (count < testStatementCount) {
			con.add(vf.createIRI("urn:test:" + subjectIndex), vf.createIRI("urn:test:" + predicateIndex),
					vf.createIRI("urn:test:" + objectIndex));
			if (Math.round(random.nextDouble()) > 0) {
				subjectIndex++;
			}
			if (Math.round(random.nextDouble()) > 0) {
				predicateIndex++;
			}
			if (Math.round(random.nextDouble()) > 0) {
				objectIndex++;
			}
			count++;
		}
		con.commit();

		logger.info("Open lots of TupleQueryResults without closing them");
		for (int i = 0; i < 100; i++) {
			try (RepositoryConnection repCon = rep.getConnection()) {
				evaluateQueryWithoutClosing(repCon);
			} catch (SailException e) {
				assertTrue(e.toString()
						.startsWith(
								"org.eclipse.rdf4j.sail.SailException: Connection closed before all iterations were closed: org.eclipse.rdf4j.sail.helpers.SailBaseIteration@"));
			}
		}
	}

	@Test
	public void testNotClosingResultWithoutDebug() throws InterruptedException {
		System.setProperty("org.eclipse.rdf4j.repository.debug", "false");

		StopWatch stopWatch = StopWatch.createStarted();

		con.begin();
		con.add(RDF.TYPE, RDF.TYPE, RDF.PROPERTY);
		con.commit();

		for (int i = 0; i < 100; i++) {
			try (RepositoryConnection repCon = rep.getConnection()) {
				evaluateQueryWithoutClosing(repCon);
				System.gc();
				Thread.sleep(1);
				while (unclosedQueryResults.stream().map(Reference::get).anyMatch(Objects::nonNull)) {
					System.gc();
					Thread.sleep(100);
					assertTrue(stopWatch.getTime(TimeUnit.SECONDS) < 60, "Test timed out after 60 seconds");

				}
			}
		}

	}

	private void evaluateQueryWithoutClosing(RepositoryConnection repCon) {
		String queryString = "select * where {?s ?p ?o}";
		TupleQuery tupleQuery = repCon.prepareTupleQuery(QueryLanguage.SPARQL, queryString);

		// see if open results hangs test
		// DO NOT CLOSE THIS
		TupleQueryResult evaluate = tupleQuery.evaluate();
		unclosedQueryResults.add(new WeakReference<>(evaluate));
		Assertions.assertNotNull(evaluate);
	}

	@Test
	public void testNotClosingResultThrowsException() {
		System.setProperty("org.eclipse.rdf4j.repository.debug", "false");

		con.begin();
		con.add(RDF.TYPE, RDF.TYPE, RDF.PROPERTY);
		con.commit();

		Assertions.assertThrows(SailException.class, () -> {
			TupleQueryResult evaluate = null;
			try (RepositoryConnection repCon = rep.getConnection()) {
				String queryString = "select * where {?s ?p ?o}";
				TupleQuery tupleQuery = repCon.prepareTupleQuery(QueryLanguage.SPARQL, queryString);

				evaluate = tupleQuery.evaluate();
			} finally {
				evaluate.close();
			}
		});

	}

	@Test
	public void testLeftJoinWithJoinCondition() throws IOException {

		String data = "@prefix wdt: <http://www.wikidata.org/prop/direct/> .\n" +
				"@prefix wd: <http://www.wikidata.org/entity/> .\n" +
				"@prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#> .\n" +
				"@prefix wikibase: <http://wikiba.se/ontology#> .\n" +

				"\n" +
				"wd:other1 wdt:P106 wd:Q27532438 .\n" +
				"wd:other2 wdt:P106 wd:Q27532438 .\n" +
				"wd:other3 wdt:P106 wd:Q27532438 .\n" +

				"wd:Q27532438 a wikibase:Item;\n" +
				"  rdfs:label \"suffragist\"@en, \"sufrageto\"@eo, \"����������������������\"@ru, \"suffragetti\"@fi, \"sz��frazsett\"@lv.\n"
				+

				"wd:Q38203 wdt:P106 wd:Q27532437 .\n" +
				"\n" +
				"wd:Q27532437 a wikibase:Item;\n" +
				"  rdfs:label \"suffragist\"@en, \"sufrageto\"@eo, \"����������������������\"@ru, \"suffragetti\"@fi, \"sz��frazsett\"@hu.\n"
				+
				"\n" +

				"wd:other9 wdt:P106 wd:Q27532439 .\n" +

				"wd:Q27532438 a wikibase:Item;\n" +
				"  rdfs:label \"suffragist\"@en, \"sufrageto\"@eo, \"����������������������\"@ru, \"suffragetti\"@fi, \"sz��frazsett\"@no-nb.\n"
				+

				"wd:Q89166696 wdt:P106 wd:Q27532437 .\n";

		con.add(new StringReader(data), "", RDFFormat.TURTLE);

		TupleQuery tupleQuery = con.prepareTupleQuery("PREFIX wdt: <http://www.wikidata.org/prop/direct/>\n" +
				"PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>\n" +
				"SELECT ?occup ?occupLabel ?count\n" +
				"WHERE\n" +
				"{\n" +
				"    {\n" +
				"        SELECT ?occup (COUNT(?person) as ?count)\n" +
				"        WHERE\n" +
				"        {\n" +
				"            ?person wdt:P106 ?occup\n" +
				"        }\n" +
				"        GROUP BY ?occup\n" +
				"        ORDER BY DESC(?count)\n" +
				"        LIMIT 1000\n" +
				"    }\n" +
				"BIND(\"lv\" AS ?lang)\n" +
				"      OPTIONAL {" +
				"			?occup rdfs:label ?label1.     \n" +
				"			filter(lang(?label1) = ?lang)\n" +
				"}\n" +
				"    FILTER(!BOUND(?label1))\n" +
				"?occup rdfs:label ?occupLabel FILTER (LANG(?occupLabel)=\"en\") .\n" +
				"\n" +
				"}\n" +
				"ORDER BY DESC(?count)\n" +
				"LIMIT 50");
		con.begin();
		try (TupleQueryResult evaluate = tupleQuery.evaluate()) {
			assertTrue(evaluate.hasNext());
		}
		con.commit();

	}

}