ValueStoreRandomLookupTest.java
/*******************************************************************************
* Copyright (c) 2025 Eclipse RDF4J contributors.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Distribution License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: BSD-3-Clause
*******************************************************************************/
// Some portions generated by Codex
package org.eclipse.rdf4j.sail.nativerdf;
import static org.assertj.core.api.Assertions.assertThat;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.zip.CRC32;
import java.util.zip.GZIPInputStream;
import org.eclipse.rdf4j.benchmark.common.BenchmarkResources;
import org.eclipse.rdf4j.common.transaction.IsolationLevels;
import org.eclipse.rdf4j.model.Value;
import org.eclipse.rdf4j.repository.sail.SailRepository;
import org.eclipse.rdf4j.repository.sail.SailRepositoryConnection;
import org.eclipse.rdf4j.rio.RDFFormat;
import org.eclipse.rdf4j.sail.SailException;
import org.eclipse.rdf4j.sail.nativerdf.config.NativeStoreConfig;
import org.eclipse.rdf4j.sail.nativerdf.config.NativeStoreFactory;
import org.eclipse.rdf4j.sail.nativerdf.datastore.DataStore;
import org.eclipse.rdf4j.sail.nativerdf.wal.ValueStoreWAL;
import org.eclipse.rdf4j.sail.nativerdf.wal.ValueStoreWalConfig;
import org.eclipse.rdf4j.sail.nativerdf.wal.ValueStoreWalReader;
import org.eclipse.rdf4j.sail.nativerdf.wal.ValueStoreWalRecord;
import org.eclipse.rdf4j.sail.nativerdf.wal.ValueStoreWalRecovery;
import org.eclipse.rdf4j.sail.nativerdf.wal.ValueStoreWalSearch;
import org.eclipse.rdf4j.sail.nativerdf.wal.ValueStoreWalTestUtils;
import org.eclipse.rdf4j.sail.nativerdf.wal.ValueStoreWalValueKind;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;
class ValueStoreRandomLookupTest {
private static final Pattern SEGMENT_PATTERN = Pattern.compile("wal-(\\d+)\\.v1(?:\\.gz)?");
private static final JsonFactory JSON_FACTORY = new JsonFactory();
@TempDir
File dataDir;
@Test
void randomLookup50() throws Exception {
NativeStoreConfig cfg = new NativeStoreConfig("spoc,ospc,psoc");
cfg.setWalMaxSegmentBytes(1024 * 1024 * 4);
NativeStore store = (NativeStore) new NativeStoreFactory().getSail(cfg);
store.setDataDir(dataDir);
SailRepository repository = new SailRepository(store);
repository.init();
try (SailRepositoryConnection connection = repository.getConnection()) {
connection.begin(IsolationLevels.NONE);
try (InputStream in = BenchmarkResources.openDecompressedStream("benchmarkFiles/datagovbe-valid.ttl.gz")) {
assertThat(in).as("benchmarkFiles/datagovbe-valid.ttl.gz should be on classpath").isNotNull();
connection.add(in, "", RDFFormat.TURTLE);
}
connection.commit();
}
repository.shutDown();
Path walDir = dataDir.toPath().resolve(ValueStoreWalConfig.DEFAULT_DIRECTORY_NAME);
String storeUuid = Files.readString(walDir.resolve("store.uuid"), StandardCharsets.UTF_8).trim();
try (DataStore ds = new DataStore(dataDir, "values");
ValueStore vs = new ValueStore(dataDir, false)) {
int maxId = ds.getMaxID();
assertThat(maxId).isGreaterThan(0);
ValueStoreWalConfig walConfig = ValueStoreWalConfig.builder()
.walDirectory(walDir)
.storeUuid(storeUuid)
.build();
Map<Path, SegmentStats> statsBySegment = analyzeSegments(walDir, walConfig);
assertThat(statsBySegment).isNotEmpty();
ValueStoreWalRecovery recovery = new ValueStoreWalRecovery();
Map<Integer, ValueStoreWalRecord> dict;
try (ValueStoreWalReader reader = ValueStoreWalReader.open(walConfig)) {
dict = recovery.replay(reader);
}
assertThat(dict).isNotEmpty();
List<Integer> ids = new ArrayList<>();
for (Map.Entry<Integer, ValueStoreWalRecord> entry : dict.entrySet()) {
ValueStoreWalValueKind kind = entry.getValue().valueKind();
if (kind == ValueStoreWalValueKind.IRI || kind == ValueStoreWalValueKind.BNODE
|| kind == ValueStoreWalValueKind.LITERAL) {
ids.add(entry.getKey());
}
}
assertThat(ids).isNotEmpty();
List<SegmentStats> compressedStats = statsBySegment.values()
.stream()
.filter(SegmentStats::isCompressed)
.sorted(Comparator.comparingInt(SegmentStats::sequence))
.collect(Collectors.toList());
assertThat(compressedStats).isNotEmpty();
for (SegmentStats stat : compressedStats) {
assertThat(stat.summaryLastId)
.as("Summary should exist for %s", stat.path.getFileName())
.isNotNull();
assertThat(stat.summaryCRC32)
.as("Summary CRC should exist for %s", stat.path.getFileName())
.isNotNull();
assertThat(stat.summaryLastId).isEqualTo(stat.highestMintedId);
long actualCrc = crc32(stat.uncompressedBytes, stat.summaryOffset);
assertThat(stat.summaryCRC32).isEqualTo(actualCrc);
}
List<Path> orderedSegments = new ArrayList<>(statsBySegment.keySet());
orderedSegments.sort(Comparator.comparingInt(p -> statsBySegment.get(p).sequence()));
assertThat(orderedSegments).isNotEmpty();
Path firstSegment = orderedSegments.get(0);
Path currentSegment = orderedSegments.get(orderedSegments.size() - 1);
Set<Path> deleted = new HashSet<>();
Files.deleteIfExists(firstSegment);
deleted.add(firstSegment);
Files.deleteIfExists(currentSegment);
deleted.add(currentSegment);
ThreadLocalRandom random = ThreadLocalRandom.current();
for (Path segment : orderedSegments) {
if (deleted.contains(segment)) {
continue;
}
if (random.nextBoolean()) {
Files.deleteIfExists(segment);
deleted.add(segment);
}
}
Set<Integer> deletedIds = new HashSet<>();
Set<Integer> survivingIds = new HashSet<>();
for (Map.Entry<Path, SegmentStats> entry : statsBySegment.entrySet()) {
if (deleted.contains(entry.getKey())) {
deletedIds.addAll(entry.getValue().mintedIds);
} else {
survivingIds.addAll(entry.getValue().mintedIds);
}
}
ValueStoreWalSearch search = ValueStoreWalSearch.open(walConfig);
int walMatches = 0;
for (int i = 0; i < 50; i++) {
int id = ids.get(random.nextInt(ids.size()));
assertThat(id).isBetween(1, maxId);
Value value = null;
try {
value = vs.getValue(id);
} catch (SailException e) {
if (!deletedIds.contains(id)) {
throw e;
}
}
Value walValue = search.findValueById(id);
if (deletedIds.contains(id)) {
assertThat(walValue).as("wal search should miss deleted segment id %s", id).isNull();
continue;
}
assertThat(value).as("ValueStore value not null for surviving id %s", id).isNotNull();
assertThat(walValue).as("wal search should recover surviving id %s", id).isEqualTo(value);
walMatches++;
}
assertThat(walMatches).as("should recover at least one id via WAL").isGreaterThan(0);
List<Integer> survivorList = new ArrayList<>(survivingIds);
Collections.shuffle(survivorList);
int sampleCount = Math.min(50, survivorList.size());
for (int i = 0; i < sampleCount; i++) {
int id = survivorList.get(i);
Value expected = vs.getValue(id);
Value fromWal = search.findValueById(id);
assertThat(expected).isNotNull();
assertThat(fromWal).isEqualTo(expected);
}
assertThat(sampleCount).as("should have surviving ids to verify").isGreaterThan(0);
int found = 0;
for (int i = 0; i < 50; i++) {
int id = ids.get(random.nextInt(ids.size()));
assertThat(id).isBetween(1, maxId);
Value v = vs.getValue(id);
Value w = search.findValueById(id);
if (w != null && v != null && v.equals(w)) {
found++;
}
}
assertThat(found).as("Should resolve values for surviving WAL segments").isGreaterThan(0);
}
}
private static Map<Path, SegmentStats> analyzeSegments(Path walDir, ValueStoreWalConfig config) throws IOException {
Map<Path, SegmentStats> stats = new HashMap<>();
if (!Files.isDirectory(walDir)) {
return stats;
}
try (var stream = Files.list(walDir)) {
for (Path path : stream.collect(Collectors.toList())) {
Matcher matcher = SEGMENT_PATTERN.matcher(path.getFileName().toString());
if (matcher.matches()) {
stats.put(path, analyzeSingleSegment(path));
}
}
}
return stats;
}
private static SegmentStats analyzeSingleSegment(Path path) throws IOException {
boolean compressed = path.getFileName().toString().endsWith(".gz");
byte[] content;
if (compressed) {
try (GZIPInputStream gin = new GZIPInputStream(Files.newInputStream(path))) {
content = gin.readAllBytes();
}
} else {
content = Files.readAllBytes(path);
}
int sequence = ValueStoreWalTestUtils.readSegmentSequence(content);
SegmentStats stats = new SegmentStats(path, sequence, compressed, content);
ByteBuffer buffer = ByteBuffer.wrap(content).order(ByteOrder.LITTLE_ENDIAN);
while (buffer.remaining() >= Integer.BYTES) {
int frameStart = buffer.position();
int length = buffer.getInt();
if (length <= 0 || length > ValueStoreWAL.MAX_FRAME_BYTES) {
break;
}
if (buffer.remaining() < length + Integer.BYTES) {
break;
}
byte[] json = new byte[length];
buffer.get(json);
buffer.getInt();
ParsedRecord record = ParsedRecord.parse(json);
if (record.type == 'M') {
if (record.kind == ValueStoreWalValueKind.IRI || record.kind == ValueStoreWalValueKind.BNODE
|| record.kind == ValueStoreWalValueKind.LITERAL) {
stats.mintedIds.add(record.id);
}
stats.highestMintedId = Math.max(stats.highestMintedId, record.id);
} else if (record.type == 'S' && compressed) {
stats.summaryLastId = record.id;
stats.summaryCRC32 = record.crc32;
stats.summaryOffset = frameStart;
break;
}
}
return stats;
}
private static long crc32(byte[] content, int limit) {
if (limit <= 0) {
return 0L;
}
CRC32 crc32 = new CRC32();
crc32.update(content, 0, Math.min(limit, content.length));
return crc32.getValue();
}
private static final class SegmentStats {
final Path path;
final int sequence;
final boolean compressed;
final byte[] uncompressedBytes;
final List<Integer> mintedIds = new ArrayList<>();
Integer summaryLastId;
Long summaryCRC32;
int summaryOffset = -1;
int highestMintedId = 0;
SegmentStats(Path path, int sequence, boolean compressed, byte[] uncompressedBytes) {
this.path = path;
this.sequence = sequence;
this.compressed = compressed;
this.uncompressedBytes = uncompressedBytes;
}
boolean isCompressed() {
return compressed;
}
int sequence() {
return sequence;
}
}
private static final class ParsedRecord {
final char type;
final int id;
final long crc32;
final ValueStoreWalValueKind kind;
final int segment;
ParsedRecord(char type, int id, long crc32, ValueStoreWalValueKind kind, int segment) {
this.type = type;
this.id = id;
this.crc32 = crc32;
this.kind = kind;
this.segment = segment;
}
static ParsedRecord parse(byte[] json) throws IOException {
try (JsonParser parser = JSON_FACTORY.createParser(json)) {
char type = '?';
int id = 0;
long crc32 = 0L;
ValueStoreWalValueKind kind = ValueStoreWalValueKind.NAMESPACE;
int segment = 0;
while (parser.nextToken() != null) {
JsonToken token = parser.currentToken();
if (token == JsonToken.FIELD_NAME) {
String field = parser.getCurrentName();
parser.nextToken();
if ("t".equals(field)) {
String value = parser.getValueAsString("");
type = value.isEmpty() ? '?' : value.charAt(0);
} else if ("id".equals(field) || "lastId".equals(field)) {
id = parser.getValueAsInt(0);
} else if ("crc32".equals(field)) {
crc32 = parser.getValueAsLong(0L);
} else if ("vk".equals(field)) {
String code = parser.getValueAsString("");
kind = ValueStoreWalValueKind.fromCode(code);
} else if ("segment".equals(field)) {
segment = parser.getValueAsInt(0);
} else {
parser.skipChildren();
}
}
}
return new ParsedRecord(type, id, crc32, kind, segment);
}
}
}
}