ValueStore.java

/*
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.facebook.presto.operator.aggregation.histogram;

import com.facebook.presto.common.NotSupportedException;
import com.facebook.presto.common.array.IntBigArray;
import com.facebook.presto.common.array.LongBigArray;
import com.facebook.presto.common.block.Block;
import com.facebook.presto.common.block.BlockBuilder;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.spi.PrestoException;
import com.google.common.annotations.VisibleForTesting;
import org.openjdk.jol.info.ClassLayout;

import static com.facebook.presto.operator.aggregation.histogram.HashUtil.calculateMaxFill;
import static com.facebook.presto.operator.aggregation.histogram.HashUtil.computeBucketCount;
import static com.facebook.presto.operator.aggregation.histogram.HashUtil.nextBucketId;
import static com.facebook.presto.operator.aggregation.histogram.HashUtil.nextProbeLinear;
import static com.facebook.presto.spi.StandardErrorCode.GENERIC_INSUFFICIENT_RESOURCES;
import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
import static com.google.common.base.Preconditions.checkState;

/**
 * helper class for {@link GroupedTypedHistogram}
 * May be used for other cases that need a simple hash for values
 * <p>
 * sort of a FlyWeightStore for values--will return unique number for a value. If it exists, you'll get the same number. Class map Value -> number
 * <p>
 * Note it assumes you're storing # -> Value (Type, Block, position, or the result of the ) somewhere else
 */
public class ValueStore
{
    private static final int INSTANCE_SIZE = ClassLayout.parseClass(GroupedTypedHistogram.class).instanceSize();
    private static final float MAX_FILL_RATIO = 0.5f;
    private static final int EMPTY_BUCKET = -1;
    private final BlockBuilder values;
    private int rehashCount;

    private int mask;
    private int bucketCount;
    private IntBigArray buckets;
    private final LongBigArray valueHashes;
    private int maxFill;

    @VisibleForTesting
    public ValueStore(int expectedSize, BlockBuilder values)
    {
        bucketCount = computeBucketCount(expectedSize, MAX_FILL_RATIO);
        mask = bucketCount - 1;
        maxFill = calculateMaxFill(bucketCount, MAX_FILL_RATIO);
        this.values = values;
        buckets = new IntBigArray(-1);
        buckets.ensureCapacity(bucketCount);
        valueHashes = new LongBigArray(-1);
        valueHashes.ensureCapacity(bucketCount);
    }

    /**
     * This will add an item if not already in the system. It returns a pointer that is unique for multiple instances of the value. If item present,
     * returns the pointer into the system
     */
    public int addAndGetPosition(Type type, Block block, int position, long valueHash)
    {
        if (values.getPositionCount() >= maxFill) {
            rehash();
        }

        int bucketId = getBucketId(valueHash, mask);
        int valuePointer;

        // look for an empty slot or a slot containing this key
        int probeCount = 1;
        int originalBucketId = bucketId;
        while (true) {
            checkState(probeCount < bucketCount, "could not find match for value nor empty slot in %s buckets", bucketCount);
            valuePointer = buckets.get(bucketId);

            try {
                if (valuePointer == EMPTY_BUCKET) {
                    valuePointer = values.getPositionCount();
                    valueHashes.set(valuePointer, (int) valueHash);
                    type.appendTo(block, position, values);
                    buckets.set(bucketId, valuePointer);

                    return valuePointer;
                }
                else if (type.equalTo(block, position, values, valuePointer)) {
                    // value at position
                    return valuePointer;
                }
                else {
                    int probe = nextProbe(probeCount);
                    bucketId = nextBucketId(originalBucketId, mask, probe);
                    probeCount++;
                }
            }
            catch (NotSupportedException e) {
                throw new PrestoException(NOT_SUPPORTED, e.getMessage(), e);
            }
        }
    }

    private int getBucketId(long valueHash, int mask)
    {
        return (int) (valueHash & mask);
    }

    @VisibleForTesting
    void rehash()
    {
        ++rehashCount;

        long newBucketCountLong = bucketCount * 2L;

        if (newBucketCountLong > Integer.MAX_VALUE) {
            throw new PrestoException(GENERIC_INSUFFICIENT_RESOURCES, "Size of hash table cannot exceed " + Integer.MAX_VALUE + " entries (" + newBucketCountLong + ")");
        }

        int newBucketCount = (int) newBucketCountLong;
        int newMask = newBucketCount - 1;

        IntBigArray newBuckets = new IntBigArray(-1);

        newBuckets.ensureCapacity(newBucketCount);

        for (int i = 0; i < values.getPositionCount(); i++) {
            long valueHash = valueHashes.get(i);
            int bucketId = getBucketId(valueHash, newMask);
            int probeCount = 1;

            while (newBuckets.get(bucketId) != EMPTY_BUCKET) {
                int probe = nextProbe(probeCount);

                bucketId = nextBucketId(bucketId, newMask, probe);
                probeCount++;
            }

            // record the mapping
            newBuckets.set(bucketId, i);
        }

        buckets = newBuckets;
        // worst case is every bucket has a unique value, so preemptively keep this large enough to have a value for ever bucket
        // TODO: could optimize the growth algorithm to be resize this only when necessary; this wastes memory but guarantees that if every value has a distinct hash, we have space
        valueHashes.ensureCapacity(newBucketCount);
        bucketCount = newBucketCount;
        maxFill = calculateMaxFill(newBucketCount, MAX_FILL_RATIO);
        mask = newMask;
    }

    public int getRehashCount()
    {
        return rehashCount;
    }

    public long getEstimatedSize()
    {
        return INSTANCE_SIZE + buckets.sizeOf();
    }

    private int nextProbe(int probe)
    {
        return nextProbeLinear(probe);
    }
}