ChangesetTest.java

/*******************************************************************************
 * Copyright (c) 2020 Eclipse RDF4J contributors.
 *
 * All rights reserved. This program and the accompanying materials
 * are made available under the terms of the Eclipse Distribution License v1.0
 * which accompanies this distribution, and is available at
 * http://www.eclipse.org/org/documents/edl-v10.php.
 *
 * SPDX-License-Identifier: BSD-3-Clause
 *******************************************************************************/
package org.eclipse.rdf4j.sail.base;

import java.util.Iterator;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.eclipse.rdf4j.model.Model;
import org.eclipse.rdf4j.model.Resource;
import org.eclipse.rdf4j.model.Statement;
import org.eclipse.rdf4j.model.impl.LinkedHashModel;
import org.eclipse.rdf4j.model.impl.SimpleValueFactory;
import org.eclipse.rdf4j.model.vocabulary.RDF;
import org.eclipse.rdf4j.model.vocabulary.RDFS;
import org.eclipse.rdf4j.sail.SailException;
import org.junit.jupiter.api.Test;

public class ChangesetTest {

	SimpleValueFactory vf = SimpleValueFactory.getInstance();
	Resource[] allGraph = {};

	@Test
	public void testConcurrency() {

		Changeset changeset = getChangeset();

		CountDownLatch countDownLatch = new CountDownLatch(3);

		ExecutorService executorService = Executors.newFixedThreadPool(3,
				r -> {
					Thread t = Executors.defaultThreadFactory().newThread(r);
					// this thread pool does not need to stick around if the all other threads are done
					t.setDaemon(true);
					return t;
				});

		Runnable addingData = () -> {
			countDownLatch.countDown();
			try {
				countDownLatch.await();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
			for (int i = 0; i < 100000; i++) {
				changeset.approve(vf.createStatement(vf.createBNode(), RDF.TYPE, RDFS.RESOURCE));
			}

		};

		Runnable readingData = () -> {
			countDownLatch.countDown();
			try {
				countDownLatch.await();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
			for (int i = 0; i < 100000; i++) {
				changeset.hasApproved(null, RDF.TYPE, RDFS.RESOURCE, allGraph);
			}
		};

		Runnable readingDataIterator = () -> {
			countDownLatch.countDown();
			try {
				countDownLatch.await();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}

			while (!changeset.hasApproved(null, RDF.TYPE, RDFS.RESOURCE, allGraph)) {
				Thread.yield();
			}

			for (int i = 0; i < 100; i++) {
				Iterator<Statement> approvedStatements = changeset.getApprovedStatements(null, RDF.TYPE, null, allGraph)
						.iterator();

				for (int j = 0; j < 100 && approvedStatements.hasNext(); j++) {
					approvedStatements.next();
				}

			}
		};

		Stream.of(addingData, readingData, readingDataIterator)
				.map(executorService::submit)
				.collect(Collectors.toList())
				.forEach(future -> {
					try {
						future.get();
					} catch (InterruptedException | ExecutionException e) {
						throw new RuntimeException(e);
					}
				});

		executorService.shutdown();
		executorService.shutdownNow();
	}

	private Changeset getChangeset() {
		return new Changeset() {
			@Override
			public void flush() throws SailException {

			}

			@Override
			public Model createEmptyModel() {
				// don't use the dynamic model here, we don't want to test upgrading
				return new LinkedHashModel();
			}
		};
	}

}