DataStore.java
/*******************************************************************************
* Copyright (c) 2015 Eclipse RDF4J contributors, Aduna, and others.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Distribution License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: BSD-3-Clause
*******************************************************************************/
package org.eclipse.rdf4j.sail.nativerdf.datastore;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.zip.CRC32;
import org.eclipse.rdf4j.common.io.ByteArrayUtil;
import org.eclipse.rdf4j.sail.nativerdf.NativeStore;
import org.eclipse.rdf4j.sail.nativerdf.ValueStore;
import org.eclipse.rdf4j.sail.nativerdf.model.NativeValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Class that provides indexed storage and retrieval of arbitrary length data.
*
* @author Arjohn Kampman
*/
public class DataStore implements Closeable {
/*-----------*
* Variables *
*-----------*/
private static final Logger logger = LoggerFactory.getLogger(DataStore.class);
private final DataFile dataFile;
private final IDFile idFile;
private final HashFile hashFile;
private ValueStore valueStore;
/*--------------*
* Constructors *
*--------------*/
public DataStore(File dataDir, String filePrefix) throws IOException {
this(dataDir, filePrefix, false);
}
public DataStore(File dataDir, String filePrefix, boolean forceSync) throws IOException {
dataFile = new DataFile(new File(dataDir, filePrefix + ".dat"), forceSync);
idFile = new IDFile(new File(dataDir, filePrefix + ".id"), forceSync);
hashFile = new HashFile(new File(dataDir, filePrefix + ".hash"), forceSync);
}
public DataStore(File dataDir, String filePrefix, boolean forceSync, ValueStore valueStore) throws IOException {
this(dataDir, filePrefix, forceSync);
this.valueStore = valueStore;
}
/*---------*
* Methods *
*---------*/
/**
* Gets the value for the specified ID.
*
* @param id A value ID, should be larger than 0.
* @return The value for the ID, or <var>null</var> if no such value could be found.
* @throws IOException If an I/O error occurred.
*/
public byte[] getData(int id) throws IOException {
assert id > 0 : "id must be larger than 0, is: " + id;
long offset = idFile.getOffset(id);
if (offset != 0L) {
byte[] data = dataFile.getData(offset);
assert data != null : "data must not be null";
if (logger.isDebugEnabled()) {
logger.debug("getData thread={} id={} offset={} resultLength={}", threadName(), id, offset,
data.length);
}
if (data.length == 0 && NativeStore.SOFT_FAIL_ON_CORRUPT_DATA_AND_REPAIR_INDEXES) {
data = attemptToRecoverCorruptData(id, offset, data);
}
return data;
}
return null;
}
/**
*
* @param id
* @param offset
* @param data
* @return
* @throws IOException
* @throws RecoveredDataException if recovery was successful
*/
private byte[] attemptToRecoverCorruptData(int id, long offset, byte[] data) throws IOException {
// Data corruption detected: ID points to an offset, but no data could be read. Attempt to recover.
try {
long offsetNoCache = idFile.getOffsetNoCache(id);
if (offset != offsetNoCache) {
logger.error("IDFile cache mismatch for id {}: cached={}, raw={}. Using raw.", id, offset,
offsetNoCache);
offset = offsetNoCache;
data = dataFile.getData(offset);
}
} catch (IOException e) {
// If raw read fails, keep cached offset
}
// Attempt recovery by using neighboring offsets to infer the bounds
long startData = offset + 4; // default start if no previous valid entry
// Find previous entry end: prevOffset + 4 + prevLength
int prev = id - 1;
for (; prev >= 1; prev--) {
long po = idFile.getOffset(prev);
try {
long poRaw = idFile.getOffsetNoCache(prev);
if (po != poRaw) {
logger.error("IDFile cache mismatch for prev id {}: cached={}, raw={}. Using raw.", prev,
po, poRaw);
po = poRaw;
}
} catch (IOException e) {
// use cached po if raw read fails
}
if (po > 0L) {
try {
byte[] prevData = dataFile.getData(po);
if (prevData != null && prevData.length > 0) {
try {
if (valueStore != null && Thread.currentThread().getStackTrace().length < 512) {
NativeValue nativeValue = valueStore.data2value(prev, prevData);
logger.warn("Data in previous ID ({}) is: {}", prev, nativeValue);
} else {
logger.warn("Data in previous ID ({}) is: {}", prev,
new String(prevData, StandardCharsets.UTF_8));
}
} catch (Exception ignored) {
}
startData = po + 4L + prevData.length;
break;
}
} catch (Exception ignored) {
}
}
}
// Find next entry start as the end bound
long endOffset = 0L;
int maxId = idFile.getMaxID();
int next = id + 1;
for (; next <= maxId; next++) {
long no = idFile.getOffset(next);
try {
long noRaw = idFile.getOffsetNoCache(next);
if (no != noRaw) {
logger.error("IDFile cache mismatch for next id {}: cached={}, raw={}. Using raw.", next,
no, noRaw);
no = noRaw;
}
} catch (IOException e) {
// use cached value if raw read fails
}
if (no > 0L) {
try {
byte[] nextData = dataFile.getData(no);
if (nextData != null && nextData.length > 0) {
try {
if (valueStore != null && Thread.currentThread().getStackTrace().length < 512) {
NativeValue nativeValue = valueStore.data2value(next, nextData);
logger.warn("Data in next ID ({}) is: {}", next, nativeValue);
} else {
logger.warn("Data in next ID ({}) is: {}", next,
new String(nextData, StandardCharsets.UTF_8));
}
} catch (Exception ignored) {
}
endOffset = no;
break;
}
} catch (Exception e) {
}
}
}
if (endOffset == 0L) {
// Fallback: use current file size as end bound
endOffset = dataFile.getFileSize();
}
if (endOffset > startData) {
// tryRecoverBetweenOffsets expects an offset to a 4-byte length, so pass (startData - 4)
byte[] recovered = dataFile.tryRecoverBetweenOffsets(Math.max(0L, startData - 4L), endOffset);
throw new RecoveredDataException(id, recovered);
}
assert data.length == 0;
return data;
}
/**
* Gets the ID for the specified value.
*
* @param queryData The value to get the ID for, must not be <var>null</var>.
* @return The ID for the specified value, or <var>-1</var> if no such ID could be found.
* @throws IOException If an I/O error occurred.
*/
public int getID(byte[] queryData) throws IOException {
assert queryData != null : "queryData must not be null";
int id;
int hash = getDataHash(queryData);
if (logger.isDebugEnabled()) {
logger.debug("getID start thread={} hash={} summary={}", threadName(), hash, summarize(queryData));
}
HashFile.IDIterator iter = hashFile.getIDIterator(hash);
try {
while ((id = iter.next()) >= 0) {
long offset = idFile.getOffset(id);
byte[] data = dataFile.getData(offset);
boolean match = Arrays.equals(queryData, data);
if (logger.isDebugEnabled()) {
logger.debug(
"getID candidate thread={} hash={} candidateId={} offset={} match={} candidateSummary={}",
threadName(), hash, id, offset, match, summarize(data));
}
if (match) {
break;
}
}
} finally {
iter.close();
}
if (logger.isDebugEnabled()) {
logger.debug("getID result thread={} hash={} id={}", threadName(), hash, id);
}
return id;
}
/**
* Returns the maximum value-ID that is in use.
*
* @return The largest ID, or <var>0</var> if the store does not contain any values.
*/
public int getMaxID() {
return idFile.getMaxID();
}
/**
* Stores the supplied value and returns the ID that has been assigned to it. In case the data to store is already
* present, the ID of this existing data is returned.
*
* @param data The data to store, must not be <var>null</var>.
* @return The ID that has been assigned to the value.
* @throws IOException If an I/O error occurred.
*/
public synchronized int storeData(byte[] data) throws IOException {
assert data != null : "data must not be null";
if (logger.isDebugEnabled()) {
int hash = getDataHash(data);
logger.debug("storeData start thread={} hash={} summary={}", threadName(), hash, summarize(data));
}
int id = getID(data);
if (id == -1) {
int hash = getDataHash(data);
long offset = dataFile.storeData(data);
id = idFile.storeOffset(offset);
hashFile.storeID(hash, id);
if (logger.isDebugEnabled()) {
logger.debug("storeData stored thread={} hash={} id={} offset={} summary={}", threadName(), hash, id,
offset, summarize(data));
}
} else if (logger.isDebugEnabled()) {
int hash = getDataHash(data);
logger.debug("storeData reuse thread={} hash={} existingId={} summary={}", threadName(), hash, id,
summarize(data));
}
return id;
}
/**
* Synchronizes any recent changes to the data to disk.
*
* @throws IOException If an I/O error occurred.
*/
public void sync() throws IOException {
if (logger.isDebugEnabled()) {
logger.debug("sync thread={} invoked", threadName());
}
hashFile.sync();
idFile.sync();
dataFile.sync();
}
/**
* Removes all values from the DataStore.
*
* @throws IOException If an I/O error occurred.
*/
public void clear() throws IOException {
if (logger.isDebugEnabled()) {
logger.debug("clear thread={} invoked", threadName());
}
try {
hashFile.clear();
} finally {
try {
idFile.clear();
} finally {
dataFile.clear();
}
}
}
/**
* Closes the DataStore, releasing any file references, etc. In case a transaction is currently open, it will be
* rolled back. Once closed, the DataStore can no longer be used.
*
* @throws IOException If an I/O error occurred.
*/
@Override
public void close() throws IOException {
try {
hashFile.close();
} finally {
try {
idFile.close();
} finally {
dataFile.close();
}
}
}
/**
* Gets a hash code for the supplied data.
*
* @param data The data to calculate the hash code for.
* @return A hash code for the supplied data.
*/
private static String summarize(byte[] data) {
if (data == null) {
return "null";
}
return "len=" + data.length + ",hash=" + Arrays.hashCode(data);
}
private static String threadName() {
return Thread.currentThread().getName();
}
private int getDataHash(byte[] data) {
CRC32 crc32 = new CRC32();
crc32.update(data);
return (int) crc32.getValue();
}
/*--------------------*
* Test/debug methods *
*--------------------*/
public static void main(String[] args) throws Exception {
if (args.length < 2) {
System.err.println(
"Usage: java org.eclipse.rdf4j.sesame.sailimpl.nativerdf.datastore.DataStore <data-dir> <file-prefix>");
return;
}
System.out.println("Dumping DataStore contents...");
File dataDir = new File(args[0]);
DataFile.DataIterator iter;
try (DataStore dataStore = new DataStore(dataDir, args[1])) {
iter = dataStore.dataFile.iterator();
while (iter.hasNext()) {
byte[] data = iter.next();
System.out.println(ByteArrayUtil.toHexString(data));
}
}
}
}