ValueStoreWAL.java

/*******************************************************************************
 * Copyright (c) 2025 Eclipse RDF4J contributors.
 *
 * All rights reserved. This program and the accompanying materials
 * are made available under the terms of the Eclipse Distribution License v1.0
 * which accompanies this distribution, and is available at
 * http://www.eclipse.org/org/documents/edl-v10.php.
 *
 * SPDX-License-Identifier: BSD-3-Clause
 *******************************************************************************/
package org.eclipse.rdf4j.sail.nativerdf.wal;

import java.io.BufferedInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.Channels;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.time.Instant;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.zip.CRC32;
import java.util.zip.CRC32C;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;

/**
 * Write-ahead log (WAL) for the ValueStore. The WAL records minted values in append-only segments so they can be
 * recovered or searched independently from the on-disk ValueStore files. This class is thread-safe for concurrent
 * producers and uses a background writer thread to serialize and fsync according to the configured
 * {@link ValueStoreWalConfig.SyncPolicy}.
 */
public final class ValueStoreWAL implements AutoCloseable {

	private static final Logger logger = LoggerFactory.getLogger(ValueStoreWAL.class);

	public BlockingQueue<ValueStoreWalRecord> getQueue() {
		var queue = this.queue;
		if (queue != null) {
			return queue;
		}

		synchronized (this) {
			queue = this.queue;
			if (queue == null) {
				queue = new ArrayBlockingQueue<>(config.queueCapacity());
				this.queue = queue;
			}
			return queue;
		}
	}

	@FunctionalInterface
	interface FileChannelOpener {
		FileChannel open(Path path, OpenOption... options) throws IOException;
	}

	private static final FileChannelOpener DEFAULT_CHANNEL_OPENER = FileChannel::open;
	private static volatile FileChannelOpener channelOpener = DEFAULT_CHANNEL_OPENER;

	public static final long NO_LSN = -1L;

	static final Pattern SEGMENT_PATTERN = Pattern.compile("wal-(\\d+)\\.v1(?:\\.gz)?");
	public static final int MAX_FRAME_BYTES = 512 * 1024 * 1024; // 512 MiB safety cap

	private final ValueStoreWalConfig config;
	private volatile BlockingQueue<ValueStoreWalRecord> queue;
	private final AtomicLong nextLsn = new AtomicLong();
	private final AtomicLong lastAppendedLsn = new AtomicLong(NO_LSN);
	private final AtomicLong lastForcedLsn = new AtomicLong(NO_LSN);
	private final AtomicLong requestedForceLsn = new AtomicLong(NO_LSN);

	private final Object ackMonitor = new Object();

	private final LogWriter logWriter;
	private final Thread writerThread;

	private volatile boolean closed;
	private volatile Throwable writerFailure;

	// Reset/purge coordination
	private volatile boolean purgeRequested;
	private final Object purgeMonitor = new Object();
	private volatile boolean purgeInProgress;

	private final FileChannel lockChannel;
	private final FileLock directoryLock;

	private final boolean initialSegmentsPresent;
	private final int initialMaxSegmentSeq;

	static void setChannelOpenerForTesting(FileChannelOpener opener) {
		channelOpener = opener != null ? opener : DEFAULT_CHANNEL_OPENER;
	}

	static void resetChannelOpenerForTesting() {
		channelOpener = DEFAULT_CHANNEL_OPENER;
	}

	private static FileChannel openWalChannel(Path path, OpenOption... options) throws IOException {
		return channelOpener.open(path, options);
	}

	private ValueStoreWAL(ValueStoreWalConfig config) throws IOException {
		this.config = Objects.requireNonNull(config, "config");
		if (!Files.isDirectory(config.walDirectory())) {
			Files.createDirectories(config.walDirectory());
		}

		Path lockFile = config.walDirectory().resolve("lock");
		lockChannel = FileChannel.open(lockFile, StandardOpenOption.CREATE, StandardOpenOption.WRITE);
		try {
			directoryLock = lockChannel.tryLock();
		} catch (IOException e) {
			lockChannel.close();
			throw e;
		}
		if (directoryLock == null) {
			throw new IOException("WAL directory is already locked: " + config.walDirectory());
		}

		DirectoryState state = analyzeDirectory(config.walDirectory());
		this.initialSegmentsPresent = state.hasSegments;
		this.initialMaxSegmentSeq = state.maxSequence;
		// Seed next LSN from existing WAL, if any, to ensure monotonic LSNs across restarts
		if (initialSegmentsPresent) {
			try (ValueStoreWalReader reader = ValueStoreWalReader.open(config)) {
				var it = reader.iterator();
				while (it.hasNext()) {
					it.next();
				}
				long last = reader.lastValidLsn();
				if (last > NO_LSN) {
					nextLsn.set(last);
				}
			}
		}
		this.logWriter = new LogWriter(initialMaxSegmentSeq);
		this.writerThread = new Thread(logWriter, "ValueStoreWalWriter-" + config.storeUuid());
		this.writerThread.setDaemon(true);
		this.writerThread.start();
	}

	/**
	 * Open a ValueStore WAL for the provided configuration. The WAL directory is created if it does not already exist.
	 * If existing segments are detected, the next LSN is seeded from the last valid record to ensure monotonicity
	 * across restarts.
	 */
	public static ValueStoreWAL open(ValueStoreWalConfig config) throws IOException {
		return new ValueStoreWAL(config);
	}

	public ValueStoreWalConfig config() {
		return config;
	}

	/**
	 * Append a minted value record to the WAL.
	 *
	 * @param id       the ValueStore internal id
	 * @param kind     the kind of value (IRI, BNODE, LITERAL, NAMESPACE)
	 * @param lexical  the lexical form (may be empty but never null)
	 * @param datatype the datatype IRI string for literals, otherwise empty
	 * @param language the language tag for literals, otherwise empty
	 * @param hash     a hash of the underlying serialized value
	 * @return the log sequence number (LSN) assigned to the record
	 */
	public long logMint(int id, ValueStoreWalValueKind kind, String lexical, String datatype, String language, int hash)
			throws IOException {
		ensureOpen();
		long lsn = nextLsn.incrementAndGet();
		ValueStoreWalRecord record = new ValueStoreWalRecord(lsn, id, kind, lexical, datatype, language, hash);
		enqueue(record);
		return lsn;
	}

	/**
	 * Block until the given LSN is durably forced to disk according to the configured sync policy. This is a no-op when
	 * {@code lsn <= NO_LSN} or after the WAL is closed.
	 */
	public void awaitDurable(long lsn) throws InterruptedException, IOException {
		if (lsn <= NO_LSN || closed) {
			return;
		}
		ensureOpen();
		if (lastForcedLsn.get() >= lsn) {
			return;
		}
		requestForce(lsn);

		// fsync is slow, so when using the INTERVAL sync policy we won't wait for fsync to finish
		if (config.syncPolicy() == ValueStoreWalConfig.SyncPolicy.INTERVAL) {
			return;
		}
		synchronized (ackMonitor) {
			while (lastForcedLsn.get() < lsn && writerFailure == null && !closed) {
				ackMonitor.wait(TimeUnit.MILLISECONDS.toMillis(10));
			}
		}
		if (writerFailure != null) {
			throw propagate(writerFailure);
		}
	}

	/**
	 * Returns {@code true} if WAL segments were already present in the directory when this WAL was opened.
	 */
	public boolean hasInitialSegments() {
		return initialSegmentsPresent;
	}

	/**
	 * Returns {@code true} once {@link #close()} has been invoked and the writer thread has terminated.
	 */
	public boolean isClosed() {
		return closed;
	}

	/**
	 * Purges all existing WAL segments from the WAL directory. This is used when the associated ValueStore is cleared,
	 * to ensure that a subsequent WAL recovery cannot resurrect deleted values.
	 * <p>
	 * The purge is coordinated with the writer thread: the current segment (if any) is closed before files are deleted,
	 * and the writer is reset to create a fresh segment on the next append.
	 */
	/**
	 * Purge all WAL segments from the WAL directory. Coordinated with the writer thread to close the current segment
	 * before deletion and reset to a fresh segment after purge completes.
	 */
	public void purgeAllSegments() throws IOException {
		ensureOpen();
		// Signal the writer to perform a coordinated purge and wait for completion
		synchronized (purgeMonitor) {
			purgeRequested = true;
			purgeInProgress = true;
			purgeMonitor.notifyAll();
			long deadline = System.nanoTime() + TimeUnit.SECONDS.toNanos(10);
			while (purgeInProgress && writerFailure == null && !closed) {
				long remaining = deadline - System.nanoTime();
				if (remaining <= 0) {
					throw new IOException("Timed out waiting for WAL purge to complete");
				}
				try {
					purgeMonitor.wait(Math.min(TimeUnit.NANOSECONDS.toMillis(remaining), 50));
				} catch (InterruptedException e) {
					Thread.currentThread().interrupt();
					throw new IOException("Interrupted while waiting for WAL purge", e);
				}
			}
			if (writerFailure != null) {
				throw propagate(writerFailure);
			}
			if (closed) {
				throw new IOException("WAL is closed");
			}
		}
	}

	@Override
	public void close() throws IOException {
		if (closed) {
			return;
		}
		closed = true;
		logWriter.shutdown();
		try {
			writerThread.join();
		} catch (InterruptedException e) {
			Thread.currentThread().interrupt();
		}
		try {
			logWriter.close();
		} finally {
			try {
				if (directoryLock != null && directoryLock.isValid()) {
					directoryLock.release();
				}
			} finally {
				if (lockChannel != null && lockChannel.isOpen()) {
					lockChannel.close();
				}
			}
		}
		if (writerFailure != null) {
			throw propagate(writerFailure);
		}
	}

	private void requestForce(long lsn) {
		requestedForceLsn.updateAndGet(prev -> Math.max(prev, lsn));
	}

	private void enqueue(ValueStoreWalRecord record) throws IOException {
		boolean offered = false;
		int spins = 0;
		while (!offered) {
			offered = getQueue().offer(record);
			if (!offered) {
				if (spins < 100) {
					Thread.onSpinWait();
					spins++;
				} else {
					try {
						getQueue().put(record);
						offered = true;
					} catch (InterruptedException e) {
						Thread.currentThread().interrupt();
						throw new IOException("Interrupted while enqueueing WAL record", e);
					}
				}
			}
		}
	}

	private void ensureOpen() throws IOException {
		if (closed) {
			throw new IOException("WAL is closed");
		}
		if (writerFailure != null) {
			throw propagate(writerFailure);
		}
	}

	private IOException propagate(Throwable throwable) {
		if (throwable instanceof IOException) {
			return (IOException) throwable;
		}
		return new IOException("WAL writer failure", throwable);
	}

	private DirectoryState analyzeDirectory(Path walDirectory) throws IOException {
		if (!Files.isDirectory(walDirectory)) {
			return new DirectoryState(false, 0);
		}
		int maxSequence = 0;
		boolean hasSegments = false;
		List<Path> paths;
		try (var stream = Files.list(walDirectory)) {
			paths = stream.collect(Collectors.toList());
		}
		for (Path path : paths) {
			Matcher matcher = SEGMENT_PATTERN.matcher(path.getFileName().toString());
			if (matcher.matches()) {
				hasSegments = true;
				try {
					int segment = readSegmentSequence(path);
					if (segment > maxSequence) {
						maxSequence = segment;
					}
				} catch (IOException e) {
					logger.warn("Failed to read WAL segment header for {}", path.getFileName(), e);
				}
			}
		}
		return new DirectoryState(hasSegments, maxSequence);
	}

	static int readSegmentSequence(Path path) throws IOException {
		boolean compressed = path.getFileName().toString().endsWith(".gz");
		try (var rawIn = new BufferedInputStream(Files.newInputStream(path));
				InputStream in = compressed ? new GZIPInputStream(rawIn) : rawIn) {
			byte[] lenBytes = in.readNBytes(4);
			if (lenBytes.length < 4) {
				return 0;
			}
			ByteBuffer lenBuf = ByteBuffer.wrap(lenBytes).order(ByteOrder.LITTLE_ENDIAN);
			int frameLen = lenBuf.getInt();
			if (frameLen <= 0) {
				return 0;
			}
			byte[] jsonBytes = in.readNBytes(frameLen);
			if (jsonBytes.length < frameLen) {
				return 0;
			}
			// skip CRC
			in.readNBytes(4);
			JsonFactory factory = new JsonFactory();
			try (JsonParser parser = factory.createParser(jsonBytes)) {
				while (parser.nextToken() != JsonToken.END_OBJECT) {
					if (parser.currentToken() == JsonToken.FIELD_NAME) {
						String field = parser.getCurrentName();
						parser.nextToken();
						if ("segment".equals(field)) {
							return parser.getIntValue();
						}
					}
				}
			}
		}
		return 0;
	}

	private static final class DirectoryState {
		final boolean hasSegments;
		final int maxSequence;

		DirectoryState(boolean hasSegments, int maxSequence) {
			this.hasSegments = hasSegments;
			this.maxSequence = maxSequence;
		}
	}

	private final class LogWriter implements Runnable {

		private final CRC32C crc32c = new CRC32C();
		private final int batchSize;
		private FileChannel segmentChannel;
		private Path segmentPath;
		private int segmentSequence;
		private long segmentBytes;
		private int segmentLastMintedId;
		private int segmentFirstMintedId;
		private volatile ByteBuffer ioBuffer;
		// Reuse JSON infrastructure to reduce allocations per record
		private final JsonFactory jsonFactory = new JsonFactory();
		private final ReusableByteArrayOutputStream jsonBuffer = new ReusableByteArrayOutputStream(256);
		private volatile boolean running = true;

		LogWriter(int existingSegments) {
			this.segmentSequence = existingSegments;
			this.batchSize = config.batchBufferBytes();
			this.segmentChannel = null;
			this.segmentPath = null;
			this.segmentBytes = 0L;
			this.segmentLastMintedId = 0;
			this.segmentFirstMintedId = 0;
		}

		private ByteBuffer getIoBuffer() {
			if (ioBuffer == null) {
				synchronized (this) {
					if (ioBuffer == null) {
						ioBuffer = ByteBuffer.allocateDirect(batchSize).order(ByteOrder.LITTLE_ENDIAN);
					}
				}
			}
			return ioBuffer;
		}

		@Override
		public void run() {
			try {
				long lastSyncCheck = System.nanoTime();
				while (running || !getQueue().isEmpty()) {
					// Handle purge requests promptly
					if (purgeRequested) {
						performPurgeInternal();
					}
					ValueStoreWalRecord record;
					try {
						record = getQueue().poll(config.idlePollInterval().toNanos(), TimeUnit.NANOSECONDS);
					} catch (InterruptedException e) {
						if (!running) {
							break;
						}
						continue;
					}
					if (record != null) {
						append(record);
					}

					boolean pendingForce = requestedForceLsn.get() > NO_LSN
							&& requestedForceLsn.get() > lastForcedLsn.get();
					boolean syncIntervalElapsed = config.syncPolicy() == ValueStoreWalConfig.SyncPolicy.INTERVAL
							&& System.nanoTime() - lastSyncCheck >= config.syncInterval().toNanos();
					if (record == null) {
						if (pendingForce || config.syncPolicy() == ValueStoreWalConfig.SyncPolicy.ALWAYS
								|| syncIntervalElapsed) {
							flushAndForce();
							lastSyncCheck = System.nanoTime();
						}
					} else if (config.syncPolicy() == ValueStoreWalConfig.SyncPolicy.ALWAYS) {
						flushAndForce();
						lastSyncCheck = System.nanoTime();
					} else if (pendingForce && requestedForceLsn.get() <= lastAppendedLsn.get()) {
						flushAndForce();
						lastSyncCheck = System.nanoTime();
					}
				}
				flushAndForce();
			} catch (Throwable t) {
				writerFailure = t;
			} finally {
				try {
					flushAndForce();
				} catch (Throwable t) {
					writerFailure = t;
				}
				closeQuietly(segmentChannel);
				synchronized (ackMonitor) {
					ackMonitor.notifyAll();
				}
			}
		}

		void shutdown() {
			running = false;
		}

		void close() throws IOException {
			closeQuietly(segmentChannel);
		}

		private void ensureSegmentWritable() throws IOException {
			if (segmentPath == null || segmentChannel == null) {
				return;
			}
			if (Files.exists(segmentPath)) {
				return;
			}
			if (config.syncPolicy() == ValueStoreWalConfig.SyncPolicy.ALWAYS) {
				throw new IOException("Current WAL segment has been removed: " + segmentPath);
			}
			logger.error("Detected deletion of active WAL segment {}; continuing with a new segment",
					segmentPath.getFileName());
			ByteBuffer pending = null;
			if (getIoBuffer().position() > 0) {
				ByteBuffer duplicate = getIoBuffer().duplicate();
				duplicate.flip();
				if (duplicate.hasRemaining()) {
					pending = ByteBuffer.allocate(duplicate.remaining());
					pending.put(duplicate);
					pending.flip();
				}
			}
			getIoBuffer().clear();
			closeQuietly(segmentChannel);
			int previousFirstId = segmentFirstMintedId;
			int previousLastId = segmentLastMintedId;
			segmentChannel = null;
			segmentPath = null;
			segmentBytes = 0L;
			segmentFirstMintedId = 0;
			if (previousFirstId > 0) {
				startSegment(previousFirstId, false);
				segmentLastMintedId = previousLastId;
				if (pending != null) {
					while (pending.hasRemaining()) {
						segmentChannel.write(pending);
					}
					segmentBytes += pending.limit();
				}
			} else {
				segmentLastMintedId = previousLastId;
			}
		}

		private void append(ValueStoreWalRecord record) throws IOException {
			ensureSegmentWritable();
			if (segmentChannel == null) {
				startSegment(record.id());
			}
			// Encode JSON for the record into reusable buffer without copying
			int jsonLength = encodeIntoReusableBuffer(record);
			int framedLength = 4 + jsonLength + 4;
			if (segmentBytes + framedLength > config.maxSegmentBytes()) {
				flushBuffer();
				finishCurrentSegment();
				startSegment(record.id());
			}
			// Write header length (4 bytes)
			if (getIoBuffer().remaining() < 4) {
				flushBuffer();
			}
			getIoBuffer().putInt(jsonLength);

			// Write JSON payload in chunks to avoid BufferOverflowException
			int offset = 0;
			byte[] jsonBytes = jsonBuffer.buffer();
			while (offset < jsonLength) {
				if (getIoBuffer().remaining() == 0) {
					flushBuffer();
				}
				int toWrite = Math.min(getIoBuffer().remaining(), jsonLength - offset);
				getIoBuffer().put(jsonBytes, offset, toWrite);
				offset += toWrite;
			}

			// Write CRC (4 bytes)
			int crc = checksum(jsonBytes, jsonLength);
			if (getIoBuffer().remaining() < 4) {
				flushBuffer();
			}
			getIoBuffer().putInt(crc);

			segmentBytes += framedLength;
			if (record.id() > segmentLastMintedId) {
				segmentLastMintedId = record.id();
			}
			lastAppendedLsn.set(record.lsn());
		}

		private void performPurgeInternal() {
			try {
				// Ensure any buffered data is not left around; close current segment
				closeQuietly(segmentChannel);
				// Drop any frames that were queued prior to purge using dequeue semantics to ensure
				// any producers blocked in queue.put() are signalled via notFull.
				while (getQueue().poll() != null) {
					// intentionally empty: draining via poll() triggers the normal signalling path
				}
				getIoBuffer().clear();
				// Delete all existing segments from disk
				deleteAllSegments();
				// Reset writer state so the next append starts a fresh segment
				segmentPath = null;
				segmentChannel = null;
				segmentBytes = 0L;
				segmentFirstMintedId = 0;
				segmentLastMintedId = 0;
			} catch (IOException e) {
				writerFailure = e;
			} finally {
				purgeRequested = false;
				synchronized (purgeMonitor) {
					purgeInProgress = false;
					purgeMonitor.notifyAll();
				}
			}
		}

		private void flushAndForce() throws IOException {
			flushAndForce(false);
		}

		private void flushAndForce(boolean forceEvenForInterval) throws IOException {
			if (lastAppendedLsn.get() <= lastForcedLsn.get()) {
				return;
			}
			flushBuffer();
			if (segmentChannel != null && segmentChannel.isOpen()) {
				try {
					boolean shouldForce = forceEvenForInterval
							|| config.syncPolicy() != ValueStoreWalConfig.SyncPolicy.INTERVAL;
					if (shouldForce) {
						segmentChannel.force(false);
						if (segmentPath != null) {
							ValueStoreWalDebug.fireForceEvent(segmentPath);
						}
					}
				} catch (ClosedChannelException e) {
					// ignore; channel already closed during shutdown
				}
			}
			long forced = lastAppendedLsn.get();
			lastForcedLsn.set(forced);
			// Clear pending force request without dropping newer requests that may arrive concurrently.
			// Use CAS to only clear if the observed value is still <= forced; if another thread published
			// a higher LSN in the meantime, we must not overwrite it with NO_LSN.
			long cur = requestedForceLsn.get();
			while (cur != NO_LSN && cur <= forced) {
				if (requestedForceLsn.compareAndSet(cur, NO_LSN)) {
					break;
				}
				cur = requestedForceLsn.get();
			}
			synchronized (ackMonitor) {
				ackMonitor.notifyAll();
			}
		}

		private void flushBuffer() throws IOException {
			ensureSegmentWritable();
			if (segmentChannel == null) {
				getIoBuffer().clear();
				return;
			}
			getIoBuffer().flip();
			while (getIoBuffer().hasRemaining()) {
				segmentChannel.write(getIoBuffer());
			}
			getIoBuffer().clear();
		}

		private void finishCurrentSegment() throws IOException {
			if (segmentChannel == null) {
				return;
			}
			boolean forceInterval = config.syncPolicy() == ValueStoreWalConfig.SyncPolicy.INTERVAL;
			flushAndForce(forceInterval);
			int summaryLastId = segmentLastMintedId;
			Path toCompress = segmentPath;
			closeQuietly(segmentChannel);
			segmentChannel = null;
			segmentPath = null;
			segmentBytes = 0L;
			segmentFirstMintedId = 0;
			segmentLastMintedId = 0;
			if (toCompress != null) {
				gzipAndDelete(toCompress, summaryLastId);
			}
		}

		/**
		 * Rotate the current WAL segment. This is a small wrapper used by tests to ensure that rotation forces the
		 * previous segment to disk before closing it. New segments will be started lazily on the next append.
		 */
		@SuppressWarnings("unused")
		private void rotateSegment() throws IOException {
			finishCurrentSegment();
		}

		private void startSegment(int firstId) throws IOException {
			startSegment(firstId, true);
		}

		private void startSegment(int firstId, boolean incrementSequence) throws IOException {
			if (incrementSequence) {
				segmentSequence++;
			}
			segmentPath = config.walDirectory().resolve(buildSegmentFileName(firstId));
			if (Files.exists(segmentPath)) {
				logger.warn("Overwriting existing WAL segment {}", segmentPath.getFileName());
			}
			segmentChannel = openWalChannel(segmentPath, StandardOpenOption.CREATE, StandardOpenOption.WRITE,
					StandardOpenOption.TRUNCATE_EXISTING);
			segmentBytes = 0L;
			segmentFirstMintedId = firstId;
			segmentLastMintedId = 0;
			writeHeader(firstId);
		}

		private String buildSegmentFileName(int firstId) {
			return "wal-" + firstId + ".v1";
		}

		private void gzipAndDelete(Path src, int lastMintedId) {
			Path gz = src.resolveSibling(src.getFileName().toString() + ".gz");
			long srcSize;
			try {
				srcSize = Files.size(src);
			} catch (IOException e) {
				// If we can't stat the file, don't attempt compression
				logger.warn("Skipping compression of WAL segment {} because it is no longer accessible",
						src.getFileName());
				return;
			}
			int summaryFrameLength;
			CRC32 crc32 = new CRC32();
			try (var in = Files.newInputStream(src);
					FileChannel gzChannel = openWalChannel(gz, StandardOpenOption.CREATE, StandardOpenOption.WRITE,
							StandardOpenOption.TRUNCATE_EXISTING);
					GZIPOutputStream gzOut = new GZIPOutputStream(Channels.newOutputStream(gzChannel))) {
				byte[] buf = new byte[1 << 16];
				int r;
				while ((r = in.read(buf)) >= 0) {
					gzOut.write(buf, 0, r);
					crc32.update(buf, 0, r);
				}
				byte[] summaryFrame = buildSummaryFrame(lastMintedId, crc32.getValue());
				summaryFrameLength = summaryFrame.length;
				gzOut.write(summaryFrame);
				gzOut.finish();
				gzOut.flush();
				gzChannel.force(false);
				ValueStoreWalDebug.fireForceEvent(gz);
			} catch (IOException e) {
				// Compression failed: do not delete original; clean up partial gzip if present
				logger.warn("Failed to compress WAL segment {}: {}", src.getFileName(), e.getMessage());
				try {
					Files.deleteIfExists(gz);
				} catch (IOException ignore) {
				}
				return;
			}
			// Verify gzip contains full original data plus summary by reading back and counting bytes
			long decompressedBytes = 0L;
			byte[] verifyBuf = new byte[1 << 16];
			try (var gin = new GZIPInputStream(Files.newInputStream(gz))) {
				int r;
				while ((r = gin.read(verifyBuf)) >= 0) {
					decompressedBytes += r;
				}
			} catch (IOException e) {
				logger.warn("Failed to verify compressed WAL segment {}: {}", gz.getFileName(), e.getMessage());
				try {
					Files.deleteIfExists(gz);
				} catch (IOException ignore) {
				}
				return;
			}
			if (decompressedBytes != srcSize + summaryFrameLength) {
				// Verification failed: keep original, remove corrupt gzip
				try {
					Files.deleteIfExists(gz);
				} catch (IOException ignore) {
				}
				return;
			}
			try {
				Files.deleteIfExists(src);
			} catch (IOException e) {
				logger.warn("Failed to delete WAL segment {} after compression: {}", src.getFileName(), e.getMessage());
			}
		}

		private byte[] buildSummaryFrame(int lastMintedId, long crc32Value) throws IOException {
			JsonFactory factory = new JsonFactory();
			ByteArrayOutputStream baos = new ByteArrayOutputStream(128);
			try (JsonGenerator gen = factory.createGenerator(baos)) {
				gen.writeStartObject();
				gen.writeStringField("t", "S");
				gen.writeNumberField("lastId", lastMintedId);
				gen.writeNumberField("crc32", crc32Value & 0xFFFFFFFFL);
				gen.writeEndObject();
			}
			baos.write('\n');
			byte[] jsonBytes = baos.toByteArray();
			ByteBuffer buffer = ByteBuffer.allocate(4 + jsonBytes.length + 4).order(ByteOrder.LITTLE_ENDIAN);
			buffer.putInt(jsonBytes.length);
			buffer.put(jsonBytes);
			int crc = checksum(jsonBytes);
			buffer.putInt(crc);
			buffer.flip();
			byte[] framed = new byte[buffer.remaining()];
			buffer.get(framed);
			return framed;
		}

		private void writeHeader(int firstId) throws IOException {
			JsonFactory factory = new JsonFactory();
			ByteArrayOutputStream baos = new ByteArrayOutputStream(256);
			try (JsonGenerator gen = factory.createGenerator(baos)) {
				gen.writeStartObject();
				gen.writeStringField("t", "V");
				gen.writeNumberField("ver", 1);
				gen.writeStringField("store", config.storeUuid());
				gen.writeStringField("engine", "valuestore");
				gen.writeNumberField("created", Instant.now().getEpochSecond());
				gen.writeNumberField("segment", segmentSequence);
				gen.writeNumberField("firstId", firstId);
				gen.writeEndObject();
			}
			// NDJSON: newline-delimited JSON
			baos.write('\n');
			byte[] jsonBytes = baos.toByteArray();
			ByteBuffer buffer = ByteBuffer.allocate(4 + jsonBytes.length + 4).order(ByteOrder.LITTLE_ENDIAN);
			buffer.putInt(jsonBytes.length);
			buffer.put(jsonBytes);
			int crc = checksum(jsonBytes);
			buffer.putInt(crc);
			buffer.flip();
			while (buffer.hasRemaining()) {
				segmentChannel.write(buffer);
			}
			segmentBytes += buffer.limit();
		}

		private int checksum(byte[] data) {
			return checksum(data, data.length);
		}

		private int checksum(byte[] data, int len) {
			crc32c.reset();
			crc32c.update(data, 0, len);
			return (int) crc32c.getValue();
		}

		private int encodeIntoReusableBuffer(ValueStoreWalRecord record) throws IOException {
			jsonBuffer.reset();
			try (JsonGenerator gen = jsonFactory.createGenerator(jsonBuffer)) {
				gen.writeStartObject();
				gen.writeStringField("t", "M");
				gen.writeNumberField("lsn", record.lsn());
				gen.writeNumberField("id", record.id());
				gen.writeStringField("vk", String.valueOf(record.valueKind().code()));
				gen.writeStringField("lex", record.lexical() == null ? "" : record.lexical());
				gen.writeStringField("dt", record.datatype() == null ? "" : record.datatype());
				gen.writeStringField("lang", record.language() == null ? "" : record.language());
				gen.writeNumberField("hash", record.hash());
				gen.writeEndObject();
			}
			jsonBuffer.write('\n'); // NDJSON newline
			return jsonBuffer.size();
		}

		private void closeQuietly(FileChannel channel) {
			if (channel != null) {
				try {
					channel.close();
				} catch (IOException ignore) {
					// ignore
				}
			}
		}

		// Minimal extension to access internal buffer without copying
		private final class ReusableByteArrayOutputStream extends ByteArrayOutputStream {
			ReusableByteArrayOutputStream(int size) {
				super(size);
			}

			byte[] buffer() {
				return this.buf;
			}
		}
	}

	private void deleteAllSegments() throws IOException {
		List<Path> toDelete;
		try (var stream = Files.list(config.walDirectory())) {
			toDelete = stream
					.filter(Files::isRegularFile)
					.filter(path -> {
						String name = path.getFileName().toString();
						return name.matches("wal-[0-9]+\\.v1") || name.matches("wal-[0-9]+\\.v1\\.gz");
					})
					.collect(Collectors.toList());
		}
		for (Path p : toDelete) {
			try {
				Files.deleteIfExists(p);
			} catch (IOException e) {
				logger.warn("Failed to delete WAL segment {}", p.getFileName(), e);
				throw e;
			}
		}
	}

}