MultithreadedTest.java

/*******************************************************************************
 * Copyright (c) 2019 Eclipse RDF4J contributors.
 *
 * All rights reserved. This program and the accompanying materials
 * are made available under the terms of the Eclipse Distribution License v1.0
 * which accompanies this distribution, and is available at
 * http://www.eclipse.org/org/documents/edl-v10.php.
 *
 * SPDX-License-Identifier: BSD-3-Clause
 *******************************************************************************/
package org.eclipse.rdf4j.sail.shacl;

import java.io.IOException;
import java.io.InputStream;
import java.io.StringReader;
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import org.eclipse.rdf4j.common.transaction.IsolationLevels;
import org.eclipse.rdf4j.model.IRI;
import org.eclipse.rdf4j.model.Model;
import org.eclipse.rdf4j.model.Statement;
import org.eclipse.rdf4j.model.impl.SimpleValueFactory;
import org.eclipse.rdf4j.model.vocabulary.RDF4J;
import org.eclipse.rdf4j.query.algebra.evaluation.optimizer.ParentReferenceChecker;
import org.eclipse.rdf4j.repository.RepositoryException;
import org.eclipse.rdf4j.repository.sail.SailRepository;
import org.eclipse.rdf4j.repository.sail.SailRepositoryConnection;
import org.eclipse.rdf4j.rio.RDFFormat;
import org.eclipse.rdf4j.rio.Rio;
import org.eclipse.rdf4j.sail.NotifyingSail;
import org.eclipse.rdf4j.sail.Sail;
import org.eclipse.rdf4j.sail.SailConflictException;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.parallel.Isolated;
import org.slf4j.LoggerFactory;

import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.Logger;

@Isolated
public abstract class MultithreadedTest {
	SimpleValueFactory vf = SimpleValueFactory.getInstance();

	@BeforeAll
	public static void beforeAll() {
		ParentReferenceChecker.skip = true;
	}

	@AfterAll
	public static void afterAll() {
		ParentReferenceChecker.skip = false;
	}

	@Test
	@Timeout(value = 30, unit = TimeUnit.MINUTES)
	public void testDataAndShapes() {
		System.out.println("testDataAndShapes");

		for (int r = 0; r < 1; r++) {

			List<List<Transaction>> list = new ArrayList<>();

			for (int j = 0; j < 10; j++) {
				ArrayList<Transaction> transactions = new ArrayList<>();
				for (int i = 0; i < 10; i++) {
					Transaction transaction = new Transaction();
					String join = String.join("\n", "",
							"ex:data_" + i + "_" + j,
							"  ex:age " + i + j + 100,
							"  ."

					);
					transaction.add(join, null);

					transactions.add(transaction);
				}
				list.add(transactions);
			}

			for (int j = 0; j < 10; j++) {
				ArrayList<Transaction> transactions = new ArrayList<>();
				for (int i = 0; i < 10; i++) {
					Transaction transaction = new Transaction();

					String join = String.join("\n", "",
							"ex:shape_" + i + "_" + j,
							"        a sh:NodeShape  ;",
							"        sh:targetClass ex:Person" + j + " ;",
							"        sh:property [",
							"                sh:path ex:age ;",
							"                sh:datatype sh:integer ;",
							"                sh:minCount 1 ;",
							"                sh:maxCount 1 ;",
							"        ] .");

					transaction.add(join, RDF4J.SHACL_SHAPE_GRAPH);
					transactions.add(transaction);
				}
				list.add(transactions);
			}

			for (int j = 0; j < 10; j++) {
				ArrayList<Transaction> transactions = new ArrayList<>();
				for (int i = 0; i < 10; i++) {
					Transaction transaction = new Transaction();
					String join;
					if (i % 2 == 0) {
						join = String.join("\n", "",
								"ex:data_" + i + "_" + j + " a ex:Person" + j + "; ",
								"  ex:age" + i + " " + i + j,
								"  ."

						);
					} else {
						join = String.join("\n", "",
								"ex:data_" + i + "_" + j + " a ex:Person" + j + "; ",
								"  ."

						);
					}

					transaction.add(join, null);
					transactions.add(transaction);
				}
				list.add(transactions);
			}

			for (int j = 0; j < 10; j++) {
				ArrayList<Transaction> transactions = new ArrayList<>();
				for (int i = 0; i < 10; i++) {
					Transaction transaction = new Transaction();
					if (i % 2 == 0) {
						String join = String.join("\n", "",
								"ex:data_" + i + "_" + j,
								"  ex:age" + i + " " + i + j + 100,
								"  ."

						);
						transaction.add(join, null);
					}

					transactions.add(transaction);
				}
				list.add(transactions);
			}

			for (int j = 0; j < 10; j++) {
				ArrayList<Transaction> transactions = new ArrayList<>();
				for (int i = 0; i < 10; i++) {
					Transaction transaction = new Transaction();

					String join = String.join("\n", "",
							"ex:shape_" + i + "_" + j,
							"        a sh:NodeShape  ;",
							"        sh:targetClass ex:Person" + j + " ;",
							"        sh:property [",
							"                sh:path ex:age ;",
							"                sh:datatype sh:integer ;",
							"                sh:minCount 1 ;",
							"                sh:maxCount 1 ;",
							"        ] .");

					transaction.remove(join, RDF4J.SHACL_SHAPE_GRAPH);
					transactions.add(transaction);
				}
				list.add(transactions);
			}

			for (int j = 0; j < 10; j++) {
				ArrayList<Transaction> transactions = new ArrayList<>();
				for (int i = 0; i < 10; i++) {
					Transaction transaction = new Transaction();
					String join;
					if (i % 2 == 0) {
						join = String.join("\n", "",
								"ex:data_" + i + "_" + j + " a ex:Person" + j + "; ",
								"  ex:age" + i + " " + i + j,
								"  ."

						);
					} else {
						join = String.join("\n", "",
								"ex:data_" + i + "_" + j + " a ex:Person" + j + "; ",
								"  ."

						);
					}

					transaction.remove(join, null);
					transactions.add(transaction);
				}
				list.add(transactions);
			}

			parallelTest(list, IsolationLevels.SNAPSHOT);
		}

	}

	private void parallelTest(List<List<Transaction>> list, IsolationLevels isolationLevel) {
		ShaclSail sail = new ShaclSail(getBaseSail());
		sail.setParallelValidation(true);
		sail.setLogValidationPlans(false);
		sail.setGlobalLogValidationExecution(false);
		sail.setLogValidationViolations(false);
		sail.setSerializableValidation(false);
		SailRepository repository = new SailRepository(sail);
		repository.init();

		Random r = new Random(52465534);

		ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2);

		try {
			for (int i = 0; i < 3; i++) {
				list.stream()
						.flatMap(Collection::stream)
						.sorted(Comparator.comparingInt(System::identityHashCode))
						.map(transaction -> (Runnable) () -> {
							try (SailRepositoryConnection connection = repository.getConnection()) {

								connection.begin(isolationLevel);
								if (r.nextBoolean()) {
									connection.add(transaction.addedStatements);
									connection.remove(transaction.removedStatements);
								} else {
									connection.add(transaction.removedStatements);
									connection.remove(transaction.addedStatements);
								}
								try {
									connection.commit();
								} catch (RepositoryException e) {
									connection.rollback();
									if (!((e.getCause() instanceof ShaclSailValidationException)
											|| e.getCause() instanceof SailConflictException)) {
										throw e;
									}
								}

							}
						})
						.map(executorService::submit)
						.collect(Collectors.toList()) // this terminates lazy evalutation, so that we can submit all our
						// runnables before we start collecting them
						.forEach(f -> {
							try {
								f.get();
							} catch (InterruptedException | ExecutionException e) {
								throw new RuntimeException(e);
							}
						});

			}

		} finally {
			executorService.shutdown();
			try (SailRepositoryConnection connection = repository.getConnection()) {
				connection.begin();
				((ShaclSailConnection) connection.getSailConnection()).revalidate();
				connection.commit();
			}
			repository.shutDown();
		}
	}

	abstract NotifyingSail getBaseSail();

	class Transaction {
		List<Statement> addedStatements = new ArrayList<>();
		List<Statement> removedStatements = new ArrayList<>();

		private void add(String turtle, IRI graph) {
			turtle = String.join("\n", "",
					"@prefix ex: <http://example.com/ns#> .",
					"@prefix sh: <http://www.w3.org/ns/shacl#> .",
					"@prefix xsd: <http://www.w3.org/2001/XMLSchema#> .",
					"@prefix foaf: <http://xmlns.com/foaf/0.1/>.") + turtle;

			StringReader shaclRules = new StringReader(turtle);

			try {
				Model parse = Rio.parse(shaclRules, "", RDFFormat.TRIG);
				parse.stream()
						.map(statement -> {
							if (graph != null) {
								return vf.createStatement(statement.getSubject(), statement.getPredicate(),
										statement.getObject(), graph);
							}

							return statement;
						})
						.forEach(statement -> addedStatements.add(statement));
			} catch (IOException e) {
				throw new RuntimeException(e);
			}

		}

		private void remove(String turtle, IRI graph) {
			turtle = String.join("\n", "",
					"@prefix ex: <http://example.com/ns#> .",
					"@prefix sh: <http://www.w3.org/ns/shacl#> .",
					"@prefix xsd: <http://www.w3.org/2001/XMLSchema#> .",
					"@prefix foaf: <http://xmlns.com/foaf/0.1/>.") + turtle;

			StringReader shaclRules = new StringReader(turtle);

			try {
				Model parse = Rio.parse(shaclRules, "", RDFFormat.TRIG);
				parse.stream()
						.map(statement -> {
							if (graph != null) {
								return vf.createStatement(statement.getSubject(), statement.getPredicate(),
										statement.getObject(), graph);
							}

							return statement;
						})
						.forEach(statement -> removedStatements.add(statement));
			} catch (IOException e) {
				throw new RuntimeException(e);
			}

		}
	}

	@Test
	@Timeout(value = 30, unit = TimeUnit.MINUTES)
	public void testLotsOfValidationFailuresSnapshot() throws IOException {
		System.out.println("testLotsOfValidationFailuresSnapshot");
		ShaclSail sail = new ShaclSail(getBaseSail());

		sail.setParallelValidation(true);
		sail.setLogValidationPlans(false);
		sail.setGlobalLogValidationExecution(false);
		sail.setLogValidationViolations(false);
		sail.setSerializableValidation(false);

		runValidationFailuresTest(sail, IsolationLevels.SNAPSHOT, 100);

	}

	@Test
	@Timeout(value = 30, unit = TimeUnit.MINUTES)
	public void testLotsOfValidationFailuresSerializableValidation() throws IOException {
		System.out.println("testLotsOfValidationFailuresSerializableValidation");
		Logger root = (Logger) LoggerFactory.getLogger(ShaclSailBaseConfiguration.class.getName());
		root.setLevel(Level.ERROR);

		ShaclSail sail = new ShaclSail(getBaseSail());

		sail.setParallelValidation(true);
		sail.setLogValidationPlans(false);
		sail.setGlobalLogValidationExecution(false);
		sail.setLogValidationViolations(false);
		sail.setSerializableValidation(true);

		runValidationFailuresTest(sail, IsolationLevels.SNAPSHOT, 100);
	}

	@Test
	@Timeout(value = 30, unit = TimeUnit.MINUTES)
	public void testLotsOfValidationFailuresSerializable() throws IOException {
		System.out.println("testLotsOfValidationFailuresSerializable");

		((Logger) LoggerFactory.getLogger(ShaclSailConnection.class.getName())).setLevel(Level.ERROR);

		ShaclSail sail = new ShaclSail(getBaseSail());

		sail.setParallelValidation(true);
		sail.setLogValidationPlans(false);
		sail.setGlobalLogValidationExecution(false);
		sail.setLogValidationViolations(false);
		sail.setSerializableValidation(false);

		runValidationFailuresTest(sail, IsolationLevels.SERIALIZABLE, 200);

	}

	@Test
	@Timeout(value = 30, unit = TimeUnit.MINUTES)
	public void testLotsOfValidationFailuresReadCommitted() throws IOException {
		System.out.println("testLotsOfValidationFailuresReadCommitted");
		ShaclSail sail = new ShaclSail(getBaseSail());

		sail.setParallelValidation(true);
		sail.setLogValidationPlans(false);
		sail.setGlobalLogValidationExecution(false);
		sail.setLogValidationViolations(false);
		sail.setSerializableValidation(false);

		runValidationFailuresTest(sail, IsolationLevels.READ_COMMITTED, 100);
	}

	@Test
	@Timeout(value = 30, unit = TimeUnit.MINUTES)
	public void testLotsOfValidationFailuresReadUncommitted() throws IOException {
		System.out.println("testLotsOfValidationFailuresReadUncommitted");
		ShaclSail sail = new ShaclSail(getBaseSail());

		sail.setParallelValidation(true);
		sail.setLogValidationPlans(false);
		sail.setGlobalLogValidationExecution(false);
		sail.setLogValidationViolations(false);
		sail.setSerializableValidation(false);

		runValidationFailuresTest(sail, IsolationLevels.READ_UNCOMMITTED, 100);

	}

	private void runValidationFailuresTest(Sail sail, IsolationLevels isolationLevels, int numberOfRuns)
			throws IOException {
		SailRepository repository = new SailRepository(sail);
		repository.init();

		List<Statement> parse;
		try (InputStream resource = MultithreadedTest.class.getClassLoader()
				.getResourceAsStream("complexBenchmark/smallFileInvalid.ttl")) {
			parse = new ArrayList<>(Rio.parse(resource, "", RDFFormat.TRIG));
		}

		List<Statement> parse2;
		try (InputStream resource = MultithreadedTest.class.getClassLoader()
				.getResourceAsStream("complexBenchmark/smallFileInvalid2.ttl")) {
			parse2 = new ArrayList<>(Rio.parse(resource, "", RDFFormat.TRIG));
		}

		List<Statement> parse3;
		try (InputStream resource = MultithreadedTest.class.getClassLoader()
				.getResourceAsStream("complexBenchmark/smallFile.ttl")) {
			parse3 = new ArrayList<>(Rio.parse(resource, "", RDFFormat.TRIG));
		}

		ExecutorService executorService = null;
		Thread deadlockDetectionThread = null;
		try {

			deadlockDetectionThread = new Thread(() -> {
				try {
					Thread.sleep(TimeUnit.SECONDS.toMillis(20));
					ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
					long[] ids = threadMXBean.findDeadlockedThreads();
					if (ids != null) {
						ThreadInfo[] deadlockedThreads = threadMXBean.getThreadInfo(ids, true, true);
						StringBuilder sb = new StringBuilder();
						for (ThreadInfo deadlockedThread : deadlockedThreads) {
							sb.append("Deadlocked thread - ").append(deadlockedThread).append("\n");
						}
						String deadlockMessage = sb.toString();
						if (!deadlockMessage.isEmpty()) {
							System.err.println(deadlockMessage);
						}
					}

				} catch (InterruptedException ignored) {
				}
			});
			deadlockDetectionThread.setDaemon(true);
			deadlockDetectionThread.start();

			executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2);

			Utils.loadShapeData(repository, "complexBenchmark/shacl.trig");

			IntStream.range(1, numberOfRuns)

					.mapToObj(transaction -> (Runnable) () -> {

						try (SailRepositoryConnection connection = repository.getConnection()) {

							connection.begin(isolationLevels);
							connection.add(parse);

							try {
								connection.commit();
							} catch (RepositoryException e) {
								connection.rollback();
								if (!((e.getCause() instanceof ShaclSailValidationException)
										|| e.getCause() instanceof SailConflictException)) {
									throw e;
								}
							}

							connection.begin(isolationLevels);
							connection.add(parse2);

							try {
								connection.commit();
							} catch (RepositoryException e) {
								connection.rollback();
								if (!((e.getCause() instanceof ShaclSailValidationException)
										|| e.getCause() instanceof SailConflictException)) {
									throw e;
								}
							}

							connection.begin(isolationLevels);
							connection.add(parse3);

							try {
								connection.commit();
							} catch (RepositoryException e) {
								connection.rollback();
								if (!((e.getCause() instanceof ShaclSailValidationException)
										|| e.getCause() instanceof SailConflictException)) {
									throw e;
								}
							}

							connection.begin(isolationLevels);
							connection.remove(parse3);

							try {
								connection.commit();
							} catch (RepositoryException e) {
								connection.rollback();
								if (!((e.getCause() instanceof ShaclSailValidationException)
										|| e.getCause() instanceof SailConflictException)) {
									throw e;
								}
							}
						}
					})
					.map(executorService::submit)
					.collect(Collectors.toList())
					.forEach(f -> {
						try {
							f.get();
						} catch (Throwable e) {

							Throwable temp = e;
							while (temp != null) {
								System.err.println(
										"\n----------------------------------------------------------------------\nClass: "
												+ temp.getClass().getCanonicalName() + "\nMessage: "
												+ temp.getMessage());
								temp.printStackTrace();
								temp = temp.getCause();
							}

							System.err.println(
									"\n######################################################################");

							throw new RuntimeException(e);
						}
					});
		} finally {
			if (deadlockDetectionThread != null) {
				deadlockDetectionThread.interrupt();
			}
			if (executorService != null) {
				List<Runnable> runnables = executorService.shutdownNow();
				assert runnables.isEmpty();
			}

			repository.shutDown();

		}
	}

}