ValueStoreWalReader.java

/*******************************************************************************
 * Copyright (c) 2025 Eclipse RDF4J contributors.
 *
 * All rights reserved. This program and the accompanying materials
 * are made available under the terms of the Eclipse Distribution License v1.0
 * which accompanies this distribution, and is available at
 * http://www.eclipse.org/org/documents/edl-v10.php.
 *
 * SPDX-License-Identifier: BSD-3-Clause
 *******************************************************************************/
package org.eclipse.rdf4j.sail.nativerdf.wal;

import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.regex.Pattern;
import java.util.zip.CRC32;
import java.util.zip.CRC32C;
import java.util.zip.GZIPInputStream;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;

/**
 * Reader for ValueStore WAL segments that yields minted records in LSN order across segments. It tolerates truncated or
 * missing tail data by stopping at the last valid record observed. Completeness can be queried via
 * {@link #isComplete()} and by inspecting {@link ScanResult#complete()}.
 */
public final class ValueStoreWalReader implements AutoCloseable {

	private static final Pattern SEGMENT_PATTERN = Pattern.compile("wal-(\\d+)\\.v1(?:\\.gz)?");
	private static final Logger logger = LoggerFactory.getLogger(ValueStoreWalReader.class);

	private final ValueStoreWalConfig config;
	private final JsonFactory jsonFactory = new JsonFactory();
	// Streaming iteration state
	private final List<SegmentEntry> segments;
	private int segIndex = -1;
	private FileChannel channel;
	private GZIPInputStream gzIn;
	private boolean stop;
	private boolean eos; // end-of-segment indicator for current stream
	private long lastValidLsn = ValueStoreWAL.NO_LSN;
	private final boolean missingSegments;
	private boolean summaryMissing;
	private boolean currentSegmentCompressed;
	private boolean currentSegmentSummarySeen;
	// CRC32 of the original (uncompressed) segment contents, accumulated while reading a compressed segment.
	private CRC32 currentSegmentCrc32;

	private ValueStoreWalReader(ValueStoreWalConfig config) {
		this.config = Objects.requireNonNull(config, "config");
		List<SegmentEntry> segs;
		try {
			segs = listSegments();
		} catch (IOException e) {
			segs = List.of();
		}
		this.segments = segs;
		this.missingSegments = hasSequenceGaps(segs);
		this.summaryMissing = false;
		this.currentSegmentCompressed = false;
		this.currentSegmentSummarySeen = false;
	}

	/**
	 * Create a reader for the given configuration. No I/O is performed until iteration begins.
	 */
	public static ValueStoreWalReader open(ValueStoreWalConfig config) {
		return new ValueStoreWalReader(config);
	}

	/**
	 * Scan the WAL and return all minted records together with bookkeeping about last valid LSN and completeness.
	 */
	public ScanResult scan() throws IOException {
		List<ValueStoreWalRecord> records = new ArrayList<>();
		Iterator<ValueStoreWalRecord> it = this.iterator();
		while (it.hasNext()) {
			records.add(it.next());
		}
		return new ScanResult(records, this.lastValidLsn(), this.isComplete());
	}

	/** On-demand iterator over minted WAL records. */
	public Iterator<ValueStoreWalRecord> iterator() {
		return new RecordIterator();
	}

	/** Highest valid LSN observed during reading (iterator/scan). */
	public long lastValidLsn() {
		return lastValidLsn;
	}

	// Iterator utils: open/close segments and read single records
	private boolean openNextSegment() throws IOException {
		closeCurrentSegment();
		segIndex++;
		if (segIndex >= segments.size()) {
			return false;
		}
		SegmentEntry entry = segments.get(segIndex);
		Path p = entry.path;
		currentSegmentCompressed = entry.compressed;
		currentSegmentSummarySeen = false;
		if (currentSegmentCompressed) {
			gzIn = new GZIPInputStream(Files.newInputStream(p));
			channel = null;
			currentSegmentCrc32 = new CRC32();
		} else {
			channel = FileChannel.open(p, StandardOpenOption.READ);
			gzIn = null;
			currentSegmentCrc32 = null;
		}
		return true;
	}

	private void closeCurrentSegment() throws IOException {
		if (currentSegmentCompressed && !currentSegmentSummarySeen) {
			summaryMissing = true;
		}
		currentSegmentCrc32 = null;
		if (channel != null && channel.isOpen()) {
			channel.close();
		}
		channel = null;
		if (gzIn != null) {
			gzIn.close();
		}
		gzIn = null;
		eos = false;
		currentSegmentCompressed = false;
		currentSegmentSummarySeen = false;
	}

	private static int readIntLE(InputStream in) throws IOException {
		byte[] b = in.readNBytes(4);
		if (b.length < 4) {
			return -1;
		}
		return ((b[0] & 0xFF)) | ((b[1] & 0xFF) << 8) | ((b[2] & 0xFF) << 16) | ((b[3] & 0xFF) << 24);
	}

	private static class Item {
		final Path path;
		final long firstId;
		final int sequence;
		final boolean compressed;

		Item(Path path, long firstId, int sequence, boolean compressed) {
			this.path = path;
			this.firstId = firstId;
			this.sequence = sequence;
			this.compressed = compressed;
		}
	}

	private List<SegmentEntry> listSegments() throws IOException {

		List<Item> items = new ArrayList<>();
		if (!Files.isDirectory(config.walDirectory())) {
			return List.of();
		}
		try (var stream = Files.list(config.walDirectory())) {
			stream.forEach(p -> {
				var m = SEGMENT_PATTERN.matcher(p.getFileName().toString());
				if (m.matches()) {
					long firstId = Long.parseLong(m.group(1));
					boolean compressed = p.getFileName().toString().endsWith(".gz");
					int sequence = 0;
					try {
						sequence = ValueStoreWAL.readSegmentSequence(p);
					} catch (IOException e) {
						logger.warn("Failed to read WAL segment header for {}", p.getFileName(), e);
					}
					items.add(new Item(p, firstId, sequence, compressed));
				}
			});
		}
		items.sort(Comparator.comparingInt(it -> it.sequence));
		List<SegmentEntry> segments = new ArrayList<>(items.size());
		for (Item it : items) {
			segments.add(new SegmentEntry(it.path, it.firstId, it.sequence, it.compressed));
		}
		return segments;
	}

	private boolean hasSequenceGaps(List<SegmentEntry> entries) {
		if (entries.isEmpty()) {
			return false;
		}
		int expected = entries.get(0).sequence;
		if (expected > 1) {
			return true;
		}
		for (SegmentEntry entry : entries) {
			if (entry.sequence != expected) {
				return true;
			}
			expected++;
		}
		return false;
	}

	private ValueStoreWalRecord readOneFromChannel() throws IOException {
		ByteBuffer header = ByteBuffer.allocate(4).order(ByteOrder.LITTLE_ENDIAN);
		header.clear();
		int read = channel.read(header);
		if (read == -1) {
			eos = true;
			return null; // clean end of segment
		}
		if (read < 4) {
			stop = true; // truncated header
			return null;
		}
		header.flip();
		int length = header.getInt();
		if (length <= 0 || (long) length > ValueStoreWAL.MAX_FRAME_BYTES) {
			stop = true;
			return null;
		}
		byte[] data = new byte[length];
		ByteBuffer dataBuf = ByteBuffer.wrap(data);
		int total = 0;
		while (total < length) {
			int n = channel.read(dataBuf);
			if (n < 0) {
				stop = true; // truncated record
				return null;
			}
			total += n;
		}
		ByteBuffer crcBuf = ByteBuffer.allocate(4).order(ByteOrder.LITTLE_ENDIAN);
		int crcRead = channel.read(crcBuf);
		if (crcRead < 4) {
			stop = true;
			return null;
		}
		crcBuf.flip();
		int expectedCrc = crcBuf.getInt();
		CRC32C crc32c = new CRC32C();
		crc32c.update(data, 0, data.length);
		if ((int) crc32c.getValue() != expectedCrc) {
			stop = true;
			return null;
		}
		Parsed parsed = parseJson(data);
		if (parsed.type == 'M') {
			ValueStoreWalRecord r = new ValueStoreWalRecord(parsed.lsn, parsed.id, parsed.kind, parsed.lex, parsed.dt,
					parsed.lang,
					parsed.hash);
			lastValidLsn = r.lsn();
			return r;
		}
		if (parsed.lsn > lastValidLsn) {
			lastValidLsn = parsed.lsn;
		}
		// non-minted record within segment; continue reading same segment
		eos = false;
		return null;
	}

	private ValueStoreWalRecord readOneFromGzip() throws IOException {
		int length = readIntLE(gzIn);
		if (length == -1) {
			eos = true;
			return null; // end of stream cleanly
		}
		if (length <= 0 || (long) length > ValueStoreWAL.MAX_FRAME_BYTES) {
			stop = true;
			return null;
		}
		byte[] data = gzIn.readNBytes(length);
		if (data.length < length) {
			stop = true; // truncated
			return null;
		}
		int expectedCrc = readIntLE(gzIn);
		CRC32C crc32c = new CRC32C();
		crc32c.update(data, 0, data.length);
		if ((int) crc32c.getValue() != expectedCrc) {
			stop = true;
			return null;
		}
		Parsed parsed = parseJson(data);
		// For compressed segments, accumulate CRC32 over the original segment bytes (lenLE + data + crcLE)
		if (currentSegmentCrc32 != null && parsed.type != 'S') {
			// length in little-endian
			ByteBuffer lenBuf = ByteBuffer.allocate(4).order(ByteOrder.LITTLE_ENDIAN).putInt(length);
			lenBuf.flip();
			currentSegmentCrc32.update(lenBuf.array(), 0, 4);
			currentSegmentCrc32.update(data, 0, data.length);
			ByteBuffer crcBuf = ByteBuffer.allocate(4).order(ByteOrder.LITTLE_ENDIAN).putInt(expectedCrc);
			crcBuf.flip();
			currentSegmentCrc32.update(crcBuf.array(), 0, 4);
		}
		if (parsed.type == 'M') {
			ValueStoreWalRecord r = new ValueStoreWalRecord(parsed.lsn, parsed.id, parsed.kind, parsed.lex, parsed.dt,
					parsed.lang,
					parsed.hash);
			lastValidLsn = r.lsn();
			return r;
		}
		if (parsed.type == 'S') {
			currentSegmentSummarySeen = true;
			// Validate CRC32 of segment contents against summary
			if (currentSegmentCrc32 != null) {
				long computed = currentSegmentCrc32.getValue() & 0xFFFFFFFFL;
				if (parsed.summaryCrc32 != computed) {
					// mark stream as invalid/incomplete
					stop = true;
				}
			}
		}
		if (parsed.lsn > lastValidLsn) {
			lastValidLsn = parsed.lsn;
		}
		// non-minted record within segment; keep reading
		eos = false;
		return null;
	}

	private final class RecordIterator implements Iterator<ValueStoreWalRecord> {
		private ValueStoreWalRecord next;
		private boolean prepared;

		@Override
		public boolean hasNext() {
			if (prepared) {
				return next != null;
			}
			try {
				prepareNext();
			} catch (IOException e) {
				stop = true;
				next = null;
			}
			prepared = true;
			return next != null;
		}

		@Override
		public ValueStoreWalRecord next() {
			if (!hasNext()) {
				throw new NoSuchElementException();
			}
			prepared = false;
			ValueStoreWalRecord r = next;
			next = null;
			return r;
		}

		private void prepareNext() throws IOException {
			next = null;
			if (stop) {
				return;
			}
			while (true) {
				if (channel == null && gzIn == null) {
					if (!openNextSegment()) {
						return; // no more segments
					}
				}
				if (gzIn != null) {
					ValueStoreWalRecord r = readOneFromGzip();
					if (r != null) {
						next = r;
						return;
					}
					if (stop) {
						return;
					}
					if (eos) {
						closeCurrentSegment();
					}
					continue;
				}
				if (channel != null) {
					ValueStoreWalRecord r = readOneFromChannel();
					if (r != null) {
						next = r;
						return;
					}
					if (stop) {
						return;
					}
					if (eos) {
						closeCurrentSegment();
					}
				}
			}
		}
	}

	private Parsed parseJson(byte[] jsonBytes) throws IOException {
		Parsed parsed = new Parsed();
		try (JsonParser jp = jsonFactory.createParser(jsonBytes)) {
			if (jp.nextToken() != JsonToken.START_OBJECT) {
				return parsed;
			}
			while (jp.nextToken() != JsonToken.END_OBJECT) {
				String field = jp.getCurrentName();
				jp.nextToken();
				if ("t".equals(field)) {
					String t = jp.getValueAsString("");
					parsed.type = t.isEmpty() ? '?' : t.charAt(0);
				} else if ("lsn".equals(field)) {
					parsed.lsn = jp.getValueAsLong(ValueStoreWAL.NO_LSN);
				} else if ("id".equals(field)) {
					parsed.id = jp.getValueAsInt(0);
				} else if ("lastId".equals(field)) {
					parsed.id = jp.getValueAsInt(0);
				} else if ("vk".equals(field)) {
					String code = jp.getValueAsString("");
					parsed.kind = ValueStoreWalValueKind.fromCode(code);
				} else if ("lex".equals(field)) {
					parsed.lex = jp.getValueAsString("");
				} else if ("dt".equals(field)) {
					parsed.dt = jp.getValueAsString("");
				} else if ("lang".equals(field)) {
					parsed.lang = jp.getValueAsString("");
				} else if ("hash".equals(field)) {
					parsed.hash = jp.getValueAsInt(0);
				} else if ("crc32".equals(field)) {
					parsed.summaryCrc32 = jp.getValueAsLong(0L);
				} else {
					jp.skipChildren();
				}
			}
		}
		return parsed;
	}

	private static final class SegmentEntry {
		final Path path;
		final long firstId;
		final int sequence;
		final boolean compressed;

		SegmentEntry(Path path, long firstId, int sequence, boolean compressed) {
			this.path = path;
			this.firstId = firstId;
			this.sequence = sequence;
			this.compressed = compressed;
		}
	}

	private static final class Parsed {
		char type = '?';
		long lsn = ValueStoreWAL.NO_LSN;
		int id = 0;
		ValueStoreWalValueKind kind = ValueStoreWalValueKind.NAMESPACE;
		String lex = "";
		String dt = "";
		String lang = "";
		int hash = 0;
		long summaryCrc32 = 0L;
	}

	@Override
	public void close() {
		try {
			closeCurrentSegment();
		} catch (IOException e) {
			// ignore on close
		}
	}

	/**
	 * Whether the reader observed a complete, contiguous sequence of segments and a valid summary for compressed
	 * segments, and did not encounter validation errors.
	 */
	boolean isComplete() {
		return !missingSegments && !summaryMissing && !stop;
	}

	/** Result of a full WAL scan. */
	public static final class ScanResult {
		private final List<ValueStoreWalRecord> records;
		private final long lastValidLsn;
		private final boolean complete;

		public ScanResult(List<ValueStoreWalRecord> records, long lastValidLsn, boolean complete) {
			this.records = List.copyOf(records);
			this.lastValidLsn = lastValidLsn;
			this.complete = complete;
		}

		/**
		 * All minted records encountered, in LSN order.
		 */
		public List<ValueStoreWalRecord> records() {
			return records;
		}

		/** Highest valid LSN observed during the scan. */
		public long lastValidLsn() {
			return lastValidLsn;
		}

		/** Whether the scan covered a complete and validated set of segments. */
		public boolean complete() {
			return complete;
		}
	}
}