ValueStoreWalCompressedSegmentRestoreTest.java
/*******************************************************************************
* Copyright (c) 2025 Eclipse RDF4J contributors.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Distribution License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: BSD-3-Clause
*******************************************************************************/
package org.eclipse.rdf4j.sail.nativerdf.wal;
import static org.assertj.core.api.Assertions.assertThat;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.UUID;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.zip.CRC32C;
import java.util.zip.GZIPInputStream;
import org.eclipse.rdf4j.model.ValueFactory;
import org.eclipse.rdf4j.model.impl.SimpleValueFactory;
import org.eclipse.rdf4j.sail.nativerdf.ValueStore;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;
/**
* Restores a value record from a compressed ValueStore WAL segment by performing a binary search on segment first LSNs.
*/
class ValueStoreWalCompressedSegmentRestoreTest {
private static final ValueFactory VF = SimpleValueFactory.getInstance();
private static final Pattern SEGMENT_GZ = Pattern.compile("wal-(\\d+)\\.v1\\.gz");
@TempDir
Path tempDir;
@Test
void restoreFromCompressedSegmentUsingBinarySearch() throws Exception {
// Force multiple segments by limiting segment size
Path walDir = tempDir.resolve(ValueStoreWalConfig.DEFAULT_DIRECTORY_NAME);
Files.createDirectories(walDir);
ValueStoreWalConfig config = ValueStoreWalConfig.builder()
.walDirectory(walDir)
.storeUuid(UUID.randomUUID().toString())
.maxSegmentBytes(4096) // small to ensure rotation + gzip
.build();
// Write enough values to rotate segments
String targetLex = null;
long targetLsn = -1;
try (ValueStoreWAL wal = ValueStoreWAL.open(config)) {
Path valuesDir = tempDir.resolve("values");
Files.createDirectories(valuesDir);
try (ValueStore store = new ValueStore(valuesDir.toFile(), false, ValueStore.VALUE_CACHE_SIZE,
ValueStore.VALUE_ID_CACHE_SIZE, ValueStore.NAMESPACE_CACHE_SIZE,
ValueStore.NAMESPACE_ID_CACHE_SIZE, wal)) {
// Mint many literal values to span several segments
for (int i = 0; i < 1000; i++) {
String lex = "val-" + i;
store.storeValue(VF.createLiteral(lex));
var lsn = store.drainPendingWalHighWaterMark();
if (i == 123) { // pick an early target to likely land in a compressed segment
targetLex = lex;
targetLsn = lsn.orElse(-1);
}
}
wal.awaitDurable(targetLsn);
}
}
// Ensure we have compressed segments
List<Path> compressed = listCompressedSegments(walDir);
assertThat(compressed).isNotEmpty();
// Compute first LSN per compressed segment (first 'M' after header)
List<Long> firstLsns = new ArrayList<>(compressed.size());
for (Path gz : compressed) {
long first = firstMintLsn(gz);
firstLsns.add(first);
}
// If our chosen target ended up after compressed segments, pick a target inside compressed range
long maxFirst = firstLsns.get(firstLsns.size() - 1);
if (targetLsn <= 0 || targetLsn < firstLsns.get(0) || targetLsn >= maxFirst) {
// fallback: derive a target from within first compressed segment by scanning a few frames
Target t = pickTargetFromCompressed(compressed.get(0));
targetLex = t.lex;
targetLsn = t.lsn;
}
// Binary search compressed segments by their first LSN
int segIdx = lowerBound(firstLsns, targetLsn);
if (segIdx == firstLsns.size() || firstLsns.get(segIdx) > targetLsn) {
segIdx = Math.max(0, segIdx - 1);
}
Path candidate = compressed.get(segIdx);
// Scan the candidate compressed segment to find our target and restore its lexical
ValueStoreWalRecord rec = scanSegmentForLsn(candidate, targetLsn);
assertThat(rec).withFailMessage("target LSN not found in compressed segment").isNotNull();
assertThat(rec.lexical()).isEqualTo(targetLex);
}
private static int lowerBound(List<Long> firstLsns, long lsn) {
int lo = 0, hi = firstLsns.size();
while (lo < hi) {
int mid = (lo + hi) >>> 1;
if (firstLsns.get(mid) <= lsn) {
lo = mid + 1;
} else {
hi = mid;
}
}
return lo;
}
private static List<Path> listCompressedSegments(Path walDir) throws IOException {
class Item {
final Path path;
final long firstId;
Item(Path path, long firstId) {
this.path = path;
this.firstId = firstId;
}
}
List<Item> items = new ArrayList<>();
try (var stream = Files.list(walDir)) {
stream.forEach(p -> {
Matcher m = SEGMENT_GZ.matcher(p.getFileName().toString());
if (m.matches()) {
long firstId = Long.parseLong(m.group(1));
items.add(new Item(p, firstId));
}
});
}
items.sort(Comparator.comparingLong(it -> it.firstId));
List<Path> segments = new ArrayList<>(items.size());
for (Item item : items) {
segments.add(item.path);
}
return segments;
}
private static long firstMintLsn(Path gz) throws IOException {
try (GZIPInputStream in = new GZIPInputStream(Files.newInputStream(gz))) {
// header frame
int headerLen = readIntLE(in);
if (headerLen <= 0) {
return -1;
}
byte[] header = in.readNBytes(headerLen);
if (header.length < headerLen)
return -1;
readIntLE(in); // header CRC
// first mint frame
int len = readIntLE(in);
byte[] json = in.readNBytes(len);
readIntLE(in); // crc
Parsed p = parseJson(json);
return p.lsn;
}
}
private static ValueStoreWalRecord scanSegmentForLsn(Path gz, long targetLsn) throws IOException {
try (GZIPInputStream in = new GZIPInputStream(Files.newInputStream(gz))) {
// skip header
int headerLen = readIntLE(in);
if (headerLen <= 0)
return null;
byte[] header = in.readNBytes(headerLen);
if (header.length < headerLen)
return null;
readIntLE(in);
// scan records
while (true) {
int length = readIntLE(in);
if (length <= 0)
return null;
byte[] jsonBytes = in.readNBytes(length);
if (jsonBytes.length < length)
return null;
int expected = readIntLE(in);
CRC32C crc = new CRC32C();
crc.update(jsonBytes, 0, jsonBytes.length);
if ((int) crc.getValue() != expected)
return null;
Parsed p = parseJson(jsonBytes);
if (p.type == 'M' && p.lsn == targetLsn) {
return new ValueStoreWalRecord(p.lsn, p.id, p.kind, p.lex, p.dt, p.lang, p.hash);
}
}
}
}
private static int readIntLE(InputStream in) throws IOException {
byte[] b = in.readNBytes(4);
if (b.length < 4)
return -1;
return ((b[0] & 0xFF)) | ((b[1] & 0xFF) << 8) | ((b[2] & 0xFF) << 16) | ((b[3] & 0xFF) << 24);
}
private static final JsonFactory JSON_FACTORY = new JsonFactory();
private static Parsed parseJson(byte[] jsonBytes) throws IOException {
Parsed parsed = new Parsed();
try (JsonParser jp = JSON_FACTORY.createParser(jsonBytes)) {
if (jp.nextToken() != JsonToken.START_OBJECT) {
return parsed;
}
while (jp.nextToken() != JsonToken.END_OBJECT) {
String field = jp.getCurrentName();
jp.nextToken();
if ("t".equals(field)) {
String t = jp.getValueAsString("");
parsed.type = t.isEmpty() ? '?' : t.charAt(0);
} else if ("lsn".equals(field)) {
parsed.lsn = jp.getValueAsLong(ValueStoreWAL.NO_LSN);
} else if ("id".equals(field)) {
parsed.id = jp.getValueAsInt(0);
} else if ("vk".equals(field)) {
String code = jp.getValueAsString("");
parsed.kind = ValueStoreWalValueKind.fromCode(code);
} else if ("lex".equals(field)) {
parsed.lex = jp.getValueAsString("");
} else if ("dt".equals(field)) {
parsed.dt = jp.getValueAsString("");
} else if ("lang".equals(field)) {
parsed.lang = jp.getValueAsString("");
} else if ("hash".equals(field)) {
parsed.hash = jp.getValueAsInt(0);
} else {
jp.skipChildren();
}
}
}
return parsed;
}
private static final class Parsed {
char type = '?';
long lsn = ValueStoreWAL.NO_LSN;
int id = 0;
ValueStoreWalValueKind kind = ValueStoreWalValueKind.NAMESPACE;
String lex = "";
String dt = "";
String lang = "";
int hash = 0;
}
private static final class Target {
final long lsn;
final String lex;
Target(long lsn, String lex) {
this.lsn = lsn;
this.lex = lex;
}
}
private static Target pickTargetFromCompressed(Path gz) throws IOException {
try (GZIPInputStream in = new GZIPInputStream(Files.newInputStream(gz))) {
// skip header
int headerLen = readIntLE(in);
if (headerLen <= 0)
return new Target(-1, "");
byte[] header = in.readNBytes(headerLen);
if (header.length < headerLen)
return new Target(-1, "");
readIntLE(in);
// read a couple of mint records and pick the second one
// first mint
int len1 = readIntLE(in);
byte[] j1 = in.readNBytes(len1);
readIntLE(in);
Parsed p1 = parseJson(j1);
// second mint (likely a user value)
int len2 = readIntLE(in);
byte[] j2 = in.readNBytes(len2);
readIntLE(in);
Parsed p2 = parseJson(j2);
Parsed chosen = p2.type == 'M' ? p2 : p1;
return new Target(chosen.lsn, chosen.lex);
}
}
}