Timeouts.java
/*
* Copyright (C) 2014 Robert Stupp, Koeln, Germany, robert-stupp.de
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.caffinitas.ohc.linked;
import com.google.common.primitives.Ints;
import org.caffinitas.ohc.Ticker;
/**
* Manages cache entry time-to-live in off-heap memory.
* <p>
* TTL entries are organized in {@code slots} slots. The <i>slot</i> is chosen using
* the entry's {@code expireAt} absolute timestamp value - rounded by the provided
* {@code precision}.
* Technically the index of a slot is calculated using the formula:
* {@code slotIndex = (expireAt >> precisionShift) & slotBitmask}.
* </p>
* <p>
* Each slot maintains an unsorted list of entries. Each entry consists of the
* {@code hashEntryAdr} of the cache's entry and the {@code expireAt} absolute
* timestamp. The size of a slot's list is unbounded, it is resized if necessary -
* both increasing and decreasing.
* </p>
*/
final class Timeouts
{
private final long slotBitmask;
private final int precisionShift;
private final int slotCount;
private final Slot[] slots;
private final Ticker ticker;
Timeouts(Ticker ticker, int slots, long precision)
{
if (slots == 0)
slots = 64;
if (precision == 0)
precision = 128;
if (slots < 1)
throw new IllegalArgumentException("timeouts-slots <= 0");
if (precision < 1)
throw new IllegalArgumentException("precision <= 0");
this.ticker = ticker;
this.slotCount = Ints.checkedCast(Util.roundUpToPowerOf2(Math.max(slots, 16), 1 << 30));
int slotShift = 64 - Util.bitNum(slotCount) - 1;
this.slotBitmask = ((long) slotCount - 1) << slotShift;
this.slots = new Slot[slotCount];
for (int i = 0; i < slotCount; i++)
this.slots[i] = new Slot();
precision = Util.roundUpToPowerOf2(Math.max(precision, 1), 1 << 30);
precisionShift = 64 - Util.bitNum(precision) - 1;
}
/**
* Add a cache entry.
*
* @param hashEntryAdr address of the cache entry
* @param expireAt absolute expiration timestamp
*/
void add(long hashEntryAdr, long expireAt)
{
// just ignore the fact that expireAt can be less than current time
int slotNum = slot(expireAt);
slots[slotNum].add(hashEntryAdr, expireAt);
}
/**
* Remote a cache entry.
*
* @param hashEntryAdr address of the cache entry
* @param expireAt absolute expiration timestamp
*/
void remove(long hashEntryAdr, long expireAt)
{
int slot = slot(expireAt);
slots[slot].remove(hashEntryAdr);
}
int used()
{
int used = 0;
for (Slot slot : slots)
used += slot.used;
return used;
}
/**
* Remove expired entries.
*
* @param expireHandler implementation that will be called for each expired entry.
*/
int removeExpired(TimeoutHandler expireHandler)
{
// ensure the clock never goes backwards
long t = ticker.currentTimeMillis();
int expired = 0;
for (int i = 0; i < slotCount; i++)
{
expired += slots[i].removeExpired(t, expireHandler);
}
return expired;
}
/**
* Releases all allocated off-heap memory.
*/
void release()
{
for (Slot slot : slots)
slot.release();
}
private int slot(long expireAt)
{
expireAt >>= precisionShift;
return (int) (expireAt & slotBitmask);
}
interface TimeoutHandler
{
/**
* Called by {@link #removeExpired(TimeoutHandler)} for each expired entry.
* The implementation must <b>not</b> call {@link #remove(long, long)} since
* removal of the entry in this structure has already taken place.
*
* @param hashEntryAdr cache entry address that has expired
*/
void expired(long hashEntryAdr);
}
private final class Slot
{
// minimum number of entries in a slot
private static final int MIN_LEN = 16;
private static final int ENTRY_SIZE = 8 + 8;
// Each entry in a slot consists of two 8-byte values:
// 1. pointer to hashEntryAdr
// 2. expireAt
private long addr;
private int allocLen;
private int len;
private int used;
private int min0;
void add(long hashEntryAdr, long expireAt)
{
for (int i = min0; i < allocLen; i++)
{
int off = i * ENTRY_SIZE;
long adr = Uns.getLong(addr, off);
if (adr == 0)
{
// reuse empty entry
if (i + 1 > len)
len = i + 1;
min0 = i + 1;
postAdd(hashEntryAdr, expireAt, off);
return;
}
}
resize();
int off = len * ENTRY_SIZE;
len = min0 = len + 1;
postAdd(hashEntryAdr, expireAt, off);
}
private void postAdd(long hashEntryAdr, long expireAt, int off)
{
Uns.putLong(addr, off, hashEntryAdr);
Uns.putLong(addr, off + 8, expireAt);
used++;
}
private void resize()
{
int newAllocLen = Math.max(MIN_LEN, allocLen * 2);
long newAddr = Uns.allocate(newAllocLen * ENTRY_SIZE, true);
if (addr != 0L)
{
Uns.copyMemory(addr, 0, newAddr, 0, len * ENTRY_SIZE);
Uns.setMemory(newAddr, len * ENTRY_SIZE, (newAllocLen - len) * ENTRY_SIZE, (byte) 0);
}
else
Uns.setMemory(newAddr, 0, newAllocLen * ENTRY_SIZE, (byte) 0);
Uns.free(addr);
addr = newAddr;
allocLen = newAllocLen;
}
void remove(long hashEntryAdr)
{
for (int i = 0; i < len; i++)
{
int off = i * ENTRY_SIZE;
long adr = Uns.getLong(addr, off);
if (adr == hashEntryAdr)
{
// reuse empty entry
clearEntry(i, off);
}
}
maybeCompact();
}
int removeExpired(long now, TimeoutHandler expireHandler)
{
int expired = 0;
for (int i = 0; i < len; i++)
{
int off = i * ENTRY_SIZE;
long hashEntryAdr = Uns.getLong(addr, off);
if (hashEntryAdr != 0L)
{
long expireAt = Uns.getLong(addr, off + 8);
if (now >= expireAt)
{
clearEntry(i, off);
expireHandler.expired(hashEntryAdr);
expired++;
}
}
}
maybeCompact();
return expired;
}
private void clearEntry(int idx, int off)
{
if (idx < min0)
min0 = idx;
if (idx == len - 1)
len--;
Uns.putLong(addr, off, 0L);
Uns.putLong(addr, off + 8, 0L);
used--;
}
private void maybeCompact()
{
// compact slot, if too many unused entries in slot
// i.e. if only half of the possible entries are used, but keep 16 at least entries
if (used * 2 <= allocLen)
{
int newAllocLen = Math.max(MIN_LEN, allocLen / 2);
if (newAllocLen < allocLen)
compact(newAllocLen);
}
}
private void compact(int newAllocLen)
{
long newAddr = Uns.allocate(newAllocLen * ENTRY_SIZE, true);
int tOff = 0;
int sOff = 0;
for (int i = 0; i < len; i++, sOff += ENTRY_SIZE)
{
long hashEntryAdr = Uns.getLong(addr, sOff);
if (hashEntryAdr != 0L)
{
long expireAt = Uns.getLong(addr, sOff + 8);
Uns.putLong(newAddr, tOff, hashEntryAdr);
Uns.putLong(newAddr, tOff + 8, expireAt);
tOff += ENTRY_SIZE;
}
}
Uns.setMemory(newAddr, used * ENTRY_SIZE, (newAllocLen - used) * ENTRY_SIZE, (byte) 0);
Uns.free(addr);
len = used;
min0 = used;
addr = newAddr;
allocLen = newAllocLen;
}
void release()
{
Uns.free(addr);
addr = 0L;
}
}
}