ValueStoreWalIntegrationTest.java
/*******************************************************************************
* Copyright (c) 2025 Eclipse RDF4J contributors.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Distribution License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: BSD-3-Clause
*******************************************************************************/
package org.eclipse.rdf4j.sail.nativerdf.wal;
import static org.assertj.core.api.Assertions.assertThat;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Map;
import java.util.Objects;
import java.util.OptionalLong;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.eclipse.rdf4j.model.IRI;
import org.eclipse.rdf4j.model.Literal;
import org.eclipse.rdf4j.model.ValueFactory;
import org.eclipse.rdf4j.model.impl.SimpleValueFactory;
import org.eclipse.rdf4j.model.vocabulary.XMLSchema;
import org.eclipse.rdf4j.sail.nativerdf.ValueStore;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
class ValueStoreWalIntegrationTest {
private static final ValueFactory VF = SimpleValueFactory.getInstance();
@TempDir
Path tempDir;
@Test
void purgeDropsQueuedFramesOnClear() throws Exception {
Path walDir = tempDir.resolve("wal-purge");
Files.createDirectories(walDir);
ValueStoreWalConfig config = ValueStoreWalConfig.builder()
.walDirectory(walDir)
.storeUuid(UUID.randomUUID().toString())
// Default COMMIT policy: do not auto-flush unless forced
.syncPolicy(ValueStoreWalConfig.SyncPolicy.COMMIT)
.build();
File valueDir = tempDir.resolve("values-purge").toFile();
Files.createDirectories(valueDir.toPath());
// Enqueue a single value and immediately clear() the store, which purges the WAL.
try (ValueStoreWAL wal = ValueStoreWAL.open(config);
ValueStore store = new ValueStore(valueDir, false, ValueStore.VALUE_CACHE_SIZE,
ValueStore.VALUE_ID_CACHE_SIZE, ValueStore.NAMESPACE_CACHE_SIZE,
ValueStore.NAMESPACE_ID_CACHE_SIZE, wal)) {
store.storeValue(VF.createLiteral("to-be-dropped"));
// Intentionally do not awaitDurable: the record remains queued/in-memory
store.clear(); // triggers WAL purge
// Now add a post-clear value and force durability. If the purge didn't drop queued frames,
// the pre-clear value will be flushed together with this post-clear record.
store.storeValue(VF.createLiteral("after-clear"));
var lsn = store.drainPendingWalHighWaterMark();
if (lsn.isPresent()) {
wal.awaitDurable(lsn.getAsLong());
}
}
// Give the background writer a brief window to act after purge.
long deadline = System.nanoTime() + TimeUnit.SECONDS.toNanos(2);
boolean hasMinted = false;
while (System.nanoTime() < deadline) {
try (ValueStoreWalReader reader = ValueStoreWalReader.open(config)) {
var scan = reader.scan();
hasMinted = scan.records().stream().anyMatch(r -> "to-be-dropped".equals(r.lexical()));
}
if (hasMinted) {
break; // if bug exists, record may appear quickly
}
TimeUnit.MILLISECONDS.sleep(25);
}
// After purge, no pre-clear minted value must be recoverable from the WAL.
assertThat(hasMinted).isFalse();
}
void logsMintedValueRecords() throws Exception {
Path walDir = tempDir.resolve(ValueStoreWalConfig.DEFAULT_DIRECTORY_NAME);
Files.createDirectories(walDir);
ValueStoreWalConfig config = ValueStoreWalConfig.builder()
.walDirectory(walDir)
.storeUuid(UUID.randomUUID().toString())
.build();
try (ValueStoreWAL wal = ValueStoreWAL.open(config)) {
File valueDir = tempDir.resolve("values").toFile();
Files.createDirectories(valueDir.toPath());
try (ValueStore store = new ValueStore(valueDir, false, ValueStore.VALUE_CACHE_SIZE,
ValueStore.VALUE_ID_CACHE_SIZE, ValueStore.NAMESPACE_CACHE_SIZE,
ValueStore.NAMESPACE_ID_CACHE_SIZE, wal)) {
Literal literal = VF.createLiteral("hello");
store.storeValue(literal);
OptionalLong lsn = store.drainPendingWalHighWaterMark();
assertThat(lsn).isPresent();
wal.awaitDurable(lsn.getAsLong());
}
ValueStoreWalReader reader = ValueStoreWalReader.open(config);
ValueStoreWalReader.ScanResult scan = reader.scan();
reader.close();
assertThat(scan.records()).hasSize(3);
assertThat(scan.records())
.anyMatch(record -> record.valueKind() == ValueStoreWalValueKind.NAMESPACE
&& record.lexical().equals(XMLSchema.NAMESPACE));
assertThat(scan.records())
.anyMatch(record -> record.valueKind() == ValueStoreWalValueKind.IRI
&& record.lexical().equals(XMLSchema.STRING.stringValue()));
assertThat(scan.records())
.anyMatch(record -> record.valueKind() == ValueStoreWalValueKind.LITERAL
&& record.lexical().equals("hello")
&& record.datatype().equals(XMLSchema.STRING.stringValue()));
}
}
@Test
void recoveryRebuildsMintedEntries() throws Exception {
Path walDir = tempDir.resolve("wal2");
Files.createDirectories(walDir);
ValueStoreWalConfig config = ValueStoreWalConfig.builder()
.walDirectory(walDir)
.storeUuid(UUID.randomUUID().toString())
.build();
Literal literal = VF.createLiteral("world", "en");
IRI datatype = VF.createIRI("http://example.com/datatype");
try (ValueStoreWAL wal = ValueStoreWAL.open(config)) {
File valueDir = tempDir.resolve("values2").toFile();
Files.createDirectories(valueDir.toPath());
try (ValueStore store = new ValueStore(valueDir, false, ValueStore.VALUE_CACHE_SIZE,
ValueStore.VALUE_ID_CACHE_SIZE, ValueStore.NAMESPACE_CACHE_SIZE,
ValueStore.NAMESPACE_ID_CACHE_SIZE, wal)) {
store.storeValue(literal);
store.storeValue(VF.createIRI("http://example.com/resource"));
store.storeValue(datatype);
OptionalLong lsn = store.drainPendingWalHighWaterMark();
assertThat(lsn).isPresent();
wal.awaitDurable(lsn.getAsLong());
}
}
try (ValueStoreWalReader reader = ValueStoreWalReader.open(config)) {
ValueStoreWalRecovery recovery = new ValueStoreWalRecovery();
Map<Integer, ValueStoreWalRecord> dictionary = recovery.replay(reader);
assertThat(dictionary).isNotEmpty();
assertThat(dictionary.values())
.anyMatch(record -> record.valueKind() == ValueStoreWalValueKind.LITERAL
&& record.lexical().equals("world"));
assertThat(dictionary.values())
.anyMatch(record -> record.valueKind() == ValueStoreWalValueKind.IRI
&& record.lexical().equals("http://example.com/resource"));
}
}
@Test
void enablingWalOnPopulatedStoreRebuildsExistingEntries() throws Exception {
Path valuesPath = tempDir.resolve("values-existing");
Files.createDirectories(valuesPath);
File valueDir = valuesPath.toFile();
IRI existingIri = VF.createIRI("http://example.com/existing/one");
Literal existingLiteral = VF.createLiteral("existing-literal", "en");
try (ValueStore store = new ValueStore(valueDir, false, ValueStore.VALUE_CACHE_SIZE,
ValueStore.VALUE_ID_CACHE_SIZE, ValueStore.NAMESPACE_CACHE_SIZE,
ValueStore.NAMESPACE_ID_CACHE_SIZE, null)) {
store.storeValue(existingIri);
store.storeValue(existingLiteral);
}
Path walDir = tempDir.resolve("wal-existing");
Files.createDirectories(walDir);
ValueStoreWalConfig config = ValueStoreWalConfig.builder()
.walDirectory(walDir)
.storeUuid(UUID.randomUUID().toString())
.build();
IRI newIri = VF.createIRI("http://example.com/new");
try (ValueStoreWAL wal = ValueStoreWAL.open(config);
ValueStore store = new ValueStore(valueDir, false, ValueStore.VALUE_CACHE_SIZE,
ValueStore.VALUE_ID_CACHE_SIZE, ValueStore.NAMESPACE_CACHE_SIZE,
ValueStore.NAMESPACE_ID_CACHE_SIZE, wal)) {
store.storeValue(newIri);
OptionalLong lsn = store.drainPendingWalHighWaterMark();
if (lsn.isPresent()) {
wal.awaitDurable(lsn.getAsLong());
}
ValueStoreWalRecovery recovery = new ValueStoreWalRecovery();
Map<Integer, ValueStoreWalRecord> dictionary = Map.of();
long deadline = System.nanoTime() + TimeUnit.SECONDS.toNanos(5);
boolean hasExistingIri = false;
boolean hasExistingLiteral = false;
while (System.nanoTime() < deadline && (!hasExistingIri || !hasExistingLiteral)) {
try (ValueStoreWalReader reader = ValueStoreWalReader.open(config)) {
dictionary = recovery.replay(reader);
}
hasExistingIri = dictionary.values()
.stream()
.anyMatch(record -> record.valueKind() == ValueStoreWalValueKind.IRI
&& record.lexical().equals(existingIri.stringValue()));
hasExistingLiteral = dictionary.values()
.stream()
.anyMatch(record -> record.valueKind() == ValueStoreWalValueKind.LITERAL
&& record.lexical().equals(existingLiteral.getLabel())
&& Objects.toString(record.language(), "")
.equals(existingLiteral.getLanguage().orElse("")));
if (!hasExistingIri || !hasExistingLiteral) {
TimeUnit.MILLISECONDS.sleep(25);
}
}
assertThat(hasExistingIri).isTrue();
assertThat(hasExistingLiteral).isTrue();
assertThat(dictionary.values())
.anyMatch(record -> record.valueKind() == ValueStoreWalValueKind.IRI
&& record.lexical().equals(newIri.stringValue()));
}
try (var stream = Files.list(walDir)) {
assertThat(stream
.filter(Files::isRegularFile)
.map(path -> path.getFileName().toString())
.filter(name -> name.startsWith("wal-")))
.allMatch(name -> name.matches("wal-[1-9]\\d*\\.v1(?:\\.gz)?"));
}
}
}