ValueStoreWalCompressedSummaryCrcValidationTest.java

/*******************************************************************************
 * Copyright (c) 2025 Eclipse RDF4J contributors.
 *
 * All rights reserved. This program and the accompanying materials
 * are made available under the terms of the Eclipse Distribution License v1.0
 * which accompanies this distribution, and is available at
 * http://www.eclipse.org/org/documents/edl-v10.php.
 *
 * SPDX-License-Identifier: BSD-3-Clause
 *******************************************************************************/
package org.eclipse.rdf4j.sail.nativerdf.wal;

import static org.assertj.core.api.Assertions.assertThat;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.UUID;
import java.util.regex.Pattern;
import java.util.zip.CRC32C;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;

import org.eclipse.rdf4j.model.impl.SimpleValueFactory;
import org.eclipse.rdf4j.sail.nativerdf.ValueStore;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;

/**
 * Validates that ValueStoreWalReader verifies the CRC32 summary embedded in compressed segments and marks the scan as
 * incomplete when the summary does not match the decompressed content.
 */
class ValueStoreWalCompressedSummaryCrcValidationTest {

	private static final Pattern SEGMENT_GZ = Pattern.compile("wal-(\\d+)\\.v1\\.gz");

	@TempDir
	Path tempDir;

	@Test
	void mismatchSummaryCrcMarksScanIncomplete() throws Exception {
		// Arrange: create a WAL with at least one compressed segment
		Path walDir = tempDir.resolve(ValueStoreWalConfig.DEFAULT_DIRECTORY_NAME);
		Files.createDirectories(walDir);
		ValueStoreWalConfig config = ValueStoreWalConfig.builder()
				.walDirectory(walDir)
				.storeUuid(UUID.randomUUID().toString())
				.maxSegmentBytes(4096) // small to ensure rotation + gzip
				.build();

		// Write enough values to rotate segments and compress the first
		try (ValueStoreWAL wal = ValueStoreWAL.open(config)) {
			Path valuesDir = tempDir.resolve("values");
			Files.createDirectories(valuesDir);
			try (ValueStore store = new ValueStore(valuesDir.toFile(), false, ValueStore.VALUE_CACHE_SIZE,
					ValueStore.VALUE_ID_CACHE_SIZE, ValueStore.NAMESPACE_CACHE_SIZE,
					ValueStore.NAMESPACE_ID_CACHE_SIZE, wal)) {
				var vf = SimpleValueFactory.getInstance();
				for (int i = 0; i < 1000; i++) {
					store.storeValue(vf.createLiteral("val-" + i));
				}
			}
		}

		// Pick one compressed segment
		Path compressed = locateFirstCompressed(walDir);
		assertThat(compressed).as("compressed WAL segment").isNotNull();

		// Corrupt the summary CRC inside the compressed segment while keeping per-frame CRCs valid
		corruptSummaryCrc32(compressed);

		// Act: scan with reader
		try (ValueStoreWalReader reader = ValueStoreWalReader.open(config)) {
			ValueStoreWalReader.ScanResult scan = reader.scan();
			// Assert: the scan is marked incomplete due to summary CRC mismatch
			assertThat(scan.complete()).as("scan completeness should be false if summary CRC mismatches").isFalse();
		}
	}

	private static Path locateFirstCompressed(Path walDir) throws IOException {
		try (var stream = Files.list(walDir)) {
			return stream.filter(p -> SEGMENT_GZ.matcher(p.getFileName().toString()).matches())
					.findFirst()
					.orElse(null);
		}
	}

	private static void corruptSummaryCrc32(Path gz) throws IOException {
		// Decompress entire segment
		byte[] decompressed;
		try (GZIPInputStream gin = new GZIPInputStream(Files.newInputStream(gz))) {
			decompressed = gin.readAllBytes();
		}

		// Walk frames to find the summary frame and its start offset
		int pos = 0;
		int summaryOffset = -1;
		int lastId = 0;
		while (pos + 12 <= decompressed.length) { // need at least len + crc around data
			int length = getIntLE(decompressed, pos);
			pos += 4;
			if (pos + length + 4 > decompressed.length) {
				break; // truncated safeguard
			}
			byte[] json = new byte[length];
			System.arraycopy(decompressed, pos, json, 0, length);
			pos += length;
			// skip frame CRC32C
			pos += 4;

			// Parse JSON and detect summary frame
			try (JsonParser jp = new JsonFactory().createParser(json)) {
				if (jp.nextToken() != JsonToken.START_OBJECT) {
					continue;
				}
				String type = null;
				Integer lid = null;
				while (jp.nextToken() != JsonToken.END_OBJECT) {
					String field = jp.getCurrentName();
					jp.nextToken();
					if ("t".equals(field)) {
						type = jp.getValueAsString("");
					} else if ("lastId".equals(field)) {
						lid = jp.getValueAsInt(0);
					} else {
						jp.skipChildren();
					}
				}
				if ("S".equals(type)) {
					summaryOffset = pos - (length + 4 /* len */ + 4 /* crc */);
					lastId = lid == null ? 0 : lid.intValue();
					break;
				}
			}
		}

		if (summaryOffset < 0) {
			throw new IOException("No summary frame found in compressed WAL segment: " + gz);
		}

		// Original content without the summary frame
		byte[] originalWithoutSummary = new byte[summaryOffset];
		System.arraycopy(decompressed, 0, originalWithoutSummary, 0, summaryOffset);

		// Build replacement summary frame with deliberately wrong crc32 value
		byte[] newSummary = buildSummaryFrameWithCrc(lastId, 0L); // mismatch on purpose

		// Rebuild gz with intact content and corrupted summary
		try (GZIPOutputStream gout = new GZIPOutputStream(Files.newOutputStream(gz))) {
			gout.write(originalWithoutSummary);
			gout.write(newSummary);
			gout.finish();
		}
	}

	private static byte[] buildSummaryFrameWithCrc(int lastMintedId, long wrongCrc32) throws IOException {
		JsonFactory factory = new JsonFactory();
		ByteArrayOutputStream baos = new ByteArrayOutputStream(128);
		try (JsonGenerator gen = factory.createGenerator(baos)) {
			gen.writeStartObject();
			gen.writeStringField("t", "S");
			gen.writeNumberField("lastId", lastMintedId);
			gen.writeNumberField("crc32", wrongCrc32 & 0xFFFFFFFFL);
			gen.writeEndObject();
		}
		baos.write('\n');
		byte[] json = baos.toByteArray();

		// Frame = lenLE + json + crc32cLE(json)
		ByteBuffer lenBuf = ByteBuffer.allocate(4).order(ByteOrder.LITTLE_ENDIAN).putInt(json.length);
		CRC32C crc32c = new CRC32C();
		crc32c.update(json, 0, json.length);
		int crc = (int) crc32c.getValue();
		ByteBuffer crcBuf = ByteBuffer.allocate(4).order(ByteOrder.LITTLE_ENDIAN).putInt(crc);
		lenBuf.flip();
		crcBuf.flip();
		byte[] framed = new byte[4 + json.length + 4];
		lenBuf.get(framed, 0, 4);
		System.arraycopy(json, 0, framed, 4, json.length);
		crcBuf.get(framed, 4 + json.length, 4);
		return framed;
	}

	private static int getIntLE(byte[] arr, int off) {
		return (arr[off] & 0xFF) | ((arr[off + 1] & 0xFF) << 8) | ((arr[off + 2] & 0xFF) << 16)
				| ((arr[off + 3] & 0xFF) << 24);
	}
}