SegmentedSliceBlockBuilder.java

/*
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.facebook.presto.orc.writer;

import com.facebook.presto.common.block.AbstractVariableWidthBlockBuilder;
import com.facebook.presto.common.block.Block;
import com.facebook.presto.common.block.BlockBuilder;
import com.facebook.presto.common.block.BlockBuilderStatus;
import com.google.common.annotations.VisibleForTesting;
import io.airlift.slice.DynamicSliceOutput;
import io.airlift.slice.Slice;
import io.airlift.slice.SliceInput;
import io.airlift.slice.XxHash64;
import org.openjdk.jol.info.ClassLayout;

import java.util.Arrays;
import java.util.OptionalInt;
import java.util.function.ObjLongConsumer;

import static com.facebook.presto.orc.writer.SegmentedSliceBlockBuilder.Segments.INITIAL_SEGMENTS;
import static com.facebook.presto.orc.writer.SegmentedSliceBlockBuilder.Segments.SEGMENT_SIZE;
import static com.facebook.presto.orc.writer.SegmentedSliceBlockBuilder.Segments.offset;
import static com.facebook.presto.orc.writer.SegmentedSliceBlockBuilder.Segments.segment;
import static io.airlift.slice.SizeOf.sizeOf;
import static java.lang.String.format;

/**
 * Custom Block Builder implementation for use with SliceDictionaryBuilder.
 * Instead of using one large contiguous Slice for storing the unique Strings
 * in String dictionary, this class uses Segmented Slices. The main advantage
 * of this class over VariableWidthBlockBuilder is memory. Non contiguous
 * memory is more likely to be available and hence reduce the chance of OOMs.
 * <p>
 * Why implement a block builder ?
 * SliceDictionaryBuilder takes in a Block and Position to write.
 * 1. It can create a slice for the position and write it. This does not
 * require a block builder. But temporary slice, produces lot of
 * short lived garbage.
 * 2. A block and position can be copied to BlockBuilder using the method
 * Block.writeBytesTo . But this requires implementing the BlockBuilder interface.
 * Most of the methods are going to be unused and left as Unsupported.
 * <p>
 * What's the difference between this class and VariableWidthBlockBuilder?
 * This class is different from VariableWidthBlockBuilder in the following ways
 * 1. It does not support nulls. (So null byte array and management is avoided).
 * 2. Instead of using one contiguous chunk for storing all the entries,
 * they are segmented.
 * <p>
 * How is it implemented ?
 * The Strings from 0 to SEGMENT_SIZE-1 are stored in the first segment.
 * The string from SEGMENT_SIZE to 2 * SEGMENT_SIZE -1 goes to the second.
 * Each segment has Slice(data is concatenated and stored in one slice)
 * and offsets to capture the start offset and length. New slices are appended
 * to the open segment. Once the segment is full the segment is
 * finalized and appended to the closed segments. A new open segment is
 * created for further appends.
 */
public class SegmentedSliceBlockBuilder
        extends AbstractVariableWidthBlockBuilder
{
    private static final int INSTANCE_SIZE = ClassLayout.parseClass(SegmentedSliceBlockBuilder.class).instanceSize();

    private final int expectedBytes;

    private DynamicSliceOutput openSliceOutput;
    private int[][] offsets;
    private Slice[] closedSlices;
    private long closedSlicesRetainedSize;
    private long closedSlicesSizeInBytes;
    private int openSegmentIndex;
    private int openSegmentOffset;

    public SegmentedSliceBlockBuilder(int expectedEntries, int expectedBytes)
    {
        this.expectedBytes = expectedBytes;

        openSliceOutput = new DynamicSliceOutput(expectedBytes);
        int initialSize = Math.max(INITIAL_SEGMENTS, segment(expectedEntries) + 1);
        offsets = new int[initialSize][];
        closedSlices = new Slice[initialSize];
        offsets[0] = new int[SEGMENT_SIZE + 1];
    }

    public void reset()
    {
        // DynamicSliceOutput.reset() does not shrink memory, when dictionary is converted
        // to direct, DynamicSliceOutput needs to give up memory to reduce the memory pressure.
        openSliceOutput = new DynamicSliceOutput(expectedBytes);

        Arrays.fill(closedSlices, null);
        closedSlicesRetainedSize = 0;
        closedSlicesSizeInBytes = 0;

        // Fill the first offset array with 0, and free up the rest of the offsets array.
        Arrays.fill(offsets[0], 0);
        Arrays.fill(offsets, 1, offsets.length, null);
        openSegmentIndex = 0;
        openSegmentOffset = 0;
    }

    @Override
    public int getPositionOffset(int position)
    {
        return getOffset(position);
    }

    @Override
    public int getSliceLength(int position)
    {
        int offset = offset(position);
        int segment = segment(position);
        return offsets[segment][offset + 1] - offsets[segment][offset];
    }

    private Slice getSegmentRawSlice(int segment)
    {
        return segment == openSegmentIndex ? openSliceOutput.getUnderlyingSlice() : closedSlices[segment];
    }

    @Override
    public Slice getRawSlice(int position)
    {
        return getSegmentRawSlice(segment(position));
    }

    @Override
    public int getPositionCount()
    {
        return Segments.getPositions(openSegmentIndex, openSegmentOffset);
    }

    @Override
    public long getSizeInBytes()
    {
        long offsetsSizeInBytes = Integer.BYTES * (long) getPositionCount();
        return openSliceOutput.size() + offsetsSizeInBytes + closedSlicesSizeInBytes;
    }

    @Override
    public OptionalInt fixedSizeInBytesPerPosition()
    {
        return OptionalInt.empty(); // size is variable based on the per element length
    }

    @Override
    public long getRegionSizeInBytes(int position, int length)
    {
        throw new UnsupportedOperationException("getRegionSizeInBytes is not supported by SegmentedSliceBlockBuilder");
    }

    @Override
    public long getPositionsSizeInBytes(boolean[] positions, int usedPositionCount)
    {
        throw new UnsupportedOperationException("getPositionsSizeInBytes is not supported by SegmentedSliceBlockBuilder");
    }

    @Override
    public long getRetainedSizeInBytes()
    {
        long offsetsSize = sizeOf(offsets) + (openSegmentIndex + 1) * sizeOf(offsets[0]);
        long closedSlicesSize = sizeOf(closedSlices) + closedSlicesRetainedSize;
        return INSTANCE_SIZE + openSliceOutput.getRetainedSize() + offsetsSize + closedSlicesSize;
    }

    @Override
    public void retainedBytesForEachPart(ObjLongConsumer<Object> consumer)
    {
        throw new UnsupportedOperationException("retainedBytesForEachPart is not supported by SegmentedSliceBlockBuilder");
    }

    @Override
    public Block copyPositions(int[] positions, int offset, int length)
    {
        throw new UnsupportedOperationException("copyPositions is not supported by SegmentedSliceBlockBuilder");
    }

    @Override
    public BlockBuilder writeByte(int value)
    {
        throw new UnsupportedOperationException("writeByte is not supported by SegmentedSliceBlockBuilder");
    }

    @Override
    public BlockBuilder writeShort(int value)
    {
        throw new UnsupportedOperationException("writeShort is not supported by SegmentedSliceBlockBuilder");
    }

    @Override
    public BlockBuilder writeInt(int value)
    {
        throw new UnsupportedOperationException("writeInt is not supported by SegmentedSliceBlockBuilder");
    }

    @Override
    public BlockBuilder writeLong(long value)
    {
        throw new UnsupportedOperationException("writeLong is not supported by SegmentedSliceBlockBuilder");
    }

    @Override
    public BlockBuilder writeBytes(Slice source, int sourceIndex, int length)
    {
        initializeNewSegmentIfRequired();
        openSliceOutput.writeBytes(source, sourceIndex, length);
        return this;
    }

    private void initializeNewSegmentIfRequired()
    {
        if (openSegmentOffset == 0) {
            // Expand Segments if necessary.
            if (openSegmentIndex >= offsets.length) {
                int newCapacity = Math.max(openSegmentIndex + 1, (int) (offsets.length * 1.5));
                closedSlices = Arrays.copyOf(closedSlices, newCapacity);
                offsets = Arrays.copyOf(offsets, newCapacity);
            }

            if (offsets[openSegmentIndex] == null) {
                offsets[openSegmentIndex] = new int[SEGMENT_SIZE + 1];
            }
        }
    }

    @Override
    public AbstractVariableWidthBlockBuilder writeBytes(byte[] source, int sourceIndex, int length)
    {
        initializeNewSegmentIfRequired();
        openSliceOutput.writeBytes(source, sourceIndex, length);
        return this;
    }

    @Override
    public BlockBuilder closeEntry()
    {
        openSegmentOffset++;
        offsets[openSegmentIndex][openSegmentOffset] = openSliceOutput.size();
        if (openSegmentOffset == SEGMENT_SIZE) {
            // Copy the content from the openSlice and append it to the closedSlices.
            // Note: openSlice will be reused for next segment, so a copy is required.
            Slice slice = openSliceOutput.copySlice();
            closedSlices[openSegmentIndex] = slice;
            closedSlicesSizeInBytes += slice.length();
            closedSlicesRetainedSize += slice.getRetainedSize();

            // Prepare the open slice for next write.
            openSliceOutput.reset();
            openSegmentOffset = 0;
            openSegmentIndex++;
        }
        return this;
    }

    @Override
    public BlockBuilder appendNull()
    {
        throw new UnsupportedOperationException("appendNull is not supported by SegmentedSliceBlockBuilder");
    }

    @Override
    public BlockBuilder readPositionFrom(SliceInput input)
    {
        throw new UnsupportedOperationException("readPositionFrom is not supported by SegmentedSliceBlockBuilder");
    }

    @Override
    public boolean mayHaveNull()
    {
        return false;
    }

    @Override
    protected boolean isEntryNull(int position)
    {
        return false;
    }

    @Override
    public Block getRegion(int positionOffset, int length)
    {
        throw new UnsupportedOperationException("getRegion is not supported by SegmentedSliceBlockBuilder");
    }

    @Override
    public Block copyRegion(int position, int length)
    {
        throw new UnsupportedOperationException("copyRegion is not supported by SegmentedSliceBlockBuilder");
    }

    @Override
    public Block build()
    {
        // No implementation of Segmented Slice based block exists. There is also no use for this yet.
        throw new UnsupportedOperationException("build is not supported by SegmentedSliceBlockBuilder");
    }

    @Override
    public BlockBuilder newBlockBuilderLike(BlockBuilderStatus blockBuilderStatus)
    {
        return newBlockBuilderLike(blockBuilderStatus, getPositionCount());
    }

    @Override
    public BlockBuilder newBlockBuilderLike(BlockBuilderStatus blockBuilderStatus, int expectedEntries)
    {
        if (blockBuilderStatus != null) {
            throw new UnsupportedOperationException("blockBuilderStatus is not supported by SegmentedSliceBlockBuilder");
        }
        return new SegmentedSliceBlockBuilder(expectedEntries, openSliceOutput.getUnderlyingSlice().length());
    }

    private int getOffset(int position)
    {
        int offset = offset(position);
        int segment = segment(position);
        return offsets[segment][offset];
    }

    @Override
    public String toString()
    {
        return format("SegmentedSliceBlockBuilder(%d){positionCount=%d,size=%d}", hashCode(), getPositionCount(), openSliceOutput.size());
    }

    @Override
    public boolean isNullUnchecked(int internalPosition)
    {
        return false;
    }

    @Override
    public int getOffsetBase()
    {
        return 0;
    }

    public boolean equals(int position, Block block, int blockPosition, int blockLength)
    {
        int segment = segment(position);
        int segmentOffset = offset(position);

        int offset = offsets[segment][segmentOffset];
        int length = offsets[segment][segmentOffset + 1] - offset;
        return blockLength == length && block.bytesEqual(blockPosition, 0, getSegmentRawSlice(segment), offset, length);
    }

    public int compareTo(int left, int right)
    {
        int leftSegment = segment(left);
        int leftSegmentOffset = offset(left);

        int rightSegment = segment(right);
        int rightSegmentOffset = offset(right);

        Slice leftRawSlice = getSegmentRawSlice(leftSegment);
        int leftOffset = offsets[leftSegment][leftSegmentOffset];
        int leftLen = offsets[leftSegment][leftSegmentOffset + 1] - leftOffset;

        Slice rightRawSlice = getSegmentRawSlice(rightSegment);
        int rightOffset = offsets[rightSegment][rightSegmentOffset];
        int rightLen = offsets[rightSegment][rightSegmentOffset + 1] - rightOffset;

        return leftRawSlice.compareTo(leftOffset, leftLen, rightRawSlice, rightOffset, rightLen);
    }

    public long hash(int position)
    {
        int segment = segment(position);
        int segmentOffset = offset(position);

        int offset = offsets[segment][segmentOffset];
        int length = offsets[segment][segmentOffset + 1] - offset;
        // There are several methods which computes hash, Block.hash, BlockBuilder.hash and
        // Slice.hash. There is an expectations that all these methods return the same
        // hash value. For insertion, block.hash is called, but rehash relies on BlockBuilder.hash
        // So changing the hashing algorithm is hard. If a block implements different hashing
        // algorithm, it is going to produce incorrect results after rehashing.
        return XxHash64.hash(getSegmentRawSlice(segment), offset, length);
    }

    @VisibleForTesting
    int getOpenSegmentIndex()
    {
        return openSegmentIndex;
    }

    // This class is copied from com.facebook.presto.orc.array.BigArrays and the
    // Sizes and Initial Segments are tuned for the SliceDictionaryBuilder use case.
    static class Segments
    {
        // DictionaryBuilder allocates first offsets array upfront and grows further offsets array lazily.
        // so 4KB (i.e 2^SEGMENT_SHIFT * INT_SIZE) is allocated upfront per builder. For tables with
        // 100+ String columns, this is 4 MB allocation upfront, which is manageable.
        // The INITIAL_SEGMENTS value does not have that much significance and 32 is guess based on
        // most ORC dictionaries will have less than 32K elements.
        public static final int INITIAL_SEGMENTS = 32;

        public static final int SEGMENT_SHIFT = 10;

        /**
         * Size of a single segment of a BigArray
         */
        public static final int SEGMENT_SIZE = 1 << SEGMENT_SHIFT;

        /**
         * The mask used to compute the offset associated to an index.
         */
        public static final int SEGMENT_MASK = SEGMENT_SIZE - 1;

        /**
         * Computes the segment associated with a given index.
         *
         * @param index an index into a big array.
         * @return the associated segment.
         */
        public static int segment(int index)
        {
            return index >>> SEGMENT_SHIFT;
        }

        /**
         * Computes the offset associated with a given index.
         *
         * @param index an index into a big array.
         * @return the associated offset (in the associated {@linkplain #segment(int) segment}).
         */
        public static int offset(int index)
        {
            return index & SEGMENT_MASK;
        }

        public static int getPositions(int segment, int offset)
        {
            return (segment << SEGMENT_SHIFT) + offset;
        }
    }
}