HashFile.java
/*******************************************************************************
* Copyright (c) 2015 Eclipse RDF4J contributors, Aduna, and others.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Distribution License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: BSD-3-Clause
*******************************************************************************/
package org.eclipse.rdf4j.sail.nativerdf.datastore;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.Arrays;
import java.util.BitSet;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.eclipse.rdf4j.common.io.NioFile;
/**
* Class supplying access to a hash file.
*
* @author Arjohn Kampman
*/
public class HashFile implements Closeable {
/*-----------*
* Constants *
*-----------*/
// The size of an item (32-bit hash + 32-bit ID), in bytes
private static final int ITEM_SIZE = 8;
/**
* Magic number "Native Hash File" to detect whether the file is actually a hash file. The first three bytes of the
* file should be equal to this magic number.
*/
private static final byte[] MAGIC_NUMBER = new byte[] { 'n', 'h', 'f' };
/**
* File format version, stored as the fourth byte in hash files.
*/
private static final byte FILE_FORMAT_VERSION = 1;
/**
* The size of the file header in bytes. The file header contains the following data: magic number (3 bytes) file
* format version (1 byte), number of buckets (4 bytes), bucket size (4 bytes) and number of stored items (4 bytes).
*/
private static final long HEADER_LENGTH = 16;
private static final int INIT_BUCKET_SIZE = 8;
/*-----------*
* Variables *
*-----------*/
private final NioFile nioFile;
private final boolean forceSync;
// The number of (non-overflow) buckets in the hash file
private volatile int bucketCount;
// The number of items that can be stored in a bucket
private final int bucketSize;
// The number of items in the hash file
private volatile int itemCount;
// Load factor (fixed, for now)
private final float loadFactor;
// recordSize = ITEM_SIZE * bucketSize + 4
private final int recordSize;
// first prime > 5MB
private final BitSet poorMansBloomFilter;
boolean loadedHashFileFromDisk = false;
/**
* A read/write lock that is used to prevent structural changes to the hash file while readers are active in order
* to prevent concurrency issues.
*/
private final ReentrantReadWriteLock structureLock = new ReentrantReadWriteLock();
/*--------------*
* Constructors *
*--------------*/
public HashFile(File file) throws IOException {
this(file, false);
}
public HashFile(File file, boolean forceSync) throws IOException {
this(file, forceSync, 512); // 512 is default initial size
}
public HashFile(File file, boolean forceSync, int initialSize) throws IOException {
this.nioFile = new NioFile(file);
this.forceSync = forceSync;
loadFactor = 0.75f;
try {
if (nioFile.size() == 0L) {
// Empty file, insert bucket count, bucket size
// and item count at the start of the file
// the bucket count handles sizes not divisible by INIT_BUCKET_SIZE
bucketCount = (int) Math.ceil(initialSize * 1.0 / INIT_BUCKET_SIZE);
bucketSize = INIT_BUCKET_SIZE;
itemCount = 0;
recordSize = ITEM_SIZE * bucketSize + 4;
// Initialize the file by writing <_bucketCount> empty buckets
writeEmptyBuckets(HEADER_LENGTH, bucketCount);
sync();
} else {
// Read bucket count, bucket size and item count from the file
ByteBuffer buf = ByteBuffer.allocate((int) HEADER_LENGTH);
nioFile.read(buf, 0L);
buf.rewind();
if (buf.remaining() < HEADER_LENGTH) {
throw new IOException("File too short to be a compatible hash file");
}
byte[] magicNumber = new byte[MAGIC_NUMBER.length];
buf.get(magicNumber);
byte version = buf.get();
bucketCount = buf.getInt();
bucketSize = buf.getInt();
itemCount = buf.getInt();
if (!Arrays.equals(MAGIC_NUMBER, magicNumber)) {
throw new IOException("File doesn't contain compatible hash file data");
}
if (version > FILE_FORMAT_VERSION) {
throw new IOException("Unable to read hash file; it uses a newer file format");
} else if (version != FILE_FORMAT_VERSION) {
throw new IOException("Unable to read hash file; invalid file format version: " + version);
}
recordSize = ITEM_SIZE * bucketSize + 4;
loadedHashFileFromDisk = itemCount > 0;
}
if (!loadedHashFileFromDisk) {
// 41943049 is ~5MB, and a prime
if (initialSize > 41943049) {
// initialSize < Integer.MAX_VALUE and Integer.MAX_VALUE = ~250 MB
poorMansBloomFilter = new BitSet(initialSize);
} else {
poorMansBloomFilter = new BitSet(41943049);
}
} else {
poorMansBloomFilter = null;
}
} catch (IOException e) {
this.nioFile.close();
throw e;
}
}
/*---------*
* Methods *
*---------*/
public File getFile() {
return nioFile.getFile();
}
public int getItemCount() {
return itemCount;
}
/**
* Gets an iterator that iterates over the IDs with hash codes that match the specified hash code.
*/
public IDIterator getIDIterator(int hash) throws IOException {
if (!loadedHashFileFromDisk && !poorMansBloomFilter.get(getBloomFilterIndex(hash))) {
return emptyIDIterator;
} else {
return new IDIterator(hash);
}
}
private int getBloomFilterIndex(int hash) {
return Math.abs(hash) % poorMansBloomFilter.size();
}
/**
* Stores ID under the specified hash code in this hash file.
*/
public void storeID(int hash, int id) throws IOException {
structureLock.readLock().lock();
if (!loadedHashFileFromDisk) {
poorMansBloomFilter.set(getBloomFilterIndex(hash), true);
}
try {
// Calculate bucket offset for initial bucket
long bucketOffset = getBucketOffset(hash);
storeID(bucketOffset, hash, id);
} finally {
structureLock.readLock().unlock();
}
if (++itemCount >= loadFactor * bucketCount * bucketSize) {
structureLock.writeLock().lock();
try {
increaseHashTable();
} finally {
structureLock.writeLock().unlock();
}
}
}
private void storeID(long bucketOffset, int hash, int id) throws IOException {
ByteBuffer bucket = ByteBuffer.allocate(recordSize);
while (true) {
nioFile.read(bucket, bucketOffset);
// Find first empty slot in bucket
int slotID = findEmptySlotInBucket(bucket);
if (slotID >= 0) {
// Empty slot found, store dataOffset in it
ByteBuffer diff = ByteBuffer.allocate(8);
diff.putInt(hash);
diff.putInt(id);
diff.rewind();
nioFile.write(diff, bucketOffset + ITEM_SIZE * slotID);
break;
} else {
// No empty slot found, check if bucket has an overflow bucket
int overflowID = bucket.getInt(ITEM_SIZE * bucketSize);
if (overflowID == 0) {
// No overflow bucket yet, create one
overflowID = createOverflowBucket();
// Link overflow bucket to current bucket
bucket.putInt(ITEM_SIZE * bucketSize, overflowID);
bucket.rewind();
nioFile.write(bucket, bucketOffset);
}
// Continue searching for an empty slot in the overflow bucket
bucketOffset = getOverflowBucketOffset(overflowID);
bucket.clear();
}
}
}
public void clear() throws IOException {
structureLock.writeLock().lock();
poorMansBloomFilter.clear();
try {
// Truncate the file to remove any overflow buffers
nioFile.truncate(HEADER_LENGTH + (long) bucketCount * recordSize);
// Overwrite normal buckets with empty ones
writeEmptyBuckets(HEADER_LENGTH, bucketCount);
itemCount = 0;
} finally {
structureLock.writeLock().unlock();
}
}
/**
* Syncs any unstored data to the hash file.
*/
public void sync() throws IOException {
structureLock.readLock().lock();
try {
// Update the file header
writeFileHeader();
} finally {
structureLock.readLock().unlock();
}
if (forceSync) {
nioFile.force(false);
}
}
public void sync(boolean force) throws IOException {
sync();
nioFile.force(force);
}
@Override
public void close() throws IOException {
nioFile.close();
}
/*-----------------*
* Utility methods *
*-----------------*/
private RandomAccessFile createEmptyFile(File file) throws IOException {
// Make sure the file exists
if (!file.exists()) {
boolean created = file.createNewFile();
if (!created) {
throw new IOException("Failed to create file " + file);
}
}
// Open the file in read-write mode and make sure the file is empty
RandomAccessFile raf = new RandomAccessFile(file, "rw");
raf.setLength(0L);
return raf;
}
/**
* Writes the bucket count, bucket size and item count to the file header.
*/
private void writeFileHeader() throws IOException {
ByteBuffer buf = ByteBuffer.allocate((int) HEADER_LENGTH);
buf.put(MAGIC_NUMBER);
buf.put(FILE_FORMAT_VERSION);
buf.putInt(bucketCount);
buf.putInt(bucketSize);
buf.putInt(itemCount);
buf.rewind();
nioFile.write(buf, 0L);
}
/**
* Returns the offset of the bucket for the specified hash code.
*/
private long getBucketOffset(int hash) {
int bucketNo = hash % bucketCount;
if (bucketNo < 0) {
bucketNo += bucketCount;
}
return HEADER_LENGTH + (long) bucketNo * recordSize;
}
/**
* Returns the offset of the overflow bucket with the specified ID.
*/
private long getOverflowBucketOffset(int bucketID) {
return HEADER_LENGTH + ((long) bucketCount + (long) bucketID - 1L) * recordSize;
}
/**
* Creates a new overflow bucket and returns its ID.
*/
private int createOverflowBucket() throws IOException {
long offset = nioFile.size();
writeEmptyBuckets(offset, 1);
return (int) ((offset - HEADER_LENGTH) / recordSize) - bucketCount + 1;
}
private void writeEmptyBuckets(long fileOffset, int bucketCount) throws IOException {
ByteBuffer emptyBucket = ByteBuffer.allocate(recordSize);
for (int i = 0; i < bucketCount; i++) {
nioFile.write(emptyBucket, fileOffset + i * (long) recordSize);
emptyBucket.rewind();
}
}
private int findEmptySlotInBucket(ByteBuffer bucket) {
for (int slotNo = 0; slotNo < bucketSize; slotNo++) {
// Check for offsets that are equal to 0
if (bucket.getInt(ITEM_SIZE * slotNo + 4) == 0) {
return slotNo;
}
}
return -1;
}
/**
* Double the number of buckets in the hash file and rehashes the stored items.
*/
private void increaseHashTable() throws IOException {
long oldTableSize = HEADER_LENGTH + (long) bucketCount * recordSize;
long newTableSize = HEADER_LENGTH + (long) bucketCount * recordSize * 2;
long oldFileSize = nioFile.size(); // includes overflow buckets
// Move any overflow buckets out of the way to a temporary file
File tmpFile = new File(getFile().getParentFile(), "rehash_" + getFile().getName());
try (RandomAccessFile tmpRaf = createEmptyFile(tmpFile)) {
FileChannel tmpChannel = tmpRaf.getChannel();
// Transfer the overflow buckets to the temp file
// FIXME: work around java bug 6431344:
// "FileChannel.transferTo() doesn't work if address space runs out"
nioFile.transferTo(oldTableSize, oldFileSize - oldTableSize, tmpChannel);
// Increase hash table by factor 2
writeEmptyBuckets(oldTableSize, bucketCount);
bucketCount *= 2;
// Discard any remaining overflow buffers
nioFile.truncate(newTableSize);
ByteBuffer bucket = ByteBuffer.allocate(recordSize);
ByteBuffer newBucket = ByteBuffer.allocate(recordSize);
// Rehash items in non-overflow buckets, half of these will move to a
// new location, but none of them will trigger the creation of new overflow
// buckets. Any (now deprecated) references to overflow buckets are
// removed too.
// All items that are moved to a new location end up in one and the same
// new and empty bucket. All items are divided between the old and the
// new bucket and the changes to the buckets are written to disk only once.
for (long bucketOffset = HEADER_LENGTH; bucketOffset < oldTableSize; bucketOffset += recordSize) {
nioFile.read(bucket, bucketOffset);
boolean bucketChanged = false;
long newBucketOffset = 0L;
for (int slotNo = 0; slotNo < bucketSize; slotNo++) {
int id = bucket.getInt(ITEM_SIZE * slotNo + 4);
if (id != 0) {
// Slot is not empty
int hash = bucket.getInt(ITEM_SIZE * slotNo);
long newOffset = getBucketOffset(hash);
if (newOffset != bucketOffset) {
// Move this item to new bucket...
newBucket.putInt(hash);
newBucket.putInt(id);
// ...and remove it from the current bucket
bucket.putInt(ITEM_SIZE * slotNo, 0);
bucket.putInt(ITEM_SIZE * slotNo + 4, 0);
bucketChanged = true;
newBucketOffset = newOffset;
}
}
}
if (bucketChanged) {
// Some of the items were moved to the new bucket, write it to
// the file
newBucket.flip();
nioFile.write(newBucket, newBucketOffset);
newBucket.clear();
}
// Reset overflow ID in the old bucket to 0 if necessary
if (bucket.getInt(ITEM_SIZE * bucketSize) != 0) {
bucket.putInt(ITEM_SIZE * bucketSize, 0);
bucketChanged = true;
}
if (bucketChanged) {
// Some of the items were moved to the new bucket or the
// overflow ID has been reset; write the bucket back to the file
bucket.rewind();
nioFile.write(bucket, bucketOffset);
}
bucket.clear();
} // Rehash items in overflow buckets. This might trigger the creation of
// new overflow buckets so we can't optimize this in the same way as we
// rehash the normal buckets.
long tmpFileSize = tmpChannel.size();
for (long bucketOffset = 0L; bucketOffset < tmpFileSize; bucketOffset += recordSize) {
tmpChannel.read(bucket, bucketOffset);
for (int slotNo = 0; slotNo < bucketSize; slotNo++) {
int id = bucket.getInt(ITEM_SIZE * slotNo + 4);
if (id != 0) {
// Slot is not empty
int hash = bucket.getInt(ITEM_SIZE * slotNo);
long newBucketOffset = getBucketOffset(hash);
// Copy this item to its new location
storeID(newBucketOffset, hash, id);
}
}
bucket.clear();
}
// Discard the temp file
}
tmpFile.delete();
}
/*------------------------*
* Inner class IDIterator *
*------------------------*/
private final IDIterator emptyIDIterator = new IDIterator() {
@Override
public void close() {
}
@Override
public int next() {
return -1;
}
};
public class IDIterator {
private final int queryHash;
private ByteBuffer bucketBuffer;
private int slotNo;
private IDIterator(int hash) throws IOException {
queryHash = hash;
bucketBuffer = ByteBuffer.allocate(recordSize);
structureLock.readLock().lock();
try {
// Read initial bucket
long bucketOffset = getBucketOffset(hash);
nioFile.read(bucketBuffer, bucketOffset);
slotNo = -1;
} catch (IOException | RuntimeException e) {
structureLock.readLock().unlock();
throw e;
}
}
IDIterator() {
queryHash = 0;
}
public void close() {
bucketBuffer = null;
structureLock.readLock().unlock();
}
/**
* Returns the next ID that has been mapped to the specified hash code, or <var>-1</var> if no more IDs were
* found.
*/
public int next() throws IOException {
while (bucketBuffer != null) {
// Search in current bucket
while (++slotNo < bucketSize) {
if (bucketBuffer.getInt(ITEM_SIZE * slotNo) == queryHash) {
return bucketBuffer.getInt(ITEM_SIZE * slotNo + 4);
}
}
// No matching hash code in current bucket, check overflow
// bucket
int overflowID = bucketBuffer.getInt(ITEM_SIZE * bucketSize);
if (overflowID == 0) {
// No overflow bucket, end the search
bucketBuffer = null;
break;
} else {
// Continue with overflow bucket
bucketBuffer.clear();
long bucketOffset = getOverflowBucketOffset(overflowID);
nioFile.read(bucketBuffer, bucketOffset);
slotNo = -1;
}
}
return -1;
}
} // End inner class IDIterator
} // End class HashFile