ValueStoreWalConfig.java
/*******************************************************************************
* Copyright (c) 2025 Eclipse RDF4J contributors.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Distribution License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: BSD-3-Clause
*******************************************************************************/
package org.eclipse.rdf4j.sail.nativerdf.wal;
import java.nio.file.Path;
import java.time.Duration;
import java.util.Objects;
/**
* Configuration for the ValueStore WAL implementation.
*/
public final class ValueStoreWalConfig {
public static final String DEFAULT_DIRECTORY_NAME = "value-store-wal";
/**
* Controls when the WAL writer flushes buffered frames to disk and when it invokes
* {@link java.nio.channels.FileChannel#force(boolean)} to guarantee durability. Choose the policy that matches the
* desired durability vs. throughput trade-off.
*/
public enum SyncPolicy {
/**
* Forces the WAL after every append (and whenever the writer wakes up without new work). This delivers the
* strongest durability and detects deleted segments immediately, at the cost of running
* {@code FileChannel.force} for every minted value.
*/
ALWAYS,
/**
* Flushes buffered frames to the WAL file whenever the configured {@link ValueStoreWalConfig#syncInterval()}
* elapses without new work but never calls {@code FileChannel.force}. Even
* {@link org.eclipse.rdf4j.sail.nativerdf.ValueStore#awaitWalDurable(long)} only waits for frames to leave the
* in-memory queue, so crashes can drop recently minted values. Choose this for maximum throughput with
* best-effort persistence.
*/
INTERVAL,
/**
* Leaves frames queued until {@link org.eclipse.rdf4j.sail.nativerdf.ValueStore#awaitWalDurable(long)} (invoked
* during NativeStore commit) requests a force. This keeps ingestion fast during a transaction but issues
* {@code FileChannel.force} when the transaction commits, providing durability at commit boundaries only.
*/
COMMIT
}
private final Path walDirectory;
private final Path snapshotsDirectory;
private final String storeUuid;
private final long maxSegmentBytes;
private final int queueCapacity;
private final int batchBufferBytes;
private final SyncPolicy syncPolicy;
private final Duration syncInterval;
private final Duration idlePollInterval;
private final boolean syncBootstrapOnOpen;
private final boolean recoverValueStoreOnOpen;
private ValueStoreWalConfig(Builder builder) {
this.walDirectory = builder.walDirectory;
this.snapshotsDirectory = builder.snapshotsDirectory;
this.storeUuid = builder.storeUuid;
this.maxSegmentBytes = builder.maxSegmentBytes;
this.queueCapacity = builder.queueCapacity;
this.batchBufferBytes = builder.batchBufferBytes;
this.syncPolicy = builder.syncPolicy;
this.syncInterval = builder.syncInterval;
this.idlePollInterval = builder.idlePollInterval;
this.syncBootstrapOnOpen = builder.syncBootstrapOnOpen;
this.recoverValueStoreOnOpen = builder.recoverValueStoreOnOpen;
}
public Path walDirectory() {
return walDirectory;
}
public Path snapshotsDirectory() {
return snapshotsDirectory;
}
public String storeUuid() {
return storeUuid;
}
public long maxSegmentBytes() {
return maxSegmentBytes;
}
public int queueCapacity() {
return queueCapacity;
}
public int batchBufferBytes() {
return batchBufferBytes;
}
public SyncPolicy syncPolicy() {
return syncPolicy;
}
public Duration syncInterval() {
return syncInterval;
}
public Duration idlePollInterval() {
return idlePollInterval;
}
/**
* When true, the ValueStore will synchronously rebuild the WAL from existing values during open before allowing any
* new values to be added. When false (default), bootstrap runs asynchronously in the background.
*/
public boolean syncBootstrapOnOpen() {
return syncBootstrapOnOpen;
}
/**
* When true, the ValueStore will attempt to reconstruct missing or empty ValueStore files from the WAL during open
* before allowing any operations.
*/
public boolean recoverValueStoreOnOpen() {
return recoverValueStoreOnOpen;
}
public static Builder builder() {
return new Builder();
}
public static final class Builder {
private Path walDirectory;
private Path snapshotsDirectory;
private String storeUuid;
private long maxSegmentBytes = 128 * 1024 * 1024; // 128 MB
private int queueCapacity = 16 * 1024;
private int batchBufferBytes = 128 * 1024; // 128 KB
private SyncPolicy syncPolicy = SyncPolicy.INTERVAL;
private Duration syncInterval = Duration.ofSeconds(1);
private Duration idlePollInterval = Duration.ofMillis(100);
private boolean syncBootstrapOnOpen = false;
private boolean recoverValueStoreOnOpen = false;
private Builder() {
}
public Builder walDirectory(Path walDirectory) {
this.walDirectory = Objects.requireNonNull(walDirectory, "walDirectory");
if (this.snapshotsDirectory == null) {
this.snapshotsDirectory = walDirectory.resolve("snapshots");
}
return this;
}
public Builder snapshotsDirectory(Path snapshotsDirectory) {
this.snapshotsDirectory = Objects.requireNonNull(snapshotsDirectory, "snapshotsDirectory");
return this;
}
public Builder storeUuid(String storeUuid) {
this.storeUuid = Objects.requireNonNull(storeUuid, "storeUuid");
return this;
}
public Builder maxSegmentBytes(long maxSegmentBytes) {
this.maxSegmentBytes = maxSegmentBytes;
return this;
}
public Builder queueCapacity(int queueCapacity) {
this.queueCapacity = queueCapacity;
return this;
}
public Builder batchBufferBytes(int batchBufferBytes) {
this.batchBufferBytes = batchBufferBytes;
return this;
}
public Builder syncPolicy(SyncPolicy syncPolicy) {
this.syncPolicy = Objects.requireNonNull(syncPolicy, "syncPolicy");
return this;
}
public Builder syncInterval(Duration syncInterval) {
this.syncInterval = Objects.requireNonNull(syncInterval, "syncInterval");
return this;
}
public Builder idlePollInterval(Duration idlePollInterval) {
this.idlePollInterval = Objects.requireNonNull(idlePollInterval, "idlePollInterval");
return this;
}
/**
* Control whether WAL bootstrap happens synchronously during open. Default is false.
*/
public Builder syncBootstrapOnOpen(boolean syncBootstrapOnOpen) {
this.syncBootstrapOnOpen = syncBootstrapOnOpen;
return this;
}
/** Enable automatic ValueStore recovery from WAL during open. */
public Builder recoverValueStoreOnOpen(boolean recoverValueStoreOnOpen) {
this.recoverValueStoreOnOpen = recoverValueStoreOnOpen;
return this;
}
public ValueStoreWalConfig build() {
if (walDirectory == null) {
throw new IllegalStateException("walDirectory must be set");
}
if (snapshotsDirectory == null) {
snapshotsDirectory = walDirectory.resolve("snapshots");
}
if (storeUuid == null || storeUuid.isEmpty()) {
throw new IllegalStateException("storeUuid must be set");
}
if (maxSegmentBytes <= 0) {
throw new IllegalStateException("maxSegmentBytes must be positive");
}
if (queueCapacity <= 0) {
throw new IllegalStateException("queueCapacity must be positive");
}
if (batchBufferBytes <= 4096) {
throw new IllegalStateException("batchBufferBytes must be > 4KB");
}
return new ValueStoreWalConfig(this);
}
}
}