SPARQLServerBaseTest.java

/*******************************************************************************
 * Copyright (c) 2019 Eclipse RDF4J contributors.
 *
 * All rights reserved. This program and the accompanying materials
 * are made available under the terms of the Eclipse Distribution License v1.0
 * which accompanies this distribution, and is available at
 * http://www.eclipse.org/org/documents/edl-v10.php.
 *
 * SPDX-License-Identifier: BSD-3-Clause
 *******************************************************************************/
package org.eclipse.rdf4j.federated;

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;

import org.eclipse.rdf4j.federated.endpoint.Endpoint;
import org.eclipse.rdf4j.federated.monitoring.MonitoringService;
import org.eclipse.rdf4j.federated.repository.RepositorySettings;
import org.eclipse.rdf4j.federated.server.NativeStoreServer;
import org.eclipse.rdf4j.federated.server.SPARQLEmbeddedServer;
import org.eclipse.rdf4j.federated.server.Server;
import org.eclipse.rdf4j.repository.Repository;
import org.eclipse.rdf4j.repository.RepositoryConnection;
import org.eclipse.rdf4j.repository.RepositoryException;
import org.eclipse.rdf4j.rio.RDFParseException;
import org.eclipse.rdf4j.rio.Rio;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.io.TempDir;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Base class for any federation test, this class is self-contained with regard to testing if run in a distinct JVM.
 *
 * @author as
 *
 */
public abstract class SPARQLServerBaseTest extends FedXBaseTest {

	/**
	 * The repository type used for testing
	 */
	public enum REPOSITORY_TYPE {
		SPARQLREPOSITORY,
		REMOTEREPOSITORY,
		NATIVE
	}

	protected static final int MAX_ENDPOINTS = 4;

	public static Logger log;

	/**
	 * the server, e.g. SparqlEmbeddedServer or NativeStoreServer
	 */
	protected static Server server;

	@TempDir
	static Path tempDir;

	private static REPOSITORY_TYPE repositoryType = REPOSITORY_TYPE.SPARQLREPOSITORY;

	@BeforeAll
	public static void initTest() throws Exception {
		System.setProperty("org.eclipse.rdf4j.repository.debug", "true");

		log = LoggerFactory.getLogger(SPARQLServerBaseTest.class);

		if (System.getProperty("repositoryType") != null) {
			repositoryType = REPOSITORY_TYPE.valueOf(System.getProperty("repositoryType"));
		}

		switch (repositoryType) {
		case NATIVE:
			initializeLocalNativeStores();
			break;
		case REMOTEREPOSITORY:
		case SPARQLREPOSITORY:
		default:
			initializeServer();
		}
	}

	@AfterAll
	public static void afterTest() throws Exception {
		if (server != null) {
			server.shutdown();
		}
		System.setProperty("org.eclipse.rdf4j.repository.debug", "false");
	}

	@BeforeEach
	public void beforeEachTest() {
		// reset operations counter and fail after
		for (int i = 1; i <= MAX_ENDPOINTS; i++) {
			RepositorySettings repoSettings = repoSettings(i);
			repoSettings.resetOperationsCounter();
			repoSettings.setFailAfter(-1);
		}
	}

	public boolean isSPARQLServer() {
		return server instanceof SPARQLEmbeddedServer;
	}

	/**
	 * Initialization of the embedded web server hosting an openrdf workbench. Used for remote and sparql repository
	 * setting
	 *
	 * @throws Exception
	 */
	private static void initializeServer() throws Exception {

		// set up the server: the maximal number of endpoints must be known
		List<String> repositoryIds = new ArrayList<>(MAX_ENDPOINTS);
		for (int i = 1; i <= MAX_ENDPOINTS; i++) {
			repositoryIds.add("endpoint" + i);
		}
		File dataDir = new File(tempDir.toFile(), "datadir");
		server = new SPARQLEmbeddedServer(dataDir, repositoryIds, repositoryType == REPOSITORY_TYPE.REMOTEREPOSITORY);

		server.initialize(MAX_ENDPOINTS);
	}

	/**
	 * Initialization of the embedded web server hosting an openrdf workbench. Used for remote and sparql repository
	 * setting
	 *
	 * @throws Exception
	 */
	private static void initializeLocalNativeStores() throws Exception {

		File dataDir = new File(tempDir.toFile(), "datadir");
		server = new NativeStoreServer(dataDir);
		server.initialize(MAX_ENDPOINTS);
	}

	/**
	 * Get the repository, initialized repositories are called
	 *
	 * endpoint1 endpoint2 .. endpoint%MAX_ENDPOINTS%
	 *
	 * @param i the index of the repository, starting with 1
	 * @return
	 */
	protected static Repository getRepository(int i) {
		return server.getRepository(i);
	}

	protected List<Endpoint> prepareTest(List<String> sparqlEndpointData) throws Exception {

		// clear federation
		federationContext().getManager().removeAll();

		// prepare the test endpoints (i.e. load data)
		if (sparqlEndpointData.size() > MAX_ENDPOINTS) {
			throw new RuntimeException("MAX_ENDPOINTs to low, " + sparqlEndpointData.size()
					+ " repositories needed. Adjust configuration");
		}

		int i = 1; // endpoint id, start with 1
		for (String s : sparqlEndpointData) {
			loadDataSet(server.getRepository(i++), s);
		}

		// configure federation
		List<Endpoint> endpoints = new ArrayList<>();
		for (i = 1; i <= sparqlEndpointData.size(); i++) {
			Endpoint e = server.loadEndpoint(i);
			endpoints.add(e);
			federationContext().getManager().addEndpoint(e, true);
		}
		return endpoints;
	}

	/**
	 * Load a dataset. Note: the repositories are cleared before loading data
	 *
	 * @param rep
	 * @param datasetFile
	 * @throws RDFParseException
	 * @throws RepositoryException
	 * @throws IOException
	 */
	protected void loadDataSet(Repository rep, String datasetFile)
			throws RDFParseException, RepositoryException, IOException {
		log.debug("loading dataset...");
		InputStream dataset = SPARQLServerBaseTest.class.getResourceAsStream(datasetFile);

		boolean needToShutdown = false;
		if (!rep.isInitialized()) {
			rep.init();
			needToShutdown = true;
		}
		RepositoryConnection con = rep.getConnection();
		try {
			con.clear();
			con.add(dataset, "", Rio.getParserFormatForFileName(datasetFile).get());
		} finally {
			dataset.close();
			con.close();
			if (needToShutdown) {
				rep.shutDown();
			}
		}
		log.debug("dataset loaded.");
	}

	protected void ignoreForNativeStore() {
		// ignore these tests for native store
		Assumptions.assumeTrue(isSPARQLServer(), "Test is ignored for native store federation");
	}

	protected void assumeNativeStore() {
		Assumptions.assumeTrue(server instanceof NativeStoreServer,
				"Test can be executed with native store federation only.");
	}

	protected void assumeSparqlEndpoint() {
		Assumptions.assumeTrue(repositoryType == REPOSITORY_TYPE.SPARQLREPOSITORY,
				"Test can be executed for SPARQL Repository only.");
	}

	/**
	 * Return the {@link RepositorySettings} for configuring the repository
	 *
	 * @param endpoint the endpoint index, starting with 1
	 * @return
	 */
	protected RepositorySettings repoSettings(int endpoint) {
		return server.getRepository(endpoint);
	}

	/**
	 * Helper method to check the number of requests sent to respective endpoint
	 *
	 * @param memberName       the memberName, typically "endpointN", where N >= 1
	 * @param expectedRequests
	 */
	protected void assertNumberOfRequests(String memberName, int expectedRequests) {
		if (!isSPARQLServer()) {
			return; // ignore for non SPARQL server environment where requests are not counted
		}
		var fedxContext = federationContext();
		if (!fedxContext.getConfig().isEnableMonitoring()) {
			Assertions.fail("monitoring is not enabled in the current federation.");
		}
		MonitoringService monitoringService = (MonitoringService) fedxContext.getMonitoringService();

		// obtain the monitoring information
		// Note: this method has some simplifications for the name
		var monitoringInformation = monitoringService.getAllMonitoringInformation()
				.stream()
				.filter(m -> {
					var endpoint = m.getE();
					return endpoint.getId().equals(memberName)
							|| endpoint.getId().equals("http://" + memberName)
							|| endpoint.getName().equals(memberName)
							|| endpoint.getName().equals("http://" + memberName);
				})
				.findFirst()
				.orElse(null);

		Assertions.assertEquals(expectedRequests,
				(monitoringInformation != null ? monitoringInformation.getNumberOfRequests() : 0));

	}

}