ValueStoreWAL.java
/*******************************************************************************
* Copyright (c) 2025 Eclipse RDF4J contributors.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Distribution License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: BSD-3-Clause
*******************************************************************************/
package org.eclipse.rdf4j.sail.nativerdf.wal;
import java.io.BufferedInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.Channels;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.time.Instant;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.zip.CRC32;
import java.util.zip.CRC32C;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;
/**
* Write-ahead log (WAL) for the ValueStore. The WAL records minted values in append-only segments so they can be
* recovered or searched independently from the on-disk ValueStore files. This class is thread-safe for concurrent
* producers and uses a background writer thread to serialize and fsync according to the configured
* {@link ValueStoreWalConfig.SyncPolicy}.
*/
public final class ValueStoreWAL implements AutoCloseable {
private static final Logger logger = LoggerFactory.getLogger(ValueStoreWAL.class);
public BlockingQueue<ValueStoreWalRecord> getQueue() {
var queue = this.queue;
if (queue != null) {
return queue;
}
synchronized (this) {
queue = this.queue;
if (queue == null) {
queue = new ArrayBlockingQueue<>(config.queueCapacity());
this.queue = queue;
}
return queue;
}
}
@FunctionalInterface
interface FileChannelOpener {
FileChannel open(Path path, OpenOption... options) throws IOException;
}
private static final FileChannelOpener DEFAULT_CHANNEL_OPENER = FileChannel::open;
private static volatile FileChannelOpener channelOpener = DEFAULT_CHANNEL_OPENER;
public static final long NO_LSN = -1L;
static final Pattern SEGMENT_PATTERN = Pattern.compile("wal-(\\d+)\\.v1(?:\\.gz)?");
public static final int MAX_FRAME_BYTES = 512 * 1024 * 1024; // 512 MiB safety cap
private final ValueStoreWalConfig config;
private volatile BlockingQueue<ValueStoreWalRecord> queue;
private final AtomicLong nextLsn = new AtomicLong();
private final AtomicLong lastAppendedLsn = new AtomicLong(NO_LSN);
private final AtomicLong lastForcedLsn = new AtomicLong(NO_LSN);
private final AtomicLong requestedForceLsn = new AtomicLong(NO_LSN);
private final Object ackMonitor = new Object();
private final LogWriter logWriter;
private final Thread writerThread;
private volatile boolean closed;
private volatile Throwable writerFailure;
// Reset/purge coordination
private volatile boolean purgeRequested;
private final Object purgeMonitor = new Object();
private volatile boolean purgeInProgress;
private final FileChannel lockChannel;
private final FileLock directoryLock;
private final boolean initialSegmentsPresent;
private final int initialMaxSegmentSeq;
static void setChannelOpenerForTesting(FileChannelOpener opener) {
channelOpener = opener != null ? opener : DEFAULT_CHANNEL_OPENER;
}
static void resetChannelOpenerForTesting() {
channelOpener = DEFAULT_CHANNEL_OPENER;
}
private static FileChannel openWalChannel(Path path, OpenOption... options) throws IOException {
return channelOpener.open(path, options);
}
private ValueStoreWAL(ValueStoreWalConfig config) throws IOException {
this.config = Objects.requireNonNull(config, "config");
if (!Files.isDirectory(config.walDirectory())) {
Files.createDirectories(config.walDirectory());
}
Path lockFile = config.walDirectory().resolve("lock");
lockChannel = FileChannel.open(lockFile, StandardOpenOption.CREATE, StandardOpenOption.WRITE);
try {
directoryLock = lockChannel.tryLock();
} catch (IOException e) {
lockChannel.close();
throw e;
}
if (directoryLock == null) {
throw new IOException("WAL directory is already locked: " + config.walDirectory());
}
DirectoryState state = analyzeDirectory(config.walDirectory());
this.initialSegmentsPresent = state.hasSegments;
this.initialMaxSegmentSeq = state.maxSequence;
// Seed next LSN from existing WAL, if any, to ensure monotonic LSNs across restarts
if (initialSegmentsPresent) {
try (ValueStoreWalReader reader = ValueStoreWalReader.open(config)) {
var it = reader.iterator();
while (it.hasNext()) {
it.next();
}
long last = reader.lastValidLsn();
if (last > NO_LSN) {
nextLsn.set(last);
}
}
}
this.logWriter = new LogWriter(initialMaxSegmentSeq);
this.writerThread = new Thread(logWriter, "ValueStoreWalWriter-" + config.storeUuid());
this.writerThread.setDaemon(true);
this.writerThread.start();
}
/**
* Open a ValueStore WAL for the provided configuration. The WAL directory is created if it does not already exist.
* If existing segments are detected, the next LSN is seeded from the last valid record to ensure monotonicity
* across restarts.
*/
public static ValueStoreWAL open(ValueStoreWalConfig config) throws IOException {
return new ValueStoreWAL(config);
}
public ValueStoreWalConfig config() {
return config;
}
/**
* Append a minted value record to the WAL.
*
* @param id the ValueStore internal id
* @param kind the kind of value (IRI, BNODE, LITERAL, NAMESPACE)
* @param lexical the lexical form (may be empty but never null)
* @param datatype the datatype IRI string for literals, otherwise empty
* @param language the language tag for literals, otherwise empty
* @param hash a hash of the underlying serialized value
* @return the log sequence number (LSN) assigned to the record
*/
public long logMint(int id, ValueStoreWalValueKind kind, String lexical, String datatype, String language, int hash)
throws IOException {
ensureOpen();
long lsn = nextLsn.incrementAndGet();
ValueStoreWalRecord record = new ValueStoreWalRecord(lsn, id, kind, lexical, datatype, language, hash);
enqueue(record);
return lsn;
}
/**
* Block until the given LSN is durably forced to disk according to the configured sync policy. This is a no-op when
* {@code lsn <= NO_LSN} or after the WAL is closed.
*/
public void awaitDurable(long lsn) throws InterruptedException, IOException {
if (lsn <= NO_LSN || closed) {
return;
}
ensureOpen();
if (lastForcedLsn.get() >= lsn) {
return;
}
requestForce(lsn);
// fsync is slow, so when using the INTERVAL sync policy we won't wait for fsync to finish
if (config.syncPolicy() == ValueStoreWalConfig.SyncPolicy.INTERVAL) {
return;
}
synchronized (ackMonitor) {
while (lastForcedLsn.get() < lsn && writerFailure == null && !closed) {
ackMonitor.wait(TimeUnit.MILLISECONDS.toMillis(10));
}
}
if (writerFailure != null) {
throw propagate(writerFailure);
}
}
/**
* Returns {@code true} if WAL segments were already present in the directory when this WAL was opened.
*/
public boolean hasInitialSegments() {
return initialSegmentsPresent;
}
/**
* Returns {@code true} once {@link #close()} has been invoked and the writer thread has terminated.
*/
public boolean isClosed() {
return closed;
}
/**
* Purges all existing WAL segments from the WAL directory. This is used when the associated ValueStore is cleared,
* to ensure that a subsequent WAL recovery cannot resurrect deleted values.
* <p>
* The purge is coordinated with the writer thread: the current segment (if any) is closed before files are deleted,
* and the writer is reset to create a fresh segment on the next append.
*/
/**
* Purge all WAL segments from the WAL directory. Coordinated with the writer thread to close the current segment
* before deletion and reset to a fresh segment after purge completes.
*/
public void purgeAllSegments() throws IOException {
ensureOpen();
// Signal the writer to perform a coordinated purge and wait for completion
synchronized (purgeMonitor) {
purgeRequested = true;
purgeInProgress = true;
purgeMonitor.notifyAll();
long deadline = System.nanoTime() + TimeUnit.SECONDS.toNanos(10);
while (purgeInProgress && writerFailure == null && !closed) {
long remaining = deadline - System.nanoTime();
if (remaining <= 0) {
throw new IOException("Timed out waiting for WAL purge to complete");
}
try {
purgeMonitor.wait(Math.min(TimeUnit.NANOSECONDS.toMillis(remaining), 50));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException("Interrupted while waiting for WAL purge", e);
}
}
if (writerFailure != null) {
throw propagate(writerFailure);
}
if (closed) {
throw new IOException("WAL is closed");
}
}
}
@Override
public void close() throws IOException {
if (closed) {
return;
}
closed = true;
logWriter.shutdown();
try {
writerThread.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
try {
logWriter.close();
} finally {
try {
if (directoryLock != null && directoryLock.isValid()) {
directoryLock.release();
}
} finally {
if (lockChannel != null && lockChannel.isOpen()) {
lockChannel.close();
}
}
}
if (writerFailure != null) {
throw propagate(writerFailure);
}
}
private void requestForce(long lsn) {
requestedForceLsn.updateAndGet(prev -> Math.max(prev, lsn));
}
private void enqueue(ValueStoreWalRecord record) throws IOException {
boolean offered = false;
int spins = 0;
while (!offered) {
offered = getQueue().offer(record);
if (!offered) {
if (spins < 100) {
Thread.onSpinWait();
spins++;
} else {
try {
getQueue().put(record);
offered = true;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException("Interrupted while enqueueing WAL record", e);
}
}
}
}
}
private void ensureOpen() throws IOException {
if (closed) {
throw new IOException("WAL is closed");
}
if (writerFailure != null) {
throw propagate(writerFailure);
}
}
private IOException propagate(Throwable throwable) {
if (throwable instanceof IOException) {
return (IOException) throwable;
}
return new IOException("WAL writer failure", throwable);
}
private DirectoryState analyzeDirectory(Path walDirectory) throws IOException {
if (!Files.isDirectory(walDirectory)) {
return new DirectoryState(false, 0);
}
int maxSequence = 0;
boolean hasSegments = false;
List<Path> paths;
try (var stream = Files.list(walDirectory)) {
paths = stream.collect(Collectors.toList());
}
for (Path path : paths) {
Matcher matcher = SEGMENT_PATTERN.matcher(path.getFileName().toString());
if (matcher.matches()) {
hasSegments = true;
try {
int segment = readSegmentSequence(path);
if (segment > maxSequence) {
maxSequence = segment;
}
} catch (IOException e) {
logger.warn("Failed to read WAL segment header for {}", path.getFileName(), e);
}
}
}
return new DirectoryState(hasSegments, maxSequence);
}
static int readSegmentSequence(Path path) throws IOException {
boolean compressed = path.getFileName().toString().endsWith(".gz");
try (var rawIn = new BufferedInputStream(Files.newInputStream(path));
InputStream in = compressed ? new GZIPInputStream(rawIn) : rawIn) {
byte[] lenBytes = in.readNBytes(4);
if (lenBytes.length < 4) {
return 0;
}
ByteBuffer lenBuf = ByteBuffer.wrap(lenBytes).order(ByteOrder.LITTLE_ENDIAN);
int frameLen = lenBuf.getInt();
if (frameLen <= 0) {
return 0;
}
byte[] jsonBytes = in.readNBytes(frameLen);
if (jsonBytes.length < frameLen) {
return 0;
}
// skip CRC
in.readNBytes(4);
JsonFactory factory = new JsonFactory();
try (JsonParser parser = factory.createParser(jsonBytes)) {
while (parser.nextToken() != JsonToken.END_OBJECT) {
if (parser.currentToken() == JsonToken.FIELD_NAME) {
String field = parser.getCurrentName();
parser.nextToken();
if ("segment".equals(field)) {
return parser.getIntValue();
}
}
}
}
}
return 0;
}
private static final class DirectoryState {
final boolean hasSegments;
final int maxSequence;
DirectoryState(boolean hasSegments, int maxSequence) {
this.hasSegments = hasSegments;
this.maxSequence = maxSequence;
}
}
private final class LogWriter implements Runnable {
private final CRC32C crc32c = new CRC32C();
private final int batchSize;
private FileChannel segmentChannel;
private Path segmentPath;
private int segmentSequence;
private long segmentBytes;
private int segmentLastMintedId;
private int segmentFirstMintedId;
private volatile ByteBuffer ioBuffer;
// Reuse JSON infrastructure to reduce allocations per record
private final JsonFactory jsonFactory = new JsonFactory();
private final ReusableByteArrayOutputStream jsonBuffer = new ReusableByteArrayOutputStream(256);
private volatile boolean running = true;
LogWriter(int existingSegments) {
this.segmentSequence = existingSegments;
this.batchSize = config.batchBufferBytes();
this.segmentChannel = null;
this.segmentPath = null;
this.segmentBytes = 0L;
this.segmentLastMintedId = 0;
this.segmentFirstMintedId = 0;
}
private ByteBuffer getIoBuffer() {
if (ioBuffer == null) {
synchronized (this) {
if (ioBuffer == null) {
ioBuffer = ByteBuffer.allocateDirect(batchSize).order(ByteOrder.LITTLE_ENDIAN);
}
}
}
return ioBuffer;
}
@Override
public void run() {
try {
long lastSyncCheck = System.nanoTime();
while (running || !getQueue().isEmpty()) {
// Handle purge requests promptly
if (purgeRequested) {
performPurgeInternal();
}
ValueStoreWalRecord record;
try {
record = getQueue().poll(config.idlePollInterval().toNanos(), TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
if (!running) {
break;
}
continue;
}
if (record != null) {
append(record);
}
boolean pendingForce = requestedForceLsn.get() > NO_LSN
&& requestedForceLsn.get() > lastForcedLsn.get();
boolean syncIntervalElapsed = config.syncPolicy() == ValueStoreWalConfig.SyncPolicy.INTERVAL
&& System.nanoTime() - lastSyncCheck >= config.syncInterval().toNanos();
if (record == null) {
if (pendingForce || config.syncPolicy() == ValueStoreWalConfig.SyncPolicy.ALWAYS
|| syncIntervalElapsed) {
flushAndForce();
lastSyncCheck = System.nanoTime();
}
} else if (config.syncPolicy() == ValueStoreWalConfig.SyncPolicy.ALWAYS) {
flushAndForce();
lastSyncCheck = System.nanoTime();
} else if (pendingForce && requestedForceLsn.get() <= lastAppendedLsn.get()) {
flushAndForce();
lastSyncCheck = System.nanoTime();
}
}
flushAndForce();
} catch (Throwable t) {
writerFailure = t;
} finally {
try {
flushAndForce();
} catch (Throwable t) {
writerFailure = t;
}
closeQuietly(segmentChannel);
synchronized (ackMonitor) {
ackMonitor.notifyAll();
}
}
}
void shutdown() {
running = false;
}
void close() throws IOException {
closeQuietly(segmentChannel);
}
private void ensureSegmentWritable() throws IOException {
if (segmentPath == null || segmentChannel == null) {
return;
}
if (Files.exists(segmentPath)) {
return;
}
if (config.syncPolicy() == ValueStoreWalConfig.SyncPolicy.ALWAYS) {
throw new IOException("Current WAL segment has been removed: " + segmentPath);
}
logger.error("Detected deletion of active WAL segment {}; continuing with a new segment",
segmentPath.getFileName());
ByteBuffer pending = null;
if (getIoBuffer().position() > 0) {
ByteBuffer duplicate = getIoBuffer().duplicate();
duplicate.flip();
if (duplicate.hasRemaining()) {
pending = ByteBuffer.allocate(duplicate.remaining());
pending.put(duplicate);
pending.flip();
}
}
getIoBuffer().clear();
closeQuietly(segmentChannel);
int previousFirstId = segmentFirstMintedId;
int previousLastId = segmentLastMintedId;
segmentChannel = null;
segmentPath = null;
segmentBytes = 0L;
segmentFirstMintedId = 0;
if (previousFirstId > 0) {
startSegment(previousFirstId, false);
segmentLastMintedId = previousLastId;
if (pending != null) {
while (pending.hasRemaining()) {
segmentChannel.write(pending);
}
segmentBytes += pending.limit();
}
} else {
segmentLastMintedId = previousLastId;
}
}
private void append(ValueStoreWalRecord record) throws IOException {
ensureSegmentWritable();
if (segmentChannel == null) {
startSegment(record.id());
}
// Encode JSON for the record into reusable buffer without copying
int jsonLength = encodeIntoReusableBuffer(record);
int framedLength = 4 + jsonLength + 4;
if (segmentBytes + framedLength > config.maxSegmentBytes()) {
flushBuffer();
finishCurrentSegment();
startSegment(record.id());
}
// Write header length (4 bytes)
if (getIoBuffer().remaining() < 4) {
flushBuffer();
}
getIoBuffer().putInt(jsonLength);
// Write JSON payload in chunks to avoid BufferOverflowException
int offset = 0;
byte[] jsonBytes = jsonBuffer.buffer();
while (offset < jsonLength) {
if (getIoBuffer().remaining() == 0) {
flushBuffer();
}
int toWrite = Math.min(getIoBuffer().remaining(), jsonLength - offset);
getIoBuffer().put(jsonBytes, offset, toWrite);
offset += toWrite;
}
// Write CRC (4 bytes)
int crc = checksum(jsonBytes, jsonLength);
if (getIoBuffer().remaining() < 4) {
flushBuffer();
}
getIoBuffer().putInt(crc);
segmentBytes += framedLength;
if (record.id() > segmentLastMintedId) {
segmentLastMintedId = record.id();
}
lastAppendedLsn.set(record.lsn());
}
private void performPurgeInternal() {
try {
// Ensure any buffered data is not left around; close current segment
closeQuietly(segmentChannel);
// Drop any frames that were queued prior to purge using dequeue semantics to ensure
// any producers blocked in queue.put() are signalled via notFull.
while (getQueue().poll() != null) {
// intentionally empty: draining via poll() triggers the normal signalling path
}
getIoBuffer().clear();
// Delete all existing segments from disk
deleteAllSegments();
// Reset writer state so the next append starts a fresh segment
segmentPath = null;
segmentChannel = null;
segmentBytes = 0L;
segmentFirstMintedId = 0;
segmentLastMintedId = 0;
} catch (IOException e) {
writerFailure = e;
} finally {
purgeRequested = false;
synchronized (purgeMonitor) {
purgeInProgress = false;
purgeMonitor.notifyAll();
}
}
}
private void flushAndForce() throws IOException {
flushAndForce(false);
}
private void flushAndForce(boolean forceEvenForInterval) throws IOException {
if (lastAppendedLsn.get() <= lastForcedLsn.get()) {
return;
}
flushBuffer();
if (segmentChannel != null && segmentChannel.isOpen()) {
try {
boolean shouldForce = forceEvenForInterval
|| config.syncPolicy() != ValueStoreWalConfig.SyncPolicy.INTERVAL;
if (shouldForce) {
segmentChannel.force(false);
if (segmentPath != null) {
ValueStoreWalDebug.fireForceEvent(segmentPath);
}
}
} catch (ClosedChannelException e) {
// ignore; channel already closed during shutdown
}
}
long forced = lastAppendedLsn.get();
lastForcedLsn.set(forced);
// Clear pending force request without dropping newer requests that may arrive concurrently.
// Use CAS to only clear if the observed value is still <= forced; if another thread published
// a higher LSN in the meantime, we must not overwrite it with NO_LSN.
long cur = requestedForceLsn.get();
while (cur != NO_LSN && cur <= forced) {
if (requestedForceLsn.compareAndSet(cur, NO_LSN)) {
break;
}
cur = requestedForceLsn.get();
}
synchronized (ackMonitor) {
ackMonitor.notifyAll();
}
}
private void flushBuffer() throws IOException {
ensureSegmentWritable();
if (segmentChannel == null) {
getIoBuffer().clear();
return;
}
getIoBuffer().flip();
while (getIoBuffer().hasRemaining()) {
segmentChannel.write(getIoBuffer());
}
getIoBuffer().clear();
}
private void finishCurrentSegment() throws IOException {
if (segmentChannel == null) {
return;
}
boolean forceInterval = config.syncPolicy() == ValueStoreWalConfig.SyncPolicy.INTERVAL;
flushAndForce(forceInterval);
int summaryLastId = segmentLastMintedId;
Path toCompress = segmentPath;
closeQuietly(segmentChannel);
segmentChannel = null;
segmentPath = null;
segmentBytes = 0L;
segmentFirstMintedId = 0;
segmentLastMintedId = 0;
if (toCompress != null) {
gzipAndDelete(toCompress, summaryLastId);
}
}
/**
* Rotate the current WAL segment. This is a small wrapper used by tests to ensure that rotation forces the
* previous segment to disk before closing it. New segments will be started lazily on the next append.
*/
@SuppressWarnings("unused")
private void rotateSegment() throws IOException {
finishCurrentSegment();
}
private void startSegment(int firstId) throws IOException {
startSegment(firstId, true);
}
private void startSegment(int firstId, boolean incrementSequence) throws IOException {
if (incrementSequence) {
segmentSequence++;
}
segmentPath = config.walDirectory().resolve(buildSegmentFileName(firstId));
if (Files.exists(segmentPath)) {
logger.warn("Overwriting existing WAL segment {}", segmentPath.getFileName());
}
segmentChannel = openWalChannel(segmentPath, StandardOpenOption.CREATE, StandardOpenOption.WRITE,
StandardOpenOption.TRUNCATE_EXISTING);
segmentBytes = 0L;
segmentFirstMintedId = firstId;
segmentLastMintedId = 0;
writeHeader(firstId);
}
private String buildSegmentFileName(int firstId) {
return "wal-" + firstId + ".v1";
}
private void gzipAndDelete(Path src, int lastMintedId) {
Path gz = src.resolveSibling(src.getFileName().toString() + ".gz");
long srcSize;
try {
srcSize = Files.size(src);
} catch (IOException e) {
// If we can't stat the file, don't attempt compression
logger.warn("Skipping compression of WAL segment {} because it is no longer accessible",
src.getFileName());
return;
}
int summaryFrameLength;
CRC32 crc32 = new CRC32();
try (var in = Files.newInputStream(src);
FileChannel gzChannel = openWalChannel(gz, StandardOpenOption.CREATE, StandardOpenOption.WRITE,
StandardOpenOption.TRUNCATE_EXISTING);
GZIPOutputStream gzOut = new GZIPOutputStream(Channels.newOutputStream(gzChannel))) {
byte[] buf = new byte[1 << 16];
int r;
while ((r = in.read(buf)) >= 0) {
gzOut.write(buf, 0, r);
crc32.update(buf, 0, r);
}
byte[] summaryFrame = buildSummaryFrame(lastMintedId, crc32.getValue());
summaryFrameLength = summaryFrame.length;
gzOut.write(summaryFrame);
gzOut.finish();
gzOut.flush();
gzChannel.force(false);
ValueStoreWalDebug.fireForceEvent(gz);
} catch (IOException e) {
// Compression failed: do not delete original; clean up partial gzip if present
logger.warn("Failed to compress WAL segment {}: {}", src.getFileName(), e.getMessage());
try {
Files.deleteIfExists(gz);
} catch (IOException ignore) {
}
return;
}
// Verify gzip contains full original data plus summary by reading back and counting bytes
long decompressedBytes = 0L;
byte[] verifyBuf = new byte[1 << 16];
try (var gin = new GZIPInputStream(Files.newInputStream(gz))) {
int r;
while ((r = gin.read(verifyBuf)) >= 0) {
decompressedBytes += r;
}
} catch (IOException e) {
logger.warn("Failed to verify compressed WAL segment {}: {}", gz.getFileName(), e.getMessage());
try {
Files.deleteIfExists(gz);
} catch (IOException ignore) {
}
return;
}
if (decompressedBytes != srcSize + summaryFrameLength) {
// Verification failed: keep original, remove corrupt gzip
try {
Files.deleteIfExists(gz);
} catch (IOException ignore) {
}
return;
}
try {
Files.deleteIfExists(src);
} catch (IOException e) {
logger.warn("Failed to delete WAL segment {} after compression: {}", src.getFileName(), e.getMessage());
}
}
private byte[] buildSummaryFrame(int lastMintedId, long crc32Value) throws IOException {
JsonFactory factory = new JsonFactory();
ByteArrayOutputStream baos = new ByteArrayOutputStream(128);
try (JsonGenerator gen = factory.createGenerator(baos)) {
gen.writeStartObject();
gen.writeStringField("t", "S");
gen.writeNumberField("lastId", lastMintedId);
gen.writeNumberField("crc32", crc32Value & 0xFFFFFFFFL);
gen.writeEndObject();
}
baos.write('\n');
byte[] jsonBytes = baos.toByteArray();
ByteBuffer buffer = ByteBuffer.allocate(4 + jsonBytes.length + 4).order(ByteOrder.LITTLE_ENDIAN);
buffer.putInt(jsonBytes.length);
buffer.put(jsonBytes);
int crc = checksum(jsonBytes);
buffer.putInt(crc);
buffer.flip();
byte[] framed = new byte[buffer.remaining()];
buffer.get(framed);
return framed;
}
private void writeHeader(int firstId) throws IOException {
JsonFactory factory = new JsonFactory();
ByteArrayOutputStream baos = new ByteArrayOutputStream(256);
try (JsonGenerator gen = factory.createGenerator(baos)) {
gen.writeStartObject();
gen.writeStringField("t", "V");
gen.writeNumberField("ver", 1);
gen.writeStringField("store", config.storeUuid());
gen.writeStringField("engine", "valuestore");
gen.writeNumberField("created", Instant.now().getEpochSecond());
gen.writeNumberField("segment", segmentSequence);
gen.writeNumberField("firstId", firstId);
gen.writeEndObject();
}
// NDJSON: newline-delimited JSON
baos.write('\n');
byte[] jsonBytes = baos.toByteArray();
ByteBuffer buffer = ByteBuffer.allocate(4 + jsonBytes.length + 4).order(ByteOrder.LITTLE_ENDIAN);
buffer.putInt(jsonBytes.length);
buffer.put(jsonBytes);
int crc = checksum(jsonBytes);
buffer.putInt(crc);
buffer.flip();
while (buffer.hasRemaining()) {
segmentChannel.write(buffer);
}
segmentBytes += buffer.limit();
}
private int checksum(byte[] data) {
return checksum(data, data.length);
}
private int checksum(byte[] data, int len) {
crc32c.reset();
crc32c.update(data, 0, len);
return (int) crc32c.getValue();
}
private int encodeIntoReusableBuffer(ValueStoreWalRecord record) throws IOException {
jsonBuffer.reset();
try (JsonGenerator gen = jsonFactory.createGenerator(jsonBuffer)) {
gen.writeStartObject();
gen.writeStringField("t", "M");
gen.writeNumberField("lsn", record.lsn());
gen.writeNumberField("id", record.id());
gen.writeStringField("vk", String.valueOf(record.valueKind().code()));
gen.writeStringField("lex", record.lexical() == null ? "" : record.lexical());
gen.writeStringField("dt", record.datatype() == null ? "" : record.datatype());
gen.writeStringField("lang", record.language() == null ? "" : record.language());
gen.writeNumberField("hash", record.hash());
gen.writeEndObject();
}
jsonBuffer.write('\n'); // NDJSON newline
return jsonBuffer.size();
}
private void closeQuietly(FileChannel channel) {
if (channel != null) {
try {
channel.close();
} catch (IOException ignore) {
// ignore
}
}
}
// Minimal extension to access internal buffer without copying
private final class ReusableByteArrayOutputStream extends ByteArrayOutputStream {
ReusableByteArrayOutputStream(int size) {
super(size);
}
byte[] buffer() {
return this.buf;
}
}
}
private void deleteAllSegments() throws IOException {
List<Path> toDelete;
try (var stream = Files.list(config.walDirectory())) {
toDelete = stream
.filter(Files::isRegularFile)
.filter(path -> {
String name = path.getFileName().toString();
return name.matches("wal-[0-9]+\\.v1") || name.matches("wal-[0-9]+\\.v1\\.gz");
})
.collect(Collectors.toList());
}
for (Path p : toDelete) {
try {
Files.deleteIfExists(p);
} catch (IOException e) {
logger.warn("Failed to delete WAL segment {}", p.getFileName(), e);
throw e;
}
}
}
}