OffHeapChunkedMap.java

/*
 *      Copyright (C) 2014 Robert Stupp, Koeln, Germany, robert-stupp.de
 *
 *   Licensed under the Apache License, Version 2.0 (the "License");
 *   you may not use this file except in compliance with the License.
 *   You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 *   Unless required by applicable law or agreed to in writing, software
 *   distributed under the License is distributed on an "AS IS" BASIS,
 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *   See the License for the specific language governing permissions and
 *   limitations under the License.
 */
package org.caffinitas.ohc.chunked;

import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;

import com.google.common.collect.AbstractIterator;
import com.google.common.primitives.Ints;

import org.caffinitas.ohc.CacheSerializer;
import org.caffinitas.ohc.OHCacheBuilder;
import org.caffinitas.ohc.Ticker;
import org.caffinitas.ohc.histo.EstimatedHistogram;
import sun.nio.ch.DirectBuffer;

import static org.caffinitas.ohc.util.ByteBufferCompat.*;

final class OffHeapChunkedMap
{
    // maximum hash table size
    private static final int TWO_POWER_30 = 1 << 30;

    private final int fixedKeySize;
    private final int fixedValueSize;

    private long size;

    private long hitCount;
    private long missCount;
    private long putAddCount;
    private long putReplaceCount;
    private long removeCount;

    private final float loadFactor;

    private long rehashes;
    private long evictedEntries;
    private long expiredEntries;

    // Replacement for Unsafe.monitorEnter/monitorExit. Uses the thread-ID to indicate a lock
    // using a CAS operation on the primitive instance field.
    private final boolean unlocked;
    private volatile long lock;
    private static final AtomicLongFieldUpdater<OffHeapChunkedMap> lockFieldUpdater =
    AtomicLongFieldUpdater.newUpdater(OffHeapChunkedMap.class, "lock");

    private final boolean throwOOME;

    private final Ticker ticker;

    private Table table;

    private long threshold;

    private int capacity;
    private int freeCapacity;

    private final int chunkDataSize;
    private final int chunkFullSize;

    private final int chunkCount;

    private int chunksUsed;
    private int writeChunk;
    private int writeChunkOffset;
    private int writeChunkFree;

    private final ByteBuffer memory;

    OffHeapChunkedMap(OHCacheBuilder builder, long freeCapacity, long chunkSize)
    {
        this.throwOOME = builder.isThrowOOME();

        this.ticker = builder.getTicker();

        this.unlocked = builder.isUnlocked();

        float lf = builder.getLoadFactor();
        if (lf <= .0d)
            lf = .75f;
        this.loadFactor = lf;

        if (freeCapacity > TWO_POWER_30)
            throw new IllegalArgumentException("Segment too big (max 2^30)");

        this.fixedKeySize = builder.getFixedKeySize();
        this.fixedValueSize = builder.getFixedValueSize();

        this.chunkDataSize = Ints.checkedCast(chunkSize);
        this.chunkFullSize = Ints.checkedCast(chunkSize + Util.CHUNK_OFF_DATA);
        this.chunkCount = (int) (freeCapacity / chunkSize);
        this.capacity = Ints.checkedCast(chunkCount * chunkSize);
        this.freeCapacity = this.capacity;
        int allocSize = Ints.checkedCast((long)chunkCount * (long)chunkFullSize);
        memory = Uns.allocate(allocSize, throwOOME);

        for (int i = 0; i < chunkCount; i++)
            resetChunk(i);
        initWriteChunk(0);
        chunksUsed = 1;

        int hts = builder.getHashTableSize();
        if (hts <= 0)
            hts = 8192;
        if (hts < 256)
            hts = 256;
        int msz = Ints.checkedCast(Util.roundUpToPowerOf2(hts, TWO_POWER_30));
        table = createTable(msz, throwOOME);
        if (table == null)
            throw new RuntimeException("unable to allocate off-heap memory for segment");

        threshold = (long) ((double) table.size() * loadFactor);
    }

    long size()
    {
        return size;
    }

    long hitCount()
    {
        return hitCount;
    }

    long missCount()
    {
        return missCount;
    }

    long putAddCount()
    {
        return putAddCount;
    }

    long putReplaceCount()
    {
        return putReplaceCount;
    }

    long removeCount()
    {
        return removeCount;
    }

    void resetStatistics()
    {
        rehashes = 0L;
        evictedEntries = 0L;
        expiredEntries = 0L;
        hitCount = 0L;
        missCount = 0L;
        putAddCount = 0L;
        putReplaceCount = 0L;
        removeCount = 0L;
    }

    long rehashes()
    {
        return rehashes;
    }

    long evictedEntries()
    {
        return evictedEntries;
    }

    long expiredEntries()
    {
        return expiredEntries;
    }

    float loadFactor()
    {
        return loadFactor;
    }

    void release()
    {
        boolean wasFirst = lock(); 
        try
        {
            Uns.free(memory);

            table.release();
            table = null;
        }
        finally
        {
            unlock(wasFirst); 
        }
    }

    long freeCapacity()
    {
        return freeCapacity;
    }

    Object getEntry(KeyBuffer key, CacheSerializer<?> valueSerializer)
    {
        int hashEntryOffset;
        ByteBuffer serBuffer = null;

        boolean wasFirst = lock(); 
        try
        {
            for (hashEntryOffset = table.getFirst(key.hash());
                 hashEntryOffset != 0L;
                 hashEntryOffset = getNext(hashEntryOffset))
            {
                if (notSameKey(key, hashEntryOffset))
                    continue;

                // return existing entry

                if (isEntryRemoved(hashEntryOffset))
                    // there can be a _new_ entry superseding the removed one
                    continue;

                hitCount++;

                if (valueSerializer == null)
                    return Boolean.TRUE;

                touch(hashEntryOffset);

                int keyLen = getKeyLen(hashEntryOffset);
                int valueLen = getValueLen(hashEntryOffset);
                int hashEntryValueOffset = hashEntryOffset + Util.entryOffData(isFixedSize()) + keyLen;
                serBuffer = ByteBuffer.allocate(valueLen);

                Uns.copyMemory(((DirectBuffer)memory).address(), hashEntryValueOffset, serBuffer.array(), 0, valueLen);
                byteBufferLimit(serBuffer, valueLen);

                break;
            }

            if (hashEntryOffset == 0)
            {
                // not found
                missCount++;
                return null;
            }
        }
        finally
        {
            unlock(wasFirst); 
        }

        return valueSerializer.deserialize(serBuffer);
    }

    boolean putEntry(ByteBuffer newHashEntry, long hash, int keyLen, int entryBytes, boolean ifAbsent, int oldValueLen)
    {
        boolean wasFirst = lock(); 
        try
        {
            int hashEntryOffset;
            int prevEntryOffset = 0;
            for (hashEntryOffset = table.getFirst(hash);
                 hashEntryOffset != 0L;
                 prevEntryOffset = hashEntryOffset, hashEntryOffset = getNext(hashEntryOffset))
            {
                if (notSameKey(newHashEntry, hash, keyLen, hashEntryOffset))
                    continue;

                // replace existing entry

                if (!isEntryRemoved(hashEntryOffset))
                {
                    if (ifAbsent)
                        return false;

                    int valueOffset = Util.entryOffData(isFixedSize()) + keyLen;
                    int hashEntryValueOffset = hashEntryOffset + valueOffset;
                    if (oldValueLen != 0)
                    {
                        // code for replace() operation
                        int valueLen = getValueLen(hashEntryOffset);
                        if (valueLen != oldValueLen ||
                            !compare(hashEntryValueOffset, newHashEntry, entryBytes, oldValueLen))
                            return false;
                    }

                    if (getValueReservedLen(hashEntryOffset) >= getValueLen(newHashEntry))
                    {
                        // just overwrite the old value if it fits

                        if (!isFixedSize())
                            setValueLen(hashEntryOffset, getValueLen(newHashEntry));

                        Uns.copyMemory(newHashEntry.array(), valueOffset,
                                       ((DirectBuffer)memory).address(), hashEntryValueOffset,
                                       entryBytes - valueOffset);

                        putReplaceCount++;

                        return true;
                    }
                }

                removeInternal(hashEntryOffset, prevEntryOffset);

                break;
            }

            if (writeChunkFree < entryBytes)
            {
                if (chunksUsed >= chunkCount)
                {
                    // find oldest chunk
                    long minTS = Long.MAX_VALUE;
                    int eldestChunk = 0;
                    for (int i = 0; i < chunkCount; i++)
                    {
                        long ts = lastUsed(i);
                        if (ts < minTS)
                        {
                            eldestChunk = i;
                            minTS = ts;
                        }
                    }

                    int entries = entriesInChunk(eldestChunk);
                    int removed = 0;
                    for (int nextOff, i = 0, off = chunkOffset(eldestChunk) + Util.CHUNK_OFF_DATA;
                         i < entries; i++, off = nextOff)
                    {
                        nextOff = nextHashEntryOffset(off);
                        if (!isEntryRemoved(off))
                        {
                            // removed elements have a value length of -1
                            removeInternal(off, -1);
                            removed ++;
                        }
                    }

                    // record statistics
                    evictedEntries += entries;
                    size -= removed;
                    freeCapacity += bytesInChunk(eldestChunk);
                    initWriteChunk(eldestChunk);
                }
                else
                {
                    // Initially not all chunks have been used.
                    // So use all "virgin" chunks first before starting eviction.
                    initWriteChunk(writeChunk + 1);
                    chunksUsed++;
                }
            }

            if (hashEntryOffset == 0L)
            {
                if (size >= threshold)
                    rehash();

                size++;
            }

            if (hashEntryOffset == 0L)
                putAddCount++;
            else
                putReplaceCount++;

            // just overwrite the old value if it fits
            hashEntryOffset = chunkOffset(writeChunk) + writeChunkOffset;

            Uns.copyMemory(newHashEntry.array(), newHashEntry.position(),
                           ((DirectBuffer)memory).address(), hashEntryOffset,
                           entryBytes - newHashEntry.position());

            writeChunkOffset += entryBytes;
            writeChunkFree -= entryBytes;

            freeCapacity -= entryBytes;

            entryAdded(writeChunk, entryBytes);

            table.addAsHead(hash, hashEntryOffset);

            return true;
        }
        finally
        {
            unlock(wasFirst); 
        }
    }

    void clear()
    {
        boolean wasFirst = lock(); 
        try
        {
            size = 0L;
            freeCapacity = capacity;

            table.clear();

            initWriteChunk(0);
            chunksUsed = 1;
        }
        finally
        {
            unlock(wasFirst); 
        }
    }

    private void initWriteChunk(int newWriteChunk)
    {
        writeChunk = newWriteChunk;
        writeChunkFree = chunkDataSize;
        writeChunkOffset = Util.CHUNK_OFF_DATA;
        resetChunk(newWriteChunk);
    }

    boolean removeEntry(KeyBuffer key)
    {
        boolean wasFirst = lock(); 
        try
        {
            int prevEntryOffset = 0;
            for (int hashEntryOffset = table.getFirst(key.hash());
                 hashEntryOffset != 0;
                 prevEntryOffset = hashEntryOffset, hashEntryOffset = getNext(hashEntryOffset))
            {
                if (notSameKey(key, hashEntryOffset))
                    continue;

                if (isEntryRemoved(hashEntryOffset))
                    // there can be a _new_ entry superseding the removed one
                    continue;

                // remove existing entry

                removeInternal(hashEntryOffset, prevEntryOffset);

                size--;
                removeCount++;

                return true;
            }

            return false;
        }
        finally
        {
            unlock(wasFirst); 
        }
    }

    private boolean notSameKey(KeyBuffer key, int hashEntryOffset)
    {
        if (getHash(hashEntryOffset) != key.hash()) return true;

        int serKeyLen = getKeyLen(hashEntryOffset);
        return serKeyLen != key.size()
               || !compareKey(hashEntryOffset, key);
    }

    private boolean notSameKey(ByteBuffer hashEntry, long newHash, int newKeyLen, int hashEntryOffset)
    {
        if (getHash(hashEntry) != newHash) return true;

        int serKeyLen = getKeyLen(hashEntryOffset);
        return serKeyLen != newKeyLen
               || !compare(hashEntryOffset + Util.entryOffData(isFixedSize()),
                           hashEntry, Util.entryOffData(isFixedSize()), serKeyLen);
    }

    private void rehash()
    {
        Table tab = table;
        int tableSize = tab.size();
        if (tableSize > TWO_POWER_30)
        {
            // already at max hash table size
            return;
        }

        Table newTable = createTable(tableSize * 2, throwOOME);
        if (newTable == null)
            return;
        int next;

        for (int part = 0; part < tableSize; part++)
            for (int hashEntryOffset = tab.getFirst(part);
                 hashEntryOffset != 0L;
                 hashEntryOffset = next)
            {
                next = getNext(hashEntryOffset);

                setNext(hashEntryOffset, 0);

                newTable.addAsHead(getHash(hashEntryOffset), hashEntryOffset);
            }

        threshold = (long) ((float) newTable.size() * loadFactor);
        table.release();
        table = newTable;
        rehashes++;
    }

    int hashTableSize()
    {
        return table.size();
    }

    void updateBucketHistogram(EstimatedHistogram hist)
    {
        boolean wasFirst = lock(); 
        try
        {
            table.updateBucketHistogram(hist);
        }
        finally
        {
            unlock(wasFirst); 
        }
    }

    private Table createTable(int hashTableSize, boolean throwOOME)
    {
        int msz = Table.BUCKET_ENTRY_LEN * hashTableSize;
        ByteBuffer table = Uns.allocate(msz, throwOOME);
        return table == null ? null : new Table(table, hashTableSize);
    }

    private final class Table
    {
        final int mask;
        final ByteBuffer table;
        private boolean released;

        static final int BUCKET_ENTRY_LEN = 4;

        private Table(ByteBuffer table, int hashTableSize)
        {
            this.table = table;
            this.mask = hashTableSize - 1;
            clear();
        }

        void clear()
        {
            // It's important to initialize the hash table memory.
            // (uninitialized memory will cause problems - endless loops, JVM crashes, damaged data, etc)
            byteBufferClear(table);
            while (table.remaining() > 8)
                table.putLong(0L);
            while (table.remaining() > 0)
                table.put((byte) 0);
        }

        void release()
        {
            Uns.free(table);
            released = true;
        }

        protected void finalize() throws Throwable
        {
            if (!released)
                Uns.free(table);
            super.finalize();
        }

        int getFirst(long hash)
        {
            return table.getInt(bucketOffset(hash));
        }

        void setFirst(long hash, int hashEntryOffset)
        {
            table.putInt(bucketOffset(hash), hashEntryOffset);
        }

        private int bucketOffset(long hash)
        {
            return bucketIndexForHash(hash) * BUCKET_ENTRY_LEN;
        }

        private int bucketIndexForHash(long hash)
        {
            return (int) (hash & mask);
        }

        void removeLink(long hash, int hashEntryOffset, int prevEntryOffset)
        {
            int next = getNext(hashEntryOffset);

            int head = getFirst(hash);
            if (head == hashEntryOffset)
            {
                setFirst(hash, next);
            }
            else if (prevEntryOffset != 0)
            {
                if (prevEntryOffset == -1)
                {
                    for (int offset = head;
                         offset != 0L;
                         prevEntryOffset = offset, offset = getNext(offset))
                    {
                        if (offset == hashEntryOffset)
                            break;
                    }
                }
                setNext(prevEntryOffset, next);
            }
        }

        void addAsHead(long hash, int hashEntryOffset)
        {
            int head = getFirst(hash);
            setNext(hashEntryOffset, head);
            setFirst(hash, hashEntryOffset);
        }

        int size()
        {
            return mask + 1;
        }

        void updateBucketHistogram(EstimatedHistogram h)
        {
            for (int i = 0; i < size(); i++)
            {
                int len = 0;
                for (int off = getFirst(i); off != 0L; off = getNext(off))
                    len++;
                h.add(len + 1);
            }
        }
    }

    private void removeInternal(int hashEntryOffset, int prevEntryOffset)
    {
        long hash = getHash(hashEntryOffset);

        table.removeLink(hash, hashEntryOffset, prevEntryOffset);

        setEntryRemoved(hashEntryOffset);
    }

    @Override
    public String toString()
    {
        return String.valueOf(size);
    }

    //
    // snapshot iterator
    //

    private final class ChunkSnapshotIterator extends AbstractIterator<ByteBuffer>
    {
        private final int entries;
        private final int limit;
        private final ByteBuffer snaphotBuffer;
        private int n;
        private int pos;

        ChunkSnapshotIterator(int limit, ByteBuffer snaphotBuffer)
        {
            this.snaphotBuffer = snaphotBuffer;
            this.limit = limit;
            this.entries = snaphotBuffer.getInt(Util.CHUNK_OFF_ENTRIES);
            this.pos = Util.CHUNK_OFF_DATA;
        }

        protected ByteBuffer computeNext()
        {
            while (true)
            {
                if (n == entries || n == limit)
                    return endOfData();

                byteBufferClear(snaphotBuffer);
                int keyLen;
                int reservedLen;
                boolean removed;
                if (isFixedSize())
                {
                    keyLen = fixedKeySize;
                    reservedLen = fixedValueSize;
                    removed = snaphotBuffer.getInt(pos + Util.ENTRY_OFF_REMOVED) == -1;
                }
                else
                {
                    keyLen = snaphotBuffer.getInt(pos + Util.ENTRY_OFF_KEY_LENGTH);
                    removed = snaphotBuffer.getInt(pos + Util.ENTRY_OFF_VALUE_LENGTH) == -1;
                    reservedLen = snaphotBuffer.getInt(pos + Util.ENTRY_OFF_RESERVED_LENGTH);
                }

                int keyOff = pos + Util.entryOffData(isFixedSize());

                pos += Util.allocLen(keyLen, reservedLen, isFixedSize());

                n++;

                // skip removed entries
                if (!removed)
                {

                    byteBufferPosition(snaphotBuffer, keyOff);
                    byteBufferLimit(snaphotBuffer, keyOff + keyLen);
                    return snaphotBuffer;
                }
            }
        }
    }

    Iterator<ByteBuffer> snapshotIterator(final int keysPerChunk, final ByteBuffer snaphotBuffer)
    {
        return new AbstractIterator<ByteBuffer>()
        {
            private int chunk;

            private Iterator<ByteBuffer> snapshotIterator;

            protected ByteBuffer computeNext()
            {
                while (true)
                {
                    if (snapshotIterator != null && snapshotIterator.hasNext())
                        return snapshotIterator.next();

                    if (chunk == chunkCount)
                        return endOfData();

                    boolean wasFirst = lock(); 
                    try
                    {
                        Uns.copyMemory(((DirectBuffer)memory).address(), chunkOffset(chunk), snaphotBuffer.array(), 0, chunkFullSize);
                        byteBufferPosition(snaphotBuffer, 0);
                        byteBufferLimit(snaphotBuffer, chunkFullSize);

                        snapshotIterator = new ChunkSnapshotIterator(keysPerChunk, snaphotBuffer);
                    }
                    finally
                    {
                        unlock(wasFirst); 
                        chunk++;
                    }
                }
            }
        };
    }

    //
    // memory access utils
    //

    private long getHash(int hashEntryOffset)
    {
        return memory.getLong(hashEntryOffset + Util.ENTRY_OFF_HASH);
    }

    private long getHash(ByteBuffer hashEntry)
    {
        return hashEntry.getLong(Util.ENTRY_OFF_HASH);
    }

    private int getNext(int hashEntryOffset)
    {
        return hashEntryOffset != 0 ? memory.getInt(hashEntryOffset + Util.ENTRY_OFF_NEXT) : 0;
    }

    private void setNext(int hashEntryOffset, int nextEntryOffset)
    {
        memory.putInt(hashEntryOffset + Util.ENTRY_OFF_NEXT, nextEntryOffset);
    }

    private boolean isEntryRemoved(int hashEntryOffset)
    {
        return memory.getInt(hashEntryOffset + (isFixedSize() ? Util.ENTRY_OFF_REMOVED : Util.ENTRY_OFF_VALUE_LENGTH)) == -1;
    }

    private void setEntryRemoved(int hashEntryOffset)
    {
        memory.putInt(hashEntryOffset + (isFixedSize() ? Util.ENTRY_OFF_REMOVED : Util.ENTRY_OFF_VALUE_LENGTH), -1);
    }

    private int getKeyLen(int hashEntryOffset)
    {
        if (fixedKeySize > 0)
            return fixedKeySize;
        return memory.getInt(hashEntryOffset + Util.ENTRY_OFF_KEY_LENGTH);
    }

    private int getValueLen(int hashEntryOffset)
    {
        if (fixedKeySize > 0)
            return fixedValueSize;
        return memory.getInt(hashEntryOffset + Util.ENTRY_OFF_VALUE_LENGTH);
    }

    private void setValueLen(int hashEntryOffset, int valueLen)
    {
        if (fixedKeySize == 0)
            memory.putInt(hashEntryOffset + Util.ENTRY_OFF_VALUE_LENGTH, valueLen);
    }

    private int getValueLen(ByteBuffer hashEntry)
    {
        if (fixedValueSize > 0)
            return fixedValueSize;
        return hashEntry.getInt(Util.ENTRY_OFF_VALUE_LENGTH);
    }

    private int getValueReservedLen(int hashEntryOffset)
    {
        if (fixedValueSize > 0)
            return fixedValueSize;
        return memory.getInt(hashEntryOffset + Util.ENTRY_OFF_RESERVED_LENGTH);
    }

    private boolean compare(int memoryOffset, ByteBuffer otherHashEntry, int otherOffset, int len)
    {
        int p = 0;
        for (; p <= len - 8; p += 8, memoryOffset += 8, otherOffset += 8)
            if (memory.getLong(memoryOffset) != otherHashEntry.getLong(otherOffset))
                return false;
        for (; p <= len - 4; p += 4, memoryOffset += 4, otherOffset += 4)
            if (memory.getInt(memoryOffset) != otherHashEntry.getInt(otherOffset))
                return false;
        for (; p <= len - 2; p += 2, memoryOffset += 2, otherOffset += 2)
            if (memory.getShort(memoryOffset) != otherHashEntry.getShort(otherOffset))
                return false;
        for (; p < len; p++, memoryOffset++, otherOffset++)
            if (memory.get(memoryOffset) != otherHashEntry.get(otherOffset))
                return false;

        return true;
    }

    private boolean compareKey(int hashEntryOffset, KeyBuffer key)
    {
        int blkOff = Util.entryOffData(isFixedSize());
        ByteBuffer buf = key.buffer();
        int p = buf.position();
        int endIdx = buf.limit();
        for (; p <= endIdx - 8; p += 8, blkOff += 8)
            if (memory.getLong(hashEntryOffset + blkOff) != buf.getLong(p))
                return false;
        for (; p <= endIdx - 4; p += 4, blkOff += 4)
            if (memory.getInt(hashEntryOffset + blkOff) != buf.getInt(p))
                return false;
        for (; p <= endIdx - 2; p += 2, blkOff += 2)
            if (memory.getShort(hashEntryOffset + blkOff) != buf.getShort(p))
                return false;
        for (; p < endIdx; p++, blkOff++)
            if (memory.get(hashEntryOffset + blkOff) != buf.get(p))
                return false;

        return true;
    }

    private int chunkOffset(int chunkNum)
    {
        return chunkNum * chunkFullSize;
    }

    private void resetChunk(int chunkNum)
    {
        int offset = chunkOffset(chunkNum);
        memory.putLong(offset + Util.CHUNK_OFF_TIMESTAMP, ticker.nanos());
        memory.putInt(offset + Util.CHUNK_OFF_ENTRIES, 0);
        memory.putInt(offset + Util.CHUNK_OFF_BYTES, 0);
    }

    private void touch(int hashEntryOffset)
    {
        touchChunk(chunkNum(hashEntryOffset));
    }

    private int chunkNum(int hashEntryOffset)
    {
        return hashEntryOffset / chunkFullSize;
    }

    private void touchChunk(int chunkNum)
    {
        int offset = chunkOffset(chunkNum);
        memory.putLong(offset + Util.CHUNK_OFF_TIMESTAMP, ticker.nanos());
    }

    private long lastUsed(int chunkNum)
    {
        int offset = chunkOffset(chunkNum);
        return memory.getLong(offset + Util.CHUNK_OFF_TIMESTAMP);
    }

    private void entryAdded(int chunkNum, int bytes)
    {
        int offset = chunkOffset(chunkNum);
        memory.putInt(offset + Util.CHUNK_OFF_ENTRIES, memory.getInt(offset + Util.CHUNK_OFF_ENTRIES) + 1);
        memory.putInt(offset + Util.CHUNK_OFF_BYTES, memory.getInt(offset + Util.CHUNK_OFF_BYTES) + bytes);
    }

    private int entriesInChunk(int chunkNum)
    {
        int offset = chunkOffset(chunkNum);
        return memory.getInt(offset + Util.CHUNK_OFF_ENTRIES);
    }

    private int bytesInChunk(int chunkNum)
    {
        int offset = chunkOffset(chunkNum);
        return memory.getInt(offset + Util.CHUNK_OFF_BYTES);
    }

    private int nextHashEntryOffset(int off)
    {
        int allocLen = Util.allocLen(getKeyLen(off), getValueReservedLen(off), isFixedSize());
        return off + allocLen;
    }

    private boolean isFixedSize()
    {
        return fixedKeySize > 0;
    }

    private boolean lock()
    {
        if (unlocked)
            return false;

        long t = Thread.currentThread().getId();

        if (t == lockFieldUpdater.get(this))
            return false;
        while (true)
        {
            if (lockFieldUpdater.compareAndSet(this, 0L, t))
                return true;

            // yield control to other thread.
            // Note: we cannot use LockSupport.parkNanos() as that does not
            // provide nanosecond resolution on Windows.
            while(lockFieldUpdater.get(this) != 0L)
                Thread.yield();
        }
    }

    private void unlock(boolean wasFirst)
    {
        if (unlocked || !wasFirst)
            return;

        long t = Thread.currentThread().getId();
        boolean r = lockFieldUpdater.compareAndSet(this, t, 0L);
        assert r;
    }
}