AbstractBlockEncodingBuffer.java

/*
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

/*
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.facebook.presto.operator.repartition;

import com.facebook.presto.common.block.ArrayAllocator;
import com.facebook.presto.common.block.Block;
import com.facebook.presto.common.block.ByteArrayBlock;
import com.facebook.presto.common.block.ColumnarArray;
import com.facebook.presto.common.block.ColumnarMap;
import com.facebook.presto.common.block.ColumnarRow;
import com.facebook.presto.common.block.DictionaryBlock;
import com.facebook.presto.common.block.Int128ArrayBlock;
import com.facebook.presto.common.block.IntArrayBlock;
import com.facebook.presto.common.block.LongArrayBlock;
import com.facebook.presto.common.block.RunLengthEncodedBlock;
import com.facebook.presto.common.block.ShortArrayBlock;
import com.facebook.presto.common.block.VariableWidthBlock;
import com.google.common.annotations.VisibleForTesting;
import io.airlift.slice.SliceOutput;

import javax.annotation.Nullable;

import java.util.Arrays;

import static com.facebook.presto.common.array.Arrays.ExpansionFactor.LARGE;
import static com.facebook.presto.common.array.Arrays.ExpansionFactor.SMALL;
import static com.facebook.presto.common.array.Arrays.ExpansionOption.INITIALIZE;
import static com.facebook.presto.common.array.Arrays.ExpansionOption.NONE;
import static com.facebook.presto.common.array.Arrays.ExpansionOption.PRESERVE;
import static com.facebook.presto.common.array.Arrays.ensureCapacity;
import static com.facebook.presto.operator.MoreByteArrays.fill;
import static com.facebook.presto.operator.UncheckedByteArrays.setByteUnchecked;
import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.base.Verify.verify;
import static io.airlift.slice.SizeOf.SIZE_OF_BYTE;
import static java.lang.String.format;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Objects.requireNonNull;

public abstract class AbstractBlockEncodingBuffer
        implements BlockEncodingBuffer
{
    @VisibleForTesting
    public static final double GRACE_FACTOR_FOR_MAX_BUFFER_CAPACITY = 1.2f;

    // The allocator for internal buffers
    protected final ArrayAllocator bufferAllocator;

    // Boolean indicating whether this is a buffer for a nested level block.
    protected final boolean isNested;

    // The block after peeling off the Dictionary or RLE wrappings.
    protected Block decodedBlock;

    // The number of positions (rows) to be copied from the incoming block.
    protected int positionCount;

    // The number of positions (rows) to be copied from the incoming block within current batch.
    protected int batchSize;

    // The offset into positions/mappedPositions array that the current batch starts copying.
    // I.e. in each batch we copy the values of rows from positions[positionsOffset] to positions[positionsOffset + batchSize]
    protected int positionsOffset;

    // The number of positions (rows) buffered so far
    protected int bufferedPositionCount;

    // Whether the positions array has already been mapped to mappedPositions
    protected boolean positionsMapped;

    // Whether the buffers have been flushed
    protected boolean flushed;

    // The positions (row numbers) of the incoming Block to be copied to this buffer.
    // All top level AbstractBlockEncodingBuffer for the same partition share the same positions array.
    private int[] positions;

    // The mapped positions of the incoming Dictionary or RLE Block. For other blocks, mappedPosition
    // is either null or meaningless.
    //
    // For example, a VARCHAR column with 5 rows - ['b', 'a', 'b', 'a', 'c'] is represented as
    // a DictionaryBlock with dictionary ['a', 'b', 'c'] and ids [1, 0, 1, 0, 2]. The mappedPositions
    // array for positions [0, 1, 4] for this VARCHAR column will be [1, 0, 2].
    @Nullable
    private int[] mappedPositions;

    @Nullable
    private byte[] nullsBuffer;

    // The address offset in the nullsBuffer if new values are to be added.
    private int nullsBufferIndex;

    // The target size for nullsBuffer
    private int estimatedNullsBufferMaxCapacity;

    // For each batch we put the nulls values up to a multiple of 8 into nullsBuffer. The rest is kept in remainingNulls.
    private final boolean[] remainingNulls = new boolean[Byte.SIZE];

    // Number of null values not put into nullsBuffer from last batch
    private int remainingNullsCount;

    // Boolean indicating whether there are any null values in the nullsBuffer. It is possible that all values are non-null.
    private boolean hasEncodedNulls;

    protected abstract void setupDecodedBlockAndMapPositions(DecodedBlockNode decodedBlockNode, int partitionBufferCapacity, double decodedBlockPageSizeFraction);

    protected AbstractBlockEncodingBuffer(ArrayAllocator bufferAllocator, boolean isNested)
    {
        this.bufferAllocator = requireNonNull(bufferAllocator, "bufferAllocator is null");
        this.isNested = isNested;
    }

    public static BlockEncodingBuffer createBlockEncodingBuffers(DecodedBlockNode decodedBlockNode, ArrayAllocator bufferAllocator, boolean isNested)
    {
        requireNonNull(decodedBlockNode, "decodedBlockNode is null");
        requireNonNull(bufferAllocator, "bufferAllocator is null");

        // decodedBlock could be a block or ColumnarArray/Map/Row
        Object decodedBlock = decodedBlockNode.getDecodedBlock();

        // Skip the Dictionary/RLE block node. The mapping info is not needed when creating buffers.
        // This is because the AbstractBlockEncodingBuffer is only created once, while position mapping for Dictionary/RLE blocks
        // need to be done for every incoming block.
        if (decodedBlock instanceof DictionaryBlock) {
            decodedBlockNode = decodedBlockNode.getChildren().get(0);
            decodedBlock = decodedBlockNode.getDecodedBlock();
        }
        else if (decodedBlock instanceof RunLengthEncodedBlock) {
            decodedBlockNode = decodedBlockNode.getChildren().get(0);
            decodedBlock = decodedBlockNode.getDecodedBlock();
        }

        verify(!(decodedBlock instanceof DictionaryBlock), "Nested RLEs and dictionaries are not supported");
        verify(!(decodedBlock instanceof RunLengthEncodedBlock), "Nested RLEs and dictionaries are not supported");

        if (decodedBlock instanceof LongArrayBlock) {
            return new LongArrayBlockEncodingBuffer(bufferAllocator, isNested);
        }

        if (decodedBlock instanceof Int128ArrayBlock) {
            return new Int128ArrayBlockEncodingBuffer(bufferAllocator, isNested);
        }

        if (decodedBlock instanceof IntArrayBlock) {
            return new IntArrayBlockEncodingBuffer(bufferAllocator, isNested);
        }

        if (decodedBlock instanceof ShortArrayBlock) {
            return new ShortArrayBlockEncodingBuffer(bufferAllocator, isNested);
        }

        if (decodedBlock instanceof ByteArrayBlock) {
            return new ByteArrayBlockEncodingBuffer(bufferAllocator, isNested);
        }

        if (decodedBlock instanceof VariableWidthBlock) {
            return new VariableWidthBlockEncodingBuffer(bufferAllocator, isNested);
        }

        if (decodedBlock instanceof ColumnarArray) {
            return new ArrayBlockEncodingBuffer(decodedBlockNode, bufferAllocator, isNested);
        }

        if (decodedBlock instanceof ColumnarMap) {
            return new MapBlockEncodingBuffer(decodedBlockNode, bufferAllocator, isNested);
        }

        if (decodedBlock instanceof ColumnarRow) {
            return new RowBlockEncodingBuffer(decodedBlockNode, bufferAllocator, isNested);
        }

        throw new IllegalArgumentException("Unsupported encoding: " + decodedBlock.getClass().getSimpleName());
    }

    @Override
    public void setupDecodedBlocksAndPositions(DecodedBlockNode decodedBlockNode, int[] positions, int positionCount, int partitionBufferCapacity, long estimatedSerializedPageSize)
    {
        requireNonNull(decodedBlockNode, "decodedBlockNode is null");
        requireNonNull(positions, "positions is null");

        this.positions = positions;
        this.positionCount = positionCount;
        this.positionsOffset = 0;
        this.positionsMapped = false;

        double decodedBlockPageSizeFraction = (decodedBlockNode.getEstimatedSerializedSizeInBytes()) / ((double) estimatedSerializedPageSize);

        setupDecodedBlockAndMapPositions(decodedBlockNode, partitionBufferCapacity, decodedBlockPageSizeFraction);
    }

    @Override
    public void setNextBatch(int positionsOffset, int batchSize)
    {
        this.positionsOffset = positionsOffset;
        this.batchSize = batchSize;
        this.flushed = false;
    }

    @Override
    public String toString()
    {
        return toStringHelper(this)
                .add("isNested", isNested)
                .add("decodedBlock", mappedPositions == null ? "null" : decodedBlock)
                .add("positionCount", positionCount)
                .add("batchSize", batchSize)
                .add("positionsOffset", positionsOffset)
                .add("bufferedPositionCount", bufferedPositionCount)
                .add("positionsMapped", positionsMapped)
                .add("flushed", flushed)
                .add("positionsCapacity", positions == null ? 0 : positions.length)
                .add("mappedPositionsCapacity", mappedPositions == null ? 0 : mappedPositions.length)
                .add("estimatedNullsBufferMaxCapacity", estimatedNullsBufferMaxCapacity)
                .add("nullsBufferCapacity", nullsBuffer == null ? 0 : nullsBuffer.length)
                .add("nullsBufferIndex", nullsBufferIndex)
                .add("remainingNullsCount", remainingNullsCount)
                .add("hasEncodedNulls", hasEncodedNulls)
                .toString();
    }

    @VisibleForTesting
    abstract int getEstimatedValueBufferMaxCapacity();

    @VisibleForTesting
    int getEstimatedNullsBufferMaxCapacity()
    {
        return estimatedNullsBufferMaxCapacity;
    }

    protected void setEstimatedNullsBufferMaxCapacity(int estimatedNullsBufferMaxCapacity)
    {
        this.estimatedNullsBufferMaxCapacity = estimatedNullsBufferMaxCapacity;
    }

    protected static int getEstimatedBufferMaxCapacity(double targetBufferSize, int unitSize, int positionSize)
    {
        return (int) (targetBufferSize * unitSize / positionSize * GRACE_FACTOR_FOR_MAX_BUFFER_CAPACITY);
    }

    /**
     * Map the positions for DictionaryBlock to its nested dictionaryBlock, and positions for RunLengthEncodedBlock
     * to its nested value block. For example, positions [0, 2, 5] on DictionaryBlock with dictionary block ['a', 'b', 'c']
     * and ids [1, 1, 2, 0, 2, 1] will be mapped to [1, 2, 1], and the copied data will be ['b', 'c', 'b'].
     */
    protected DecodedBlockNode mapPositionsToNestedBlock(DecodedBlockNode decodedBlockNode)
    {
        Object decodedObject = decodedBlockNode.getDecodedBlock();

        if (decodedObject instanceof DictionaryBlock) {
            DictionaryBlock dictionaryBlock = (DictionaryBlock) decodedObject;
            mappedPositions = ensureCapacity(mappedPositions, positionCount, SMALL, NONE, bufferAllocator);

            for (int i = 0; i < positionCount; i++) {
                mappedPositions[i] = dictionaryBlock.getId(positions[i]);
            }
            positionsMapped = true;
            return decodedBlockNode.getChildren().get(0);
        }

        if (decodedObject instanceof RunLengthEncodedBlock) {
            mappedPositions = ensureCapacity(mappedPositions, positionCount, SMALL, INITIALIZE, bufferAllocator);
            positionsMapped = true;
            return decodedBlockNode.getChildren().get(0);
        }

        positionsMapped = false;
        return decodedBlockNode;
    }

    protected void ensurePositionsCapacity(int capacity)
    {
        positions = ensureCapacity(positions, capacity, SMALL, NONE, bufferAllocator);
    }

    protected void appendPositionRange(int offset, int length)
    {
        for (int i = 0; i < length; i++) {
            positions[positionCount++] = offset + i;
        }
    }

    /**
     * Called by accumulateSerializedRowSizes(int[] serializedRowSizes) for composite types to add row sizes for nested blocks.
     * <p>
     * For example, a AbstractBlockEncodingBuffer for an array(int) column with 3 rows - [{1, 2}, {3}, {4, 5, 6}] - will call accumulateSerializedRowSizes
     * with positionOffsets = [0, 2, 3, 6] and serializedRowSizes = [5, 5, 5] (5 = 4 + 1 where 4 bytes for offset and 1 for null flag).
     * After the method returns, serializedRowSizes will be [15, 10, 20].
     *
     * @param positionOffsets The offsets of the top level rows
     * @param positionCount Number of top level rows
     */
    protected abstract void accumulateSerializedRowSizes(int[] positionOffsets, int positionCount, int[] serializedRowSizes);

    protected void resetPositions()
    {
        positionsOffset = 0;
        positionCount = 0;
        positionsMapped = false;
    }

    protected int[] getPositions()
    {
        if (positionsMapped) {
            verify(mappedPositions != null);
            return mappedPositions;
        }

        return positions;
    }

    protected void appendNulls()
    {
        if (decodedBlock.mayHaveNull()) {
            // Write to nullsBuffer if there is a possibility to have nulls. It is possible that the
            // decodedBlock contains nulls, but rows that go into this partition don't. Write to nullsBuffer anyway.
            nullsBuffer = ensureCapacity(nullsBuffer, (bufferedPositionCount + batchSize) / Byte.SIZE + 1, estimatedNullsBufferMaxCapacity, LARGE, PRESERVE, bufferAllocator);

            int bufferedNullsCount = nullsBufferIndex * Byte.SIZE + remainingNullsCount;
            if (bufferedPositionCount > bufferedNullsCount) {
                // There are no nulls in positions (bufferedNullsCount, bufferedPositionCount]
                encodeNonNullsAsBits(bufferedPositionCount - bufferedNullsCount);
            }

            // Append this batch
            encodeNullsAsBits(decodedBlock);
        }
        else if (containsNull()) {
            // There were nulls in previously buffered rows, but for this batch there can't be any nulls.
            // Any how we need to append 0's for this batch.
            nullsBuffer = ensureCapacity(nullsBuffer, (bufferedPositionCount + batchSize) / Byte.SIZE + 1, estimatedNullsBufferMaxCapacity, LARGE, PRESERVE, bufferAllocator);

            encodeNonNullsAsBits(batchSize);
        }
    }

    @Override
    public void noMoreBatches()
    {
        // Only the positions for nested level need to be recycled.
        if (isNested && positions != null) {
            bufferAllocator.returnArray(positions);
            positions = null;
        }

        if (mappedPositions != null) {
            bufferAllocator.returnArray(mappedPositions);
            mappedPositions = null;
        }

        // Only when this is the last batch in the page and it filled the buffer, the buffers can be recycled.
        if (flushed && nullsBuffer != null) {
            bufferAllocator.returnArray(nullsBuffer);
            nullsBuffer = null;
        }

        // Recycle the decodedBlock. It will be set by setupDecodedBlocksAndPositions in the next call.
        decodedBlock = null;
    }

    protected void serializeNullsTo(SliceOutput output)
    {
        encodeRemainingNullsAsBits();

        if (hasEncodedNulls) {
            output.writeBoolean(true);
            output.appendBytes(nullsBuffer, 0, nullsBufferIndex);
        }
        else {
            output.writeBoolean(false);
        }
    }

    protected static void writeLengthPrefixedString(SliceOutput output, String value)
    {
        byte[] bytes = value.getBytes(UTF_8);
        output.writeInt(bytes.length);
        output.writeBytes(bytes);
    }

    protected void resetNullsBuffer()
    {
        nullsBufferIndex = 0;
        remainingNullsCount = 0;
        hasEncodedNulls = false;
    }

    protected long getNullsBufferSerializedSizeInBytes()
    {
        long size = SIZE_OF_BYTE;       // nulls uses 1 byte for mayHaveNull
        if (containsNull()) {
            size += nullsBufferIndex +  // nulls buffer
                    (remainingNullsCount > 0 ? SIZE_OF_BYTE : 0);   // The remaining nulls not serialized yet. We have to add it here because at the time of calling this function, the remaining nulls was not put into the nullsBuffer yet.
        }
        return size;
    }

    private void encodeNullsAsBits(Block block)
    {
        int[] positions = getPositions();

        if (remainingNullsCount + batchSize < Byte.SIZE) {
            // just put all of this batch to remainingNulls
            for (int i = 0; i < batchSize; i++) {
                remainingNulls[remainingNullsCount++] = block.isNull(positions[positionsOffset + i]);
            }

            return;
        }

        // Process the remaining nulls from last batch
        int offset = positionsOffset;

        if (remainingNullsCount > 0) {
            byte value = 0;

            for (int i = 0; i < remainingNullsCount; i++) {
                value |= remainingNulls[i] ? 0b1000_0000 >>> i : 0;
            }

            // process a few more nulls to make up one byte
            for (int i = remainingNullsCount; i < Byte.SIZE; i++) {
                value |= block.isNull(positions[offset++]) ? 0b1000_0000 >>> i : 0;
            }

            hasEncodedNulls |= (value != 0);
            nullsBufferIndex = setByteUnchecked(nullsBuffer, nullsBufferIndex, value);

            remainingNullsCount = 0;
        }

        // Process the next Byte.SIZE * n positions. We now have processed (offset - positionsOffset) positions
        int remainingPositions = batchSize - (offset - positionsOffset);
        int positionsToEncode = remainingPositions & ~0b111;
        for (int i = 0; i < positionsToEncode; i += Byte.SIZE) {
            byte value = 0;
            value |= block.isNull(positions[offset]) ? 0b1000_0000 : 0;
            value |= block.isNull(positions[offset + 1]) ? 0b0100_0000 : 0;
            value |= block.isNull(positions[offset + 2]) ? 0b0010_0000 : 0;
            value |= block.isNull(positions[offset + 3]) ? 0b0001_0000 : 0;
            value |= block.isNull(positions[offset + 4]) ? 0b0000_1000 : 0;
            value |= block.isNull(positions[offset + 5]) ? 0b0000_0100 : 0;
            value |= block.isNull(positions[offset + 6]) ? 0b0000_0010 : 0;
            value |= block.isNull(positions[offset + 7]) ? 0b0000_0001 : 0;

            hasEncodedNulls |= (value != 0);
            nullsBufferIndex = setByteUnchecked(nullsBuffer, nullsBufferIndex, value);
            offset += Byte.SIZE;
        }

        // Process the remaining positions
        remainingNullsCount = remainingPositions & 0b111;
        for (int i = 0; i < remainingNullsCount; i++) {
            remainingNulls[i] = block.isNull(positions[offset++]);
        }
    }

    private void encodeNonNullsAsBits(int count)
    {
        if (remainingNullsCount + count < Byte.SIZE) {
            // just put all of this batch to remainingNulls
            for (int i = 0; i < count; i++) {
                remainingNulls[remainingNullsCount++] = false;
            }
            return;
        }

        int remainingPositions = count - encodeRemainingNullsAsBits();

        nullsBufferIndex = fill(nullsBuffer, nullsBufferIndex, remainingPositions >>> 3, (byte) 0);

        remainingNullsCount = remainingPositions & 0b111;
        Arrays.fill(remainingNulls, false);
    }

    private int encodeRemainingNullsAsBits()
    {
        if (remainingNullsCount == 0) {
            return 0;
        }

        byte value = 0;
        for (int i = 0; i < remainingNullsCount; i++) {
            value |= remainingNulls[i] ? 0b1000_0000 >>> i : 0;
        }

        hasEncodedNulls |= (value != 0);
        nullsBufferIndex = setByteUnchecked(nullsBuffer, nullsBufferIndex, value);

        int padding = Byte.SIZE - remainingNullsCount;
        remainingNullsCount = 0;
        return padding;
    }

    private boolean containsNull()
    {
        if (hasEncodedNulls) {
            return true;
        }

        for (int i = 0; i < remainingNullsCount; i++) {
            if (remainingNulls[i]) {
                return true;
            }
        }
        return false;
    }

    @VisibleForTesting
    void checkValidPositions()
    {
        verify(decodedBlock != null, "decodedBlock should have been set up");

        int positionCount = decodedBlock.getPositionCount();

        int[] positions = getPositions();
        for (int i = 0; i < this.positionCount; i++) {
            if (positions[i] < 0 || positions[i] >= positionCount) {
                throw new IndexOutOfBoundsException(format("Invalid position %d for decodedBlock with %d positions", positions[i], positionCount));
            }
        }
    }
}