NativeStoreUtf8ExtremesCorruptionTestIT.java

/**
 *******************************************************************************
 * Copyright (c) 2025 Eclipse RDF4J contributors.
 *
 * All rights reserved. This program and the accompanying materials
 * are made available under the terms of the Eclipse Distribution License v1.0
 * which accompanies this distribution, and is available at
 * http://www.eclipse.org/org/documents/edl-v10.php.
 *
 * SPDX-License-Identifier: BSD-3-Clause
 *******************************************************************************/
package org.eclipse.rdf4j.sail.nativerdf;

import java.io.File;
import java.io.IOException;
import java.lang.reflect.Field;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import org.eclipse.rdf4j.sail.nativerdf.datastore.DataStore;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.api.parallel.Isolated;

/**
 * Attempts to reproduce a corrupt NativeStore by iteratively storing IRIs and BNodes whose UTF-8 encodings exercise
 * extremes and boundary lengths (1/2/3/4-byte code points; sizes near small and page-sized thresholds). A concurrent
 * reader thread repeatedly calls into the underlying DataStore to force frequent DataFile.flush() operations while
 * writes happen, increasing the chance of a flush/write race corrupting values.
 *
 * If corruption occurs, the subsequent checkConsistency() call will throw, causing this test to fail. This test is
 * designed as a reproducer (expected to fail when corruption is possible) rather than an assertion of correctness.
 */
@Tag("slow")
@Isolated
public class NativeStoreUtf8ExtremesCorruptionTestIT {

	@TempDir
	File dataDir;

	private NativeStore store;

	private ValueStore valueStore;

	@BeforeEach
	public void setup() throws Exception {
		// surface corruption as exceptions
		NativeStore.SOFT_FAIL_ON_CORRUPT_DATA_AND_REPAIR_INDEXES = false;
		store = new NativeStore(dataDir, "spoc,posc");
		store.init();
		valueStore = (ValueStore) store.getValueFactory();
	}

	@AfterEach
	public void tearDown() throws Exception {
		try {
			if (store != null) {
				store.shutDown();
			}
		} finally {
			NativeStore.SOFT_FAIL_ON_CORRUPT_DATA_AND_REPAIR_INDEXES = false;
		}
	}

	@Test
	public void utf8ExtremesForIrisAndBNodesMayCorrupt() throws Exception {
		// Reflect underlying DataStore to drive flushes via getData(id)
		DataStore ds = (DataStore) reflect(valueStore, "dataStore");

		ExecutorService pool = Executors.newFixedThreadPool(3);
		CountDownLatch start = new CountDownLatch(1);

		List<Future<?>> futures = new ArrayList<>();

		// Writer: iterate across code point classes and length boundaries
		futures.add(pool.submit(() -> {
			try {
				start.await();
			} catch (InterruptedException e) {
				Thread.currentThread().interrupt();
				throw new RuntimeException(e);
			}

			String ns = "http://example.org/ns/";

			// code point classes: ASCII(1B), U+00A2(2B), U+20AC(3B), U+1F600(4B)
			String[] cps = new String[] { "a", "\u00A2", "\u20AC", "\uD83D\uDE00" };

			// length targets (bytes) to exercise small/medium/page-ish boundaries
			int[] byteTargets = new int[] {
					1, 2, 3, 4, 5,
					15, 31, 63, 127, 255, 256, 257,
					512 - 8, 512 - 1, 512, 512 + 1,
					1024 - 8, 1024 - 1, 1024, 1024 + 1,
					2048 - 8, 2048 - 1, 2048, 2048 + 1,
					4096 - 16, 4096 - 8, 4096 - 5, 4096 - 4, 4096 - 1, 4096, 4096 + 1, (64 * 1024) - 1, (64 * 1024),
					(64 * 1024) + 1,
					(1024 * 1024) - 1, (1024 * 1024), (1024 * 1024) + 1
			};

			try {
				// prime namespace so ns id is stable
				valueStore.storeValue(valueStore.createIRI(ns, "seed"));

				for (String cp : cps) {
					for (int targetBytes : byteTargets) {
						// Build local name approximately targetBytes long in UTF-8
						String local = buildUtf8Length(cp, targetBytes);
						valueStore.storeValue(valueStore.createIRI(ns, local));

						// Also add a BNode id with similar UTF-8 size
						String bnodeId = buildUtf8Length(cp, targetBytes);
						valueStore.storeValue(valueStore.createBNode(bnodeId));

						// Interleave small literal writes to keep buffered path hot
						valueStore.storeValue(valueStore.createLiteral("v" + targetBytes));
					}
				}
			} catch (IOException e) {
				throw new RuntimeException(e);
			}
			return null;
		}));

		// Flusher: aggressively force reads to trigger DataFile.flush()
		futures.add(pool.submit(() -> {
			try {
				start.await();
			} catch (InterruptedException e) {
				Thread.currentThread().interrupt();
				throw new RuntimeException(e);
			}
			long until = System.nanoTime() + Duration.ofSeconds(30).toNanos();
			while (System.nanoTime() < until) {
				try {
					int max = ds.getMaxID();
					if (max >= 1) {
						// alternate between a stable low id and a random-ish recent id
						ds.getData(1);
						ds.getData(Math.max(1, max));
					}
				} catch (IOException e) {
					throw new RuntimeException(e);
				}
				Thread.onSpinWait();
			}
			return null;
		}));

		// Big IRI writer: periodically write very large IRI/BNode values to trigger direct-write path
		futures.add(pool.submit(() -> {
			try {
				start.await();
			} catch (InterruptedException e) {
				Thread.currentThread().interrupt();
				throw new RuntimeException(e);
			}
			String ns = "http://example.org/big/";
			try {
				// prime
				valueStore.storeValue(valueStore.createIRI(ns, "start"));
				for (int i = 0; i < 2000; i++) {
					String bigLocal = buildUtf8Length("a", 12_000 + (i % 200));
					valueStore.storeValue(valueStore.createIRI(ns, bigLocal));
					// large bnode ids too
					String bigBNode = buildUtf8Length("\uD83D\uDE00", 10_000 + (i % 300));
					valueStore.storeValue(valueStore.createBNode(bigBNode));
					// keep buffer hot in between
					valueStore.storeValue(valueStore.createLiteral("keep" + i));
				}
			} catch (IOException e) {
				throw new RuntimeException(e);
			}
			return null;
		}));

		// go!
		start.countDown();
		try {
			for (Future<?> f : futures) {
				f.get();
			}
		} finally {
			pool.shutdownNow();
			pool.awaitTermination(5, TimeUnit.SECONDS);
		}

		// Close and reopen to force load-from-disk
		store.shutDown();
		store = new NativeStore(dataDir, "spoc,posc");
		store.init();
		valueStore = (ValueStore) store.getValueFactory();

		// If corruption occurred this should throw, causing the test to fail (reproducer)
		valueStore.checkConsistency();
	}

	private static Object reflect(Object target, String field) {
		try {
			Field f = target.getClass().getDeclaredField(field);
			f.setAccessible(true);
			return f.get(target);
		} catch (ReflectiveOperationException e) {
			throw new RuntimeException(e);
		}
	}

	private static String buildUtf8Length(String unit, int targetBytes) {
		if (targetBytes <= 0) {
			return "";
		}
		StringBuilder sb = new StringBuilder();
		int unitBytes = unit.getBytes(StandardCharsets.UTF_8).length;
		if (unitBytes <= 0) {
			unitBytes = 1;
		}
		int reps = Math.max(1, targetBytes / unitBytes);
		for (int i = 0; i < reps; i++) {
			sb.append(unit);
		}
		// if we overshot, trim by bytes (safe: only append ASCII 'x' when under)
		while (sb.toString().getBytes(StandardCharsets.UTF_8).length > targetBytes && sb.length() > 0) {
			sb.setLength(sb.length() - 1);
		}
		while (sb.toString().getBytes(StandardCharsets.UTF_8).length < targetBytes) {
			sb.append('x');
		}
		return sb.toString();
	}
}