SequentialRecordCache.java
/*******************************************************************************
* Copyright (c) 2015 Eclipse RDF4J contributors, Aduna, and others.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Distribution License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: BSD-3-Clause
*******************************************************************************/
package org.eclipse.rdf4j.sail.nativerdf;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.EnumSet;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
import org.eclipse.rdf4j.common.io.NioFile;
import org.eclipse.rdf4j.sail.nativerdf.btree.RecordIterator;
/**
* A cache for fixed size byte array records. This cache uses a temporary file to store the records. This file is
* deleted upon calling {@link #discard()}.
*
* @author Arjohn Kampman
*/
final class SequentialRecordCache extends AbstractRecordCache {
private final static AtomicLong TEMP_FILE_COUNTER = new AtomicLong();
private final static String TEMP_FILE_PREFIX = "txncache" + UUID.randomUUID().toString().replace("-", "");
private final static String TEMP_FILE_SUFFIX = ".dat";
private static final EnumSet<StandardOpenOption> FILE_OPEN_OPTIONS = EnumSet.of(
StandardOpenOption.READ,
StandardOpenOption.WRITE,
StandardOpenOption.CREATE_NEW,
StandardOpenOption.DELETE_ON_CLOSE
);
/**
* Magic number "Sequential Record Cache" to detect whether the file is actually a sequential record cache file. The
* first three bytes of the file should be equal to this magic number.
*/
private static final byte[] MAGIC_NUMBER = { 's', 'r', 'c' };
/**
* The file format version number, stored as the fourth byte in sequential record cache files.
*/
private static final byte FILE_FORMAT_VERSION = 1;
// 3 bytes for magic number and 1 byte for file format version
private static final int HEADER_LENGTH = MAGIC_NUMBER.length + 1;
/*------------*
* Attributes *
*------------*/
private final int recordSize;
private long currentSize;
private long extendedSize;
private final NioFile nioFile;
/*--------------*
* Constructors *
*--------------*/
public SequentialRecordCache(File cacheDir, int recordSize) throws IOException {
this(cacheDir, recordSize, Long.MAX_VALUE);
}
public SequentialRecordCache(File cacheDir, int recordSize, long maxRecords) throws IOException {
super(maxRecords);
this.recordSize = recordSize;
Path path = Paths.get(cacheDir.getCanonicalPath(),
TEMP_FILE_PREFIX + TEMP_FILE_COUNTER.incrementAndGet() + TEMP_FILE_SUFFIX);
this.nioFile = new NioFile(path, FILE_OPEN_OPTIONS);
// Write file header
append(MAGIC_NUMBER);
append(new byte[] { FILE_FORMAT_VERSION });
}
private void append(byte[] data) throws IOException {
if (currentSize >= extendedSize - recordSize) {
long newExtendedSize = (((currentSize + recordSize) / BLOCK_SIZE) + 1) * BLOCK_SIZE;
assert newExtendedSize > extendedSize;
int bytesToWrite = (int) (newExtendedSize - extendedSize);
nioFile.writeBytes(new byte[bytesToWrite], currentSize);
extendedSize += bytesToWrite;
}
nioFile.writeBytes(data, currentSize);
currentSize += data.length;
}
@Override
public void discard() throws IOException {
nioFile.delete();
}
@Override
protected void clearInternal() throws IOException {
nioFile.truncate(HEADER_LENGTH);
currentSize = nioFile.size();
extendedSize = currentSize;
}
@Override
protected void storeRecordInternal(byte[] data) throws IOException {
append(data);
}
@Override
protected RecordIterator getRecordsInternal() {
return new RecordCacheIterator();
}
/*---------------------------------*
* Inner class RecordCacheIterator *
*---------------------------------*/
protected class RecordCacheIterator implements RecordIterator {
private long position = HEADER_LENGTH;
private long readAheadBufferPosition = HEADER_LENGTH;
private final ByteBuffer readAheadBuffer;
{
readAheadBuffer = ByteBuffer.allocate(BLOCK_SIZE + recordSize);
readAheadBuffer.position(readAheadBuffer.limit());
}
private void fillReadAheadBuffer() throws IOException {
while (readAheadBuffer.limit() - readAheadBuffer.position() < recordSize
&& currentSize > readAheadBufferPosition) {
readAheadBuffer.compact();
// align with 4K boundary
if (readAheadBufferPosition == HEADER_LENGTH) {
readAheadBuffer.limit(readAheadBuffer.limit() - HEADER_LENGTH);
} else if (readAheadBufferPosition % BLOCK_SIZE != 0) {
readAheadBuffer.limit(
BLOCK_SIZE - (int) (readAheadBufferPosition % BLOCK_SIZE) + readAheadBuffer.position());
} else {
readAheadBuffer.limit(BLOCK_SIZE + readAheadBuffer.position());
}
int read = nioFile.read(readAheadBuffer, readAheadBufferPosition);
readAheadBufferPosition += read;
readAheadBuffer.flip();
// override the limit if we have read outside the actual size (e.g. into the extended area).
if (readAheadBufferPosition > currentSize) {
readAheadBuffer.limit((int) (readAheadBuffer.limit() - (readAheadBufferPosition - currentSize)));
readAheadBufferPosition = currentSize;
}
if (read <= 0) {
break;
}
}
}
@Override
public byte[] next() throws IOException {
fillReadAheadBuffer();
if (!readAheadBuffer.hasRemaining()) {
return null;
}
byte[] bytes = new byte[recordSize];
readAheadBuffer.get(bytes);
position += recordSize;
return bytes;
}
@Override
public void set(byte[] value) throws IOException {
if (position >= HEADER_LENGTH + recordSize && position <= currentSize) {
nioFile.writeBytes(value, position - recordSize);
}
}
@Override
public void close() {
}
}
}