ValueStoreWalCombinatoricsTest.java

/*******************************************************************************
 * Copyright (c) 2025 Eclipse RDF4J contributors.
 *
 * All rights reserved. This program and the accompanying materials
 * are made available under the terms of the Eclipse Distribution License v1.0
 * which accompanies this distribution, and is available at
 * http://www.eclipse.org/org/documents/edl-v10.php.
 *
 * SPDX-License-Identifier: BSD-3-Clause
 *******************************************************************************/

package org.eclipse.rdf4j.sail.nativerdf.wal;

import static org.assertj.core.api.Assertions.assertThat;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.TestInstance.Lifecycle;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

/**
 * Parameterized combinatorics tests that exercise the ValueStore WAL writer under a variety of sync, durability and
 * purge permutations. The goal is to ensure that no matter which combination is chosen the WAL produces a consistent,
 * monotonically ordered set of records without leaking stale segments.
 */
@TestInstance(Lifecycle.PER_CLASS)
class ValueStoreWalCombinatoricsTest {

	private static final Duration SYNC_INTERVAL = Duration.ofMillis(2);
	private static final Duration IDLE_POLL_INTERVAL = Duration.ofMillis(1);
	private static final long MAX_SEGMENT_BYTES = 2048;
	private static final int BATCH_BUFFER_BYTES = 1 << 15;
	private static final int QUEUE_CAPACITY = 16;
	private static final int SEED_RECORDS = 24;

	private enum ForceMode {
		NONE,
		FINAL,
		EACH
	}

	private enum PurgeMode {
		NEVER,
		MID_STREAM
	}

	private enum InitialState {
		EMPTY,
		SEEDED
	}

	@TempDir
	Path tempDir;

	private final AtomicInteger idCounter = new AtomicInteger();
	private String storeUuid;

	@BeforeEach
	void setUp() {
		storeUuid = UUID.randomUUID().toString();
	}

	@AfterEach
	void tearDown() {
		idCounter.set(0);
	}

	@ParameterizedTest(name = "{index}: policy={0}, force={1}, purge={2}, seed={3}")
	@MethodSource("walCombinationCases")
	void walHandlesCombinations(ValueStoreWalConfig.SyncPolicy syncPolicy, ForceMode forceMode, PurgeMode purgeMode,
			InitialState initialState) throws Exception {

		Path walDir = createWalDirectory(syncPolicy, forceMode, purgeMode, initialState);

		List<String> expectedLexicals = new ArrayList<>();
		if (initialState == InitialState.SEEDED) {
			expectedLexicals.addAll(seedInitialSegments(walDir));
		}

		ValueStoreWalConfig config = ValueStoreWalConfig.builder()
				.walDirectory(walDir)
				.storeUuid(storeUuid)
				.maxSegmentBytes(MAX_SEGMENT_BYTES)
				.queueCapacity(QUEUE_CAPACITY)
				.batchBufferBytes(BATCH_BUFFER_BYTES)
				.syncPolicy(syncPolicy)
				.syncInterval(SYNC_INTERVAL)
				.idlePollInterval(IDLE_POLL_INTERVAL)
				.build();

		try (ValueStoreWAL wal = ValueStoreWAL.open(config)) {
			BatchResult firstBatch = mintBatch(wal, "first", 6, forceMode);
			expectedLexicals.addAll(firstBatch.lexicals());

			if (purgeMode == PurgeMode.MID_STREAM) {
				wal.purgeAllSegments();
				expectedLexicals.clear();
			}

			BatchResult secondBatch = mintBatch(wal, "second", 5, forceMode);
			expectedLexicals.addAll(secondBatch.lexicals());
		}

		ValueStoreWalReader.ScanResult result = ValueStoreWalReader.open(config).scan();
		List<String> actualLexicals = result.records()
				.stream()
				.map(ValueStoreWalRecord::lexical)
				.collect(Collectors.toList());

		assertThat(actualLexicals).containsExactlyElementsOf(expectedLexicals);
		if (purgeMode == PurgeMode.NEVER) {
			assertThat(result.complete())
					.as("WAL scan should be complete when no purge occurs")
					.isTrue();
		}

		List<Long> lsns = result.records()
				.stream()
				.map(ValueStoreWalRecord::lsn)
				.collect(Collectors.toList());
		for (int i = 1; i < lsns.size(); i++) {
			assertThat(lsns.get(i)).isGreaterThan(lsns.get(i - 1));
		}

		if (expectedLexicals.isEmpty()) {
			assertThat(lsns).isEmpty();
			assertThat(result.lastValidLsn()).isEqualTo(ValueStoreWAL.NO_LSN);
		} else {
			assertThat(lsns).isNotEmpty();
			assertThat(result.lastValidLsn()).isGreaterThanOrEqualTo(lsns.get(lsns.size() - 1));
		}
	}

	private Stream<Arguments> walCombinationCases() {
		List<Arguments> arguments = new ArrayList<>();
		for (ValueStoreWalConfig.SyncPolicy policy : ValueStoreWalConfig.SyncPolicy.values()) {
			for (ForceMode forceMode : ForceMode.values()) {
				for (PurgeMode purgeMode : PurgeMode.values()) {
					for (InitialState seed : EnumSet.allOf(InitialState.class)) {
						arguments.add(Arguments.of(policy, forceMode, purgeMode, seed));
					}
				}
			}
		}
		return arguments.stream();
	}

	private Path createWalDirectory(ValueStoreWalConfig.SyncPolicy syncPolicy, ForceMode forceMode,
			PurgeMode purgeMode, InitialState seed) throws IOException {
		String dirName = (syncPolicy.name() + "-" + forceMode.name() + "-" + purgeMode.name() + "-" + seed.name())
				.toLowerCase();
		Path dir = tempDir.resolve(dirName);
		Files.createDirectories(dir);
		return dir;
	}

	private List<String> seedInitialSegments(Path walDir) throws Exception {
		ValueStoreWalConfig seedConfig = ValueStoreWalConfig.builder()
				.walDirectory(walDir)
				.storeUuid(storeUuid)
				.maxSegmentBytes(MAX_SEGMENT_BYTES)
				.queueCapacity(QUEUE_CAPACITY)
				.batchBufferBytes(BATCH_BUFFER_BYTES)
				.syncPolicy(ValueStoreWalConfig.SyncPolicy.ALWAYS)
				.syncInterval(SYNC_INTERVAL)
				.idlePollInterval(IDLE_POLL_INTERVAL)
				.build();

		try (ValueStoreWAL wal = ValueStoreWAL.open(seedConfig)) {
			return new ArrayList<>(mintBatch(wal, "seed", SEED_RECORDS, ForceMode.FINAL).lexicals());
		}
	}

	private BatchResult mintBatch(ValueStoreWAL wal, String prefix, int count, ForceMode forceMode)
			throws IOException, InterruptedException {
		List<String> lexicals = new ArrayList<>(count);
		long lastLsn = ValueStoreWAL.NO_LSN;
		for (int i = 0; i < count; i++) {
			int id = idCounter.incrementAndGet();
			String lexical = lexicalToken(prefix, id);
			long lsn = wal.logMint(id, ValueStoreWalValueKind.LITERAL, lexical, "http://example/dt", "",
					lexical.hashCode());
			lexicals.add(lexical);
			if (forceMode == ForceMode.EACH) {
				wal.awaitDurable(lsn);
			}
			lastLsn = lsn;
		}
		if (forceMode == ForceMode.FINAL && lastLsn > ValueStoreWAL.NO_LSN) {
			wal.awaitDurable(lastLsn);
		}
		return new BatchResult(lexicals, lastLsn);
	}

	private static String lexicalToken(String prefix, int id) {
		return prefix + "-" + id + "-payload-0123456789abcdefghijklmnopqrstuvwxyz";
	}

	private static final class BatchResult {
		private final List<String> lexicals;
		private final long lastLsn;

		private BatchResult(List<String> lexicals, long lastLsn) {
			this.lexicals = List.copyOf(lexicals);
			this.lastLsn = lastLsn;
		}

		private List<String> lexicals() {
			return lexicals;
		}

		private long lastLsn() {
			return lastLsn;
		}
	}
}