BinaryNestedBatchReader.java

/*
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.facebook.presto.parquet.batchreader;

import com.facebook.presto.common.block.Block;
import com.facebook.presto.common.block.RunLengthEncodedBlock;
import com.facebook.presto.common.block.VariableWidthBlock;
import com.facebook.presto.parquet.RichColumnDescriptor;
import com.facebook.presto.parquet.batchreader.decoders.ValuesDecoder.BinaryValuesDecoder;
import com.facebook.presto.parquet.batchreader.decoders.ValuesDecoder.BinaryValuesDecoder.ValueBuffer;
import com.facebook.presto.parquet.reader.ColumnChunk;
import io.airlift.slice.Slice;
import io.airlift.slice.Slices;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;

public class BinaryNestedBatchReader
        extends AbstractNestedBatchReader
{
    public BinaryNestedBatchReader(RichColumnDescriptor columnDescriptor)
    {
        super(columnDescriptor);
    }

    @Override
    protected ColumnChunk readNestedWithNull()
            throws IOException
    {
        int maxDefinitionLevel = columnDescriptor.getMaxDefinitionLevel();
        RepetitionLevelDecodingContext repetitionLevelDecodingContext = readRepetitionLevels(nextBatchSize);
        DefinitionLevelDecodingContext definitionLevelDecodingContext = readDefinitionLevels(repetitionLevelDecodingContext.getDLValuesDecoderContexts(), repetitionLevelDecodingContext.getRepetitionLevels().length);

        int[] definitionLevels = definitionLevelDecodingContext.getDefinitionLevels();
        int newBatchSize = 0;
        int batchNonNullCount = 0;
        for (ValuesDecoderContext valuesDecoderContext : definitionLevelDecodingContext.getValuesDecoderContexts()) {
            int nonNullCount = 0;
            int valueCount = 0;
            for (int i = valuesDecoderContext.getStart(); i < valuesDecoderContext.getEnd(); i++) {
                nonNullCount += (definitionLevels[i] == maxDefinitionLevel ? 1 : 0);
                valueCount += (definitionLevels[i] >= maxDefinitionLevel - 1 ? 1 : 0);
            }
            batchNonNullCount += nonNullCount;
            newBatchSize += valueCount;
            valuesDecoderContext.setNonNullCount(nonNullCount);
            valuesDecoderContext.setValueCount(valueCount);
        }

        if (batchNonNullCount == 0) {
            Block block = RunLengthEncodedBlock.create(field.getType(), null, newBatchSize);
            return new ColumnChunk(block, definitionLevels, repetitionLevelDecodingContext.getRepetitionLevels());
        }

        List<ValueBuffer> valueBuffers = new ArrayList<>();
        int bufferSize = 0;
        for (ValuesDecoderContext valuesDecoderContext : definitionLevelDecodingContext.getValuesDecoderContexts()) {
            ValueBuffer valueBuffer = ((BinaryValuesDecoder) valuesDecoderContext.getValuesDecoder()).readNext(valuesDecoderContext.getNonNullCount());
            bufferSize += valueBuffer.getBufferSize();
            valueBuffers.add(valueBuffer);
        }

        byte[] byteBuffer = new byte[bufferSize];
        int[] offsets = new int[newBatchSize + 1];
        int i = 0;
        int bufferIndex = 0;
        int offsetIndex = 0;
        for (ValuesDecoderContext valuesDecoderContext : definitionLevelDecodingContext.getValuesDecoderContexts()) {
            ValueBuffer value = valueBuffers.get(i);
            bufferIndex = ((BinaryValuesDecoder) valuesDecoderContext.getValuesDecoder()).readIntoBuffer(byteBuffer, bufferIndex, offsets, offsetIndex, value);
            offsetIndex += valuesDecoderContext.getValueCount();
            i++;
        }

        boolean[] isNull = new boolean[newBatchSize];
        int offset = 0;
        for (ValuesDecoderContext valuesDecoderContext : definitionLevelDecodingContext.getValuesDecoderContexts()) {
            int destinationIndex = offset + valuesDecoderContext.getValueCount() - 1;
            int sourceIndex = offset + valuesDecoderContext.getNonNullCount() - 1;
            int definitionLevelIndex = valuesDecoderContext.getEnd() - 1;

            offsets[destinationIndex + 1] = offsets[sourceIndex + 1];
            while (destinationIndex >= offset) {
                if (definitionLevels[definitionLevelIndex] == maxDefinitionLevel) {
                    offsets[destinationIndex--] = offsets[sourceIndex--];
                }
                else if (definitionLevels[definitionLevelIndex] == maxDefinitionLevel - 1) {
                    offsets[destinationIndex] = offsets[sourceIndex + 1];
                    isNull[destinationIndex] = true;
                    destinationIndex--;
                }
                definitionLevelIndex--;
            }
            offset += valuesDecoderContext.getValueCount();
        }

        Slice buffer = Slices.wrappedBuffer(byteBuffer, 0, bufferSize);
        boolean hasNoNull = batchNonNullCount == newBatchSize;
        Block block = new VariableWidthBlock(newBatchSize, buffer, offsets, hasNoNull ? Optional.empty() : Optional.of(isNull));
        return new ColumnChunk(block, definitionLevels, repetitionLevelDecodingContext.getRepetitionLevels());
    }

    @Override
    protected ColumnChunk readNestedNoNull()
            throws IOException
    {
        int maxDefinitionLevel = columnDescriptor.getMaxDefinitionLevel();
        RepetitionLevelDecodingContext repetitionLevelDecodingContext = readRepetitionLevels(nextBatchSize);

        DefinitionLevelDecodingContext definitionLevelDecodingContext = readDefinitionLevels(repetitionLevelDecodingContext.getDLValuesDecoderContexts(), repetitionLevelDecodingContext.getRepetitionLevels().length);

        int[] definitionLevels = definitionLevelDecodingContext.getDefinitionLevels();
        int newBatchSize = 0;
        for (ValuesDecoderContext valuesDecoderContext : definitionLevelDecodingContext.getValuesDecoderContexts()) {
            int valueCount = 0;
            for (int i = valuesDecoderContext.getStart(); i < valuesDecoderContext.getEnd(); i++) {
                valueCount += (definitionLevels[i] == maxDefinitionLevel ? 1 : 0);
            }
            newBatchSize += valueCount;
            valuesDecoderContext.setNonNullCount(valueCount);
            valuesDecoderContext.setValueCount(valueCount);
        }

        List<ValueBuffer> valueBuffers = new ArrayList<>();
        int bufferSize = 0;
        for (ValuesDecoderContext valuesDecoderContext : definitionLevelDecodingContext.getValuesDecoderContexts()) {
            ValueBuffer valueBuffer = ((BinaryValuesDecoder) valuesDecoderContext.getValuesDecoder()).readNext(valuesDecoderContext.getNonNullCount());
            bufferSize += valueBuffer.getBufferSize();
            valueBuffers.add(valueBuffer);
        }

        byte[] byteBuffer = new byte[bufferSize];
        int[] offsets = new int[newBatchSize + 1];
        int i = 0;
        int bufferIndex = 0;
        int offsetIndex = 0;
        for (ValuesDecoderContext valuesDecoderContext : definitionLevelDecodingContext.getValuesDecoderContexts()) {
            ValueBuffer value = valueBuffers.get(i);
            bufferIndex = ((BinaryValuesDecoder) valuesDecoderContext.getValuesDecoder()).readIntoBuffer(byteBuffer, bufferIndex, offsets, offsetIndex, value);
            offsetIndex += valuesDecoderContext.getValueCount();
            i++;
        }

        Slice buffer = Slices.wrappedBuffer(byteBuffer, 0, bufferSize);
        Block block = new VariableWidthBlock(newBatchSize, buffer, offsets, Optional.empty());
        return new ColumnChunk(block, definitionLevels, repetitionLevelDecodingContext.getRepetitionLevels());
    }

    @Override
    protected void seek()
            throws IOException
    {
        if (readOffset == 0) {
            return;
        }
        int maxDefinitionLevel = columnDescriptor.getMaxDefinitionLevel();
        RepetitionLevelDecodingContext repetitionLevelDecodingContext = readRepetitionLevels(readOffset);
        DefinitionLevelDecodingContext definitionLevelDecodingContext = readDefinitionLevels(repetitionLevelDecodingContext.getDLValuesDecoderContexts(), repetitionLevelDecodingContext.getRepetitionLevels().length);

        int[] definitionLevels = definitionLevelDecodingContext.getDefinitionLevels();
        for (ValuesDecoderContext valuesDecoderContext : definitionLevelDecodingContext.getValuesDecoderContexts()) {
            int valueCount = 0;
            for (int i = valuesDecoderContext.getStart(); i < valuesDecoderContext.getEnd(); i++) {
                valueCount += (definitionLevels[i] == maxDefinitionLevel ? 1 : 0);
            }
            BinaryValuesDecoder binaryValuesDecoder = ((BinaryValuesDecoder) valuesDecoderContext.getValuesDecoder());
            binaryValuesDecoder.skip(valueCount);
        }
    }
}