NativeStoreRepositoryCorruptionReproducerTestIT.java
/*******************************************************************************
* Copyright (c) 2025 Eclipse RDF4J contributors.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Distribution License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: BSD-3-Clause
*******************************************************************************/
package org.eclipse.rdf4j.sail.nativerdf;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.eclipse.rdf4j.common.transaction.IsolationLevels;
import org.eclipse.rdf4j.model.Statement;
import org.eclipse.rdf4j.model.ValueFactory;
import org.eclipse.rdf4j.repository.RepositoryConnection;
import org.eclipse.rdf4j.repository.RepositoryException;
import org.eclipse.rdf4j.repository.RepositoryResult;
import org.eclipse.rdf4j.repository.sail.SailRepository;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.api.parallel.Isolated;
/**
* Reproduces a corruption scenario using only public repository APIs. Multiple writer threads perform rapid
* begin/add/commit cycles with {@link IsolationLevels#NONE} while a concurrent reader thread continuously iterates over
* all statements. This stresses NativeStore's value writes (buffered and direct-write) and flush-on-read behavior
* without manipulating files directly.
*
* The test expects that, with soft-fail disabled, iterating statements after the workload can throw a
* {@link RepositoryException} if values have become corrupted on disk. This is a reproducer; it is expected to fail on
* affected implementations.
*/
@Tag("slow")
@Isolated
public class NativeStoreRepositoryCorruptionReproducerTestIT {
@TempDir
File dataDir;
@AfterEach
public void resetSoftFail() {
NativeStore.SOFT_FAIL_ON_CORRUPT_DATA_AND_REPAIR_INDEXES = false;
}
@Test
public void concurrentAddAndReadMayCorrupt() throws Exception {
NativeStore.SOFT_FAIL_ON_CORRUPT_DATA_AND_REPAIR_INDEXES = false; // surface corruption
NativeStore store = new NativeStore(dataDir, "spoc,posc");
store.init();
SailRepository repo = new SailRepository(store);
repo.init();
final SailRepository repoRef = repo; // effectively final for lambdas
int writers = 8;
int perWriterOps = 15_000;
ExecutorService pool = Executors.newFixedThreadPool(writers + 2);
CountDownLatch start = new CountDownLatch(1);
List<Future<?>> futures = new ArrayList<>();
// Writer threads: add small and occasional large values to exercise both buffered and direct-write paths
for (int i = 0; i < writers; i++) {
final int seed = 1234 + i;
futures.add(pool.submit(() -> {
Random rnd = new Random(seed);
try (RepositoryConnection conn = repoRef.getConnection()) {
try {
start.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
ValueFactory vf = conn.getValueFactory();
for (int j = 0; j < perWriterOps; j++) {
conn.begin(IsolationLevels.NONE);
try {
// small IRI + literal
String ns = "http://ex/" + rnd.nextInt(128) + "/";
String local = "l" + rnd.nextInt(50_000);
conn.add(vf.createIRI(ns, local), vf.createIRI("urn:p"), vf.createLiteral("v" + j));
// occasionally write larger literal to hit direct write path
if ((j % 200) == 0) {
String big = buildString(12_000 + rnd.nextInt(4000));
conn.add(vf.createIRI("urn:s" + j), vf.createIRI("urn:p"), vf.createLiteral(big));
}
conn.commit();
} catch (Throwable t) {
try {
conn.rollback();
} catch (Throwable ignore) {
}
// surface failures in worker
throw new RuntimeException(t);
}
}
}
return null;
}));
}
// Reader thread: continuously iterate, forcing flush-on-read in DataFile through ValueStore
futures.add(pool.submit(() -> {
try (RepositoryConnection conn = repoRef.getConnection()) {
try {
start.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
conn.begin(IsolationLevels.NONE);
try {
long until = System.nanoTime() + TimeUnit.SECONDS.toNanos(30);
while (System.nanoTime() < until) {
try (RepositoryResult<Statement> statements = conn.getStatements(null, null, null, false)) {
statements.forEachRemaining(st -> {
st.toString();
// no-op; force materialization
});
}
Thread.onSpinWait();
}
conn.commit();
} catch (Throwable t) {
try {
conn.rollback();
} catch (Throwable ignore) {
}
throw new RuntimeException(t);
}
}
return null;
}));
// kick off
start.countDown();
try {
for (Future<?> f : futures) {
f.get();
}
} finally {
pool.shutdownNow();
pool.awaitTermination(10, TimeUnit.SECONDS);
}
// Close and reopen to ensure disk state is reloaded
repo.shutDown();
store.shutDown();
store = new NativeStore(dataDir, "spoc,posc");
store.init();
repo = new SailRepository(store);
repo.init();
try (RepositoryConnection conn = repo.getConnection()) {
// If corruption occurred, iterating statements should throw RepositoryException
try (RepositoryResult<Statement> statements = conn.getStatements(null, null, null, false)) {
statements.forEachRemaining(st -> {
st.toString();
});
}
}
repo.shutDown();
store.shutDown();
}
private static String buildString(int len) {
StringBuilder sb = new StringBuilder(len);
while (sb.length() < len) {
sb.append('x');
}
return sb.toString();
}
}