ValueStoreConcurrentWriteCorruptionTest.java

/**
 *******************************************************************************
 * Copyright (c) 2025 Eclipse RDF4J contributors.
 *
 * All rights reserved. This program and the accompanying materials
 * are made available under the terms of the Eclipse Distribution License v1.0
 * which accompanies this distribution, and is available at
 * http://www.eclipse.org/org/documents/edl-v10.php.
 *
 * SPDX-License-Identifier: BSD-3-Clause
 *******************************************************************************
 */
package org.eclipse.rdf4j.sail.nativerdf;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import org.eclipse.rdf4j.model.ValueFactory;
import org.eclipse.rdf4j.model.impl.SimpleValueFactory;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.api.parallel.Isolated;

/**
 * Induces concurrent writes directly against {@link ValueStore} to demonstrate a race condition that corrupts the
 * underlying values data files. This does not go through {@link NativeStore}'s internal locking and therefore
 * deliberately stresses {@link org.eclipse.rdf4j.sail.nativerdf.datastore.DataFile} concurrent write behavior.
 */
@Isolated
public class ValueStoreConcurrentWriteCorruptionTest {

	@TempDir
	File dataDir;

	private ValueStore valueStore;

	private final ValueFactory vf = SimpleValueFactory.getInstance();

	@BeforeEach
	public void setup() throws IOException {
		valueStore = new ValueStore(dataDir);
		NativeStore.SOFT_FAIL_ON_CORRUPT_DATA_AND_REPAIR_INDEXES = false;
	}

	@AfterEach
	public void tearDown() throws IOException {
		try {
			if (valueStore != null) {
				valueStore.close();
			}
		} finally {
			NativeStore.SOFT_FAIL_ON_CORRUPT_DATA_AND_REPAIR_INDEXES = false;
		}
	}

	@Test
	public void concurrentStoreValueShouldCorrupt() throws Exception {
		int writers = 16;
		int valuesPerWriter = 8000;

		ExecutorService pool = Executors.newFixedThreadPool(writers);
		CountDownLatch start = new CountDownLatch(1);

		try {
			List<Future<?>> futures = new ArrayList<>();

			for (int i = 0; i < writers; i++) {
				final int seed = 42 + i;
				futures.add(pool.submit(() -> writeLoad(start, seed, valuesPerWriter)));
			}

			// start all writers concurrently
			start.countDown();

			// wait for completion (propagate any exception from workers)
			for (Future<?> f : futures) {
				f.get();
			}
		} finally {
			pool.shutdownNow();
		}

		// Close and reopen the ValueStore to force data reload from disk
		valueStore.close();
		valueStore = new ValueStore(dataDir);

		valueStore.checkConsistency();

	}

	private void writeLoad(CountDownLatch start, int seed, int count) {
		try {
			start.await();
		} catch (InterruptedException e) {
			throw new RuntimeException(e);
		}

		Random rnd = new Random(seed);
		for (int i = 0; i < count; i++) {
			try {
				// small values to exercise buffered write path
				String ns = "http://ex/" + rnd.nextInt(100) + "/";
				String local = "l" + rnd.nextInt(50_000);
				valueStore.storeValue(vf.createIRI(ns + local));

				switch (rnd.nextInt(3)) {
				case 0:
					valueStore.storeValue(vf.createLiteral("v" + rnd.nextInt(10_000)));
					break;
				case 1:
					valueStore.storeValue(vf.createLiteral("v" + rnd.nextInt(10_000), "en"));
					break;
				default:
					valueStore.storeValue(
							vf.createLiteral("v" + rnd.nextInt(10_000), vf.createIRI("http://dt/" + rnd.nextInt(100))));
					break;
				}
			} catch (IOException e) {
				throw new RuntimeException(e);
			}
		}
	}
}