ValueStoreWalRecoveryCorruptionTest.java
/*******************************************************************************
* Copyright (c) 2025 Eclipse RDF4J contributors.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Distribution License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: BSD-3-Clause
*******************************************************************************/
package org.eclipse.rdf4j.sail.nativerdf.wal;
import static org.assertj.core.api.Assertions.assertThat;
import java.io.BufferedOutputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import org.eclipse.rdf4j.common.io.ByteArrayUtil;
import org.eclipse.rdf4j.model.IRI;
import org.eclipse.rdf4j.model.Literal;
import org.eclipse.rdf4j.model.ValueFactory;
import org.eclipse.rdf4j.model.impl.SimpleValueFactory;
import org.eclipse.rdf4j.model.util.Values;
import org.eclipse.rdf4j.model.vocabulary.RDF;
import org.eclipse.rdf4j.model.vocabulary.RDFS;
import org.eclipse.rdf4j.repository.Repository;
import org.eclipse.rdf4j.repository.RepositoryConnection;
import org.eclipse.rdf4j.repository.sail.SailRepository;
import org.eclipse.rdf4j.sail.nativerdf.NativeStore;
import org.eclipse.rdf4j.sail.nativerdf.ValueStore;
import org.eclipse.rdf4j.sail.nativerdf.datastore.DataStore;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.io.TempDir;
/**
* Tests that corrupt or missing ValueStore files can be reconstructed from the ValueStore WAL, restoring consistent IDs
* so existing triple indexes remain valid.
*/
class ValueStoreWalRecoveryCorruptionTest {
private static final ValueFactory VF = SimpleValueFactory.getInstance();
@TempDir
Path tempDir;
@Test
@Timeout(10)
void rebuildsAfterDeletingAllValueFiles() throws Exception {
File dataDir = tempDir.resolve("store").toFile();
dataDir.mkdirs();
// Pre-create an empty context index to avoid ContextStore reconstruction during init
ensureEmptyContextIndex(dataDir.toPath());
Repository repo = new SailRepository(new NativeStore(dataDir, "spoc,posc"));
repo.init();
try (RepositoryConnection conn = repo.getConnection()) {
conn.begin();
IRI exA = VF.createIRI("http://example.org/a0");
IRI exB = VF.createIRI("http://example.org/b1");
IRI exC = VF.createIRI("http://example.org/c2");
Literal lit0 = VF.createLiteral("zero");
Literal lit1 = VF.createLiteral("one");
Literal lit2 = VF.createLiteral("two");
Literal lit2en = VF.createLiteral("two", "en");
Literal litTyped = VF.createLiteral(1.2);
conn.add(exA, RDFS.LABEL, lit0);
conn.add(exB, RDFS.LABEL, lit1, VF.createIRI("urn:one"));
conn.add(exC, RDFS.LABEL, lit2, VF.createIRI("urn:two"));
conn.add(exC, RDFS.LABEL, lit2, VF.createIRI("urn:two"));
conn.add(Values.bnode(), RDF.TYPE, Values.bnode(), VF.createIRI("urn:two"));
conn.add(exC, RDFS.LABEL, lit2en, VF.createIRI("urn:two"));
conn.add(exC, RDFS.LABEL, litTyped, VF.createIRI("urn:two"));
conn.commit();
}
repo.shutDown();
// Simulate corruption: delete all ValueStore files
deleteIfExists(dataDir.toPath().resolve("values.dat"));
deleteIfExists(dataDir.toPath().resolve("values.id"));
deleteIfExists(dataDir.toPath().resolve("values.hash"));
recoverValueStoreFromWal(dataDir.toPath());
validateDictionaryMatchesWal(dataDir.toPath());
}
@Test
@Timeout(10)
void rebuildsAfterCorruptingValuesDat() throws Exception {
File dataDir = tempDir.resolve("store2").toFile();
dataDir.mkdirs();
ensureEmptyContextIndex(dataDir.toPath());
Repository repo = new SailRepository(new NativeStore(dataDir, "spoc,posc"));
repo.init();
try (RepositoryConnection conn = repo.getConnection()) {
conn.begin();
conn.add(VF.createIRI("http://ex.com/s"), RDFS.LABEL, VF.createLiteral("hello"));
conn.add(VF.createIRI("http://ex.com/t"), RDFS.LABEL, VF.createLiteral("world", "en"));
conn.add(VF.createIRI("http://ex.com/u"), RDFS.LABEL, VF.createLiteral(42));
conn.commit();
}
repo.shutDown();
Path valuesDat = dataDir.toPath().resolve("values.dat");
if (Files.exists(valuesDat)) {
Files.newByteChannel(valuesDat, Set.of(StandardOpenOption.WRITE))
.truncate(0)
.close();
}
recoverValueStoreFromWal(dataDir.toPath());
validateDictionaryMatchesWal(dataDir.toPath());
}
private void deleteIfExists(Path path) throws IOException {
if (Files.exists(path)) {
Files.delete(path);
}
}
private void recoverValueStoreFromWal(Path dataDir) throws Exception {
Path walDir = dataDir.resolve(ValueStoreWalConfig.DEFAULT_DIRECTORY_NAME);
Path uuidFile = walDir.resolve("store.uuid");
String storeUuid = Files.exists(uuidFile) ? Files.readString(uuidFile, StandardCharsets.UTF_8).trim()
: UUID.randomUUID().toString();
ValueStoreWalConfig config = ValueStoreWalConfig.builder().walDirectory(walDir).storeUuid(storeUuid).build();
Map<Integer, ValueStoreWalRecord> dictionary;
try (ValueStoreWalReader reader = ValueStoreWalReader.open(config)) {
ValueStoreWalRecovery recovery = new ValueStoreWalRecovery();
dictionary = new LinkedHashMap<>(recovery.replay(reader));
}
try (DataStore ds = new DataStore(dataDir.toFile(), "values", false)) {
for (ValueStoreWalRecord record : dictionary.values()) {
switch (record.valueKind()) {
case NAMESPACE: {
byte[] nsBytes = record.lexical().getBytes(StandardCharsets.UTF_8);
ds.storeData(nsBytes);
break;
}
case IRI: {
byte[] iriBytes = encodeIri(record.lexical(), ds);
ds.storeData(iriBytes);
break;
}
case BNODE: {
byte[] idData = record.lexical().getBytes(StandardCharsets.UTF_8);
byte[] bnode = new byte[1 + idData.length];
bnode[0] = 0x2;
ByteArrayUtil.put(idData, bnode, 1);
ds.storeData(bnode);
break;
}
case LITERAL: {
byte[] litBytes = encodeLiteral(record.lexical(), record.datatype(), record.language(), ds);
ds.storeData(litBytes);
break;
}
default:
break;
}
}
ds.sync();
}
}
private byte[] encodeIri(String lexical, DataStore ds) throws IOException {
IRI iri = VF.createIRI(lexical);
String ns = iri.getNamespace();
String local = iri.getLocalName();
int nsId = ds.getID(ns.getBytes(StandardCharsets.UTF_8));
if (nsId == -1) {
nsId = ds.storeData(ns.getBytes(StandardCharsets.UTF_8));
}
byte[] localBytes = local.getBytes(StandardCharsets.UTF_8);
byte[] data = new byte[1 + 4 + localBytes.length];
data[0] = 0x1;
ByteArrayUtil.putInt(nsId, data, 1);
ByteArrayUtil.put(localBytes, data, 5);
return data;
}
private byte[] encodeLiteral(String label, String datatype, String language, DataStore ds) throws IOException {
int dtId = -1; // -1 denotes UNKNOWN_ID
if (datatype != null && !datatype.isEmpty()) {
byte[] dtBytes = encodeIri(datatype, ds);
int id = ds.getID(dtBytes);
dtId = id == -1 ? ds.storeData(dtBytes) : id;
}
byte[] langBytes = language == null ? new byte[0] : language.getBytes(StandardCharsets.UTF_8);
byte[] labelBytes = label.getBytes(StandardCharsets.UTF_8);
byte[] data = new byte[1 + 4 + 1 + langBytes.length + labelBytes.length];
data[0] = 0x3;
ByteArrayUtil.putInt(dtId, data, 1);
data[5] = (byte) (langBytes.length & 0xFF);
if (langBytes.length > 0) {
ByteArrayUtil.put(langBytes, data, 6);
}
ByteArrayUtil.put(labelBytes, data, 6 + langBytes.length);
return data;
}
private void validateDictionaryMatchesWal(Path dataDir) throws Exception {
Path walDir = dataDir.resolve(ValueStoreWalConfig.DEFAULT_DIRECTORY_NAME);
String storeUuid = Files.readString(walDir.resolve("store.uuid"), StandardCharsets.UTF_8).trim();
ValueStoreWalConfig config = ValueStoreWalConfig.builder().walDirectory(walDir).storeUuid(storeUuid).build();
Map<Integer, ValueStoreWalRecord> dictionary;
try (ValueStoreWalReader reader = ValueStoreWalReader.open(config)) {
ValueStoreWalRecovery recovery = new ValueStoreWalRecovery();
dictionary = new LinkedHashMap<>(recovery.replay(reader));
}
try (ValueStore vs = new ValueStore(dataDir.toFile(), false, ValueStore.VALUE_CACHE_SIZE,
ValueStore.VALUE_ID_CACHE_SIZE, ValueStore.NAMESPACE_CACHE_SIZE, ValueStore.NAMESPACE_ID_CACHE_SIZE,
null)) {
for (ValueStoreWalRecord record : dictionary.values()) {
switch (record.valueKind()) {
case IRI: {
IRI iri = VF.createIRI(record.lexical());
int id = vs.getID(iri);
assertThat(id).isNotEqualTo(-1);
assertThat(vs.getValue(id).stringValue()).isEqualTo(record.lexical());
break;
}
case BNODE: {
int id = vs.getID(VF.createBNode(record.lexical()));
assertThat(id).isNotEqualTo(-1);
assertThat(vs.getValue(id).stringValue()).isEqualTo(record.lexical());
break;
}
case LITERAL: {
Literal lit;
if (record.language() != null && !record.language().isEmpty()) {
lit = VF.createLiteral(record.lexical(), record.language());
} else if (record.datatype() != null && !record.datatype().isEmpty()) {
lit = VF.createLiteral(record.lexical(), VF.createIRI(record.datatype()));
} else {
lit = VF.createLiteral(record.lexical());
}
int id = vs.getID(lit);
assertThat(id).isNotEqualTo(-1);
assertThat(vs.getValue(id).stringValue()).isEqualTo(lit.stringValue());
break;
}
case NAMESPACE:
// Namespaces indirectly validated via IRIs
break;
default:
break;
}
}
}
}
private void ensureEmptyContextIndex(Path dataDir) throws IOException {
Path file = dataDir.resolve("contexts.dat");
if (Files.exists(file)) {
return;
}
Files.createDirectories(dataDir);
try (var out = new DataOutputStream(
new BufferedOutputStream(new FileOutputStream(file.toFile())))) {
out.write(new byte[] { 'n', 'c', 'f' });
out.writeByte(1);
out.writeInt(0);
out.flush();
}
}
}