ValueStoreWalRecoveryRebuildTest.java
/*******************************************************************************
* Copyright (c) 2025 Eclipse RDF4J contributors.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Distribution License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: BSD-3-Clause
*******************************************************************************/
package org.eclipse.rdf4j.sail.nativerdf.wal;
import static org.assertj.core.api.Assertions.assertThat;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.OptionalLong;
import java.util.UUID;
import java.util.stream.Collectors;
import org.eclipse.rdf4j.common.io.ByteArrayUtil;
import org.eclipse.rdf4j.model.IRI;
import org.eclipse.rdf4j.model.Literal;
import org.eclipse.rdf4j.model.ValueFactory;
import org.eclipse.rdf4j.model.impl.SimpleValueFactory;
import org.eclipse.rdf4j.sail.nativerdf.ValueStore;
import org.eclipse.rdf4j.sail.nativerdf.datastore.DataStore;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
class ValueStoreWalRecoveryRebuildTest {
private static final ValueFactory VF = SimpleValueFactory.getInstance();
@TempDir
Path tempDir;
@Test
void rebuildAssignsExactIds() throws Exception {
Path walDir = tempDir.resolve(ValueStoreWalConfig.DEFAULT_DIRECTORY_NAME);
Files.createDirectories(walDir);
ValueStoreWalConfig config = ValueStoreWalConfig.builder()
.walDirectory(walDir)
.storeUuid(UUID.randomUUID().toString())
.build();
IRI iri = VF.createIRI("http://example.com/res");
Literal lit = VF.createLiteral("value", "en");
// Mint values and persist WAL
try (ValueStoreWAL wal = ValueStoreWAL.open(config)) {
File valueDir = tempDir.resolve("values").toFile();
Files.createDirectories(valueDir.toPath());
try (ValueStore store = new ValueStore(valueDir, false, ValueStore.VALUE_CACHE_SIZE,
ValueStore.VALUE_ID_CACHE_SIZE, ValueStore.NAMESPACE_CACHE_SIZE,
ValueStore.NAMESPACE_ID_CACHE_SIZE, wal)) {
store.storeValue(iri);
store.storeValue(lit);
var lsn = store.drainPendingWalHighWaterMark();
assertThat(lsn).isPresent();
wal.awaitDurable(lsn.getAsLong());
}
}
Map<Integer, ValueStoreWalRecord> dictionary;
try (ValueStoreWalReader reader = ValueStoreWalReader.open(config)) {
ValueStoreWalRecovery recovery = new ValueStoreWalRecovery();
dictionary = new LinkedHashMap<>(recovery.replay(reader));
}
assertThat(dictionary).isNotEmpty();
// Rebuild DataStore directly from WAL dictionary
File dataDir = tempDir.resolve("rebuilt").toFile();
Files.createDirectories(dataDir.toPath());
try (DataStore ds = new DataStore(dataDir, "values", false)) {
for (ValueStoreWalRecord rec : dictionary.values()) {
if (rec.valueKind() == ValueStoreWalValueKind.NAMESPACE) {
ds.storeData(rec.lexical().getBytes(StandardCharsets.UTF_8));
} else if (rec.valueKind() == ValueStoreWalValueKind.IRI) {
ds.storeData(encodeIri(rec.lexical(), ds));
} else if (rec.valueKind() == ValueStoreWalValueKind.BNODE) {
byte[] idData = rec.lexical().getBytes(StandardCharsets.UTF_8);
byte[] bnode = new byte[1 + idData.length];
bnode[0] = 0x2; // BNODE tag
ByteArrayUtil.put(idData, bnode, 1);
ds.storeData(bnode);
} else if (rec.valueKind() == ValueStoreWalValueKind.LITERAL) {
ds.storeData(encodeLiteral(rec.lexical(), rec.datatype(), rec.language(), ds));
}
}
ds.sync();
}
// Verify exact id equality using ValueStore on rebuilt data
try (ValueStore vs = new ValueStore(dataDir, false, ValueStore.VALUE_CACHE_SIZE,
ValueStore.VALUE_ID_CACHE_SIZE, ValueStore.NAMESPACE_CACHE_SIZE,
ValueStore.NAMESPACE_ID_CACHE_SIZE, null)) {
for (ValueStoreWalRecord rec : dictionary.values()) {
switch (rec.valueKind()) {
case IRI:
assertThat(vs.getID(VF.createIRI(rec.lexical()))).isEqualTo(rec.id());
break;
case BNODE:
assertThat(vs.getID(VF.createBNode(rec.lexical()))).isEqualTo(rec.id());
break;
case LITERAL:
Literal l = (rec.language() != null && !rec.language().isEmpty())
? VF.createLiteral(rec.lexical(), rec.language())
: (rec.datatype() != null && !rec.datatype().isEmpty())
? VF.createLiteral(rec.lexical(), VF.createIRI(rec.datatype()))
: VF.createLiteral(rec.lexical());
assertThat(vs.getID(l)).isEqualTo(rec.id());
break;
default:
// skip NAMESPACE here
}
}
}
}
@Test
void missingSegmentMarksIncomplete() throws Exception {
Path walDir = tempDir.resolve("wal-missing");
Files.createDirectories(walDir);
ValueStoreWalConfig config = ValueStoreWalConfig.builder()
.walDirectory(walDir)
.storeUuid(UUID.randomUUID().toString())
.maxSegmentBytes(1 << 12)
.build();
Path valueDir = tempDir.resolve("values-missing");
Files.createDirectories(valueDir);
try (ValueStoreWAL wal = ValueStoreWAL.open(config);
ValueStore store = new ValueStore(valueDir.toFile(), false, ValueStore.VALUE_CACHE_SIZE,
ValueStore.VALUE_ID_CACHE_SIZE, ValueStore.NAMESPACE_CACHE_SIZE,
ValueStore.NAMESPACE_ID_CACHE_SIZE, wal)) {
for (int i = 0; i < 200; i++) {
store.storeValue(VF.createIRI("http://example.com/value/" + i));
}
OptionalLong lsn = store.drainPendingWalHighWaterMark();
if (lsn.isPresent()) {
store.awaitWalDurable(lsn.getAsLong());
}
}
List<Path> segments;
try (var stream = Files.list(walDir)) {
segments = stream.filter(p -> p.getFileName().toString().startsWith("wal-"))
.sorted()
.collect(Collectors.toList());
}
assertThat(segments).hasSizeGreaterThan(1);
Files.deleteIfExists(segments.get(0));
ValueStoreWalRecovery recovery = new ValueStoreWalRecovery();
ValueStoreWalRecovery.ReplayReport report;
try (ValueStoreWalReader reader = ValueStoreWalReader.open(config)) {
report = recovery.replayWithReport(reader);
}
assertThat(report.complete()).isFalse();
}
private byte[] encodeIri(String lexical, DataStore ds) throws IOException {
IRI iri = VF.createIRI(lexical);
String ns = iri.getNamespace();
String local = iri.getLocalName();
int nsId = ds.getID(ns.getBytes(StandardCharsets.UTF_8));
if (nsId == -1) {
nsId = ds.storeData(ns.getBytes(StandardCharsets.UTF_8));
}
byte[] localBytes = local.getBytes(StandardCharsets.UTF_8);
byte[] data = new byte[1 + 4 + localBytes.length];
data[0] = 0x1; // URI tag
ByteArrayUtil.putInt(nsId, data, 1);
ByteArrayUtil.put(localBytes, data, 5);
return data;
}
private byte[] encodeLiteral(String label, String datatype, String language, DataStore ds) throws IOException {
int dtId = -1; // UNKNOWN_ID
if (datatype != null && !datatype.isEmpty()) {
byte[] dtBytes = encodeIri(datatype, ds);
int id = ds.getID(dtBytes);
dtId = id == -1 ? ds.storeData(dtBytes) : id;
}
byte[] langBytes = language == null ? new byte[0] : language.getBytes(StandardCharsets.UTF_8);
byte[] labelBytes = label.getBytes(StandardCharsets.UTF_8);
byte[] data = new byte[1 + 4 + 1 + langBytes.length + labelBytes.length];
data[0] = 0x3; // LITERAL tag
ByteArrayUtil.putInt(dtId, data, 1);
data[5] = (byte) (langBytes.length & 0xFF);
if (langBytes.length > 0) {
ByteArrayUtil.put(langBytes, data, 6);
}
ByteArrayUtil.put(labelBytes, data, 6 + langBytes.length);
return data;
}
}