ValueStoreWalDurabilityRecoveryTest.java
/*******************************************************************************
* Copyright (c) 2025 Eclipse RDF4J contributors.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Distribution License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: BSD-3-Clause
*******************************************************************************/
package org.eclipse.rdf4j.sail.nativerdf.wal;
import static org.assertj.core.api.Assertions.assertThat;
import java.io.File;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import org.eclipse.rdf4j.common.io.ByteArrayUtil;
import org.eclipse.rdf4j.common.io.NioFile;
import org.eclipse.rdf4j.model.IRI;
import org.eclipse.rdf4j.model.ValueFactory;
import org.eclipse.rdf4j.model.impl.SimpleValueFactory;
import org.eclipse.rdf4j.model.util.Values;
import org.eclipse.rdf4j.repository.Repository;
import org.eclipse.rdf4j.repository.RepositoryConnection;
import org.eclipse.rdf4j.repository.sail.SailRepository;
import org.eclipse.rdf4j.sail.nativerdf.NativeStore;
import org.eclipse.rdf4j.sail.nativerdf.testutil.FailureInjectingFileChannel;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
/**
* Proves that a NativeStore with forceSync disabled can be fully recovered from a WAL that runs with SyncPolicy.COMMIT
* and synchronous bootstrap on open ensuring durability before commit returns.
*/
class ValueStoreWalDurabilityRecoveryTest {
@TempDir
Path tempDir;
private static final ValueFactory VF = SimpleValueFactory.getInstance();
@Test
void recoversFromLostValueStoreUsingWALCommitDurability() throws Exception {
// Install a delegating FileChannel factory (no failures by default), proving injection works
NioFile.setChannelFactoryForTesting(
(path, options) -> new FailureInjectingFileChannel(java.nio.channels.FileChannel.open(path, options)));
File dataDir = tempDir.resolve("store").toFile();
dataDir.mkdirs();
NativeStore store = new NativeStore(dataDir, "spoc,posc");
store.setForceSync(false); // ValueStore won't fsync
store.setWalSyncPolicy(ValueStoreWalConfig.SyncPolicy.COMMIT); // WAL fsyncs on commit
store.setWalSyncBootstrapOnOpen(false);
Repository repo = new SailRepository(store);
repo.init();
IRI p = VF.createIRI("http://ex/p");
IRI s = VF.createIRI("http://ex/s");
IRI o = VF.createIRI("http://ex/o");
try (RepositoryConnection conn = repo.getConnection()) {
conn.begin();
conn.add(s, p, o, Values.iri("urn:g"));
conn.commit(); // WAL should force+persist before this returns
}
repo.shutDown();
// Simulate crash that loses the ValueStore by deleting the value files, WAL remains
Files.deleteIfExists(dataDir.toPath().resolve("values.dat"));
Files.deleteIfExists(dataDir.toPath().resolve("values.id"));
Files.deleteIfExists(dataDir.toPath().resolve("values.hash"));
// Manually recover the ValueStore from WAL to simulate crash recovery
Path walDir = dataDir.toPath().resolve(ValueStoreWalConfig.DEFAULT_DIRECTORY_NAME);
String storeUuid = Files.readString(walDir.resolve("store.uuid"), StandardCharsets.UTF_8).trim();
ValueStoreWalConfig cfg = ValueStoreWalConfig.builder().walDirectory(walDir).storeUuid(storeUuid).build();
java.util.Map<Integer, ValueStoreWalRecord> dictionary;
try (ValueStoreWalReader reader = ValueStoreWalReader.open(cfg)) {
ValueStoreWalRecovery recovery = new ValueStoreWalRecovery();
dictionary = new java.util.LinkedHashMap<>(recovery.replay(reader));
}
try (org.eclipse.rdf4j.sail.nativerdf.datastore.DataStore ds = new org.eclipse.rdf4j.sail.nativerdf.datastore.DataStore(
dataDir, "values", false)) {
for (ValueStoreWalRecord record : dictionary.values()) {
switch (record.valueKind()) {
case NAMESPACE:
ds.storeData(record.lexical().getBytes(StandardCharsets.UTF_8));
break;
case IRI:
ds.storeData(encodeIri(record.lexical(), ds));
break;
case BNODE:
byte[] idData = record.lexical().getBytes(StandardCharsets.UTF_8);
byte[] bnode = new byte[1 + idData.length];
bnode[0] = 0x2;
ByteArrayUtil.put(idData, bnode, 1);
ds.storeData(bnode);
break;
default:
ds.storeData(encodeLiteral(record.lexical(), record.datatype(), record.language(), ds));
break;
}
}
ds.sync();
}
// Restart store and verify statement is readable (dictionary present)
NativeStore store2 = new NativeStore(dataDir, "spoc,posc");
store2.setForceSync(false);
store2.setWalSyncPolicy(ValueStoreWalConfig.SyncPolicy.COMMIT);
store2.setWalSyncBootstrapOnOpen(true);
Repository repo2 = new SailRepository(store2);
repo2.init();
try (RepositoryConnection conn = repo2.getConnection()) {
long count = conn.getStatements(s, p, o, false, Values.iri("urn:g")).stream().count();
assertThat(count).isEqualTo(1L);
}
repo2.shutDown();
// Remove factory to avoid impacting other tests
NioFile.setChannelFactoryForTesting(null);
}
private byte[] encodeIri(String lexical, org.eclipse.rdf4j.sail.nativerdf.datastore.DataStore ds) throws Exception {
IRI iri = VF.createIRI(lexical);
String ns = iri.getNamespace();
String local = iri.getLocalName();
int nsId = ds.getID(ns.getBytes(StandardCharsets.UTF_8));
if (nsId == -1) {
nsId = ds.storeData(ns.getBytes(StandardCharsets.UTF_8));
}
byte[] localBytes = local.getBytes(StandardCharsets.UTF_8);
byte[] data = new byte[1 + 4 + localBytes.length];
data[0] = 0x1;
ByteArrayUtil.putInt(nsId, data, 1);
ByteArrayUtil.put(localBytes, data, 5);
return data;
}
private byte[] encodeLiteral(String label, String datatype, String language,
org.eclipse.rdf4j.sail.nativerdf.datastore.DataStore ds) throws Exception {
int dtId = -1; // -1 denotes UNKNOWN_ID
if (datatype != null && !datatype.isEmpty()) {
byte[] dtBytes = encodeIri(datatype, ds);
int id = ds.getID(dtBytes);
dtId = id == -1 ? ds.storeData(dtBytes) : id;
}
byte[] langBytes = language == null ? new byte[0] : language.getBytes(StandardCharsets.UTF_8);
byte[] labelBytes = label.getBytes(StandardCharsets.UTF_8);
byte[] data = new byte[1 + 4 + 1 + langBytes.length + labelBytes.length];
data[0] = 0x3;
ByteArrayUtil.putInt(dtId, data, 1);
data[5] = (byte) (langBytes.length & 0xFF);
if (langBytes.length > 0) {
ByteArrayUtil.put(langBytes, data, 6);
}
ByteArrayUtil.put(labelBytes, data, 6 + langBytes.length);
return data;
}
}