PositionDeleteFilter.java

/*
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.facebook.presto.iceberg.delete;

import com.facebook.presto.common.Page;
import com.facebook.presto.common.block.Block;
import com.facebook.presto.iceberg.IcebergColumnHandle;
import com.facebook.presto.spi.ConnectorPageSource;
import io.airlift.slice.Slice;
import org.roaringbitmap.longlong.ImmutableLongBitmapDataProvider;
import org.roaringbitmap.longlong.LongBitmapDataProvider;

import javax.annotation.Nullable;

import java.util.List;
import java.util.Optional;

import static com.facebook.presto.common.type.BigintType.BIGINT;
import static com.facebook.presto.common.type.VarcharType.VARCHAR;
import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;

public final class PositionDeleteFilter
        implements DeleteFilter
{
    private final ImmutableLongBitmapDataProvider deletedRows;
    @Nullable
    private final String deleteFilePath;

    public PositionDeleteFilter(ImmutableLongBitmapDataProvider deletedRows, @Nullable String deleteFilePath)
    {
        this.deletedRows = requireNonNull(deletedRows, "deletedRows is null");
        this.deleteFilePath = deleteFilePath;
    }

    @Override
    public RowPredicate createPredicate(List<IcebergColumnHandle> columns)
    {
        int filePosChannel = rowPositionChannel(columns);
        return (page, position) -> {
            long filePos = BIGINT.getLong(page.getBlock(filePosChannel), position);
            return !deletedRows.contains(filePos);
        };
    }

    public Optional<String> getDeleteFilePath()
    {
        return Optional.ofNullable(deleteFilePath);
    }

    private static int rowPositionChannel(List<IcebergColumnHandle> columns)
    {
        for (int i = 0; i < columns.size(); i++) {
            if (columns.get(i).isRowPositionColumn()) {
                return i;
            }
        }
        throw new IllegalArgumentException("No row position column");
    }

    public static void readPositionDeletes(ConnectorPageSource pageSource, Slice targetPath, LongBitmapDataProvider deletedRows)
    {
        CachingVarcharComparator comparator = new CachingVarcharComparator(targetPath);

        // Use a linear search since we expect most deletion files to only contain
        // entries for a single path. The comparison cost is minimal if the
        // path values are dictionary encoded, since we only do the comparison once.
        while (!pageSource.isFinished()) {
            Page page = pageSource.getNextPage();
            if (page == null) {
                continue;
            }

            Block pathBlock = page.getBlock(0);
            Block posBlock = page.getBlock(1);

            for (int position = 0; position < page.getPositionCount(); position++) {
                int result = comparator.compare(pathBlock, position);
                if (result > 0) {
                    // deletion files are sorted by path, so we're done
                    return;
                }
                if (result == 0) {
                    deletedRows.addLong(BIGINT.getLong(posBlock, position));
                }
            }
        }
    }

    private static final class CachingVarcharComparator
    {
        private final Slice reference;
        private int result;
        private Slice value;

        public CachingVarcharComparator(Slice reference)
        {
            this.reference = requireNonNull(reference, "reference is null");
        }

        @SuppressWarnings({"ObjectEquality", "ReferenceEquality"})
        public int compare(Block block, int position)
        {
            checkArgument(!block.isNull(position), "position is null");
            Slice next = VARCHAR.getSlice(block, position);
            // The expected case is a dictionary block with many entries for the
            // same path. Only perform a comparison if the object has changed.
            if (value != next) {
                value = next;
                result = value.compareTo(reference);
            }
            return result;
        }
    }
}