NativeStoreConcurrentValueStoreCorruptionTest.java

/*******************************************************************************
 * Copyright (c) 2025 Eclipse RDF4J contributors.
 *
 * All rights reserved. This program and the accompanying materials
 * are made available under the terms of the Eclipse Distribution License v1.0
 * which accompanies this distribution, and is available at
 * http://www.eclipse.org/org/documents/edl-v10.php.
 *
 * SPDX-License-Identifier: BSD-3-Clause
 *******************************************************************************/
package org.eclipse.rdf4j.sail.nativerdf;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.api.parallel.Isolated;

/**
 * Exercises the {@link ValueStore} obtained from an initialized {@link NativeStore} with highly concurrent writes. The
 * workload mirrors {@link ValueStoreConcurrentWriteCorruptionTest} but goes through {@link NativeStore} to demonstrate
 * that the embedded {@link ValueStore} can still become corrupted.
 */
@Isolated
public class NativeStoreConcurrentValueStoreCorruptionTest {

	@TempDir
	File dataDir;

	@AfterEach
	public void resetSoftFailFlag() {
		NativeStore.SOFT_FAIL_ON_CORRUPT_DATA_AND_REPAIR_INDEXES = true;
	}

	@Test
	public void concurrentNativeValueStoreWritesTriggerCorruption() throws Exception {
		NativeStore.SOFT_FAIL_ON_CORRUPT_DATA_AND_REPAIR_INDEXES = false;

		NativeStore store = new NativeStore(dataDir, "spoc,posc");
		store.init();

		ValueStore valueStore = (ValueStore) store.getValueFactory();

		int writers = 16;
		int valuesPerWriter = 1000;

		ExecutorService pool = Executors.newFixedThreadPool(writers);
		CountDownLatch start = new CountDownLatch(1);
		List<Future<?>> futures = new ArrayList<>();

		for (int i = 0; i < writers; i++) {
			final int seed = 42 + i;
			futures.add(pool.submit(() -> {
				Random random = new Random(seed);
				try {
					start.await();
				} catch (InterruptedException e) {
					Thread.currentThread().interrupt();
					throw new RuntimeException(e);
				}

				for (int j = 0; j < valuesPerWriter; j++) {
					try {
						storeRandomValue(random, valueStore);

					} catch (IOException e) {
						throw new RuntimeException(e);
					}
				}

				return null;
			}));
		}

		start.countDown();

		try {
			for (Future<?> future : futures) {
				future.get();
			}
		} finally {
			pool.shutdownNow();
		}

		store.shutDown();

		NativeStore reopened = new NativeStore(dataDir, "spoc,posc");
		reopened.init();

		ValueStore reopenedValueStore = (ValueStore) reopened.getValueFactory();

		reopenedValueStore.checkConsistency();

		reopened.shutDown();
	}

	private static void storeRandomValue(Random random, ValueStore valueStore) throws IOException {
		String namespace = "http://example.org/ns/" + random.nextInt(200) + "/";
		String localName = "s" + random.nextInt(50_000);
		valueStore.storeValue(valueStore.createIRI(namespace, localName));

		switch (random.nextInt(3)) {
		case 0:
			valueStore.storeValue(valueStore.createLiteral("value-" + random.nextInt(100_000)));
			break;
		case 1:
			valueStore.storeValue(valueStore.createLiteral("value-" + random.nextInt(100_000), "en"));
			break;
		default:
			valueStore.storeValue(valueStore.createLiteral(
					"value-" + random.nextInt(100_000),
					valueStore.createIRI("http://example.org/dt/" + random.nextInt(256))));
			break;
		}
	}
}