WriteTest.java

/*******************************************************************************
 * Copyright (c) 2019 Eclipse RDF4J contributors.
 *
 * All rights reserved. This program and the accompanying materials
 * are made available under the terms of the Eclipse Distribution License v1.0
 * which accompanies this distribution, and is available at
 * http://www.eclipse.org/org/documents/edl-v10.php.
 *
 * SPDX-License-Identifier: BSD-3-Clause
 *******************************************************************************/
package org.eclipse.rdf4j.federated.write;

import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

import org.eclipse.rdf4j.common.iteration.Iterations;
import org.eclipse.rdf4j.federated.FederationContext;
import org.eclipse.rdf4j.federated.SPARQLBaseTest;
import org.eclipse.rdf4j.federated.endpoint.Endpoint;
import org.eclipse.rdf4j.federated.endpoint.EndpointBase;
import org.eclipse.rdf4j.model.IRI;
import org.eclipse.rdf4j.model.Resource;
import org.eclipse.rdf4j.model.Statement;
import org.eclipse.rdf4j.model.Value;
import org.eclipse.rdf4j.model.ValueFactory;
import org.eclipse.rdf4j.model.impl.SimpleValueFactory;
import org.eclipse.rdf4j.model.vocabulary.FOAF;
import org.eclipse.rdf4j.model.vocabulary.RDF;
import org.eclipse.rdf4j.query.QueryLanguage;
import org.eclipse.rdf4j.query.Update;
import org.eclipse.rdf4j.repository.Repository;
import org.eclipse.rdf4j.repository.RepositoryConnection;
import org.eclipse.rdf4j.repository.RepositoryException;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

public class WriteTest extends SPARQLBaseTest {

	@BeforeEach
	public void nativeStoreOnly() {
		assumeNativeStore();
	}

	@Test
	public void testSimpleWrite() throws Exception {
		prepareTest(Arrays.asList("/tests/basic/data_emptyStore.ttl", "/tests/basic/data_emptyStore.ttl"));

		Iterator<Endpoint> iter = federationContext().getEndpointManager().getAvailableEndpoints().iterator();
		EndpointBase ep1 = (EndpointBase) iter.next();
		ep1.setWritable(true);
		Endpoint ep2 = iter.next();

		List<Statement> stmts;
		Statement st = simpleStatement();

		try (RepositoryConnection conn = fedxRule.getRepository().getConnection()) {
			conn.add(st);

			// test that statement is returned from federation
			stmts = Iterations.asList(conn.getStatements(null, null, null, true));
			Assertions.assertEquals(1, stmts.size());
			Assertions.assertEquals(st, stmts.get(0));
		}

		// check that the statement is actually written to endpoint 1
		try (RepositoryConnection ep1Conn = ep1.getConnection()) {
			stmts = Iterations.asList(ep1Conn.getStatements(null, null, null, true));
			Assertions.assertEquals(1, stmts.size());
			Assertions.assertEquals(st, stmts.get(0));
		}

		// check that endpoint 2 is empty
		try (RepositoryConnection ep2Conn = ep2.getConnection()) {
			stmts = Iterations.asList(ep2Conn.getStatements(null, null, null, true));
			Assertions.assertEquals(0, stmts.size());
		}
	}

	@Test
	public void testReadOnlyFederation() throws Exception {

		prepareTest(Arrays.asList("/tests/basic/data_emptyStore.ttl", "/tests/basic/data_emptyStore.ttl"));

		Assertions.assertEquals(false, fedxRule.getRepository().isWritable());

		Assertions.assertThrows(UnsupportedOperationException.class, () -> {
			Statement st = simpleStatement();
			try (RepositoryConnection conn = fedxRule.getRepository().getConnection()) {
				try {
					conn.add(st);
				} catch (RuntimeException e) {
					// rollback to avoid a stack trace in the output
					conn.rollback();
					throw e;
				}
			}
		});

	}

	@Test
	public void testSimpleUpdateQuery() throws Exception {

		prepareTest(Arrays.asList("/tests/basic/data_emptyStore.ttl", "/tests/basic/data_emptyStore.ttl"));

		Iterator<Endpoint> iter = federationContext().getEndpointManager().getAvailableEndpoints().iterator();
		EndpointBase ep1 = (EndpointBase) iter.next();
		ep1.setWritable(true);

		try (RepositoryConnection conn = fedxRule.getRepository().getConnection()) {
			Update update = conn.prepareUpdate(QueryLanguage.SPARQL,
					"PREFIX : <http://example.org/> INSERT { :subject a :Person } WHERE { }");
			update.execute();

			// test that statement is returned from federation
			List<Statement> stmts = Iterations.asList(conn.getStatements(null, null, null, true));
			Assertions.assertEquals(1, stmts.size());
			Assertions.assertEquals(RDF.TYPE, stmts.get(0).getPredicate());
		}
	}

	@Test
	public void testSimpleUpdateQuery_insertData() throws Exception {

		prepareTest(Arrays.asList("/tests/basic/data_emptyStore.ttl", "/tests/basic/data_emptyStore.ttl"));

		Iterator<Endpoint> iter = federationContext().getEndpointManager().getAvailableEndpoints().iterator();
		EndpointBase ep1 = (EndpointBase) iter.next();
		ep1.setWritable(true);

		try (RepositoryConnection conn = fedxRule.getRepository().getConnection()) {
			Update update = conn.prepareUpdate(QueryLanguage.SPARQL,
					"PREFIX ex: <http://example.org/> INSERT DATA { ex:subject a ex:Person } ");
			update.execute();

			// test that statement is returned from federation
			List<Statement> stmts = Iterations.asList(conn.getStatements(null, null, null, true));
			Assertions.assertEquals(1, stmts.size());
			Assertions.assertEquals(RDF.TYPE, stmts.get(0).getPredicate());
		}
	}

	@Test
	public void testSimpleRemove() throws Exception {
		prepareTest(Arrays.asList("/tests/basic/data_emptyStore.ttl", "/tests/basic/data_emptyStore.ttl"));

		Iterator<Endpoint> iter = federationContext().getEndpointManager().getAvailableEndpoints().iterator();
		EndpointBase ep1 = (EndpointBase) iter.next();
		ep1.setWritable(true);

		Statement st = simpleStatement();

		try (RepositoryConnection ep1Conn = ep1.getRepository().getConnection()) {
			ep1Conn.add(st);
		}

		// test that statement is returned from federation
		try (RepositoryConnection conn = fedxRule.getRepository().getConnection()) {

			List<Statement> stmts = Iterations.asList(conn.getStatements(null, null, null, true));
			Assertions.assertEquals(1, stmts.size());
			Assertions.assertEquals(st, stmts.get(0));

			conn.remove(st.getSubject(), null, null);

			Assertions.assertEquals(0, conn.size());
		}
	}

	@Test
	public void testCustomWriteStrategy() throws Exception {

		prepareTest(Arrays.asList("/tests/basic/data_emptyStore.ttl", "/tests/basic/data_emptyStore.ttl"));

		// configure the test write strategy factory
		fedxRule.getFederationContext().getFederation().setWriteStrategyFactory(new TestWriteStrategyFactory());

		try (RepositoryConnection conn = fedxRule.getRepository().getConnection()) {
			Update update = conn.prepareUpdate(QueryLanguage.SPARQL,
					"PREFIX : <http://example.org/> INSERT { :subject a :Person } WHERE { }");
			update.execute();

			// test that statement is returned from federation
			List<Statement> stmts = Iterations.asList(conn.getStatements(null, null, null, true));
			Assertions.assertEquals(1, stmts.size());
			Assertions.assertEquals(RDF.TYPE, stmts.get(0).getPredicate());

			Assertions.assertEquals(1, writeOperations.get());
		}
	}

	protected Statement simpleStatement() {
		ValueFactory vf = SimpleValueFactory.getInstance();
		IRI subject = vf.createIRI("http://example.org/person1");
		return vf.createStatement(subject, RDF.TYPE, FOAF.PERSON);
	}

	@BeforeEach
	public void clearWriteOperations() {
		writeOperations.set(0);
	}

	// write operations done
	static final AtomicInteger writeOperations = new AtomicInteger(0);

	public static class TestWriteStrategyFactory implements WriteStrategyFactory {

		@Override
		public WriteStrategy create(List<Endpoint> members, FederationContext federationContext) {
			return new TestWriteStrategy(members.get(0).getRepository());
		}
	}

	static class TestWriteStrategy extends RepositoryWriteStrategy {

		public TestWriteStrategy(Repository writeRepository) {
			super(writeRepository);
		}

		@Override
		public void addStatement(Resource subj, IRI pred, Value obj, Resource... contexts) throws RepositoryException {
			writeOperations.incrementAndGet();
			super.addStatement(subj, pred, obj, contexts);
		}

	}
}