ValueStorePartialWriteCorruptionTest.java

/*******************************************************************************
 * Copyright (c) 2025 Eclipse RDF4J contributors.
 *
 * All rights reserved. This program and the accompanying materials
 * are made available under the terms of the Eclipse Distribution License v1.0
 * which accompanies this distribution, and is available at
 * http://www.eclipse.org/org/documents/edl-v10.php.
 *
 * SPDX-License-Identifier: BSD-3-Clause
 *******************************************************************************/
package org.eclipse.rdf4j.sail.nativerdf;

import java.io.File;
import java.io.IOException;
import java.lang.reflect.Field;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import org.eclipse.rdf4j.sail.nativerdf.datastore.DataStore;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.api.parallel.Isolated;

/**
 * Induces ValueStore corruption by simulating partial file writes at the channel level. This test replaces the
 * FileChannel inside the internal NioFile used by ValueStore's DataFile with a wrapper that occasionally reports fewer
 * bytes written than requested. Since DataFile/NioFile do not loop to ensure full writes, this can leave truncated
 * records (e.g., zeroed payloads or invalid length prefixes), reproducing the user-observed symptoms.
 */
@Isolated
public class ValueStorePartialWriteCorruptionTest {

	@TempDir
	File dataDir;

	private ValueStore valueStore;

	@BeforeEach
	public void setup() throws IOException {
		valueStore = new ValueStore(dataDir);
		// Disable soft-fail to surface corruption as exceptions in checkConsistency
		NativeStore.SOFT_FAIL_ON_CORRUPT_DATA_AND_REPAIR_INDEXES = false;
		injectChoppyChannel(valueStore, /* partialWriteFrequency= */1);
	}

	@AfterEach
	public void tearDown() throws IOException {
		try {
			if (valueStore != null) {
				valueStore.close();
			}
		} finally {
			NativeStore.SOFT_FAIL_ON_CORRUPT_DATA_AND_REPAIR_INDEXES = false;
		}
	}

	@Test
	public void partialWritesCauseValueStoreInconsistency() throws Exception {
		int writers = 8;
		int valuesPerWriter = 20_000;

		ExecutorService pool = Executors.newFixedThreadPool(writers);
		CountDownLatch start = new CountDownLatch(1);
		List<Future<?>> futures = new ArrayList<>();

		for (int i = 0; i < writers; i++) {
			final int seed = 2025 + i;
			futures.add(pool.submit(() -> {
				Random rnd = new Random(seed);
				try {
					start.await();
				} catch (InterruptedException e) {
					Thread.currentThread().interrupt();
					throw new RuntimeException(e);
				}
				for (int j = 0; j < valuesPerWriter; j++) {
					try {
						// Mix of small IRIs and occasionally large literals to trigger both buffered and direct writes
						String ns = "http://ex/" + rnd.nextInt(64) + "/";
						String local = "s" + rnd.nextInt(50_000);
						valueStore.storeValue(valueStore.createIRI(ns, local));
						if ((j % 200) == 0) {
							String big = buildString(12_000 + rnd.nextInt(6000));
							valueStore.storeValue(valueStore.createLiteral(big));
						} else {
							valueStore.storeValue(valueStore.createLiteral("v" + j));
						}
					} catch (IOException e) {
						throw new RuntimeException(e);
					}
				}
				return null;
			}));
		}

		start.countDown();
		for (Future<?> f : futures) {
			f.get();
		}
		pool.shutdownNow();

		// Close and reopen to force reload from disk only
		valueStore.close();
		valueStore = new ValueStore(dataDir);

		// After the write-loop fix in NioFile, partial low-level writes are retried until complete.
		// Therefore, checkConsistency should not throw anymore.
		valueStore.checkConsistency();
	}

	private static String buildString(int len) {
		StringBuilder sb = new StringBuilder(len);
		while (sb.length() < len) {
			sb.append('x');
		}
		return sb.toString();
	}

	/**
	 * Replace the internal FileChannel of the ValueStore's DataFile with a wrapper that sometimes performs partial
	 * writes.
	 */
	private static void injectChoppyChannel(ValueStore vs, int partialWriteFrequency) {
		try {
			Field dsField = ValueStore.class.getDeclaredField("dataStore");
			dsField.setAccessible(true);
			DataStore ds = (DataStore) dsField.get(vs);

			Field dfField = DataStore.class.getDeclaredField("dataFile");
			dfField.setAccessible(true);
			Object dataFile = dfField.get(ds);

			Field nioField = dataFile.getClass().getDeclaredField("nioFile");
			nioField.setAccessible(true);
			Object nioFile = nioField.get(dataFile);

			Field fcField = nioFile.getClass().getDeclaredField("fc");
			fcField.setAccessible(true);
			FileChannel original = (FileChannel) fcField.get(nioFile);

			fcField.set(nioFile, new ChoppyFileChannel(original, partialWriteFrequency));
		} catch (ReflectiveOperationException e) {
			throw new RuntimeException(e);
		}
	}

	/**
	 * A FileChannel wrapper that intermittently writes fewer bytes than requested.
	 */
	static class ChoppyFileChannel extends FileChannel {
		private final FileChannel delegate;
		private final int frequency;
		private int opCount = 0;

		ChoppyFileChannel(FileChannel delegate, int frequency) {
			this.delegate = delegate;
			this.frequency = Math.max(1, frequency);
		}

		@Override
		public int write(ByteBuffer src, long position) throws IOException {
			opCount++;
			if (opCount % frequency == 0) {
				int remaining = src.remaining();
				if (remaining > 4) {
					int partial = Math.max(1, remaining / 2);
					// write only a part
					ByteBuffer slice = src.slice();
					slice.limit(partial);
					int written = delegate.write(slice, position);
					// advance original buffer by 'written' bytes but leave unwritten bytes unflushed
					src.position(src.position() + written);
					return written;
				}
			}
			return delegate.write(src, position);
		}

		@Override
		public int write(ByteBuffer src) throws IOException {
			opCount++;
			if (opCount % frequency == 0) {
				int remaining = src.remaining();
				if (remaining > 4) {
					int partial = Math.max(1, remaining / 2);
					ByteBuffer slice = src.slice();
					slice.limit(partial);
					int written = delegate.write(slice);
					src.position(src.position() + written);
					return written;
				}
			}
			return delegate.write(src);
		}

		// Delegations / stubs for other abstract methods
		@Override
		public int read(ByteBuffer dst) throws IOException {
			return delegate.read(dst);
		}

		// remove duplicate signature (below there is another read(ByteBuffer,long))

		@Override
		public long read(ByteBuffer[] dsts, int offset, int length) throws IOException {
			return delegate.read(dsts, offset, length);
		}

		@Override
		public long write(ByteBuffer[] srcs, int offset, int length) throws IOException {
			return delegate.write(srcs, offset, length);
		}

		@Override
		public long position() throws IOException {
			return delegate.position();
		}

		@Override
		public FileChannel position(long newPosition) throws IOException {
			delegate.position(newPosition);
			return this;
		}

		@Override
		public long size() throws IOException {
			return delegate.size();
		}

		@Override
		public FileChannel truncate(long size) throws IOException {
			delegate.truncate(size);
			return this;
		}

		@Override
		public void force(boolean metaData) throws IOException {
			delegate.force(metaData);
		}

		@Override
		public long transferTo(long position, long count, java.nio.channels.WritableByteChannel target)
				throws IOException {
			return delegate.transferTo(position, count, target);
		}

		@Override
		public long transferFrom(java.nio.channels.ReadableByteChannel src, long position, long count)
				throws IOException {
			return delegate.transferFrom(src, position, count);
		}

		@Override
		public int read(ByteBuffer dst, long position) throws IOException {
			return delegate.read(dst, position);
		}

		@Override
		public MappedByteBuffer map(MapMode mode, long position, long size) throws IOException {
			return delegate.map(mode, position, size);
		}

		@Override
		public FileLock lock(long position, long size, boolean shared) throws IOException {
			return delegate.lock(position, size, shared);
		}

		@Override
		public FileLock tryLock(long position, long size, boolean shared) throws IOException {
			return delegate.tryLock(position, size, shared);
		}

		@Override
		protected void implCloseChannel() throws IOException {
			delegate.close();
		}
	}
}