OHCacheLinkedImpl.java

/*
 *      Copyright (C) 2014 Robert Stupp, Koeln, Germany, robert-stupp.de
 *
 *   Licensed under the Apache License, Version 2.0 (the "License");
 *   you may not use this file except in compliance with the License.
 *   You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 *   Unless required by applicable law or agreed to in writing, software
 *   distributed under the License is distributed on an "AS IS" BASIS,
 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *   See the License for the specific language governing permissions and
 *   limitations under the License.
 */
package org.caffinitas.ohc.linked;

import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.SeekableByteChannel;
import java.nio.channels.WritableByteChannel;
import java.util.Arrays;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import com.google.common.collect.AbstractIterator;
import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.SettableFuture;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.caffinitas.ohc.CacheLoader;
import org.caffinitas.ohc.CacheSerializer;
import org.caffinitas.ohc.DirectValueAccess;
import org.caffinitas.ohc.CloseableIterator;
import org.caffinitas.ohc.OHCache;
import org.caffinitas.ohc.OHCacheBuilder;
import org.caffinitas.ohc.OHCacheStats;
import org.caffinitas.ohc.PermanentLoadException;
import org.caffinitas.ohc.TemporaryLoadException;
import org.caffinitas.ohc.Ticker;
import org.caffinitas.ohc.histo.EstimatedHistogram;

import static org.caffinitas.ohc.util.ByteBufferCompat.*;

public final class OHCacheLinkedImpl<K, V> implements OHCache<K, V>
{
    private static final Logger LOGGER = LoggerFactory.getLogger(OHCacheLinkedImpl.class);

    private static final int CURRENT_FILE_VERSION = 2;

    private final CacheSerializer<K> keySerializer;
    private final CacheSerializer<V> valueSerializer;

    private final OffHeapLinkedMap[] maps;
    private final long segmentMask;
    private final int segmentShift;

    private final long maxEntrySize;
    private final long defaultTTL;

    private long capacity;

    private volatile long putFailCount;

    private boolean closed;

    private final ScheduledExecutorService executorService;

    private final boolean throwOOME;
    private final Hasher hasher;

    private final Ticker ticker;

    public OHCacheLinkedImpl(OHCacheBuilder<K, V> builder)
    {
        long capacity = builder.getCapacity();
        if (capacity <= 0L)
            throw new IllegalArgumentException("capacity:" + capacity);

        this.capacity = capacity;

        this.ticker = builder.getTicker();

        this.defaultTTL = builder.getDefaultTTLmillis();

        this.throwOOME = builder.isThrowOOME();
        this.hasher = Hasher.create(builder.getHashAlgorighm());

        // build segments
        int segments = builder.getSegmentCount();
        if (segments <= 0)
            segments = Runtime.getRuntime().availableProcessors() * 2;
        segments = Ints.checkedCast(Util.roundUpToPowerOf2(segments, 1 << 30));
        maps = new OffHeapLinkedMap[segments];
        for (int i = 0; i < segments; i++)
        {
            try
            {
                maps[i] = makeMap(builder, capacity / segments);
            }
            catch (RuntimeException e)
            {
                for (;i >= 0; i--)
                    if (maps[i] != null)
                        maps[i].release();
                throw e;
            }
        }

        // bit-mask for segment part of hash
        int bitNum = Util.bitNum(segments) - 1;
        this.segmentShift = 64 - bitNum;
        this.segmentMask = ((long) segments - 1) << segmentShift;

        // calculate max entry size
        long maxEntrySize = builder.getMaxEntrySize();
        if (maxEntrySize > capacity / segments)
            throw new IllegalArgumentException("Illegal max entry size " + maxEntrySize);
        else if (maxEntrySize <= 0)
            maxEntrySize = capacity / segments;
        this.maxEntrySize = maxEntrySize;

        this.keySerializer = builder.getKeySerializer();
        if (keySerializer == null)
            throw new NullPointerException("keySerializer == null");
        this.valueSerializer = builder.getValueSerializer();
        if (valueSerializer == null)
            throw new NullPointerException("valueSerializer == null");

        this.executorService = builder.getExecutorService();

        if (LOGGER.isDebugEnabled())
            LOGGER.debug("OHC linked instance with {} segments and capacity of {} created.", segments, capacity);
    }

    private OffHeapLinkedMap makeMap(OHCacheBuilder<K, V> builder, long perMapCapacity)
    {
        switch (builder.getEviction())
        {
            case LRU:
                return new OffHeapLinkedLRUMap(builder, perMapCapacity);
            case W_TINY_LFU:
                return new OffHeapLinkedWTinyLFUMap(builder, perMapCapacity);
            case NONE:
                return new OffHeapLinkedPermMap(builder, perMapCapacity);
            default:
                throw new IllegalArgumentException("Unsupported eviction: " + builder.getEviction());
        }
    }

    //
    // map stuff
    //

    public DirectValueAccess getDirect(K key)
    {
        return getDirect(key, true);
    }

    public DirectValueAccess getDirect(K key, boolean updateLRU)
    {
        if (key == null)
            throw new NullPointerException();

        KeyBuffer keySource = keySource(key);
        long hashEntryAdr = segment(keySource.hash()).getEntry(keySource, true, updateLRU);

        if (hashEntryAdr == 0L)
            return null;

        return new DirectValueAccessImpl(hashEntryAdr, true);
    }

    public V get(K key)
    {
        if (key == null)
            throw new NullPointerException();

        long hashEntryAdr = 0L;

        KeyBuffer keySource = keySource(key);
        try
        {
            hashEntryAdr = segment(keySource.hash()).getEntry(keySource, true, true);

            if (hashEntryAdr == 0L)
                return null;

            return valueSerializer.deserialize(Uns.valueBufferR(hashEntryAdr));
        }
        finally
        {
            HashEntries.dereference(hashEntryAdr);
        }
    }

    public boolean containsKey(K key)
    {
        if (key == null)
            throw new NullPointerException();

        KeyBuffer keySource = keySource(key);
        return segment(keySource.hash()).getEntry(keySource, false, true) != 0L;
    }

    public boolean put(K k, V v)
    {
        return putInternal(k, v, false, null, defaultExpireAt());
    }

    public boolean put(K k, V v, long expireAt)
    {
        return putInternal(k, v, false, null, expireAt);
    }

    public boolean addOrReplace(K key, V old, V value)
    {
        return putInternal(key, value, false, old, defaultExpireAt());
    }

    public boolean addOrReplace(K key, V old, V value, long expireAt)
    {
        return putInternal(key, value, false, old, expireAt);
    }

    public boolean putIfAbsent(K k, V v)
    {
        return putInternal(k, v, true, null, defaultExpireAt());
    }

    public boolean putIfAbsent(K k, V v, long expireAt)
    {
        return putInternal(k, v, true, null, expireAt);
    }

    private boolean putInternal(K k, V v, boolean ifAbsent, V old, long expireAt)
    {
        if (k == null || v == null)
            throw new NullPointerException();

        int keyLen = keySize(k);
        int valueLen = valueSize(v);

        long bytes = Util.allocLen(keyLen, valueLen);

        long oldValueAdr = 0L;
        long oldValueLen = 0L;

        try
        {
            if (old != null)
            {
                oldValueLen = valueSize(old);
                oldValueAdr = Uns.allocate(oldValueLen, throwOOME);
                if (oldValueAdr == 0L)
                    throw new RuntimeException("Unable to allocate " + oldValueLen + " bytes in off-heap");
                valueSerializer.serialize(old,  Uns.directBufferFor(oldValueAdr, 0, oldValueLen, false));
            }

            long hashEntryAdr;
            if ((maxEntrySize > 0L && bytes > maxEntrySize) || (hashEntryAdr = Uns.allocate(bytes, throwOOME)) == 0L)
            {
                // entry too large to be inserted or OS is not able to provide enough memory
                putFailCount++;

                remove(k);

                return false;
            }

            long hash = serializeForPut(k, v, keyLen, valueLen, hashEntryAdr);

            if (expireAt == USE_DEFAULT_EXPIRE_AT)
                expireAt = defaultExpireAt();

            // initialize hash entry
            HashEntries.init(hash, keyLen, valueLen, hashEntryAdr, Util.SENTINEL_NOT_PRESENT, expireAt);

            if (segment(hash).putEntry(hashEntryAdr, hash, keyLen, bytes, ifAbsent, expireAt,
                                       oldValueAdr, oldValueLen))
                return true;

            Uns.free(hashEntryAdr);
            return false;
        }
        finally
        {
            Uns.free(oldValueAdr);
        }
    }

    private int keySize(K k)
    {
        int sz = keySerializer.serializedSize(k);
        if (sz <= 0)
            throw new IllegalArgumentException("Illegal key length " + sz);
        return sz;
    }

    private int valueSize(V v)
    {
        int sz = valueSerializer.serializedSize(v);
        if (sz <= 0)
            throw new IllegalArgumentException("Illegal value length " + sz);
        return sz;
    }

    private long defaultExpireAt()
    {
        long ttl = defaultTTL;
        return ttl > 0L ? ticker.currentTimeMillis() + ttl : 0L;
    }

    private long serializeForPut(K k, V v, int keyLen, long valueLen, long hashEntryAdr)
    {
        try
        {
            keySerializer.serialize(k, Uns.keyBuffer(hashEntryAdr, keyLen));
            if (v != null)
                valueSerializer.serialize(v, Uns.valueBuffer(hashEntryAdr, keyLen, valueLen));
        }
        catch (Throwable e)
        {
            freeAndThrow(e, hashEntryAdr);
        }

        return hasher.hash(hashEntryAdr, Util.ENTRY_OFF_DATA, keyLen);
    }

    private static void freeAndThrow(Throwable e, long hashEntryAdr)
    {
        Uns.free(hashEntryAdr);
        if (e instanceof RuntimeException)
            throw (RuntimeException) e;
        if (e instanceof Error)
            throw (Error) e;
        throw new RuntimeException(e);
    }

    public boolean remove(K k)
    {
        if (k == null)
            throw new NullPointerException();

        KeyBuffer keySource = keySource(k);
        return segment(keySource.hash()).removeEntry(keySource);
    }

    public V getWithLoader(K key, CacheLoader<K, V> loader) throws InterruptedException, ExecutionException
    {
        return getWithLoaderAsync(key, loader).get();
    }

    public V getWithLoader(K key, CacheLoader<K, V> loader, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException
    {
        return getWithLoaderAsync(key, loader).get(timeout, unit);
    }

    public Future<V> getWithLoaderAsync(K key, CacheLoader<K, V> loader)
    {
        return getWithLoaderAsync(key, loader, defaultExpireAt());
    }

    public Future<V> getWithLoaderAsync(final K key, final CacheLoader<K, V> loader, final long expireAt)
    {
        if (key == null)
            throw new NullPointerException();
        if (executorService == null || executorService.isShutdown() || closed)
            throw new IllegalStateException("OHCache has no executor service - configure one via OHCacheBuilder.executorService()");

        final KeyBuffer keySource = keySource(key);
        final OffHeapLinkedMap segment = segment(keySource.hash());
        long hashEntryAdr = segment.getEntry(keySource, true, true);

        if (hashEntryAdr == 0L)
        {
            // this call is _likely_ the initial requestor for that key since there's no entry for the key

            final int keyLen = keySize(key);

            long bytes = Util.allocLen(keyLen, 0L);

            if ((maxEntrySize > 0L && bytes > maxEntrySize) || (hashEntryAdr = Uns.allocate(bytes, throwOOME)) == 0L)
            {
                // entry too large to be inserted or OS is not able to provide enough memory
                putFailCount++;

                remove(key);

                return Futures.immediateFailedFuture(new RuntimeException("max entry size exceeded or malloc() failed"));
            }

            try
            {
                keySerializer.serialize(key, Uns.keyBuffer(hashEntryAdr, keyLen));
            }
            catch (Throwable e)
            {
                freeAndThrow(e, hashEntryAdr);
            }

            final long hash = hasher.hash(hashEntryAdr, Util.ENTRY_OFF_DATA, keyLen);

            // initialize hash entry
            HashEntries.init(hash, keyLen, 0, hashEntryAdr, Util.SENTINEL_LOADING, 0L);

            if (segment.putEntry(hashEntryAdr, hash, keyLen, bytes, true, 0L, 0L, 0L))
            {
                // this request IS the initial requestor for the key

                final long sentinelHashEntryAdr = hashEntryAdr;
                return executorService.submit(() -> {
                    Exception failure = null;
                    V value = null;
                    boolean derefSentinel = false;

                    try
                    {
                        value = loader.load(key);

                        long entryExpireAt = expireAt;
                        if (value == null || (entryExpireAt > 0L && entryExpireAt <= ticker.currentTimeMillis()))
                        {
                            // If the value is null, it means the loaded could not
                            // already expired

                            segment.removeEntry(sentinelHashEntryAdr);

                            return null;
                        }

                        // not already expired

                        int valueLen = valueSize(value);

                        long bytes1 = Util.allocLen(keyLen, valueLen);

                        long hashEntryAdr1;
                        if ((maxEntrySize > 0L && bytes1 > maxEntrySize) || (hashEntryAdr1 = Uns.allocate(bytes1, throwOOME)) == 0L)
                            throw new RuntimeException("max entry size exceeded or malloc() failed");

                        long hash1 = serializeForPut(key, value, keyLen, valueLen, hashEntryAdr1);
                        if (entryExpireAt == USE_DEFAULT_EXPIRE_AT)
                            entryExpireAt = defaultExpireAt();

                        // initialize hash entry
                        HashEntries.init(hash1, keyLen, valueLen, hashEntryAdr1, Util.SENTINEL_NOT_PRESENT, entryExpireAt);

                        if (!segment.replaceSentinelEntry(hash1, sentinelHashEntryAdr, hashEntryAdr1, bytes1, entryExpireAt))
                            throw new RuntimeException("not enough free capacity");
                        derefSentinel = true;

                        HashEntries.setSentinel(sentinelHashEntryAdr, Util.SENTINEL_SUCCESS);
                        HashEntries.dereference(sentinelHashEntryAdr);
                    }
                    catch (PermanentLoadException e)
                    {
                        HashEntries.setSentinel(sentinelHashEntryAdr, Util.SENTINEL_PERMANENT_FAILURE);
                        throw e;
                    }
                    catch (Throwable e)
                    {
                        failure = e instanceof Exception ? (Exception) e : new RuntimeException(e);
                        HashEntries.setSentinel(sentinelHashEntryAdr, Util.SENTINEL_TEMPORARY_FAILURE);
                        if (derefSentinel)
                            HashEntries.dereference(sentinelHashEntryAdr);
                        else
                            segment.removeEntry(sentinelHashEntryAdr);
                    }

                    if (failure != null)
                        throw failure;

                    return value;
                });
            }
            else
            {
                // this request IS NOT the initial requestor for the key, so it must
                // free the unneeded but allocated sentinel

                Uns.free(hashEntryAdr);
            }

            // fall through
        }

        // this request IS NOT the initial requestor for the key
        // this request IS an adjacent requestor for the key

        // check if the value is already there (no sentinel) or has permanent failure status
        int sentinelStatus = HashEntries.getSentinel(hashEntryAdr);
        switch (sentinelStatus)
        {
            case Util.SENTINEL_NOT_PRESENT:
                try
                {
                    return Futures.immediateFuture(valueSerializer.deserialize(Uns.valueBufferR(hashEntryAdr)));
                }
                finally
                {
                    HashEntries.dereference(hashEntryAdr);
                }
            case Util.SENTINEL_PERMANENT_FAILURE:
                HashEntries.dereference(hashEntryAdr);
                return Futures.immediateFailedFuture(new PermanentLoadException());
        }

        // handle sentinel

        final SettableFuture<V> future = SettableFuture.create();

        final long sentinelHashEntryAdr = hashEntryAdr;

        // TODO / DOCUMENT: executorService shutdown with ongoing requests with referenced sentinel-entries --> off-heap memory leak
        // Reason:
        // The jobs started for adjacent getWithLoader*() calls are not running (and waiting) continuously.
        // Instead these are scheduled regularly and poll the referenced sentinel hash-entry.
        // The sentinel hash-entry is dereferenced within the following job for adjacent getWithLoader*() calls.
        // So - if the following job is no longer called, the sentinel hash-entry will never be dereferenced and
        // therefore never be deallocated.
        // Workaround is to close the OHCache instance, then wait some time and shutdown the scheduled executor service
        // when there are no more scheduled jobs.


        // The following job basically "spins" on the sentinel field of the sentinel hash-entry without
        // any lock on the segment.
        // It has two "exit" criterias:

        executorService.schedule(new Runnable()
        {
            public void run()
            {
                if (future.isCancelled() || closed)
                {
                    HashEntries.dereference(sentinelHashEntryAdr);
                    return;
                }

                int sentinelStatus = HashEntries.getSentinel(sentinelHashEntryAdr);
                switch (sentinelStatus)
                {
                    // SENTINEL_NOT_PRESENT is an impossible status on the sentinel hash-entry
                    case Util.SENTINEL_SUCCESS:
                        break;
                    case Util.SENTINEL_LOADING:
                        reschedule(0L);
                        return;
                    case Util.SENTINEL_PERMANENT_FAILURE:
                        failure(0L, new PermanentLoadException());
                        return;
                    case Util.SENTINEL_TEMPORARY_FAILURE:
                        failure(0L, new TemporaryLoadException());
                        return;
                    default:
                        failure(0L, new AssertionError("illegal sentinel value " + sentinelStatus));
                        return;
                }

                long hashEntryAdr = segment.getEntry(keySource, true, true);

                if (hashEntryAdr == 0L)
                {
                    // two possibilities that the entry does not exist:
                    // - entry has been evicted (very very unlikely, so ignore this option)
                    // - loader indicated temporary failure (very very likely)
                    future.setException(new TemporaryLoadException());
                }

                if (hashEntryAdr == sentinelHashEntryAdr)
                {
                    // oops, still the sentinel entry
                    reschedule(0L);
                    return;
                }

                sentinelStatus = HashEntries.getSentinel(hashEntryAdr);
                switch (sentinelStatus)
                {
                    case Util.SENTINEL_NOT_PRESENT:
                        try
                        {
                            future.set(valueSerializer.deserialize(Uns.valueBufferR(hashEntryAdr)));
                            HashEntries.dereference(hashEntryAdr);
                            HashEntries.dereference(sentinelHashEntryAdr);
                        }
                        catch (Throwable e)
                        {
                            failure(hashEntryAdr, e);
                        }
                        break;
                    case Util.SENTINEL_SUCCESS:
                    case Util.SENTINEL_LOADING:
                        HashEntries.dereference(hashEntryAdr);
                        reschedule(hashEntryAdr);
                        break;
                    case Util.SENTINEL_PERMANENT_FAILURE:
                        failure(hashEntryAdr, new PermanentLoadException());
                        break;
                    case Util.SENTINEL_TEMPORARY_FAILURE:
                        failure(hashEntryAdr, new TemporaryLoadException());
                        break;
                    default:
                        failure(hashEntryAdr, new AssertionError("illegal sentinel value " + sentinelStatus));
                        break;
                }
            }

            private void failure(long hashEntryAdr, Throwable e)
            {
                if (hashEntryAdr != 0L)
                    HashEntries.dereference(hashEntryAdr);
                HashEntries.dereference(sentinelHashEntryAdr);
                future.setException(e);
            }

            private void reschedule(long hashEntryAdr)
            {
                try
                {
                    executorService.schedule(this, 10, TimeUnit.MILLISECONDS);
                }
                catch (Throwable t)
                {
                    failure(hashEntryAdr, t);
                }
            }
        }, 10, TimeUnit.MILLISECONDS);

        return future;
    }

    private OffHeapLinkedMap segment(long hash)
    {
        int seg = (int) ((hash & segmentMask) >>> segmentShift);
        return maps[seg];
    }

    private KeyBuffer keySource(K o)
    {
        int size = keySize(o);

        KeyBuffer keyBuffer = new KeyBuffer(size);
        ByteBuffer bb = keyBuffer.byteBuffer();
        keySerializer.serialize(o, bb);
        assert(bb.position() == bb.capacity()) && (bb.capacity() == size);
        return keyBuffer.finish(hasher);
    }

    //
    // maintenance
    //

    public void clear()
    {
        for (OffHeapLinkedMap map : maps)
            map.clear();
    }

    //
    // state
    //

    public void setCapacity(long capacity)
    {
        if (capacity < 0L)
            throw new IllegalArgumentException();
        long oldPerSegment = this.capacity / segments();
        this.capacity = capacity;
        long perSegment = capacity / segments();
        long diff = perSegment - oldPerSegment;
        for (OffHeapLinkedMap map : maps)
            map.updateFreeCapacity(diff);
    }

    public void close()
    {
        closed = true;
        if (executorService != null)
            try
            {
                Thread.sleep(500);
            }
            catch (InterruptedException e)
            {
                // ignored
                Thread.currentThread().interrupt();
            }

        clear();

        for (OffHeapLinkedMap map : maps)
            map.release();
        Arrays.fill(maps, null);

        if (LOGGER.isDebugEnabled())
            LOGGER.debug("Closing OHC instance");
    }

    //
    // statistics and related stuff
    //

    public void resetStatistics()
    {
        for (OffHeapLinkedMap map : maps)
            map.resetStatistics();
        putFailCount = 0;
    }

    public OHCacheStats stats()
    {
        long rehashes = 0L;
        for (OffHeapLinkedMap map : maps)
            rehashes += map.rehashes();
        return new OHCacheStats(
                               hitCount(),
                               missCount(),
                               evictedEntries(),
                               expiredEntries(),
                               perSegmentSizes(),
                               size(),
                               capacity(),
                               freeCapacity(),
                               rehashes,
                               putAddCount(),
                               putReplaceCount(),
                               putFailCount,
                               removeCount(),
                               Uns.getTotalAllocated(),
                               0L);
    }

    private long putAddCount()
    {
        long putAddCount = 0L;
        for (OffHeapLinkedMap map : maps)
            putAddCount += map.putAddCount();
        return putAddCount;
    }

    private long putReplaceCount()
    {
        long putReplaceCount = 0L;
        for (OffHeapLinkedMap map : maps)
            putReplaceCount += map.putReplaceCount();
        return putReplaceCount;
    }

    private long removeCount()
    {
        long removeCount = 0L;
        for (OffHeapLinkedMap map : maps)
            removeCount += map.removeCount();
        return removeCount;
    }

    private long hitCount()
    {
        long hitCount = 0L;
        for (OffHeapLinkedMap map : maps)
            hitCount += map.hitCount();
        return hitCount;
    }

    private long missCount()
    {
        long missCount = 0L;
        for (OffHeapLinkedMap map : maps)
            missCount += map.missCount();
        return missCount;
    }

    public long capacity()
    {
        return capacity;
    }

    public long freeCapacity()
    {
        long freeCapacity = 0L;
        for (OffHeapLinkedMap map : maps)
            freeCapacity += map.freeCapacity();
        return freeCapacity;
    }

    private long evictedEntries()
    {
        long evictedEntries = 0L;
        for (OffHeapLinkedMap map : maps)
            evictedEntries += map.evictedEntries();
        return evictedEntries;
    }

    private long expiredEntries()
    {
        long expiredEntries = 0L;
        for (OffHeapLinkedMap map : maps)
            expiredEntries += map.expiredEntries();
        return expiredEntries;
    }

    int usedTimeouts()
    {
        int used = 0;
        for (OffHeapLinkedMap map : maps)
            used += map.usedTimeouts();
        return used;
    }

    public long size()
    {
        long size = 0L;
        for (OffHeapLinkedMap map : maps)
            size += map.size();
        return size;
    }

    public int segments()
    {
        return maps.length;
    }

    public float loadFactor()
    {
        return maps[0].loadFactor();
    }

    public int[] hashTableSizes()
    {
        int[] r = new int[maps.length];
        for (int i = 0; i < maps.length; i++)
            r[i] = maps[i].hashTableSize();
        return r;
    }

    public long[] perSegmentSizes()
    {
        long[] r = new long[maps.length];
        for (int i = 0; i < maps.length; i++)
            r[i] = maps[i].size();
        return r;
    }

    public EstimatedHistogram getBucketHistogram()
    {
        EstimatedHistogram hist = new EstimatedHistogram();
        for (OffHeapLinkedMap map : maps)
            map.updateBucketHistogram(hist);

        long[] offsets = hist.getBucketOffsets();
        long[] buckets = hist.getBuckets(false);

        for (int i = buckets.length - 1; i > 0; i--)
        {
            if (buckets[i] != 0L)
            {
                offsets = Arrays.copyOf(offsets, i + 2);
                buckets = Arrays.copyOf(buckets, i + 3);
                System.arraycopy(offsets, 0, offsets, 1, i + 1);
                System.arraycopy(buckets, 0, buckets, 1, i + 2);
                offsets[0] = 0L;
                buckets[0] = 0L;
                break;
            }
        }

        for (int i = 0; i < offsets.length; i++)
            offsets[i]--;

        return new EstimatedHistogram(offsets, buckets);
    }

    //
    // serialization (serialized data cannot be ported between different CPU architectures, if endianess differs)
    //

    public CloseableIterator<K> deserializeKeys(final ReadableByteChannel channel) throws IOException
    {
        long headerAddress = Uns.allocateIOException(8, throwOOME);
        try
        {
            ByteBuffer header = Uns.directBufferFor(headerAddress, 0L, 8L, false);
            Util.readFully(channel, header);
            byteBufferFlip(header);
            int magic = header.getInt();
            if (magic == Util.HEADER_KEYS_WRONG)
                throw new IOException("File from instance with different CPU architecture cannot be loaded");
            if (magic == Util.HEADER_ENTRIES)
                throw new IOException("File contains entries - expected keys");
            if (magic != Util.HEADER_KEYS)
                throw new IOException("Illegal file header");
            if (header.getInt() != CURRENT_FILE_VERSION)
                throw new IOException("Illegal file version");
        }
        finally
        {
            Uns.free(headerAddress);
        }

        return new CloseableIterator<K>()
        {
            private K next;
            private boolean eod;

            private final byte[] keyLenBuf = new byte[Util.SERIALIZED_KEY_LEN_SIZE];
            private final ByteBuffer bb = ByteBuffer.wrap(keyLenBuf);

            private long bufAdr;
            private long bufLen;

            public void close()
            {
                Uns.free(bufAdr);
                bufAdr = 0L;
            }

            protected void finalize() throws Throwable
            {
                close();
                super.finalize();
            }

            public boolean hasNext()
            {
                if (eod)
                    return false;
                if (next == null)
                    checkNext();
                return next != null;
            }

            private void checkNext()
            {
                try
                {
                    byteBufferClear(bb);
                    if (!Util.readFully(channel, bb))
                    {
                        eod = true;
                        return;
                    }

                    int keyLen = Uns.getIntFromByteArray(keyLenBuf, 0);
                    long totalLen = Util.ENTRY_OFF_DATA + keyLen;

                    if (bufLen < totalLen)
                    {
                        Uns.free(bufAdr);
                        bufAdr = 0L;

                        bufLen = Math.max(4096, Util.roundUpToPowerOf2(totalLen, 1 << 30));
                        bufAdr = Uns.allocateIOException(bufLen, throwOOME);
                    }

                    if (!Util.readFully(channel, Uns.directBufferFor(bufAdr, Util.ENTRY_OFF_DATA, keyLen, false)))
                    {
                        eod = true;
                        throw new EOFException();
                    }
                    HashEntries.init(0L, keyLen, 0, bufAdr, Util.SENTINEL_NOT_PRESENT, defaultExpireAt());
                    next = keySerializer.deserialize(Uns.directBufferFor( bufAdr + Util.ENTRY_OFF_DATA, 0, keyLen, true));
                }
                catch (IOException e)
                {
                    throw new RuntimeException(e);
                }
            }

            public K next()
            {
                if (eod)
                    throw new NoSuchElementException();

                K r = next;
                if (r == null)
                {
                    checkNext();
                    r = next;
                }
                if (r == null)
                    throw new NoSuchElementException();
                next = null;

                return r;
            }

            public void remove()
            {
                throw new UnsupportedOperationException();
            }
        };
    }

    public boolean deserializeEntry(ReadableByteChannel channel) throws IOException
    {
        // read hash, keyLen, valueLen
        byte[] hashKeyValueLen = new byte[Util.SERIALIZED_ENTRY_SIZE];
        ByteBuffer bb = ByteBuffer.wrap(hashKeyValueLen);
        if (!Util.readFully(channel, bb))
            return false;

        long expireAt = 0L;

        long hash = Uns.getLongFromByteArray(hashKeyValueLen,   (int) (Util.ENTRY_OFF_HASH - Util.ENTRY_OFF_HASH));
        int valueLen = Uns.getIntFromByteArray(hashKeyValueLen, (int) (Util.ENTRY_OFF_VALUE_LENGTH - Util.ENTRY_OFF_HASH));
        int keyLen = Uns.getIntFromByteArray(hashKeyValueLen,   (int) (Util.ENTRY_OFF_KEY_LENGTH - Util.ENTRY_OFF_HASH));

        long kvLen = Util.roundUpTo8(keyLen) + valueLen;
        long totalLen = kvLen + Util.ENTRY_OFF_DATA;
        long hashEntryAdr;
        if ((maxEntrySize > 0L && totalLen > maxEntrySize) || (hashEntryAdr = Uns.allocate(totalLen, throwOOME)) == 0L)
        {
            if (channel instanceof SeekableByteChannel)
            {
                SeekableByteChannel sc = (SeekableByteChannel) channel;
                sc.position(sc.position() + kvLen);
            }
            else
            {
                ByteBuffer tmp = ByteBuffer.allocate(8192);
                while (kvLen > 0L)
                {
                    byteBufferClear(tmp);
                    if (kvLen < tmp.capacity())
                        byteBufferLimit(tmp, Ints.checkedCast(kvLen));
                    if (!Util.readFully(channel, tmp))
                        return false;
                    kvLen -= tmp.limit();
                }
            }
            return false;
        }

        HashEntries.init(hash, keyLen, valueLen, hashEntryAdr, Util.SENTINEL_NOT_PRESENT, defaultExpireAt());

        // read key + value
        if (!Util.readFully(channel, Uns.keyBuffer(hashEntryAdr, kvLen)) ||
            !segment(hash).putEntry(hashEntryAdr, hash, keyLen, totalLen, false, expireAt, 0L, 0L))
        {
            Uns.free(hashEntryAdr);
            return false;
        }

        return true;
    }

    public boolean serializeEntry(K key, WritableByteChannel channel) throws IOException
    {
        KeyBuffer keySource = keySource(key);
        long hashEntryAdr = segment(keySource.hash()).getEntry(keySource, true, true);

        return hashEntryAdr != 0L && serializeEntry(channel, hashEntryAdr);
    }

    public int deserializeEntries(ReadableByteChannel channel) throws IOException
    {
        long headerAddress = Uns.allocateIOException(8, throwOOME);
        try
        {
            ByteBuffer header = Uns.directBufferFor(headerAddress, 0L, 8L, false);
            Util.readFully(channel, header);
            byteBufferFlip(header);
            int magic = header.getInt();
            if (magic == Util.HEADER_ENTRIES_WRONG)
                throw new IOException("File from instance with different CPU architecture cannot be loaded");
            if (magic == Util.HEADER_KEYS)
                throw new IOException("File contains keys - expected entries");
            if (magic != Util.HEADER_ENTRIES)
                throw new IOException("Illegal file header");
            if (header.getInt() != CURRENT_FILE_VERSION)
                throw new IOException("Illegal file version");
        }
        finally
        {
            Uns.free(headerAddress);
        }

        int count = 0;
        while (deserializeEntry(channel))
            count++;
        return count;
    }

    public int serializeHotNEntries(int n, WritableByteChannel channel) throws IOException
    {
        return serializeHotN(n, channel, true);
    }

    public int serializeHotNKeys(int n, WritableByteChannel channel) throws IOException
    {
        return serializeHotN(n, channel, false);
    }

    private int serializeHotN(int n, WritableByteChannel channel, boolean entries) throws IOException
    {
        // hotN implementation does only return a (good) approximation - not necessarily the exact hotN
        // since it iterates over the all segments and takes a fraction of 'n' from them.
        // This implementation may also return more results than expected just to keep it simple
        // (it does not really matter if you request 5000 keys and e.g. get 5015).

        long headerAddress = Uns.allocateIOException(8, throwOOME);
        try
        {
            ByteBuffer headerBuffer = Uns.directBufferFor(headerAddress, 0L, 8L, false);
            headerBuffer.putInt(entries ? Util.HEADER_ENTRIES : Util.HEADER_KEYS);
            headerBuffer.putInt(CURRENT_FILE_VERSION);
            byteBufferFlip(headerBuffer);
            Util.writeFully(channel, headerBuffer);
        }
        finally
        {
            Uns.free(headerAddress);
        }

        int perMap = n / maps.length + 1;
        int cnt = 0;

        for (OffHeapLinkedMap map : maps)
        {
            long[] hotPerMap = map.hotN(perMap);
            try
            {
                for (int i = 0; i < hotPerMap.length; i++)
                {
                    long hashEntryAdr = hotPerMap[i];
                    if (hashEntryAdr == 0L)
                        continue;

                    try
                    {
                        if (entries)
                            serializeEntry(channel, hashEntryAdr);
                        else
                            serializeKey(channel, hashEntryAdr);
                    }
                    finally
                    {
                        hotPerMap[i] = 0L;
                    }

                    cnt++;
                }
            }
            finally
            {
                for (long hashEntryAdr : hotPerMap)
                    if (hashEntryAdr != 0L)
                        HashEntries.dereference(hashEntryAdr);
            }
        }

        return cnt;
    }

    private static boolean serializeEntry(WritableByteChannel channel, long hashEntryAdr) throws IOException
    {
        try
        {
            int keyLen = HashEntries.getKeyLen(hashEntryAdr);
            int valueLen = HashEntries.getValueLen(hashEntryAdr);

            long totalLen = Util.SERIALIZED_ENTRY_SIZE + Util.roundUpTo8(keyLen) + valueLen;

            // write hash, keyLen, valueLen + key + value
            Util.writeFully(channel, Uns.directBufferFor(hashEntryAdr, Util.ENTRY_OFF_HASH, totalLen, true));

            return true;
        }
        finally
        {
            HashEntries.dereference(hashEntryAdr);
        }
    }

    private static boolean serializeKey(WritableByteChannel channel, long hashEntryAdr) throws IOException
    {
        try
        {
            long keyLen = HashEntries.getKeyLen(hashEntryAdr);

            long totalLen = Util.SERIALIZED_KEY_LEN_SIZE + keyLen;

            // write keyLen + key
            Util.writeFully(channel, Uns.directBufferFor(hashEntryAdr, Util.ENTRY_OFF_KEY_LENGTH, totalLen, true));

            return true;
        }
        finally
        {
            HashEntries.dereference(hashEntryAdr);
        }
    }

    //
    // convenience methods
    //

    public void putAll(Map<? extends K, ? extends V> m)
    {
        // could be improved by grouping puts by segment - but increases heap pressure and complexity - decide later
        for (Map.Entry<? extends K, ? extends V> entry : m.entrySet())
            put(entry.getKey(), entry.getValue());
    }

    public void removeAll(Iterable<K> iterable)
    {
        // could be improved by grouping removes by segment - but increases heap pressure and complexity - decide later
        for (K o : iterable)
            remove(o);
    }

    public long memUsed()
    {
        return capacity() - freeCapacity();
    }

    //
    // key iterators
    //

    public CloseableIterator<K> hotKeyIterator(int n)
    {
        return new AbstractHotKeyIterator<K>(n)
        {
            K buildResult(long hashEntryAdr)
            {
                return keySerializer.deserialize(Uns.directBufferFor(hashEntryAdr + Util.ENTRY_OFF_DATA, 0, HashEntries.getKeyLen(hashEntryAdr), true));
            }
        };
    }

    public CloseableIterator<ByteBuffer> hotKeyBufferIterator(int n)
    {
        return new AbstractHotKeyIterator<ByteBuffer>(n)
        {
            ByteBuffer buildResult(long hashEntryAdr)
            {
                return Uns.directBufferFor(hashEntryAdr, Util.ENTRY_OFF_DATA, HashEntries.getKeyLen(hashEntryAdr), true);
            }
        };
    }

    public CloseableIterator<K> keyIterator()
    {
        return new AbstractKeyIterator<K>()
        {
            K buildResult(long hashEntryAdr)
            {
                return keySerializer.deserialize(Uns.keyBufferR(hashEntryAdr));
            }
        };
    }

    public CloseableIterator<ByteBuffer> keyBufferIterator()
    {
        return new AbstractKeyIterator<ByteBuffer>()
        {
            ByteBuffer buildResult(long hashEntryAdr)
            {
                return Uns.directBufferFor(hashEntryAdr, Util.ENTRY_OFF_DATA, HashEntries.getKeyLen(hashEntryAdr), true);
            }
        };
    }

    private abstract class AbstractKeyIterator<R> implements CloseableIterator<R>
    {
        private int segmentIndex;
        private OffHeapLinkedMap segment;

        private int mapSegmentCount;
        private int mapSegmentIndex;

        private final LongArrayList hashEntryAdrs = new LongArrayList(1024);
        private int listIndex;

        private boolean eod;
        private R next;

        private OffHeapLinkedMap lastSegment;
        private long lastHashEntryAdr;

        private void derefLast()
        {
            if (lastHashEntryAdr != 0L)
            {
                HashEntries.dereference(lastHashEntryAdr);
                lastHashEntryAdr = 0L;
                lastSegment = null;
            }
        }

        public void close()
        {
            derefLast();

            while (listIndex < hashEntryAdrs.size())
            {
                long hashEntryAdr = hashEntryAdrs.getLong(listIndex++);
                HashEntries.dereference(hashEntryAdr);
            }
        }

        public boolean hasNext()
        {
            if (eod)
                return false;

            if (next == null)
                next = computeNext();

            return next != null;
        }

        public R next()
        {
            if (eod)
                throw new NoSuchElementException();

            if (next == null)
                next = computeNext();
            R r = next;
            next = null;
            if (!eod)
                return r;
            throw new NoSuchElementException();
        }

        public void remove()
        {
            if (eod)
                throw new NoSuchElementException();

            lastSegment.removeEntry(lastHashEntryAdr);
            derefLast();
        }

        private R computeNext()
        {
            derefLast();

            while (true)
            {
                if (listIndex < hashEntryAdrs.size())
                {
                    long hashEntryAdr = hashEntryAdrs.getLong(listIndex++);
                    lastSegment = segment;
                    lastHashEntryAdr = hashEntryAdr;
                    return buildResult(hashEntryAdr);
                }

                if (mapSegmentIndex >= mapSegmentCount)
                {
                    if (segmentIndex == maps.length)
                    {
                        eod = true;
                        return null;
                    }
                    segment = maps[segmentIndex++];
                    mapSegmentCount = segment.hashTableSize();
                    mapSegmentIndex = 0;
                }

                if (listIndex == hashEntryAdrs.size())
                {
                    hashEntryAdrs.clear();
                    segment.getEntryAddresses(mapSegmentIndex, 1024, hashEntryAdrs);
                    mapSegmentIndex += 1024;
                    listIndex = 0;
                }
            }
        }

        abstract R buildResult(long hashEntryAdr);
    }

    private abstract class AbstractHotKeyIterator<R> extends AbstractIterator<R> implements CloseableIterator<R>
    {
        private final int perMap;
        int mapIndex;

        long[] hotPerMap;
        int subIndex;

        long lastHashEntryAdr;

        AbstractHotKeyIterator(int n)
        {
            // hotN implementation does only return a (good) approximation - not necessarily the exact hotN
            // since it iterates over the all segments and takes a fraction of 'n' from them.
            // This implementation may also return more results than expected just to keep it simple
            // (it does not really matter if you request 5000 keys and e.g. get 5015).

            this.perMap = n / maps.length + 1;
        }

        private void derefLast()
        {
            if (lastHashEntryAdr != 0L)
            {
                HashEntries.dereference(lastHashEntryAdr);
                lastHashEntryAdr = 0L;
            }
        }

        public void close()
        {
            derefLast();

            while (hotPerMap != null && subIndex < hotPerMap.length)
            {
                long hashEntryAdr = hotPerMap[subIndex++];
                if (hashEntryAdr != 0L)
                    HashEntries.dereference(hashEntryAdr);
            }
        }

        abstract R buildResult(long hashEntryAdr);

        protected R computeNext()
        {
            derefLast();

            while (true)
            {
                if (hotPerMap != null && subIndex < hotPerMap.length)
                {
                    long hashEntryAdr = hotPerMap[subIndex++];
                    if (hashEntryAdr != 0L)
                    {
                        lastHashEntryAdr = hashEntryAdr;
                        return buildResult(hashEntryAdr);
                    }
                }

                if (mapIndex == maps.length)
                    return endOfData();

                hotPerMap = maps[mapIndex++].hotN(perMap);
                subIndex = 0;
            }
        }
    }

    public String toString()
    {
        return getClass().getSimpleName() +
               "(capacity=" + capacity() +
               " ,segments=" + maps.length +
               " ,defaultTTL=" + defaultTTL +
               " ,maxEntrySize=" + maxEntrySize;
    }
}