SingleRowBlockWriter.java

/*
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.facebook.presto.common.block;

import io.airlift.slice.Slice;
import io.airlift.slice.SliceInput;
import org.openjdk.jol.info.ClassLayout;

import java.util.OptionalInt;
import java.util.function.ObjLongConsumer;

import static java.lang.String.format;

public class SingleRowBlockWriter
        extends AbstractSingleRowBlock
        implements BlockBuilder
{
    private static final int INSTANCE_SIZE = ClassLayout.parseClass(SingleRowBlockWriter.class).instanceSize();

    private final BlockBuilder[] fieldBlockBuilders;

    private boolean isEntryOpen;
    private int currentFieldIndexToWrite;
    private boolean fieldBlockBuilderReturned;

    SingleRowBlockWriter(int rowIndex, BlockBuilder[] fieldBlockBuilders)
    {
        super(rowIndex);
        this.fieldBlockBuilders = fieldBlockBuilders;
    }

    /**
     * Obtains the field {@code BlockBuilder}.
     * <p>
     * This method is used to perform random write to {@code SingleRowBlockWriter}.
     * Each field {@code BlockBuilder} must be written EXACTLY once.
     * <p>
     * Field {@code BlockBuilder} can only be obtained before any sequential write has done.
     * Once obtained, sequential write is no longer allowed.
     */
    public BlockBuilder getFieldBlockBuilder(int fieldIndex)
    {
        if (currentFieldIndexToWrite != 0) {
            throw new IllegalStateException("field block builder can only be obtained before any sequential write has done");
        }
        fieldBlockBuilderReturned = true;
        return fieldBlockBuilders[fieldIndex];
    }

    @Override
    protected Block getRawFieldBlock(int fieldIndex)
    {
        return fieldBlockBuilders[fieldIndex];
    }

    @Override
    public long getSizeInBytes()
    {
        long size = 0;
        if (rowIndex == 0) {
            for (BlockBuilder blockBuilder : fieldBlockBuilders) {
                size += blockBuilder.getSizeInBytes();
            }
            return size;
        }

        int endIndex = currentFieldIndexToWrite + (isEntryOpen ? 1 : 0);
        for (int i = 0; i < endIndex; i++) {
            BlockBuilder builder = fieldBlockBuilders[i];
            size += (builder.getSizeInBytes() - builder.getRegionSizeInBytes(0, rowIndex));
        }
        return size;
    }

    @Override
    public OptionalInt fixedSizeInBytesPerPosition()
    {
        return OptionalInt.empty();
    }

    @Override
    public long getRetainedSizeInBytes()
    {
        long size = INSTANCE_SIZE;
        for (BlockBuilder fieldBlockBuilder : fieldBlockBuilders) {
            size += fieldBlockBuilder.getRetainedSizeInBytes();
        }
        return size;
    }

    @Override
    public void retainedBytesForEachPart(ObjLongConsumer<Object> consumer)
    {
        for (BlockBuilder fieldBlockBuilder : fieldBlockBuilders) {
            consumer.accept(fieldBlockBuilder, fieldBlockBuilder.getRetainedSizeInBytes());
        }
        consumer.accept(this, INSTANCE_SIZE);
    }

    @Override
    public BlockBuilder writeByte(int value)
    {
        checkFieldIndexToWrite();
        fieldBlockBuilders[currentFieldIndexToWrite].writeByte(value);
        return this;
    }

    @Override
    public BlockBuilder writeShort(int value)
    {
        checkFieldIndexToWrite();
        fieldBlockBuilders[currentFieldIndexToWrite].writeShort(value);
        return this;
    }

    @Override
    public BlockBuilder writeInt(int value)
    {
        checkFieldIndexToWrite();
        fieldBlockBuilders[currentFieldIndexToWrite].writeInt(value);
        return this;
    }

    @Override
    public BlockBuilder writeLong(long value)
    {
        checkFieldIndexToWrite();
        fieldBlockBuilders[currentFieldIndexToWrite].writeLong(value);
        return this;
    }

    @Override
    public BlockBuilder writeBytes(Slice source, int sourceIndex, int length)
    {
        checkFieldIndexToWrite();
        fieldBlockBuilders[currentFieldIndexToWrite].writeBytes(source, sourceIndex, length);
        return this;
    }

    @Override
    public BlockBuilder appendStructure(Block block)
    {
        checkFieldIndexToWrite();
        fieldBlockBuilders[currentFieldIndexToWrite].appendStructure(block);
        entryAdded();
        return this;
    }

    @Override
    public BlockBuilder appendStructureInternal(Block block, int position)
    {
        checkFieldIndexToWrite();
        fieldBlockBuilders[currentFieldIndexToWrite].appendStructureInternal(block, position);
        entryAdded();
        return this;
    }

    @Override
    public BlockBuilder beginBlockEntry()
    {
        checkFieldIndexToWrite();
        isEntryOpen = true;
        return fieldBlockBuilders[currentFieldIndexToWrite].beginBlockEntry();
    }

    @Override
    public BlockBuilder appendNull()
    {
        checkFieldIndexToWrite();
        fieldBlockBuilders[currentFieldIndexToWrite].appendNull();
        entryAdded();
        return this;
    }

    @Override
    public BlockBuilder closeEntry()
    {
        checkFieldIndexToWrite();
        fieldBlockBuilders[currentFieldIndexToWrite].closeEntry();
        entryAdded();
        return this;
    }

    @Override
    public BlockBuilder readPositionFrom(SliceInput input)
    {
        boolean isNull = input.readByte() == 0;
        if (isNull) {
            appendNull();
        }
        else {
            checkFieldIndexToWrite();
            fieldBlockBuilders[currentFieldIndexToWrite].readPositionFrom(input);
            entryAdded();
        }
        return this;
    }

    private void entryAdded()
    {
        isEntryOpen = false;
        currentFieldIndexToWrite++;
    }

    @Override
    public int getPositionCount()
    {
        if (fieldBlockBuilderReturned) {
            throw new IllegalStateException("field block builder has been returned");
        }
        return currentFieldIndexToWrite;
    }

    @Override
    public String getEncodingName()
    {
        throw new UnsupportedOperationException();
    }

    @Override
    public Block build()
    {
        throw new UnsupportedOperationException();
    }

    @Override
    public BlockBuilder newBlockBuilderLike(BlockBuilderStatus blockBuilderStatus)
    {
        throw new UnsupportedOperationException();
    }

    @Override
    public BlockBuilder newBlockBuilderLike(BlockBuilderStatus blockBuilderStatus, int expectedEntries)
    {
        throw new UnsupportedOperationException();
    }

    @Override
    public String toString()
    {
        if (!fieldBlockBuilderReturned) {
            return format("SingleRowBlockWriter(%d){numFields=%d, fieldBlockBuilderReturned=false, positionCount=%d}", hashCode(), fieldBlockBuilders.length, getPositionCount());
        }
        else {
            return format("SingleRowBlockWriter(%d){numFields=%d, fieldBlockBuilderReturned=true}", hashCode(), fieldBlockBuilders.length);
        }
    }

    private void checkFieldIndexToWrite()
    {
        if (fieldBlockBuilderReturned) {
            throw new IllegalStateException("cannot do sequential write after getFieldBlockBuilder is called");
        }
        if (currentFieldIndexToWrite >= fieldBlockBuilders.length) {
            throw new IllegalStateException("currentFieldIndexToWrite is not valid");
        }
    }
}