ValueStoreWalDeletionDuringWriteTest.java
/*******************************************************************************
* Copyright (c) 2025 Eclipse RDF4J contributors.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Distribution License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: BSD-3-Clause
*******************************************************************************/
package org.eclipse.rdf4j.sail.nativerdf.wal;
import static org.assertj.core.api.Assertions.assertThat;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.OptionalLong;
import java.util.UUID;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.eclipse.rdf4j.model.IRI;
import org.eclipse.rdf4j.model.impl.SimpleValueFactory;
import org.eclipse.rdf4j.sail.nativerdf.ValueStore;
import org.eclipse.rdf4j.sail.nativerdf.datastore.DataStore;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
class ValueStoreWalDeletionDuringWriteTest {
private static final Pattern SEGMENT_PATTERN = Pattern.compile("wal-(\\d{8})\\.v1");
@TempDir
Path tempDir;
@Test
void asyncWalContinuesAfterCurrentSegmentDeletion() throws Exception {
Path walDir = tempDir.resolve(ValueStoreWalConfig.DEFAULT_DIRECTORY_NAME);
ValueStoreWalConfig config = ValueStoreWalConfig.builder()
.walDirectory(walDir)
.storeUuid(UUID.randomUUID().toString())
.maxSegmentBytes(1 << 12)
.syncPolicy(ValueStoreWalConfig.SyncPolicy.COMMIT)
.build();
Path valuesDir = tempDir.resolve("values");
Files.createDirectories(valuesDir);
List<Integer> beforeDeletion = new ArrayList<>();
List<Integer> afterDeletion = new ArrayList<>();
try (ValueStoreWAL wal = ValueStoreWAL.open(config);
ValueStore store = new ValueStore(valuesDir.toFile(), false, ValueStore.VALUE_CACHE_SIZE,
ValueStore.VALUE_ID_CACHE_SIZE, ValueStore.NAMESPACE_CACHE_SIZE,
ValueStore.NAMESPACE_ID_CACHE_SIZE, wal)) {
for (int i = 0; i < 80; i++) {
beforeDeletion.add(mintUniqueIri(store, "before-" + i));
}
drainAndAwait(store);
Path currentSegment = locateCurrentSegment(walDir);
assertThat(currentSegment).as("current WAL segment").isNotNull();
Files.deleteIfExists(currentSegment);
for (int i = 80; i < 160; i++) {
afterDeletion.add(mintUniqueIri(store, "after-" + i));
}
drainAndAwait(store);
}
try (ValueStoreWalReader reader = ValueStoreWalReader.open(config);
DataStore ds = new DataStore(valuesDir.toFile(), "values")) {
ValueStoreWalRecovery recovery = new ValueStoreWalRecovery();
var dictionary = recovery.replay(reader);
assertThat(afterDeletion).isNotEmpty();
assertThat(dictionary.keySet()).as("WAL should retain post-deletion ids")
.containsAll(afterDeletion);
for (Integer id : beforeDeletion) {
assertThat(ds.getData(id)).as("ValueStore data should exist for id %s", id).isNotNull();
}
for (Integer id : afterDeletion) {
assertThat(ds.getData(id)).as("ValueStore data should exist for id %s", id).isNotNull();
}
}
}
private static int mintUniqueIri(ValueStore store, String token) throws IOException {
IRI iri = SimpleValueFactory.getInstance().createIRI("http://example.com/value/" + token);
return store.storeValue(iri);
}
private static void drainAndAwait(ValueStore store) throws IOException {
OptionalLong pending = store.drainPendingWalHighWaterMark();
if (pending.isPresent()) {
store.awaitWalDurable(pending.getAsLong());
}
}
private static Path locateCurrentSegment(Path walDir) throws IOException {
if (!Files.isDirectory(walDir)) {
return null;
}
try (var stream = Files.list(walDir)) {
return stream.filter(path -> path.getFileName().toString().endsWith(".v1"))
.max(Comparator.comparingInt(ValueStoreWalDeletionDuringWriteTest::segmentSequence))
.orElse(null);
}
}
private static int segmentSequence(Path path) {
Matcher matcher = SEGMENT_PATTERN.matcher(path.getFileName().toString());
if (!matcher.matches()) {
return -1;
}
return Integer.parseInt(matcher.group(1));
}
}