ElasticsearchStoreTransactionsIT.java

/*******************************************************************************
 * Copyright (c) 2019 Eclipse RDF4J contributors.
 *
 * All rights reserved. This program and the accompanying materials
 * are made available under the terms of the Eclipse Distribution License v1.0
 * which accompanies this distribution, and is available at
 * http://www.eclipse.org/org/documents/edl-v10.php.
 *
 * SPDX-License-Identifier: BSD-3-Clause
 *******************************************************************************/
package org.eclipse.rdf4j.sail.elasticsearchstore;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;

import java.io.IOException;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Date;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;

import org.apache.commons.lang3.time.StopWatch;
import org.eclipse.rdf4j.common.iteration.Iterations;
import org.eclipse.rdf4j.common.transaction.IsolationLevels;
import org.eclipse.rdf4j.model.BNode;
import org.eclipse.rdf4j.model.IRI;
import org.eclipse.rdf4j.model.Literal;
import org.eclipse.rdf4j.model.Resource;
import org.eclipse.rdf4j.model.Statement;
import org.eclipse.rdf4j.model.impl.SimpleValueFactory;
import org.eclipse.rdf4j.model.vocabulary.RDF;
import org.eclipse.rdf4j.model.vocabulary.RDFS;
import org.eclipse.rdf4j.model.vocabulary.SHACL;
import org.eclipse.rdf4j.query.BindingSet;
import org.eclipse.rdf4j.query.TupleQuery;
import org.eclipse.rdf4j.repository.RepositoryException;
import org.eclipse.rdf4j.repository.RepositoryResult;
import org.eclipse.rdf4j.repository.sail.SailRepository;
import org.eclipse.rdf4j.repository.sail.SailRepositoryConnection;
import org.eclipse.rdf4j.rio.RDFFormat;
import org.eclipse.rdf4j.sail.NotifyingSailConnection;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ElasticsearchStoreTransactionsIT extends AbstractElasticsearchStoreIT {

	private static final Logger logger = LoggerFactory.getLogger(ElasticsearchStoreTransactionsIT.class);
	private static final SimpleValueFactory vf = SimpleValueFactory.getInstance();
	private static ElasticsearchStore elasticsearchStore;

	@BeforeEach
	public void before() {
		elasticsearchStore = new ElasticsearchStore("localhost", TestHelpers.PORT, TestHelpers.CLUSTER, "testindex");
		elasticsearchStore.setElasticsearchScrollTimeout(60000);

		try (NotifyingSailConnection connection = elasticsearchStore.getConnection()) {
			connection.begin(IsolationLevels.NONE);
			connection.removeStatements(null, null, null);
			connection.commit();
		}
	}

	public static void logTime(StopWatch stopWatch, String message, TimeUnit timeUnit) {

		if (timeUnit == TimeUnit.SECONDS) {
			logger.info("`{}` took {} seconds", message, stopWatch.getTime(TimeUnit.MILLISECONDS) / 1000.0);

		} else if (timeUnit == TimeUnit.MILLISECONDS) {
			logger.info("`{}` took {} ms", message, stopWatch.getTime(TimeUnit.MILLISECONDS));

		} else {
			throw new RuntimeException("Unknow time unit: " + timeUnit);
		}

	}

	@Test
	public void testAddData() {
		try (NotifyingSailConnection connection = elasticsearchStore.getConnection()) {
			connection.begin(IsolationLevels.NONE);
			connection.addStatement(RDF.TYPE, RDF.TYPE, RDFS.RESOURCE);
			connection.commit();
		}
	}

	@Test
	public void testGetData() {
		try (NotifyingSailConnection connection = elasticsearchStore.getConnection()) {
			connection.begin(IsolationLevels.NONE);
			connection.addStatement(RDF.TYPE, RDF.TYPE, RDFS.RESOURCE);
			connection.flush();
			List<? extends Statement> statements = Iterations.asList(connection.getStatements(null, null, null, true));

			connection.commit();

			System.out.println(Arrays.toString(statements.toArray()));

			assertEquals(1, statements.size());
			assertEquals(SimpleValueFactory.getInstance().createStatement(RDF.TYPE, RDF.TYPE, RDFS.RESOURCE),
					statements.get(0));
		}

	}

	@Test
	public void testGetDataSailRepository() {
		SailRepository elasticsearchStore = new SailRepository(this.elasticsearchStore);
		try (SailRepositoryConnection connection = elasticsearchStore.getConnection()) {
			connection.add(RDF.TYPE, RDF.TYPE, RDFS.RESOURCE);

			List<? extends Statement> statements = Iterations.asList(connection.getStatements(null, null, null, true));

			System.out.println(Arrays.toString(statements.toArray()));

			assertEquals(1, statements.size());
			assertEquals(SimpleValueFactory.getInstance().createStatement(RDF.TYPE, RDF.TYPE, RDFS.RESOURCE),
					statements.get(0));
		}

	}

	@Test
	public void testGetDataSailRepositorySpecificStatement() {
		SailRepository elasticsearchStore = new SailRepository(this.elasticsearchStore);
		try (SailRepositoryConnection connection = elasticsearchStore.getConnection()) {
			connection.add(RDF.TYPE, RDF.TYPE, RDFS.RESOURCE);

			List<? extends Statement> statements = Iterations
					.asList(connection.getStatements(RDF.TYPE, RDF.TYPE, RDFS.RESOURCE, true));

			System.out.println(Arrays.toString(statements.toArray()));

			assertEquals(1, statements.size());
			assertEquals(SimpleValueFactory.getInstance().createStatement(RDF.TYPE, RDF.TYPE, RDFS.RESOURCE),
					statements.get(0));
		}

	}

	@Test
	public void testGetDataSailRepositoryBNodeSubject() {
		SailRepository elasticsearchStore = new SailRepository(this.elasticsearchStore);
		try (SailRepositoryConnection connection = elasticsearchStore.getConnection()) {

			BNode bNode = vf.createBNode();
			connection.add(bNode, RDF.TYPE, RDFS.RESOURCE);

			List<? extends Statement> statements = Iterations
					.asList(connection.getStatements(bNode, RDF.TYPE, RDFS.RESOURCE, true));

			System.out.println(Arrays.toString(statements.toArray()));

			assertEquals(1, statements.size());

			assertEquals(bNode, statements.get(0).getSubject());

		}

	}

	@Test
	public void testGetDataSailRepositoryBNodeObject() {
		SailRepository elasticsearchStore = new SailRepository(this.elasticsearchStore);
		try (SailRepositoryConnection connection = elasticsearchStore.getConnection()) {

			BNode bNode = vf.createBNode();
			connection.add(bNode, RDF.TYPE, bNode);

			List<? extends Statement> statements = Iterations
					.asList(connection.getStatements(bNode, RDF.TYPE, bNode, true));

			System.out.println(Arrays.toString(statements.toArray()));

			assertEquals(1, statements.size());

			assertEquals(bNode, statements.get(0).getSubject());
			assertEquals(bNode, statements.get(0).getObject());

		}

	}

	@Test
	public void testGetDataSailRepositoryStringObject() {
		SailRepository elasticsearchStore = new SailRepository(this.elasticsearchStore);
		try (SailRepositoryConnection connection = elasticsearchStore.getConnection()) {

			Literal label = vf.createLiteral("label1");
			connection.add(RDF.TYPE, RDFS.LABEL, label);

			List<? extends Statement> statements = Iterations
					.asList(connection.getStatements(null, null, label, true));

			System.out.println(Arrays.toString(statements.toArray()));

			assertEquals(1, statements.size());

			assertEquals(label, statements.get(0).getObject());

		}

	}

	@Test
	public void testGetDataSailRepositoryStringObjectWhitespace() {
		SailRepository elasticsearchStore = new SailRepository(this.elasticsearchStore);
		try (SailRepositoryConnection connection = elasticsearchStore.getConnection()) {

			Literal label = vf.createLiteral("rdf:type label \n jfelwkfjl \r fjklwejf \t ������");
			connection.add(RDF.TYPE, RDFS.LABEL, label);

			List<? extends Statement> statements = Iterations
					.asList(connection.getStatements(null, null, label, true));

			System.out.println(Arrays.toString(statements.toArray()));

			assertEquals(1, statements.size());

			assertEquals(label, statements.get(0).getObject());

		}

	}

	@Test
	public void testGetDataSailRepositoryLongString() {
		SailRepository elasticsearchStore = new SailRepository(this.elasticsearchStore);
		try (SailRepositoryConnection connection = elasticsearchStore.getConnection()) {

			StringBuilder sb = new StringBuilder();
			IntStream.range(0, 100000).forEach(i -> sb.append(i + ""));

			Literal label = vf.createLiteral(sb.toString());
			connection.add(RDF.TYPE, RDFS.LABEL, label);

			List<? extends Statement> statements = Iterations
					.asList(connection.getStatements(null, null, label, true));

			System.out.println(Arrays.toString(statements.toArray()));

			assertEquals(1, statements.size());

			assertEquals(label, statements.get(0).getObject());

		}

	}

	@Test
	public void testGetDataSailRepositoryDate() {
		SailRepository elasticsearchStore = new SailRepository(this.elasticsearchStore);
		try (SailRepositoryConnection connection = elasticsearchStore.getConnection()) {

			Literal label = vf.createLiteral(new Date());
			connection.add(RDF.TYPE, RDFS.LABEL, label);

			List<? extends Statement> statements = Iterations
					.asList(connection.getStatements(null, null, label, true));

			System.out.println(Arrays.toString(statements.toArray()));

			assertEquals(1, statements.size());

			assertEquals(label, statements.get(0).getObject());

		}

	}

	@Test
	public void testGetDataSailRepositoryLang() {
		SailRepository elasticsearchStore = new SailRepository(this.elasticsearchStore);
		try (SailRepositoryConnection connection = elasticsearchStore.getConnection()) {

			Literal label = vf.createLiteral("norsk bokm��l", "nb");
			connection.add(RDF.TYPE, RDFS.LABEL, label);

			List<? extends Statement> statements = Iterations
					.asList(connection.getStatements(null, null, label, true));

			System.out.println(Arrays.toString(statements.toArray()));

			assertEquals(1, statements.size());

			assertEquals(label, statements.get(0).getObject());

		}

	}

	@Test
	public void testGetDataSailRepositoryIRISubjectWhitespace() {
		SailRepository elasticsearchStore = new SailRepository(this.elasticsearchStore);
		try (SailRepositoryConnection connection = elasticsearchStore.getConnection()) {

			IRI iri = vf.createIRI("http://example.com/ space /test");
			connection.add(iri, RDF.TYPE, RDFS.RESOURCE);

			StopWatch stopWatch = StopWatch.createStarted();

			List<? extends Statement> statements = Iterations
					.asList(connection.getStatements(iri, RDF.TYPE, RDFS.RESOURCE, true));

			stopWatch.stop();
			logTime(stopWatch, "Query", TimeUnit.MILLISECONDS);

			System.out.println(Arrays.toString(statements.toArray()));

			assertEquals(1, statements.size());

			assertEquals(iri, statements.get(0).getSubject());

		}

	}

	@Test
	public void testGetDataSailRepositoryContextIRI() throws IOException {
		SailRepository elasticsearchStore = new SailRepository(this.elasticsearchStore);
		try (SailRepositoryConnection connection = elasticsearchStore.getConnection()) {

			Resource context1 = vf.createIRI("http://example.com/context1");
			Resource context2 = vf.createBNode();
			Resource context3 = vf.createIRI("http://example.com/context3");

			connection.add(RDF.TYPE, RDF.TYPE, RDFS.RESOURCE, context1);
			connection.add(RDF.TYPE, RDF.TYPE, RDF.PROPERTY, context2);
			connection.add(RDF.TYPE, RDF.TYPE, RDF.PREDICATE, context3);
			connection.add(RDF.TYPE, RDF.TYPE, vf.createLiteral("no context"));

			printAllDocs();

			StopWatch stopWatch = StopWatch.createStarted();
			Set<Statement> context1Statements = Iterations
					.asSet(connection.getStatements(null, RDF.TYPE, null, context1));

			stopWatch.stop();
			logTime(stopWatch, "Query", TimeUnit.MILLISECONDS);

			stopWatch = StopWatch.createStarted();

			Set<Statement> context2Statements = Iterations
					.asSet(connection.getStatements(null, RDF.TYPE, null, context2));

			stopWatch.stop();
			logTime(stopWatch, "Query", TimeUnit.MILLISECONDS);

			stopWatch = StopWatch.createStarted();

			Set<Statement> context3Statements = Iterations
					.asSet(connection.getStatements(null, RDF.TYPE, null, context3));

			stopWatch.stop();
			logTime(stopWatch, "Query", TimeUnit.MILLISECONDS);

			stopWatch = StopWatch.createStarted();

			Set<Statement> contextNoneStatements = Iterations
					.asSet(connection.getStatements(null, RDF.TYPE, null, true, (Resource) null));

			stopWatch.stop();
			logTime(stopWatch, "Query", TimeUnit.MILLISECONDS);

			stopWatch = StopWatch.createStarted();

			Set<Statement> contextAllStatements = Iterations
					.asSet(connection.getStatements(null, RDF.TYPE, null));

			stopWatch.stop();
			logTime(stopWatch, "Query", TimeUnit.MILLISECONDS);

			stopWatch = StopWatch.createStarted();

			Set<Statement> contextContext1And2Statements = Iterations
					.asSet(connection.getStatements(null, RDF.TYPE, null, context1, context2));

			stopWatch.stop();
			logTime(stopWatch, "Query", TimeUnit.MILLISECONDS);

			Statement statementContext1 = vf.createStatement(RDF.TYPE, RDF.TYPE, RDFS.RESOURCE, context1);
			Statement statementContext2 = vf.createStatement(RDF.TYPE, RDF.TYPE, RDF.PROPERTY, context2);
			Statement statementContext3 = vf.createStatement(RDF.TYPE, RDF.TYPE, RDF.PREDICATE, context3);
			Statement statementContextNone = vf.createStatement(RDF.TYPE, RDF.TYPE, vf.createLiteral("no context"));

			assertEquals(asSet(statementContext1), context1Statements);
			assertEquals(asSet(statementContext2), context2Statements);
			assertEquals(asSet(statementContext3), context3Statements);
			assertEquals(asSet(statementContext1, statementContext2), contextContext1And2Statements);
			assertEquals(asSet(statementContextNone), contextNoneStatements);
			assertEquals(asSet(statementContext1, statementContext2, statementContext3, statementContextNone),
					contextAllStatements);

		}

	}

	@Test
	public void sparqlTest() throws IOException {
		SailRepository elasticsearchStore = new SailRepository(this.elasticsearchStore);
		try (SailRepositoryConnection connection = elasticsearchStore.getConnection()) {
			connection.begin(IsolationLevels.NONE);
			connection.add(ElasticsearchStoreTransactionsIT.class.getClassLoader().getResourceAsStream("testFile.ttl"),
					"",
					RDFFormat.TURTLE);
			connection.commit();

			StopWatch stopwatch = StopWatch.createStarted();

			TupleQuery tupleQuery = connection.prepareTupleQuery(String.join("\n", "",
					"PREFIX sh: <http://www.w3.org/ns/shacl#>",
					"select * where {",
					"	?a a sh:NodeShape ;",
					"		sh:property ?property .",
					"",
					"	?property sh:path ?path;",
					"				 sh:minCount ?minCount.",
					"}"));

			List<BindingSet> bindingSets = Iterations.asList(tupleQuery.evaluate());

			stopwatch.stop();
			logTime(stopwatch, "query", TimeUnit.MILLISECONDS);

			assertEquals(1, bindingSets.size());
			assertEquals("http://example.com/ns#PersonShape", bindingSets.get(0).getValue("a").stringValue());
			assertEquals("http://www.w3.org/2000/01/rdf-schema#label",
					bindingSets.get(0).getValue("path").stringValue());
			assertEquals("1", bindingSets.get(0).getValue("minCount").stringValue());

		}

	}

	@Test
	public void testAddDelete() {
		SailRepository elasticsearchStore = new SailRepository(this.elasticsearchStore);
		try (SailRepositoryConnection connection = elasticsearchStore.getConnection()) {

			connection.begin(IsolationLevels.NONE);
			connection.add(RDF.TYPE, RDF.TYPE, RDFS.RESOURCE);
			connection.commit();

			assertEquals(1, connection.size());

			connection.begin(IsolationLevels.NONE);
			connection.remove(RDF.TYPE, null, null);
			connection.commit();

			assertEquals(0, connection.size());
		}

	}

	@Test
	public void testRollback() {
		SailRepository elasticsearchStore = new SailRepository(this.elasticsearchStore);
		try (SailRepositoryConnection connection = elasticsearchStore.getConnection()) {

			connection.begin(IsolationLevels.READ_COMMITTED);
			connection.add(RDF.TYPE, RDF.TYPE, RDFS.RESOURCE);
			connection.rollback();

			assertEquals(0, connection.size());
		}

	}

	@Test
	public void testRollback2() {
		SailRepository elasticsearchStore = new SailRepository(this.elasticsearchStore);
		try (SailRepositoryConnection connection = elasticsearchStore.getConnection()) {

			connection.begin(IsolationLevels.READ_COMMITTED);
			connection.add(RDF.TYPE, RDF.TYPE, RDFS.RESOURCE);
			assertEquals(1, connection.size());
			connection.rollback();

			assertEquals(0, connection.size());
		}

	}

	@Test
	public void testRollback3() {
		SailRepository elasticsearchStore = new SailRepository(this.elasticsearchStore);
		try (SailRepositoryConnection connection = elasticsearchStore.getConnection()) {

			connection.begin(IsolationLevels.READ_COMMITTED);
			assertEquals(0, connection.size());
			connection.add(RDF.TYPE, RDF.TYPE, RDFS.RESOURCE);
			assertEquals(1, connection.size());
			connection.rollback();

			assertEquals(0, connection.size());
		}

	}

	@Test
	public void testRollbackClear() {
		SailRepository elasticsearchStore = new SailRepository(this.elasticsearchStore);
		try (SailRepositoryConnection connection = elasticsearchStore.getConnection()) {

			BNode context = vf.createBNode();

			connection.begin(IsolationLevels.READ_COMMITTED);
			connection.add(RDF.TYPE, RDF.TYPE, RDFS.RESOURCE);
			connection.add(RDF.TYPE, RDF.TYPE, RDF.PROPERTY, context);
			connection.commit();
			assertEquals(2, connection.size());

			connection.begin();
			connection.clear(context);
			connection.commit();
			assertEquals(1, connection.size());

			connection.begin();
			connection.clear();
			assertEquals(0, connection.size());

			connection.rollback();

			assertEquals(1, connection.size());
		}

	}

	@Test
	public void testRollbackClearSimple() {
		SailRepository elasticsearchStore = new SailRepository(this.elasticsearchStore);
		try (SailRepositoryConnection connection = elasticsearchStore.getConnection()) {

			BNode context = vf.createBNode();

			connection.begin(IsolationLevels.READ_COMMITTED);
			connection.add(RDF.TYPE, RDF.TYPE, RDFS.RESOURCE);
			connection.add(RDF.TYPE, RDF.TYPE, RDF.PROPERTY, context);
			connection.commit();

			connection.begin();
			connection.clear();
			assertEquals(0, connection.size());
			connection.rollback();

			assertEquals(2, connection.size());

		}

	}

	@Test
	public void testHashCollision() {
		String fb = "FB";
		String ea = "Ea";

		assertEquals(fb.hashCode(), ea.hashCode());

		SailRepository elasticsearchStore = new SailRepository(this.elasticsearchStore);
		try (SailRepositoryConnection connection = elasticsearchStore.getConnection()) {
			Literal fbLiteral = vf.createLiteral(fb);
			Literal eaLiteral = vf.createLiteral(ea);

			assertEquals(fbLiteral.stringValue().hashCode(), eaLiteral.stringValue().hashCode());

			connection.begin(IsolationLevels.NONE);
			connection.add(RDF.TYPE, RDFS.LABEL, fbLiteral);
			connection.add(RDF.TYPE, RDFS.LABEL, eaLiteral);
			connection.commit();

			assertEquals(2, connection.size());

			List<Statement> fbLiteralList = Iterations.asList(connection.getStatements(null, null, fbLiteral));
			assertEquals(1, fbLiteralList.size());

			List<Statement> eaLiteralList = Iterations.asList(connection.getStatements(null, null, eaLiteral));
			assertEquals(1, eaLiteralList.size());

		}
	}

	// TODO: this throws a SearchPhaseExecutionException, even thought it should have gotten wrapped at some point in a
	// RepositoryException or something like that
	@Disabled("slow test")
	@Test
	public void testScrollTimeout() {
		assertThrows(RepositoryException.class, () -> {
			SailRepository elasticsearchStore = new SailRepository(this.elasticsearchStore);
			this.elasticsearchStore.setElasticsearchScrollTimeout(1);

			try (SailRepositoryConnection connection = elasticsearchStore.getConnection()) {

				connection.begin(IsolationLevels.NONE);
				for (int i = 0; i < 2000; i++) {
					connection.add(RDF.TYPE, RDF.TYPE, vf.createLiteral(i));

				}
				connection.commit();

				try (RepositoryResult<Statement> statements = connection.getStatements(null, null, null, false)) {
					int count = 0;
					while (statements.hasNext()) {
						if (count++ % 1000 == 999) {
							Thread.sleep(60000);
						}
						statements.next();
					}
				}

			}
		});
	}

	@Test
	public void testAddSameStatement() {

		SailRepository elasticsearchStore = new SailRepository(this.elasticsearchStore);

		try (SailRepositoryConnection connection = elasticsearchStore.getConnection()) {

			connection.begin(IsolationLevels.NONE);
			connection.add(RDF.TYPE, RDF.TYPE, RDFS.RESOURCE);
			connection.commit();

			connection.begin(IsolationLevels.NONE);
			for (int i = 0; i < 2000; i++) {
				connection.add(RDF.TYPE, RDFS.LABEL, vf.createLiteral(i + ""));
			}
			connection.commit();

			List<Statement> typeStatements = Iterations.asList(connection.getStatements(null, RDF.TYPE, null));
			assertEquals(1, typeStatements.size());

			connection.begin(IsolationLevels.NONE);
			connection.add(RDF.TYPE, RDF.TYPE, RDFS.RESOURCE);
			connection.commit();

			typeStatements = Iterations.asList(connection.getStatements(null, RDF.TYPE, null));
			assertEquals(1, typeStatements.size());

		}
	}

	@Test
	public void testAddSameStatement2() {

		SailRepository elasticsearchStore = new SailRepository(this.elasticsearchStore);

		try (SailRepositoryConnection connection = elasticsearchStore.getConnection()) {

			connection.begin(IsolationLevels.NONE);
			for (int i = 0; i < 2000; i++) {
				connection.add(RDF.TYPE, RDFS.LABEL, vf.createLiteral(i + ""));
			}
			connection.commit();

			connection.begin(IsolationLevels.NONE);
			connection.add(RDF.TYPE, RDF.TYPE, RDFS.RESOURCE);
			connection.add(RDF.TYPE, RDF.TYPE, RDFS.RESOURCE);

			List<Statement> typeStatements = Iterations.asList(connection.getStatements(null, RDF.TYPE, null));
			assertEquals(1, typeStatements.size());

			connection.commit();

			typeStatements = Iterations.asList(connection.getStatements(null, RDF.TYPE, null));
			assertEquals(1, typeStatements.size());

		}
	}

	@Test
	public void testNamespace() {

		SailRepository elasticsearchStore = new SailRepository(this.elasticsearchStore);

		try (SailRepositoryConnection connection = elasticsearchStore.getConnection()) {
			connection.begin();
			connection.setNamespace(SHACL.PREFIX, SHACL.NAMESPACE);
			connection.commit();
		}

		try (SailRepositoryConnection connection = elasticsearchStore.getConnection()) {
			String namespace = connection.getNamespace(SHACL.PREFIX);
			assertEquals(SHACL.NAMESPACE, namespace);
		}
	}

	@Test
	public void testClear() {
		SailRepository elasticsearchStore = new SailRepository(this.elasticsearchStore);
		try (SailRepositoryConnection connection = elasticsearchStore.getConnection()) {

			BNode context = vf.createBNode();

			connection.begin(IsolationLevels.READ_COMMITTED);
			connection.add(RDF.TYPE, RDF.TYPE, RDFS.RESOURCE);
			connection.add(RDF.TYPE, RDF.TYPE, RDF.PROPERTY, context);
			connection.commit();

			connection.begin(IsolationLevels.NONE);
			connection.clear();
			connection.commit();

		}

	}

	private Set<Statement> asSet(Statement... statements) {
		Set<Statement> set = new TreeSet<>(Comparator.comparing(Object::toString));
		set.addAll(Arrays.asList(statements));
		return set;
	}

}