IcebergPartitionInsertingPageSource.java

/*
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.facebook.presto.iceberg;

import com.facebook.presto.common.Page;
import com.facebook.presto.common.block.Block;
import com.facebook.presto.common.block.RunLengthEncodedBlock;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.hive.HivePartitionKey;
import com.facebook.presto.spi.ConnectorPageSource;
import com.facebook.presto.spi.PrestoException;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
import java.util.stream.IntStream;

import static com.facebook.presto.common.Utils.nativeValueToBlock;
import static com.facebook.presto.hive.BaseHiveColumnHandle.ColumnType.PARTITION_KEY;
import static com.facebook.presto.iceberg.IcebergErrorCode.ICEBERG_BAD_DATA;
import static com.facebook.presto.iceberg.IcebergMetadataColumn.isMetadataColumnId;
import static com.facebook.presto.iceberg.IcebergUtil.deserializePartitionValue;
import static com.google.common.base.Throwables.throwIfInstanceOf;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static java.util.Objects.requireNonNull;

/**
 * This class is responsible for filtering the set of columns that should be
 * read by a page source which performs IO to on-disk files to only those which
 * are not partition or metadata fields.
 * <p>
 * When a new page is requested, this class inserts the partition fields as
 * RLE-encoded blocks to save memory and IO cost at runtime.
 */
public class IcebergPartitionInsertingPageSource
        implements ConnectorPageSource
{
    private final IcebergColumnHandle[] nonPartitionColumnIndexes;
    private final ConnectorPageSourceWithRowPositions delegateWithPositions;
    private final ConnectorPageSource delegate;
    private final Block[] partitionValueBlocks;
    private final long partitionValuesMemoryUsage;
    // maps output array index to index of input page from delegate provider.
    private final int[] outputIndexes;

    public IcebergPartitionInsertingPageSource(
            List<IcebergColumnHandle> fullColumnList,
            Map<Integer, Object> metadataValues,
            Map<Integer, HivePartitionKey> partitionKeys,
            Function<List<IcebergColumnHandle>, ConnectorPageSourceWithRowPositions> delegateSupplier)
    {
        this.nonPartitionColumnIndexes = new IcebergColumnHandle[fullColumnList.size()];
        this.outputIndexes = new int[fullColumnList.size()];
        populateIndexes(fullColumnList, partitionKeys);
        this.partitionValueBlocks = generatePartitionValueBlocks(fullColumnList, metadataValues, partitionKeys);
        this.partitionValuesMemoryUsage = Arrays.stream(partitionValueBlocks).filter(Objects::nonNull).mapToLong(Block::getRetainedSizeInBytes).sum();

        List<IcebergColumnHandle> delegateColumns = Arrays.stream(nonPartitionColumnIndexes)
                .filter(Objects::nonNull)
                .collect(toImmutableList());

        this.delegateWithPositions = delegateSupplier.apply(delegateColumns);
        this.delegate = delegateWithPositions.getDelegate();
    }

    private void populateIndexes(List<IcebergColumnHandle> fullColumnList, Map<Integer, HivePartitionKey> partitionKeys)
    {
        int delegateIndex = 0;
        // generate array of non-partition column indexes
        for (int i = 0; i < fullColumnList.size(); i++) {
            IcebergColumnHandle handle = fullColumnList.get(i);
            if (handle.getColumnType() == PARTITION_KEY ||
                    partitionKeys.containsKey(handle.getId()) ||
                    isMetadataColumnId(handle.getId())) {
                // is partition key, don't include for delegate supplier
                continue;
            }
            nonPartitionColumnIndexes[i] = handle;
            outputIndexes[i] = delegateIndex;
            delegateIndex++;
        }
    }

    private Block[] generatePartitionValueBlocks(
            List<IcebergColumnHandle> fullColumnList,
            Map<Integer, Object> metadataValues,
            Map<Integer, HivePartitionKey> partitionKeys)
    {
        return IntStream.range(0, fullColumnList.size())
                .mapToObj(index -> {
                    IcebergColumnHandle column = fullColumnList.get(index);
                    if (nonPartitionColumnIndexes[index] != null) {
                        return null;
                    }

                    Type type = column.getType();
                    if (partitionKeys.containsKey(column.getId())) {
                        HivePartitionKey icebergPartition = partitionKeys.get(column.getId());
                        Object prefilledValue = deserializePartitionValue(type, icebergPartition.getValue().orElse(null), column.getName());
                        return nativeValueToBlock(type, prefilledValue);
                    }
                    else if (column.getColumnType() == PARTITION_KEY) {
                        // Partition key with no value. This can happen after partition evolution
                        return nativeValueToBlock(type, null);
                    }
                    else if (isMetadataColumnId(column.getId())) {
                        return nativeValueToBlock(type, metadataValues.get(column.getColumnIdentity().getId()));
                    }

                    return null;
                })
                .toArray(Block[]::new);
    }

    public ConnectorPageSourceWithRowPositions getRowPositionDelegate()
    {
        return delegateWithPositions;
    }

    @Override
    public long getCompletedBytes()
    {
        return delegate.getCompletedBytes();
    }

    @Override
    public long getCompletedPositions()
    {
        return delegate.getCompletedPositions();
    }

    @Override
    public long getReadTimeNanos()
    {
        return delegate.getReadTimeNanos();
    }

    @Override
    public boolean isFinished()
    {
        return delegate.isFinished();
    }

    @Override
    public Page getNextPage()
    {
        try {
            Page dataPage = delegate.getNextPage();
            if (dataPage == null) {
                return null;
            }
            int batchSize = dataPage.getPositionCount();
            Block[] blocks = new Block[nonPartitionColumnIndexes.length];
            for (int i = 0; i < nonPartitionColumnIndexes.length; i++) {
                blocks[i] = partitionValueBlocks[i] == null ?
                        dataPage.getBlock(outputIndexes[i]) :
                        new RunLengthEncodedBlock(partitionValueBlocks[i], batchSize);
            }

            return new Page(batchSize, blocks);
        }
        catch (RuntimeException e) {
            closeWithSuppression(e);
            throwIfInstanceOf(e, PrestoException.class);
            throw new PrestoException(ICEBERG_BAD_DATA, e);
        }
    }

    protected void closeWithSuppression(Throwable throwable)
    {
        requireNonNull(throwable, "throwable is null");
        try {
            close();
        }
        catch (RuntimeException e) {
            // Self-suppression not permitted
            if (throwable != e) {
                throwable.addSuppressed(e);
            }
        }
    }

    @Override
    public long getSystemMemoryUsage()
    {
        return delegate.getSystemMemoryUsage() + partitionValuesMemoryUsage;
    }

    @Override
    public void close()
    {
        try {
            delegate.close();
        }
        catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }
}