ValueStore.java
/*******************************************************************************
* Copyright (c) 2015 Eclipse RDF4J contributors, Aduna, and others.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Distribution License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: BSD-3-Clause
*******************************************************************************/
package org.eclipse.rdf4j.sail.nativerdf;
import static org.eclipse.rdf4j.sail.nativerdf.NativeStore.SOFT_FAIL_ON_CORRUPT_DATA_AND_REPAIR_INDEXES;
import java.io.File;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.stream.Collectors;
import java.util.zip.CRC32C;
import org.eclipse.rdf4j.common.annotation.InternalUseOnly;
import org.eclipse.rdf4j.common.concurrent.locks.Lock;
import org.eclipse.rdf4j.common.concurrent.locks.ReadWriteLockManager;
import org.eclipse.rdf4j.common.concurrent.locks.WritePrefReadWriteLockManager;
import org.eclipse.rdf4j.common.io.ByteArrayUtil;
import org.eclipse.rdf4j.model.BNode;
import org.eclipse.rdf4j.model.IRI;
import org.eclipse.rdf4j.model.Literal;
import org.eclipse.rdf4j.model.Resource;
import org.eclipse.rdf4j.model.Value;
import org.eclipse.rdf4j.model.base.CoreDatatype;
import org.eclipse.rdf4j.model.impl.SimpleValueFactory;
import org.eclipse.rdf4j.model.util.Literals;
import org.eclipse.rdf4j.model.vocabulary.RDF;
import org.eclipse.rdf4j.model.vocabulary.XSD;
import org.eclipse.rdf4j.sail.SailException;
import org.eclipse.rdf4j.sail.nativerdf.datastore.DataStore;
import org.eclipse.rdf4j.sail.nativerdf.datastore.RecoveredDataException;
import org.eclipse.rdf4j.sail.nativerdf.model.CorruptIRI;
import org.eclipse.rdf4j.sail.nativerdf.model.CorruptIRIOrBNode;
import org.eclipse.rdf4j.sail.nativerdf.model.CorruptLiteral;
import org.eclipse.rdf4j.sail.nativerdf.model.CorruptUnknownValue;
import org.eclipse.rdf4j.sail.nativerdf.model.CorruptValue;
import org.eclipse.rdf4j.sail.nativerdf.model.NativeBNode;
import org.eclipse.rdf4j.sail.nativerdf.model.NativeIRI;
import org.eclipse.rdf4j.sail.nativerdf.model.NativeLiteral;
import org.eclipse.rdf4j.sail.nativerdf.model.NativeResource;
import org.eclipse.rdf4j.sail.nativerdf.model.NativeValue;
import org.eclipse.rdf4j.sail.nativerdf.wal.ValueStoreWAL;
import org.eclipse.rdf4j.sail.nativerdf.wal.ValueStoreWalConfig;
import org.eclipse.rdf4j.sail.nativerdf.wal.ValueStoreWalReader;
import org.eclipse.rdf4j.sail.nativerdf.wal.ValueStoreWalRecord;
import org.eclipse.rdf4j.sail.nativerdf.wal.ValueStoreWalRecovery;
import org.eclipse.rdf4j.sail.nativerdf.wal.ValueStoreWalSearch;
import org.eclipse.rdf4j.sail.nativerdf.wal.ValueStoreWalValueKind;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* File-based indexed storage and retrieval of RDF values. ValueStore maps RDF values to integer IDs and vice-versa.
*
* @author Arjohn Kampman
* @apiNote This feature is for internal use only: its existence, signature or behavior may change without warning from
* one release to the next.
*/
@InternalUseOnly
public class ValueStore extends SimpleValueFactory implements AutoCloseable {
private static final Logger logger = LoggerFactory.getLogger(ValueStore.class);
private static final String WAL_RECOVERY_LOG_PROP = "org.eclipse.rdf4j.sail.nativerdf.valuestorewal.recoveryLog";
private static final String WAL_RECOVERY_LOG = System.getProperty(WAL_RECOVERY_LOG_PROP, "debug").toLowerCase();
/**
* The default value cache size.
*/
public static final int VALUE_CACHE_SIZE = 512;
/**
* The default value id cache size.
*/
public static final int VALUE_ID_CACHE_SIZE = 128;
/**
* The default namespace cache size.
*/
public static final int NAMESPACE_CACHE_SIZE = 64;
/**
* The default namespace id cache size.
*/
public static final int NAMESPACE_ID_CACHE_SIZE = 32;
private static final String FILENAME_PREFIX = "values";
private static final byte URI_VALUE = 0x1; // 0000 0001
private static final byte BNODE_VALUE = 0x2; // 0000 0010
private static final byte LITERAL_VALUE = 0x3; // 0000 0011
/*-----------*
* Variables *
*-----------*/
/**
* Used to do the actual storage of values, once they're translated to byte arrays.
*/
private final File dataDir;
private final DataStore dataStore;
private final ValueStoreWAL wal;
private final ThreadLocal<Long> walPendingLsn;
private volatile CompletableFuture<Void> walBootstrapFuture;
private volatile ValueStoreWalSearch walSearch;
/**
* Lock manager used to prevent the removal of values over multiple method calls. Note that values can still be
* added when read locks are active.
*/
private final ReadWriteLockManager lockManager = new WritePrefReadWriteLockManager();
/**
* An object that indicates the revision of the value store, which is used to check if cached value IDs are still
* valid. In order to be valid, the ValueStoreRevision object of a NativeValue needs to be equal to this object.
*/
private volatile ValueStoreRevision revision;
/**
* A simple cache containing the [VALUE_CACHE_SIZE] most-recently used values stored by their ID.
*/
private final ConcurrentCache<Integer, NativeValue> valueCache;
/**
* A simple cache containing the [ID_CACHE_SIZE] most-recently used value-IDs stored by their value.
*/
private final ConcurrentCache<NativeValue, Integer> valueIDCache;
/**
* A simple cache containing the [NAMESPACE_CACHE_SIZE] most-recently used namespaces stored by their ID.
*/
private final ConcurrentCache<Integer, String> namespaceCache;
/**
* A simple cache containing the [NAMESPACE_ID_CACHE_SIZE] most-recently used namespace-IDs stored by their
* namespace.
*/
private final ConcurrentCache<String, Integer> namespaceIDCache;
/*--------------*
* Constructors *
*--------------*/
public ValueStore(File dataDir) throws IOException {
this(dataDir, false);
}
public ValueStore(File dataDir, boolean forceSync) throws IOException {
this(dataDir, forceSync, VALUE_CACHE_SIZE, VALUE_ID_CACHE_SIZE, NAMESPACE_CACHE_SIZE, NAMESPACE_ID_CACHE_SIZE);
}
public ValueStore(File dataDir, boolean forceSync, int valueCacheSize, int valueIDCacheSize, int namespaceCacheSize,
int namespaceIDCacheSize) throws IOException {
this(dataDir, forceSync, valueCacheSize, valueIDCacheSize, namespaceCacheSize, namespaceIDCacheSize, null);
}
public ValueStore(File dataDir, boolean forceSync, int valueCacheSize, int valueIDCacheSize, int namespaceCacheSize,
int namespaceIDCacheSize, ValueStoreWAL wal) throws IOException {
super();
this.dataDir = dataDir;
dataStore = new DataStore(dataDir, FILENAME_PREFIX, forceSync, this);
valueCache = new ConcurrentCache<>(valueCacheSize);
valueIDCache = new ConcurrentCache<>(valueIDCacheSize);
namespaceCache = new ConcurrentCache<>(namespaceCacheSize);
namespaceIDCache = new ConcurrentCache<>(namespaceIDCacheSize);
this.wal = wal;
this.walPendingLsn = wal != null ? ThreadLocal.withInitial(() -> ValueStoreWAL.NO_LSN) : null;
autoRecoverValueStoreIfConfigured();
setNewRevision();
maybeScheduleWalBootstrap();
}
/*---------*
* Methods *
*---------*/
/**
* Creates a new revision object for this value store, invalidating any IDs cached in NativeValue objects that were
* created by this value store.
*/
private void setNewRevision() {
revision = new ValueStoreRevision(this);
}
public ValueStoreRevision getRevision() {
return revision;
}
/**
* Gets a read lock on this value store that can be used to prevent values from being removed while the lock is
* active.
*/
public Lock getReadLock() throws InterruptedException {
return lockManager.getReadLock();
}
/**
* Gets the value for the specified ID.
*
* @param id A value ID.
* @return The value for the ID, or <var>null</var> no such value could be found.
* @throws IOException If an I/O error occurred.
*/
public NativeValue getValue(int id) throws IOException {
// Check value cache
Integer cacheID = id;
NativeValue resultValue = valueCache.get(cacheID);
if (resultValue == null) {
boolean recoveredDirectlyFromWal = false;
try {
// Value not in cache, fetch it from file
byte[] data = dataStore.getData(id);
if (data != null) {
resultValue = data2value(id, data);
if (resultValue instanceof CorruptValue) {
NativeValue recovered = ((CorruptValue) resultValue).getRecovered();
if (recovered != null) {
resultValue = recovered;
}
} else if (shouldValidateAgainstWal()) {
NativeValue walValue = recoverValueFromWal(id, false);
if (walValue != null && !valuesMatch(resultValue, walValue)) {
resultValue = walValue;
recoveredDirectlyFromWal = true;
}
}
} else {
resultValue = recoverValueFromWal(id, false);
recoveredDirectlyFromWal = resultValue != null;
}
} catch (RecoveredDataException rde) {
byte[] recovered = rde.getData();
CorruptValue corruptValue;
if (recovered != null && recovered.length > 0) {
byte t = recovered[0];
if (t == URI_VALUE) {
corruptValue = new CorruptIRI(revision, id, null, recovered);
} else if (t == BNODE_VALUE) {
corruptValue = new CorruptIRIOrBNode(revision, id, recovered);
} else if (t == LITERAL_VALUE) {
corruptValue = new CorruptLiteral(revision, id, recovered);
} else {
corruptValue = new CorruptUnknownValue(revision, id, recovered);
}
} else {
corruptValue = new CorruptUnknownValue(revision, id, recovered);
}
tryRecoverFromWal(id, corruptValue);
NativeValue recoveredValue = corruptValue.getRecovered();
if (recoveredValue != null) {
resultValue = recoveredValue;
recoveredDirectlyFromWal = true;
} else {
resultValue = corruptValue;
}
}
if (recoveredDirectlyFromWal && resultValue != null) {
logRecovered(id, resultValue);
logWalRepairHint(id);
}
if (resultValue != null && !(resultValue instanceof CorruptValue)) {
// Store value in cache
valueCache.put(cacheID, resultValue);
}
}
return resultValue;
}
/**
* Gets the Resource for the specified ID.
*
* @param id A value ID.
* @return The Resource for the ID, or <var>null</var> no such value could be found.
* @throws IOException If an I/O error occurred.
*/
public <T extends NativeValue & Resource> T getResource(int id) throws IOException {
NativeValue resultValue = getValue(id);
if (resultValue != null && !(resultValue instanceof Resource)) {
if (SOFT_FAIL_ON_CORRUPT_DATA_AND_REPAIR_INDEXES && resultValue instanceof CorruptValue) {
return (T) new CorruptIRIOrBNode(revision, id, ((CorruptValue) resultValue).getData());
}
logger.warn(
"NativeStore is possibly corrupt. To attempt to repair or retrieve the data, read the documentation on http://rdf4j.org about the system property org.eclipse.rdf4j.sail.nativerdf.softFailOnCorruptDataAndRepairIndexes");
}
return (T) resultValue;
}
/**
* Gets the IRI for the specified ID.
*
* @param id A value ID.
* @return The IRI for the ID, or <var>null</var> no such value could be found.
* @throws IOException If an I/O error occurred.
*/
public <T extends NativeValue & IRI> T getIRI(int id) throws IOException {
NativeValue resultValue = getValue(id);
if (resultValue != null && !(resultValue instanceof IRI)) {
if (SOFT_FAIL_ON_CORRUPT_DATA_AND_REPAIR_INDEXES && resultValue instanceof CorruptValue) {
if (resultValue instanceof CorruptIRI) {
return (T) resultValue;
}
return (T) new CorruptIRI(revision, id, null, ((CorruptValue) resultValue).getData());
}
logger.warn(
"NativeStore is possibly corrupt. To attempt to repair or retrieve the data, read the documentation on http://rdf4j.org about the system property org.eclipse.rdf4j.sail.nativerdf.softFailOnCorruptDataAndRepairIndexes");
}
return (T) resultValue;
}
/**
* Gets the ID for the specified value.
*
* @param value A value.
* @return The ID for the specified value, or {@link NativeValue#UNKNOWN_ID} if no such ID could be found.
* @throws IOException If an I/O error occurred.
*/
public int getID(Value value) throws IOException {
if (logger.isDebugEnabled()) {
logger.debug("getID start thread={} value={}", threadName(), describeValue(value));
}
// Try to get the internal ID from the value itself
boolean isOwnValue = isOwnValue(value);
if (isOwnValue) {
NativeValue nativeValue = (NativeValue) value;
if (revisionIsCurrent(nativeValue)) {
int id = nativeValue.getInternalID();
if (id != NativeValue.UNKNOWN_ID) {
if (logger.isDebugEnabled()) {
logger.debug("getID returning cached internal id {} for value={} thread={}", id,
describeValue(value), threadName());
}
return id;
}
}
}
// Check cache
Integer cachedID = valueIDCache.get(value);
if (cachedID != null) {
int id = cachedID.intValue();
if (isOwnValue) {
// Store id in value for fast access in any consecutive calls
((NativeValue) value).setInternalID(id, revision);
}
if (logger.isDebugEnabled()) {
logger.debug("getID returning cache id {} for value={} thread={}", id, describeValue(value),
threadName());
}
return id;
}
// ID not cached, search in file
byte[] data = value2data(value, false);
if (data == null && value instanceof Literal) {
data = literal2legacy((Literal) value);
}
if (data != null) {
if (logger.isDebugEnabled()) {
logger.debug("getID querying datastore for value={} thread={} dataSummary={}", describeValue(value),
threadName(), summarize(data));
}
int id = dataStore.getID(data);
if (id == NativeValue.UNKNOWN_ID && value instanceof Literal) {
id = dataStore.getID(literal2legacy((Literal) value));
}
if (id != NativeValue.UNKNOWN_ID) {
if (isOwnValue) {
// Store id in value for fast access in any consecutive calls
((NativeValue) value).setInternalID(id, revision);
} else {
// Store id in cache
NativeValue nv = getNativeValue(value);
nv.setInternalID(id, revision);
valueIDCache.put(nv, Integer.valueOf(id));
}
if (logger.isDebugEnabled()) {
logger.debug("getID resolved value={} id={} thread={}", describeValue(value), id, threadName());
}
}
return id;
}
if (logger.isDebugEnabled()) {
logger.debug("getID returning UNKNOWN for value={} thread={}", describeValue(value), threadName());
}
return NativeValue.UNKNOWN_ID;
}
private static String summarize(byte[] data) {
if (data == null) {
return "null";
}
return "len=" + data.length + ",hash=" + Arrays.hashCode(data);
}
private static String threadName() {
return Thread.currentThread().getName();
}
/**
* Stores the supplied value and returns the ID that has been assigned to it. In case the value was already present,
* the value will not be stored again and the ID of the existing value is returned.
*
* @param value The Value to store.
* @return The ID that has been assigned to the value.
* @throws IOException If an I/O error occurred.
*/
public synchronized int storeValue(Value value) throws IOException {
if (logger.isDebugEnabled()) {
logger.debug("storeValue start thread={} value={}", threadName(), describeValue(value));
}
// Try to get the internal ID from the value itself
boolean isOwnValue = isOwnValue(value);
if (isOwnValue) {
NativeValue nativeValue = (NativeValue) value;
if (revisionIsCurrent(nativeValue)) {
// Value's ID is still current
int id = nativeValue.getInternalID();
if (id != NativeValue.UNKNOWN_ID) {
if (logger.isDebugEnabled()) {
logger.debug("storeValue returning cached internal id {} for value={} thread={}", id,
describeValue(value), threadName());
}
return id;
}
}
}
// ID not stored in value itself, try the ID cache
Integer cachedID = valueIDCache.get(value);
if (cachedID != null) {
int id = cachedID.intValue();
if (isOwnValue) {
// Store id in value for fast access in any consecutive calls
((NativeValue) value).setInternalID(id, revision);
}
if (logger.isDebugEnabled()) {
logger.debug("storeValue returning cached id {} for value={} thread={}", id, describeValue(value),
threadName());
}
return id;
}
// Unable to get internal ID in a cheap way, just store it in the data
// store which will handle duplicates
byte[] valueData = value2data(value, true);
int previousMaxID = walEnabled() ? dataStore.getMaxID() : 0;
if (valueData == null) {
if (logger.isDebugEnabled()) {
logger.debug("storeValue computed no data for value={} thread={}", describeValue(value), threadName());
}
return NativeValue.UNKNOWN_ID;
}
int id = dataStore.storeData(valueData);
NativeValue nv = isOwnValue ? (NativeValue) value : getNativeValue(value);
// Store id in value for fast access in any consecutive calls
nv.setInternalID(id, revision);
// Update cache
valueIDCache.put(nv, id);
if (walEnabled() && id > previousMaxID) {
logMintedValue(id, nv);
}
if (logger.isDebugEnabled()) {
logger.debug("storeValue stored value={} assigned id={} thread={} dataSummary={}", describeValue(nv), id,
threadName(), summarize(valueData));
}
return id;
}
/**
* Removes all values from the ValueStore.
*
* @throws IOException If an I/O error occurred.
*/
public void clear() throws IOException {
try {
Lock writeLock = lockManager.getWriteLock();
try {
// Purge any existing WAL segments so a subsequent WAL recovery cannot
// resurrect values that were present before the clear().
if (walEnabled()) {
try {
wal.purgeAllSegments();
} catch (IOException e) {
logger.warn("Failed to purge ValueStore WAL during clear for {}", dataDir, e);
throw e;
}
}
dataStore.clear();
valueCache.clear();
valueIDCache.clear();
namespaceCache.clear();
namespaceIDCache.clear();
setNewRevision();
} finally {
writeLock.release();
}
} catch (InterruptedException e) {
throw new IOException("Failed to acquire write lock", e);
}
}
/**
* Synchronizes any changes that are cached in memory to disk.
*
* @throws IOException If an I/O error occurred.
*/
public void sync() throws IOException {
dataStore.sync();
}
/**
* Closes the ValueStore, releasing any file references, etc. Once closed, the ValueStore can no longer be used.
*
* @throws IOException If an I/O error occurred.
*/
@Override
public void close() throws IOException {
CompletableFuture<Void> bootstrap = walBootstrapFuture;
if (bootstrap != null) {
try {
bootstrap.join();
} catch (CompletionException e) {
Throwable cause = e.getCause() == null ? e : e.getCause();
logger.warn("ValueStore WAL bootstrap failed during close", cause);
} catch (CancellationException e) {
logger.warn("ValueStore WAL bootstrap was cancelled during close");
}
}
dataStore.close();
}
/**
* Checks that every value has exactly one ID.
*
* @throws IOException
*/
public void checkConsistency() throws SailException, IOException {
int maxID = dataStore.getMaxID();
for (int id = 1; id <= maxID; id++) {
try {
byte[] data = dataStore.getData(id);
if (data == null || data.length == 0) {
// Defensive guard against truncated/empty records which otherwise cause AIOOBE in isNamespaceData
throw new SailException("Empty data array for value with id " + id);
}
if (isNamespaceData(data)) {
String namespace = data2namespace(data);
try {
if (id == getNamespaceID(namespace, false)
&& URI.create(namespace + "part").isAbsolute()) {
continue;
}
} catch (IllegalArgumentException e) {
// throw SailException
}
logger.error("Inconsistent namespace data for id {} (also id {}): {}", id,
getNamespaceID(namespace, false), namespace);
throw new SailException(
"Store must be manually exported and imported to fix namespaces like " + namespace);
} else {
Value value = this.data2value(id, data);
if (id != this.getID(copy(value))) {
throw new SailException(
"Store must be manually exported and imported to merge values like " + value);
}
}
} catch (RecoveredDataException rde) {
// Treat as a corrupt unknown value during consistency check
Value value = new CorruptUnknownValue(revision, id, rde.getData());
if (id != this.getID(copy(value))) {
throw new SailException(
"Store must be manually exported and imported to merge values like " + value);
}
}
}
}
private Value copy(Value value) {
if (value instanceof IRI) {
return createIRI(value.stringValue());
} else if (value instanceof Literal) {
Literal lit = (Literal) value;
if (Literals.isLanguageLiteral(lit)) {
return createLiteral(value.stringValue(), lit.getLanguage().orElse(null));
} else {
return createLiteral(value.stringValue(), lit.getDatatype());
}
} else {
return createBNode(value.stringValue());
}
}
/**
* Checks if the supplied Value object is a NativeValue object that has been created by this ValueStore.
*/
private boolean isOwnValue(Value value) {
return value instanceof NativeValue && ((NativeValue) value).getValueStoreRevision().getValueStore() == this;
}
/**
* Checks if the revision of the supplied value object is still current.
*/
private boolean revisionIsCurrent(NativeValue value) {
return revision.equals(value.getValueStoreRevision());
}
private byte[] value2data(Value value, boolean create) throws IOException {
byte[] data;
if (value instanceof IRI) {
data = uri2data((IRI) value, create);
} else if (value instanceof BNode) {
data = bnode2data((BNode) value, create);
} else if (value instanceof Literal) {
data = literal2data((Literal) value, create);
} else {
throw new IllegalArgumentException("value parameter should be a URI, BNode or Literal");
}
if (logger.isDebugEnabled()) {
logger.debug("value2data thread={} value={} create={} summary={}", threadName(), describeValue(value),
create, summarize(data));
}
return data;
}
private byte[] uri2data(IRI uri, boolean create) throws IOException {
int nsID = getNamespaceID(uri.getNamespace(), create);
if (logger.isDebugEnabled()) {
logger.debug("uri2data thread={} namespace='{}' nsId={} create={}", threadName(), uri.getNamespace(), nsID,
create);
}
if (nsID == -1) {
// Unknown namespace means unknown URI
return null;
}
// Get local name in UTF-8
byte[] localNameData = uri.getLocalName().getBytes(StandardCharsets.UTF_8);
// Combine parts in a single byte array
byte[] uriData = new byte[5 + localNameData.length];
uriData[0] = URI_VALUE;
ByteArrayUtil.putInt(nsID, uriData, 1);
ByteArrayUtil.put(localNameData, uriData, 5);
if (logger.isDebugEnabled()) {
logger.debug("uri2data produced len={} summary={} thread={}", uriData.length, summarize(uriData),
threadName());
}
return uriData;
}
private byte[] bnode2data(BNode bNode, boolean create) {
byte[] idData = bNode.getID().getBytes(StandardCharsets.UTF_8);
byte[] bNodeData = new byte[1 + idData.length];
bNodeData[0] = BNODE_VALUE;
ByteArrayUtil.put(idData, bNodeData, 1);
return bNodeData;
}
private byte[] literal2data(Literal literal, boolean create) throws IOException {
return literal2data(literal.getLabel(), literal.getLanguage(), literal.getDatatype(), create);
}
private byte[] literal2legacy(Literal literal) throws IOException {
IRI dt = literal.getDatatype();
if (XSD.STRING.equals(dt) || RDF.LANGSTRING.equals(dt)) {
return literal2data(literal.getLabel(), literal.getLanguage(), null, false);
}
return literal2data(literal.getLabel(), literal.getLanguage(), dt, false);
}
private byte[] literal2data(String label, Optional<String> lang, IRI dt, boolean create)
throws IOException, UnsupportedEncodingException {
// Get datatype ID
int datatypeID = NativeValue.UNKNOWN_ID;
if (create) {
datatypeID = storeValue(dt);
} else if (dt != null) {
datatypeID = getID(dt);
if (datatypeID == NativeValue.UNKNOWN_ID) {
// Unknown datatype means unknown literal
return null;
}
}
if (logger.isDebugEnabled()) {
logger.debug("literal2data thread={} valueLength={} langPresent={} datatype={} datatypeId={} create={}",
threadName(), label.length(), lang.isPresent(), dt, datatypeID, create);
}
// Get language tag in UTF-8
byte[] langData = null;
int langDataLength = 0;
if (lang.isPresent()) {
langData = lang.get().getBytes(StandardCharsets.UTF_8);
langDataLength = langData.length;
if (langDataLength > 255) {
throw new IllegalArgumentException(
"Language tag too long (length " + langDataLength + " > maximum 255): " + lang.get());
}
}
// Get label in UTF-8
byte[] labelData = label.getBytes(StandardCharsets.UTF_8);
// Combine parts in a single byte array
byte[] literalData = new byte[6 + langDataLength + labelData.length];
literalData[0] = LITERAL_VALUE;
ByteArrayUtil.putInt(datatypeID, literalData, 1);
literalData[5] = (byte) (langDataLength & 0xFF);
if (langData != null) {
ByteArrayUtil.put(langData, literalData, 6);
}
ByteArrayUtil.put(labelData, literalData, 6 + langDataLength);
if (logger.isDebugEnabled()) {
logger.debug("literal2data produced len={} summary={} thread={}", literalData.length,
summarize(literalData), threadName());
}
return literalData;
}
private boolean isNamespaceData(byte[] data) {
return data[0] != URI_VALUE && data[0] != BNODE_VALUE && data[0] != LITERAL_VALUE;
}
@InternalUseOnly
public NativeValue data2value(int id, byte[] data) throws IOException {
if (data.length == 0) {
if (SOFT_FAIL_ON_CORRUPT_DATA_AND_REPAIR_INDEXES) {
logger.error("Soft fail on corrupt data: Empty data array for value with id {}", id);
CorruptUnknownValue v = new CorruptUnknownValue(revision, id, data);
tryRecoverFromWal(id, v);
return v;
}
throw new SailException("Empty data array for value with id " + id
+ " consider setting the system property org.eclipse.rdf4j.sail.nativerdf.softFailOnCorruptDataAndRepairIndexes to true");
}
switch (data[0]) {
case URI_VALUE:
return data2uri(id, data);
case BNODE_VALUE:
return data2bnode(id, data);
case LITERAL_VALUE:
return data2literal(id, data);
default:
if (SOFT_FAIL_ON_CORRUPT_DATA_AND_REPAIR_INDEXES) {
logger.error("Soft fail on corrupt data: Invalid type {} for value with id {}", data[0], id);
CorruptUnknownValue v = new CorruptUnknownValue(revision, id, data);
tryRecoverFromWal(id, v);
return v;
}
throw new SailException("Invalid type " + data[0] + " for value with id " + id
+ " consider setting the system property org.eclipse.rdf4j.sail.nativerdf.softFailOnCorruptDataAndRepairIndexes to true");
}
}
private <T extends IRI & NativeValue> T data2uri(int id, byte[] data) throws IOException {
String namespace = null;
try {
int nsID = ByteArrayUtil.getInt(data, 1);
namespace = getNamespace(nsID);
String localName = new String(data, 5, data.length - 5, StandardCharsets.UTF_8);
return (T) new NativeIRI(revision, namespace, localName, id);
} catch (Throwable e) {
if (SOFT_FAIL_ON_CORRUPT_DATA_AND_REPAIR_INDEXES
&& (e instanceof Exception || e instanceof AssertionError)) {
CorruptIRI v = new CorruptIRI(revision, id, namespace, data);
tryRecoverFromWal(id, v);
return (T) v;
}
logger.warn(
"NativeStore is possibly corrupt. To attempt to repair or retrieve the data, read the documentation on http://rdf4j.org about the system property org.eclipse.rdf4j.sail.nativerdf.softFailOnCorruptDataAndRepairIndexes");
throw e;
}
}
private NativeBNode data2bnode(int id, byte[] data) {
String nodeID = new String(data, 1, data.length - 1, StandardCharsets.UTF_8);
return new NativeBNode(revision, nodeID, id);
}
private <T extends NativeValue & Literal> T data2literal(int id, byte[] data) throws IOException {
try {
// Get datatype
int datatypeID = ByteArrayUtil.getInt(data, 1);
IRI datatype = null;
if (datatypeID != NativeValue.UNKNOWN_ID) {
datatype = (IRI) getValue(datatypeID);
}
// Get language tag
String lang = null;
int langLength = data[5] & 0xFF;
if (langLength > 0) {
lang = new String(data, 6, langLength, StandardCharsets.UTF_8);
}
// Get label
String label = new String(data, 6 + langLength, data.length - 6 - langLength, StandardCharsets.UTF_8);
if (lang != null) {
return (T) new NativeLiteral(revision, label, lang, id);
} else if (datatype != null) {
return (T) new NativeLiteral(revision, label, datatype, id);
} else {
return (T) new NativeLiteral(revision, label, CoreDatatype.XSD.STRING, id);
}
} catch (Throwable e) {
if (SOFT_FAIL_ON_CORRUPT_DATA_AND_REPAIR_INDEXES
&& (e instanceof Exception || e instanceof AssertionError)) {
CorruptLiteral v = new CorruptLiteral(revision, id, data);
tryRecoverFromWal(id, v);
return (T) v;
}
throw e;
}
}
private void tryRecoverFromWal(int id, CorruptValue holder) {
NativeValue recovered = recoverValueFromWal(id);
if (recovered != null) {
holder.setRecovered(recovered);
}
}
private NativeValue recoverValueFromWal(int id) {
return recoverValueFromWal(id, true);
}
private NativeValue recoverValueFromWal(int id, boolean log) {
ValueStoreWalSearch search = getOrCreateWalSearch();
if (search == null) {
return null;
}
try {
Value v = search.findValueById(id);
if (v == null) {
return null;
}
NativeValue nv = getNativeValue(v);
if (nv != null) {
nv.setInternalID(id, revision);
if (log) {
logRecovered(id, nv);
logWalRepairHint(id);
}
return nv;
}
} catch (IOException ioe) {
// ignore recovery failures
}
return null;
}
private ValueStoreWalSearch getOrCreateWalSearch() {
if (wal == null) {
return null;
}
ValueStoreWalSearch search = walSearch;
if (search != null) {
return search;
}
synchronized (this) {
search = walSearch;
if (search == null) {
search = ValueStoreWalSearch.open(wal.config());
walSearch = search;
}
return search;
}
}
private boolean shouldValidateAgainstWal() {
return walEnabled() && SOFT_FAIL_ON_CORRUPT_DATA_AND_REPAIR_INDEXES;
}
private boolean valuesMatch(NativeValue storeValue, NativeValue walValue) {
if (storeValue == walValue) {
return true;
}
if (storeValue == null || walValue == null) {
return false;
}
if (storeValue instanceof Literal && walValue instanceof Literal) {
Literal a = (Literal) storeValue;
Literal b = (Literal) walValue;
return Objects.equals(a.getLabel(), b.getLabel())
&& Objects.equals(a.getLanguage().orElse(null), b.getLanguage().orElse(null))
&& Objects.equals(datatypeIri(a), datatypeIri(b));
}
if (storeValue instanceof IRI && walValue instanceof IRI) {
return Objects.equals(storeValue.stringValue(), walValue.stringValue());
}
if (storeValue instanceof BNode && walValue instanceof BNode) {
return Objects.equals(storeValue.stringValue(), walValue.stringValue());
}
return Objects.equals(storeValue.stringValue(), walValue.stringValue());
}
private String datatypeIri(Literal literal) {
return literal.getDatatype() == null ? "" : literal.getDatatype().stringValue();
}
private void logRecovered(int id, NativeValue nv) {
switch (WAL_RECOVERY_LOG) {
case "trace":
if (logger.isTraceEnabled()) {
logger.trace("Recovered value for id {} from WAL as {}", id, nv.stringValue());
}
break;
case "debug":
if (logger.isDebugEnabled()) {
logger.debug("Recovered value for id {} from WAL as {}", id, nv.stringValue());
}
break;
default:
// off or unknown: no-op
}
}
private void logWalRepairHint(int id) {
logger.error(
"ValueStore {} recovered value id {} from WAL because the values.* files are corrupt. Enable NativeStore#setWalAutoRecoverOnOpen(true) (config:native.walAutoRecoverOnOpen) and restart, or run ValueStoreWalRecovery to replay the WAL and rebuild values.dat/values.id/values.hash so the on-disk data matches the WAL again.",
dataDir, id);
}
private NativeValue fromWalRecord(ValueStoreWalRecord rec) {
switch (rec.valueKind()) {
case IRI:
return createIRI(rec.lexical());
case BNODE:
return createBNode(rec.lexical());
case LITERAL: {
String lang = rec.language();
String dt = rec.datatype();
if (lang != null && !lang.isEmpty()) {
return createLiteral(rec.lexical(), lang);
} else if (dt != null && !dt.isEmpty()) {
return createLiteral(rec.lexical(), createIRI(dt));
} else {
return createLiteral(rec.lexical());
}
}
case NAMESPACE:
// not a value; nothing to recover
return null;
default:
return null;
}
}
private String data2namespace(byte[] data) {
return new String(data, StandardCharsets.UTF_8);
}
private int getNamespaceID(String namespace, boolean create) throws IOException {
if (logger.isDebugEnabled()) {
logger.debug("getNamespaceID thread={} namespace='{}' create={}", threadName(), namespace, create);
}
Integer cacheID = namespaceIDCache.get(namespace);
if (cacheID != null) {
if (logger.isDebugEnabled()) {
logger.debug("getNamespaceID cache hit namespace='{}' id={} thread={}", namespace, cacheID,
threadName());
}
return cacheID;
}
byte[] namespaceData = namespace.getBytes(StandardCharsets.UTF_8);
int id;
if (create) {
int previousMaxID = walEnabled() ? dataStore.getMaxID() : 0;
id = dataStore.storeData(namespaceData);
if (walEnabled() && id > previousMaxID) {
logNamespaceMint(id, namespace);
}
} else {
id = dataStore.getID(namespaceData);
}
if (id != -1) {
namespaceIDCache.put(namespace, id);
if (logger.isDebugEnabled()) {
logger.debug("getNamespaceID resolved namespace='{}' id={} thread={}", namespace, id, threadName());
}
} else if (logger.isDebugEnabled()) {
logger.debug("getNamespaceID unresolved namespace='{}' thread={}", namespace, threadName());
}
return id;
}
public OptionalLong drainPendingWalHighWaterMark() {
if (walPendingLsn == null) {
return OptionalLong.empty();
}
long lsn = walPendingLsn.get();
if (lsn <= ValueStoreWAL.NO_LSN) {
return OptionalLong.empty();
}
walPendingLsn.set(ValueStoreWAL.NO_LSN);
return OptionalLong.of(lsn);
}
public void awaitWalDurable(long lsn) throws IOException {
if (!walEnabled() || lsn <= ValueStoreWAL.NO_LSN) {
return;
}
try {
wal.awaitDurable(lsn);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException("Interrupted while awaiting WAL durability", e);
}
}
private void logMintedValue(int id, Value value) throws IOException {
ValueStoreWalDescription description = describeValue(value);
int hash = computeWalHash(description.kind, description.lexical, description.datatype, description.language);
long lsn = wal.logMint(id, description.kind, description.lexical, description.datatype, description.language,
hash);
recordWalLsn(lsn);
}
private void logNamespaceMint(int id, String namespace) throws IOException {
int hash = computeWalHash(ValueStoreWalValueKind.NAMESPACE, namespace, "", "");
long lsn = wal.logMint(id, ValueStoreWalValueKind.NAMESPACE, namespace, "", "", hash);
recordWalLsn(lsn);
}
private void maybeScheduleWalBootstrap() {
if (!walEnabled()) {
return;
}
int maxId = dataStore.getMaxID();
if (maxId <= 0) {
return;
}
boolean needsBootstrap = !wal.hasInitialSegments() || walNeedsBootstrap(maxId);
if (!needsBootstrap) {
return;
}
boolean syncBootstrap = false;
try {
syncBootstrap = wal.config().syncBootstrapOnOpen();
} catch (Throwable ignore) {
// defensive: if config not accessible, default to async
}
if (syncBootstrap) {
// Perform bootstrap synchronously before allowing any further operations
rebuildWalFromExistingValues(maxId);
} else {
if (walBootstrapFuture != null) {
return;
}
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> rebuildWalFromExistingValues(maxId));
walBootstrapFuture = future;
future.whenComplete((unused, throwable) -> {
if (throwable != null) {
logger.warn("ValueStore WAL bootstrap failed", throwable);
}
});
}
}
private void rebuildWalFromExistingValues(int maxId) {
try {
for (int id = 1; id <= maxId; id++) {
if (Thread.currentThread().isInterrupted()) {
Thread.currentThread().interrupt();
return;
}
if (wal.isClosed()) {
return;
}
byte[] data;
try {
data = dataStore.getData(id);
} catch (IOException e) {
logger.warn("Failed to read value {} while rebuilding WAL", id, e);
continue;
}
if (data == null) {
continue;
}
try {
if (isNamespaceData(data)) {
String namespace = data2namespace(data);
logNamespaceMint(id, namespace);
} else {
NativeValue value = data2value(id, data);
if (value != null) {
logMintedValue(id, value);
}
}
} catch (IOException e) {
if (wal.isClosed()) {
return;
}
logger.warn("Failed to rebuild WAL entry for id {}", id, e);
} catch (RuntimeException e) {
logger.warn("Unexpected failure while rebuilding WAL entry for id {}", id, e);
}
}
if (!wal.isClosed()) {
OptionalLong pending = drainPendingWalHighWaterMark();
if (pending.isPresent()) {
awaitWalDurable(pending.getAsLong());
}
}
} catch (Throwable t) {
logger.warn("Error while rebuilding ValueStore WAL", t);
}
}
private boolean walNeedsBootstrap(int maxId) {
try (ValueStoreWalReader reader = ValueStoreWalReader.open(wal.config())) {
ValueStoreWalRecovery recovery = new ValueStoreWalRecovery();
ValueStoreWalRecovery.ReplayReport report = recovery.replayWithReport(reader);
Map<Integer, ValueStoreWalRecord> dict = report.dictionary();
if (dict.isEmpty()) {
return true;
}
if (!report.complete()) {
return true;
}
for (int id = 1; id <= maxId; id++) {
if (!dict.containsKey(id)) {
return true;
}
}
return false;
} catch (IOException e) {
// if we cannot inspect WAL, avoid scheduling to not interfere with normal operations
return false;
}
}
private void recordWalLsn(long lsn) {
if (walPendingLsn == null || lsn <= ValueStoreWAL.NO_LSN) {
return;
}
long current = walPendingLsn.get();
if (lsn > current) {
walPendingLsn.set(lsn);
}
}
private ValueStoreWalDescription describeValue(Value value) {
if (value instanceof IRI) {
return new ValueStoreWalDescription(ValueStoreWalValueKind.IRI, value.stringValue(), "", "");
} else if (value instanceof BNode) {
return new ValueStoreWalDescription(ValueStoreWalValueKind.BNODE, value.stringValue(), "", "");
} else if (value instanceof Literal) {
Literal literal = (Literal) value;
String lang = literal.getLanguage().orElse("");
String datatype = literal.getDatatype() != null ? literal.getDatatype().stringValue() : "";
return new ValueStoreWalDescription(ValueStoreWalValueKind.LITERAL, literal.getLabel(), datatype, lang);
} else {
throw new IllegalArgumentException("value parameter should be a URI, BNode or Literal");
}
}
private int computeWalHash(ValueStoreWalValueKind kind, String lexical, String datatype, String language) {
CRC32C crc32c = CRC32C_HOLDER.get();
// Reset the checksum to ensure each computed hash reflects only the current value
crc32c.reset();
crc32c.update((byte) kind.code());
updateCrc(crc32c, lexical);
crc32c.update((byte) 0);
updateCrc(crc32c, datatype);
crc32c.update((byte) 0);
updateCrc(crc32c, language);
return (int) crc32c.getValue();
}
private void updateCrc(CRC32C crc32c, String value) {
if (value == null || value.isEmpty()) {
return;
}
byte[] bytes = value.getBytes(StandardCharsets.UTF_8);
crc32c.update(bytes, 0, bytes.length);
}
private boolean walEnabled() {
return wal != null;
}
private static final ThreadLocal<CRC32C> CRC32C_HOLDER = ThreadLocal.withInitial(CRC32C::new);
private static final class ValueStoreWalDescription {
final ValueStoreWalValueKind kind;
final String lexical;
final String datatype;
final String language;
ValueStoreWalDescription(ValueStoreWalValueKind kind, String lexical, String datatype, String language) {
this.kind = kind;
this.lexical = lexical == null ? "" : lexical;
this.datatype = datatype == null ? "" : datatype;
this.language = language == null ? "" : language;
}
}
private String getNamespace(int id) throws IOException {
Integer cacheID = id;
String namespace = namespaceCache.get(cacheID);
if (namespace == null) {
try {
byte[] namespaceData = dataStore.getData(id);
namespace = data2namespace(namespaceData);
} catch (RecoveredDataException rde) {
namespace = data2namespace(rde.getData());
}
namespaceCache.put(cacheID, namespace);
}
return namespace;
}
/*-------------------------------------*
* Methods from interface ValueFactory *
*-------------------------------------*/
@Override
public NativeIRI createIRI(String uri) {
return new NativeIRI(revision, uri);
}
@Override
public NativeIRI createIRI(String namespace, String localName) {
return new NativeIRI(revision, namespace, localName);
}
@Override
public NativeBNode createBNode(String nodeID) {
return new NativeBNode(revision, nodeID);
}
@Override
public NativeLiteral createLiteral(String value) {
return new NativeLiteral(revision, value, CoreDatatype.XSD.STRING);
}
@Override
public NativeLiteral createLiteral(String value, String language) {
return new NativeLiteral(revision, value, language);
}
@Override
public NativeLiteral createLiteral(String value, IRI datatype) {
return new NativeLiteral(revision, value, datatype);
}
/*----------------------------------------------------------------------*
* Methods for converting model objects to NativeStore-specific objects *
*----------------------------------------------------------------------*/
public NativeValue getNativeValue(Value value) {
if (value instanceof Resource) {
return getNativeResource((Resource) value);
} else if (value instanceof Literal) {
return getNativeLiteral((Literal) value);
} else {
throw new IllegalArgumentException("Unknown value type: " + value.getClass());
}
}
public NativeResource getNativeResource(Resource resource) {
if (resource instanceof IRI) {
return getNativeURI((IRI) resource);
} else if (resource instanceof BNode) {
return getNativeBNode((BNode) resource);
} else {
throw new IllegalArgumentException("Unknown resource type: " + resource.getClass());
}
}
/**
* Creates a NativeURI that is equal to the supplied URI. This method returns the supplied URI itself if it is
* already a NativeURI that has been created by this ValueStore, which prevents unnecessary object creations.
*
* @return A NativeURI for the specified URI.
*/
public NativeIRI getNativeURI(IRI uri) {
if (isOwnValue(uri)) {
return (NativeIRI) uri;
}
return new NativeIRI(revision, uri.toString());
}
/**
* Creates a NativeBNode that is equal to the supplied bnode. This method returns the supplied bnode itself if it is
* already a NativeBNode that has been created by this ValueStore, which prevents unnecessary object creations.
*
* @return A NativeBNode for the specified bnode.
*/
public NativeBNode getNativeBNode(BNode bnode) {
if (isOwnValue(bnode)) {
return (NativeBNode) bnode;
}
return new NativeBNode(revision, bnode.getID());
}
/**
* Creates an NativeLiteral that is equal to the supplied literal. This method returns the supplied literal itself
* if it is already a NativeLiteral that has been created by this ValueStore, which prevents unnecessary object
* creations.
*
* @return A NativeLiteral for the specified literal.
*/
public NativeLiteral getNativeLiteral(Literal l) {
if (isOwnValue(l)) {
return (NativeLiteral) l;
}
if (Literals.isLanguageLiteral(l)) {
return new NativeLiteral(revision, l.getLabel(), l.getLanguage().get());
} else {
NativeIRI datatype = getNativeURI(l.getDatatype());
return new NativeLiteral(revision, l.getLabel(), datatype);
}
}
/*--------------------*
* Test/debug methods *
*--------------------*/
public static void main(String[] args) throws Exception {
File dataDir = new File(args[0]);
ValueStore valueStore = new ValueStore(dataDir);
int maxID = valueStore.dataStore.getMaxID();
for (int id = 1; id <= maxID; id++) {
try {
byte[] data = valueStore.dataStore.getData(id);
if (valueStore.isNamespaceData(data)) {
String ns = valueStore.data2namespace(data);
System.out.println("[" + id + "] " + ns);
} else {
Value value = valueStore.data2value(id, data);
System.out.println("[" + id + "] " + value.toString());
}
} catch (RecoveredDataException rde) {
System.out.println("[" + id + "] CorruptUnknownValue:"
+ new CorruptUnknownValue(valueStore.revision, id, rde.getData()));
}
}
}
private void autoRecoverValueStoreIfConfigured() {
if (wal == null) {
return;
}
ValueStoreWalConfig config;
try {
config = wal.config();
} catch (Throwable t) {
logger.warn("ValueStore WAL configuration unavailable for {}", dataDir, t);
return;
}
if (!config.recoverValueStoreOnOpen()) {
return;
}
try {
ValueStoreWalRecovery recovery = new ValueStoreWalRecovery();
ValueStoreWalRecovery.ReplayReport report;
try (ValueStoreWalReader reader = ValueStoreWalReader.open(config)) {
report = recovery.replayWithReport(reader);
}
Map<Integer, ValueStoreWalRecord> dictionary = report.dictionary();
if (dictionary.isEmpty()) {
return;
}
if (!report.complete()) {
logger.warn("Skipping ValueStore WAL recovery for {}: WAL segments incomplete", dataDir);
return;
}
if (hasDictionaryGaps(dictionary)) {
logger.warn("Skipping ValueStore WAL recovery for {}: WAL dictionary has gaps", dataDir);
return;
}
if (!shouldRecoverFromWalDictionary(dictionary)) {
return;
}
recoverValueStoreFromWal(dictionary);
logAutoRecovery(dictionary.size());
} catch (IOException e) {
logger.warn("ValueStore WAL recovery failed for {}", dataDir, e);
}
}
private boolean hasDictionaryGaps(Map<Integer, ValueStoreWalRecord> dictionary) {
int maxId = dictionary.keySet().stream().mapToInt(Integer::intValue).max().orElse(0);
if (maxId <= 0) {
return false;
}
if (dictionary.size() == maxId) {
return false;
}
for (int expected = 1; expected <= maxId; expected++) {
if (!dictionary.containsKey(expected)) {
return true;
}
}
return false;
}
private boolean shouldRecoverFromWalDictionary(Map<Integer, ValueStoreWalRecord> dictionary) {
int maxWalId = dictionary.keySet().stream().mapToInt(Integer::intValue).max().orElse(0);
if (maxWalId <= 0) {
return false;
}
int currentMaxId = dataStore.getMaxID();
if (currentMaxId == 0 && maxWalId > 0) {
return true;
}
if (currentMaxId < maxWalId) {
return true;
}
List<Integer> ids = new ArrayList<>(dictionary.keySet());
if (ids.isEmpty()) {
return false;
}
ids.sort(Integer::compareTo);
for (Integer id : ids) {
if (isMissingValueData(id)) {
return true;
}
}
return false;
}
private boolean isMissingValueData(int id) {
if (id <= 0) {
return false;
}
try {
byte[] data = dataStore.getData(id);
return data == null || data.length == 0;
} catch (IOException e) {
return true;
}
}
private void recoverValueStoreFromWal(Map<Integer, ValueStoreWalRecord> dictionary) throws IOException {
dataStore.clear();
valueCache.clear();
valueIDCache.clear();
namespaceCache.clear();
namespaceIDCache.clear();
List<Map.Entry<Integer, ValueStoreWalRecord>> entries = dictionary.entrySet()
.stream()
.sorted(Map.Entry.comparingByKey(Comparator.naturalOrder()))
.collect(Collectors.toList());
for (Map.Entry<Integer, ValueStoreWalRecord> entry : entries) {
ValueStoreWalRecord record = entry.getValue();
byte[] data;
switch (record.valueKind()) {
case NAMESPACE:
data = record.lexical().getBytes(StandardCharsets.UTF_8);
break;
case IRI:
data = encodeIri(record.lexical(), dataStore);
break;
case BNODE: {
byte[] idBytes = record.lexical().getBytes(StandardCharsets.UTF_8);
data = new byte[1 + idBytes.length];
data[0] = BNODE_VALUE;
ByteArrayUtil.put(idBytes, data, 1);
break;
}
case LITERAL:
data = encodeLiteral(record.lexical(), record.datatype(), record.language(), dataStore);
break;
default:
continue;
}
if (data == null) {
continue;
}
int assigned = dataStore.storeData(data);
if (assigned != record.id()) {
throw new IOException("ValueStore WAL recovery produced mismatched id " + assigned
+ " (expected " + record.id() + ")");
}
}
dataStore.sync();
}
private void logAutoRecovery(int recoveredCount) {
switch (WAL_RECOVERY_LOG) {
case "trace":
if (logger.isTraceEnabled()) {
logger.trace("Recovered {} ValueStore entries from WAL for {}", recoveredCount, dataDir);
}
break;
case "debug":
if (logger.isDebugEnabled()) {
logger.debug("Recovered {} ValueStore entries from WAL for {}", recoveredCount, dataDir);
}
break;
default:
// off
}
}
private byte[] encodeIri(String lexical, DataStore ds) throws IOException {
IRI iri = createIRI(lexical);
String ns = iri.getNamespace();
String local = iri.getLocalName();
int nsId = ds.getID(ns.getBytes(StandardCharsets.UTF_8));
if (nsId == -1) {
nsId = ds.storeData(ns.getBytes(StandardCharsets.UTF_8));
}
byte[] localBytes = local.getBytes(StandardCharsets.UTF_8);
byte[] data = new byte[1 + 4 + localBytes.length];
data[0] = URI_VALUE;
ByteArrayUtil.putInt(nsId, data, 1);
ByteArrayUtil.put(localBytes, data, 5);
return data;
}
private byte[] encodeLiteral(String label, String datatype, String language, DataStore ds) throws IOException {
int dtId = NativeValue.UNKNOWN_ID;
if (datatype != null && !datatype.isEmpty()) {
byte[] dtBytes = encodeIri(datatype, ds);
int id = ds.getID(dtBytes);
dtId = id == -1 ? ds.storeData(dtBytes) : id;
}
byte[] langBytes = language == null ? new byte[0] : language.getBytes(StandardCharsets.UTF_8);
byte[] labelBytes = label.getBytes(StandardCharsets.UTF_8);
byte[] data = new byte[1 + 4 + 1 + langBytes.length + labelBytes.length];
data[0] = LITERAL_VALUE;
ByteArrayUtil.putInt(dtId, data, 1);
data[5] = (byte) (langBytes.length & 0xFF);
if (langBytes.length > 0) {
ByteArrayUtil.put(langBytes, data, 6);
}
ByteArrayUtil.put(labelBytes, data, 6 + langBytes.length);
return data;
}
}