DataFileFlushRaceReproducerTest.java

/**
 *******************************************************************************
 * Copyright (c) 2025 Eclipse RDF4J contributors.
 *
 * All rights reserved. This program and the accompanying materials
 * are made available under the terms of the Eclipse Distribution License v1.0
 * which accompanies this distribution, and is available at
 * http://www.eclipse.org/org/documents/edl-v10.php.
 *
 * SPDX-License-Identifier: BSD-3-Clause
 *******************************************************************************
 */
package org.eclipse.rdf4j.sail.nativerdf;

import java.io.File;
import java.io.IOException;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import org.eclipse.rdf4j.sail.nativerdf.datastore.DataStore;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.api.parallel.Isolated;

/**
 * Attempts to deterministically reproduce corruption caused by a race between {@code DataFile.flush()} and concurrent
 * buffered writes. The test hammers the {@link ValueStore} with small writes while a separate thread forces frequent
 * {@code DataFile.getData(...)} calls (which invoke {@code flush()}). Finally, after closing and reopening the store, a
 * consistency check is expected to fail if corruption has occurred.
 *
 * The workload and explicit flusher increase the likelihood of the following race: - Writer increases the in-memory
 * write buffer and {@code nioFileSize} without flushing - Concurrent read forces {@code flush()}, which computes the
 * flush offset using the current {@code nioFileSize} while copying a smaller snapshot of the buffer content, producing
 * misaligned writes.
 */
@Isolated
public class DataFileFlushRaceReproducerTest {

	@TempDir
	File dataDir;

	private ValueStore valueStore;

	@BeforeEach
	public void setup() throws IOException {
		valueStore = new ValueStore(dataDir);
		// surface corruption as exceptions (not soft-fail)
		NativeStore.SOFT_FAIL_ON_CORRUPT_DATA_AND_REPAIR_INDEXES = false;
	}

	@AfterEach
	public void tearDown() throws IOException {
		try {
			if (valueStore != null) {
				valueStore.close();
			}
		} finally {
			NativeStore.SOFT_FAIL_ON_CORRUPT_DATA_AND_REPAIR_INDEXES = false;
		}
	}

	@Test
	public void flushDuringConcurrentWritesTriggersCorruption() throws Exception {
		int writerThreads = 8; // DataStore serializes storeData, but this keeps the buffer hot
		int valuesPerWriter = 50_000; // small values -> buffered path, many flush opportunities

		ExecutorService pool = Executors.newFixedThreadPool(writerThreads + 1);
		CountDownLatch start = new CountDownLatch(1);

		// reflectively access the underlying DataStore so we can force getData(id) to
		// call DataFile.flush() on every iteration (ValueStore caches may avoid reads otherwise)
		DataStore ds = (DataStore) reflect(valueStore, "dataStore");

		List<Future<?>> futures = new ArrayList<>();

		// Writers
		for (int i = 0; i < writerThreads; i++) {
			final int seed = 4242 + i;
			futures.add(pool.submit(() -> {
				Random rnd = new Random(seed);
				try {
					start.await();
				} catch (InterruptedException e) {
					Thread.currentThread().interrupt();
					throw new RuntimeException(e);
				}

				for (int j = 0; j < valuesPerWriter; j++) {
					try {
						// keep values small to stay on buffered write path
						String ns = "http://ex/" + rnd.nextInt(64) + "/";
						String local = "l" + rnd.nextInt(20_000);
						valueStore.storeValue(valueStore.createIRI(ns + local));

						switch (rnd.nextInt(3)) {
						case 0:
							valueStore.storeValue(valueStore.createLiteral("v" + rnd.nextInt(10_000)));
							break;
						case 1:
							valueStore.storeValue(valueStore.createLiteral("v" + rnd.nextInt(10_000), "en"));
							break;
						default:
							valueStore.storeValue(valueStore
									.createLiteral("v" + rnd.nextInt(10_000),
											valueStore.createIRI("http://dt/" + rnd.nextInt(128))));
							break;
						}
					} catch (IOException e) {
						throw new RuntimeException(e);
					}
				}
				return null;
			}));
		}

		// Flusher: repeatedly read an early id to force DataFile.getData(...)->flush()
		futures.add(pool.submit(() -> {
			try {
				start.await();
			} catch (InterruptedException e) {
				Thread.currentThread().interrupt();
				throw new RuntimeException(e);
			}
			// spin for roughly the duration of writers
			long until = System.nanoTime() + TimeUnit.SECONDS.toNanos(25);
			while (System.nanoTime() < until) {
				try {
					int max = ds.getMaxID();
					if (max >= 1) {
						// focus on a small id to avoid cache churn and ensure frequent flushes
						ds.getData(1);
					}
				} catch (IOException e) {
					// propagate to fail the test
					throw new RuntimeException(e);
				}
				// minimal pause to yield
				Thread.onSpinWait();
			}
			return null;
		}));

		// go!
		start.countDown();

		try {
			for (Future<?> f : futures) {
				f.get();
			}
		} finally {
			pool.shutdownNow();
		}

		// Close and reopen to ensure we load from disk only
		valueStore.close();
		valueStore = new ValueStore(dataDir);

		valueStore.checkConsistency();
	}

	private static Object reflect(Object target, String field) {
		try {
			Field f = target.getClass().getDeclaredField(field);
			f.setAccessible(true);
			return f.get(target);
		} catch (ReflectiveOperationException e) {
			throw new RuntimeException(e);
		}
	}
}