ValueStoreWalSearch.java
/*******************************************************************************
* Copyright (c) 2025 Eclipse RDF4J contributors.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Distribution License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: BSD-3-Clause
*******************************************************************************/
package org.eclipse.rdf4j.sail.nativerdf.wal;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.regex.Pattern;
import java.util.zip.CRC32C;
import java.util.zip.GZIPInputStream;
import org.eclipse.rdf4j.model.Value;
import org.eclipse.rdf4j.model.impl.SimpleValueFactory;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;
/**
* Utility to search a ValueStore WAL for a specific minted value ID efficiently.
*
* Strategy: scan the first minted record in each segment to determine the best candidate segment (binary search on the
* first IDs), then scan only that segment to locate the requested ID.
*/
public final class ValueStoreWalSearch {
private static final Pattern SEGMENT_PATTERN = Pattern.compile("wal-(\\d+)\\.v1(?:\\.gz)?");
private final ValueStoreWalConfig config;
private final JsonFactory jsonFactory = new JsonFactory();
private volatile List<SegFirst> cachedSegments;
private ValueStoreWalSearch(ValueStoreWalConfig config) {
this.config = Objects.requireNonNull(config, "config");
}
public static ValueStoreWalSearch open(ValueStoreWalConfig config) {
return new ValueStoreWalSearch(config);
}
/**
* Find and reconstruct a {@link org.eclipse.rdf4j.model.Value} by its ValueStore id using WAL contents only.
*
* @return the reconstructed value if present; {@code null} otherwise
*/
public Value findValueById(int id) throws IOException {
if (!Files.isDirectory(config.walDirectory())) {
invalidateSegmentCache();
return null;
}
LookupOutcome firstAttempt = locateCandidate(id, false);
if (firstAttempt.value != null || !firstAttempt.retry) {
return firstAttempt.value;
}
LookupOutcome secondAttempt = locateCandidate(id, true);
return secondAttempt.value;
}
private static final class SegFirst {
final Path path;
final int firstId;
SegFirst(Path p, int id) {
this.path = p;
this.firstId = id;
}
}
private LookupOutcome locateCandidate(int targetId, boolean forceRefresh) throws IOException {
List<SegFirst> segments = loadSegments(forceRefresh);
if (segments.isEmpty()) {
return LookupOutcome.miss(!forceRefresh);
}
SegFirst candidate = selectSegment(segments, targetId);
if (candidate == null) {
return LookupOutcome.miss(!forceRefresh);
}
Optional<Value> value;
try {
value = scanSegmentForId(candidate.path, targetId);
} catch (NoSuchFileException missingSegment) {
invalidateSegmentCache();
return LookupOutcome.miss(!forceRefresh);
}
if (value.isPresent()) {
return LookupOutcome.hit(value.get());
}
return LookupOutcome.miss(!forceRefresh);
}
private List<SegFirst> loadSegments(boolean forceRefresh) throws IOException {
if (forceRefresh) {
invalidateSegmentCache();
}
List<SegFirst> snapshot = cachedSegments;
if (snapshot != null) {
return snapshot;
}
synchronized (this) {
snapshot = cachedSegments;
if (snapshot == null) {
snapshot = readSegmentsFromDisk();
cachedSegments = snapshot;
}
return snapshot;
}
}
private List<SegFirst> readSegmentsFromDisk() throws IOException {
if (!Files.isDirectory(config.walDirectory())) {
return List.of();
}
List<SegFirst> segments = new ArrayList<>();
try (var stream = Files.list(config.walDirectory())) {
stream.forEach(p -> {
var m = SEGMENT_PATTERN.matcher(p.getFileName().toString());
if (m.matches()) {
long firstId1 = Long.parseLong(m.group(1));
if (firstId1 >= Integer.MIN_VALUE && firstId1 <= Integer.MAX_VALUE) {
segments.add(new SegFirst(p, (int) firstId1));
}
}
});
}
return List.copyOf(segments);
}
private SegFirst selectSegment(List<SegFirst> segments, int targetId) {
SegFirst best = null;
for (SegFirst segment : segments) {
if (segment.firstId > targetId) {
continue;
}
if (best == null || segment.firstId > best.firstId) {
best = segment;
}
}
return best;
}
private void invalidateSegmentCache() {
cachedSegments = null;
}
private static final class LookupOutcome {
final Value value;
final boolean retry;
private LookupOutcome(Value value, boolean retry) {
this.value = value;
this.retry = retry;
}
static LookupOutcome hit(Value value) {
return new LookupOutcome(value, false);
}
static LookupOutcome miss(boolean retry) {
return new LookupOutcome(null, retry);
}
}
private Optional<Value> scanSegmentForId(Path segment, int targetId) throws IOException {
if (segment.getFileName().toString().endsWith(".gz")) {
try (GZIPInputStream in = new GZIPInputStream(Files.newInputStream(segment))) {
while (true) {
int length = readIntLE(in);
if (length == -1)
return Optional.empty();
if (length <= 0 || (long) length > ValueStoreWAL.MAX_FRAME_BYTES)
return Optional.empty();
byte[] data = in.readNBytes(length);
if (data.length < length)
return Optional.empty();
int expectedCrc = readIntLE(in);
CRC32C crc32c = new CRC32C();
crc32c.update(data, 0, data.length);
if ((int) crc32c.getValue() != expectedCrc)
return Optional.empty();
Parsed p = parseJson(data);
if (p.type == 'M' && p.id == targetId) {
Value value = toValue(p);
if (value != null) {
return Optional.of(value);
}
}
}
}
}
try (FileChannel ch = FileChannel.open(segment, StandardOpenOption.READ)) {
ByteBuffer header = ByteBuffer.allocate(4).order(ByteOrder.LITTLE_ENDIAN);
while (true) {
header.clear();
int r = ch.read(header);
if (r == -1)
return Optional.empty();
if (r < 4)
return Optional.empty();
header.flip();
int length = header.getInt();
if (length <= 0 || (long) length > ValueStoreWAL.MAX_FRAME_BYTES)
return Optional.empty();
byte[] data = new byte[length];
ByteBuffer dataBuf = ByteBuffer.wrap(data);
int total = 0;
while (total < length) {
int n = ch.read(dataBuf);
if (n < 0)
return Optional.empty();
total += n;
}
ByteBuffer crcBuf = ByteBuffer.allocate(4).order(ByteOrder.LITTLE_ENDIAN);
int crcRead = ch.read(crcBuf);
if (crcRead < 4)
return Optional.empty();
crcBuf.flip();
int expectedCrc = crcBuf.getInt();
CRC32C crc32c = new CRC32C();
crc32c.update(data, 0, data.length);
if ((int) crc32c.getValue() != expectedCrc)
return Optional.empty();
Parsed p = parseJson(data);
if (p.type == 'M' && p.id == targetId) {
Value value = toValue(p);
if (value != null) {
return Optional.of(value);
}
}
}
}
}
private int readIntLE(InputStream in) throws IOException {
byte[] b = in.readNBytes(4);
if (b.length < 4)
return -1;
return (b[0] & 0xFF) | ((b[1] & 0xFF) << 8) | ((b[2] & 0xFF) << 16) | ((b[3] & 0xFF) << 24);
}
private Parsed parseJson(byte[] jsonBytes) throws IOException {
Parsed parsed = new Parsed();
try (JsonParser jp = jsonFactory.createParser(jsonBytes)) {
if (jp.nextToken() != JsonToken.START_OBJECT) {
return parsed;
}
while (jp.nextToken() != JsonToken.END_OBJECT) {
String field = jp.getCurrentName();
jp.nextToken();
if ("t".equals(field)) {
String t = jp.getValueAsString("");
parsed.type = t.isEmpty() ? '?' : t.charAt(0);
} else if ("lsn".equals(field)) {
parsed.lsn = jp.getValueAsLong(ValueStoreWAL.NO_LSN);
} else if ("id".equals(field)) {
parsed.id = jp.getValueAsInt(0);
} else if ("vk".equals(field)) {
String code = jp.getValueAsString("");
parsed.kind = ValueStoreWalValueKind.fromCode(code);
} else if ("lex".equals(field)) {
parsed.lex = jp.getValueAsString("");
} else if ("dt".equals(field)) {
parsed.dt = jp.getValueAsString("");
} else if ("lang".equals(field)) {
parsed.lang = jp.getValueAsString("");
} else if ("hash".equals(field)) {
parsed.hash = jp.getValueAsInt(0);
} else {
jp.skipChildren();
}
}
}
return parsed;
}
private Value toValue(Parsed p) {
var vf = SimpleValueFactory.getInstance();
switch (p.kind) {
case IRI:
return vf.createIRI(p.lex);
case BNODE:
return vf.createBNode(p.lex);
case LITERAL:
if (p.lang != null && !p.lang.isEmpty())
return vf.createLiteral(p.lex, p.lang);
if (p.dt != null && !p.dt.isEmpty())
return vf.createLiteral(p.lex, vf.createIRI(p.dt));
return vf.createLiteral(p.lex);
default:
return null;
}
}
private static final class Parsed {
char type = '?';
long lsn = ValueStoreWAL.NO_LSN;
int id = 0;
ValueStoreWalValueKind kind = ValueStoreWalValueKind.NAMESPACE;
String lex = "";
String dt = "";
String lang = "";
int hash = 0;
}
}