ColumnIndexFilterUtils.java

/*
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.facebook.presto.parquet.reader;

import com.facebook.presto.parquet.ParquetDataSource;
import com.facebook.presto.parquet.RichColumnDescriptor;
import com.facebook.presto.parquet.predicate.Predicate;
import com.facebook.presto.parquet.predicate.TupleDomainParquetPredicate;
import it.unimi.dsi.fastutil.ints.IntArrayList;
import it.unimi.dsi.fastutil.ints.IntList;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
import org.apache.parquet.hadoop.metadata.ColumnPath;
import org.apache.parquet.internal.column.columnindex.OffsetIndex;
import org.apache.parquet.internal.filter2.columnindex.ColumnIndexStore;
import org.apache.parquet.internal.filter2.columnindex.RowRanges;
import org.openjdk.jol.info.ClassLayout;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Formatter;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

import static io.airlift.slice.SizeOf.sizeOf;

public class ColumnIndexFilterUtils
{
    private ColumnIndexFilterUtils() {}

    static class OffsetRange
    {
        private final long offset;
        private long length;

        public OffsetRange(long offset, int length)
        {
            this.offset = offset;
            this.length = length;
        }

        long getOffset()
        {
            return offset;
        }

        long getLength()
        {
            return length;
        }

        private boolean extendWithCheck(long offset, int length)
        {
            if (this.offset + this.length == offset) {
                this.length += length;
                return true;
            }
            else {
                return false;
            }
        }

        public void extendLength(long length)
        {
            this.length += length;
        }

        public long endPos()
        {
            return offset + length;
        }
    }

    private static class FilteredOffsetIndex
            implements OffsetIndex
    {
        private static final int INSTANCE_SIZE = ClassLayout.parseClass(FilteredOffsetIndex.class).instanceSize();

        private final OffsetIndex offsetIndex;
        private final int[] indices;

        private FilteredOffsetIndex(OffsetIndex offsetIndex, int[] indices)
        {
            this.offsetIndex = offsetIndex;
            this.indices = indices;
        }

        @Override
        public int getPageCount()
        {
            return indices.length;
        }

        @Override
        public long getOffset(int pageIndex)
        {
            return offsetIndex.getOffset(indices[pageIndex]);
        }

        @Override
        public int getCompressedPageSize(int pageIndex)
        {
            return offsetIndex.getCompressedPageSize(indices[pageIndex]);
        }

        @Override
        public long getFirstRowIndex(int pageIndex)
        {
            return offsetIndex.getFirstRowIndex(indices[pageIndex]);
        }

        @Override
        public long getLastRowIndex(int pageIndex, long totalRowCount)
        {
            int nextIndex = indices[pageIndex] + 1;
            return (nextIndex >= offsetIndex.getPageCount() ? totalRowCount : offsetIndex.getFirstRowIndex(nextIndex)) - 1;
        }

        @Override
        public String toString()
        {
            try (Formatter formatter = new Formatter()) {
                formatter.format("%-12s  %20s  %16s  %20s%n", "", "offset", "compressed size", "first row index");
                for (int i = 0, n = offsetIndex.getPageCount(); i < n; ++i) {
                    int index = Arrays.binarySearch(indices, i);
                    boolean isHidden = index < 0;
                    formatter.format("%spage-%-5d  %20d  %16d  %20d%n",
                            isHidden ? "- " : "  ",
                            isHidden ? i : index,
                            offsetIndex.getOffset(i),
                            offsetIndex.getCompressedPageSize(i),
                            offsetIndex.getFirstRowIndex(i));
                }
                return formatter.toString();
            }
        }

        public long getRetainedSizeInBytes()
        {
            return INSTANCE_SIZE + sizeOf(indices);
        }
    }

    /*
     * Returns the filtered offset index containing only the pages which are overlapping with rowRanges.
     */
    static OffsetIndex filterOffsetIndex(OffsetIndex offsetIndex, RowRanges rowRanges, long totalRowCount)
    {
        IntList indices = new IntArrayList();
        for (int i = 0; i < offsetIndex.getPageCount(); i++) {
            long from = offsetIndex.getFirstRowIndex(i);
            if (rowRanges.isOverlapping(from, offsetIndex.getLastRowIndex(i, totalRowCount))) {
                indices.add(i);
            }
        }
        return new FilteredOffsetIndex(offsetIndex, indices.toIntArray());
    }

    static List<OffsetRange> calculateOffsetRanges(OffsetIndex offsetIndex, ColumnChunkMetaData columnChunkMetadata, long firstPageOffset, long startingPosition)
    {
        List<OffsetRange> ranges = new ArrayList<>();
        int pageCount = offsetIndex.getPageCount();
        if (pageCount > 0) {
            OffsetRange currentRange = null;

            // Add a range for the dictionary page if required
            long rowGroupOffset = columnChunkMetadata.getStartingPos();
            if (rowGroupOffset < firstPageOffset) {
                // We need to adjust the offset by startingPosition for presto because dataSource.readFully() started at startingPosition
                currentRange = new OffsetRange(rowGroupOffset - startingPosition, (int) (firstPageOffset - rowGroupOffset));
                ranges.add(currentRange);
            }

            for (int i = 0; i < pageCount; i++) {
                long offset = offsetIndex.getOffset(i);
                int length = offsetIndex.getCompressedPageSize(i);
                // We need to adjust the offset by startingPosition for presto because dataSource.readFully() started at startingPosition
                if (currentRange == null || !currentRange.extendWithCheck(offset - startingPosition, length)) {
                    currentRange = new OffsetRange(offset - startingPosition, length);
                    ranges.add(currentRange);
                }
            }
        }
        return ranges;
    }

    public static Optional<ColumnIndexStore> getColumnIndexStore(Predicate parquetPredicate, ParquetDataSource dataSource, BlockMetaData blockMetadata, Map<List<String>, RichColumnDescriptor> descriptorsByPath, boolean columnIndexFilterEnabled)
    {
        if (!columnIndexFilterEnabled || parquetPredicate == null || !(parquetPredicate instanceof TupleDomainParquetPredicate)) {
            return Optional.empty();
        }

        for (ColumnChunkMetaData column : blockMetadata.getColumns()) {
            if (column.getColumnIndexReference() != null && column.getOffsetIndexReference() != null) {
                Set<ColumnPath> paths = new HashSet<>();
                for (List<String> path : descriptorsByPath.keySet()) {
                    paths.add(ColumnPath.get(path.toArray(new String[0])));
                }
                return Optional.of(ParquetColumnIndexStore.create(dataSource, blockMetadata, paths));
            }
        }
        return Optional.empty();
    }
}