HiveTableLayoutHandle.java

/*
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.facebook.presto.hive;

import com.facebook.presto.common.Subfield;
import com.facebook.presto.common.plan.PlanCanonicalizationStrategy;
import com.facebook.presto.common.predicate.TupleDomain;
import com.facebook.presto.common.predicate.TupleDomain.ColumnDomain;
import com.facebook.presto.hive.HiveBucketing.HiveBucketFilter;
import com.facebook.presto.hive.metastore.Column;
import com.facebook.presto.hive.metastore.MetastoreContext;
import com.facebook.presto.hive.metastore.SemiTransactionalHiveMetastore;
import com.facebook.presto.hive.metastore.Table;
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ConnectorSplit;
import com.facebook.presto.spi.SchemaTableName;
import com.facebook.presto.spi.TableNotFoundException;
import com.facebook.presto.spi.relation.RowExpression;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

import static com.facebook.presto.expressions.CanonicalRowExpressionRewriter.canonicalizeRowExpression;
import static com.facebook.presto.hive.HiveColumnHandle.isRowIdColumnHandle;
import static com.facebook.presto.hive.MetadataUtils.createPredicate;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.toList;

public class HiveTableLayoutHandle
        extends BaseHiveTableLayoutHandle
{
    private final SchemaTableName schemaTableName;
    private final String tablePath;
    private final List<Column> dataColumns;
    private final Map<String, String> tableParameters;
    private final Map<String, HiveColumnHandle> predicateColumns;
    private final Optional<HiveBucketHandle> bucketHandle;
    private final Optional<HiveBucketFilter> bucketFilter;
    private final String layoutString;
    private final Optional<Set<HiveColumnHandle>> requestedColumns;
    private final boolean partialAggregationsPushedDown;
    private final boolean appendRowNumberEnabled;
    private final boolean appendRowId;
    private final boolean footerStatsUnreliable;

    // coordinator-only properties
    private final Optional<List<HivePartition>> partitions;
    private final Optional<HiveTableHandle> hiveTableHandle;

    /**
     * @param partitionColumns columns by which the table is split between rows
     * @param dataColumns all columns in the table
     * @param predicateColumns columns used in a WHERE or HAVING clause
     * @param requestedColumns columns read by the query
     */
    @JsonCreator
    public HiveTableLayoutHandle(
            @JsonProperty("schemaTableName") SchemaTableName schemaTableName,
            @JsonProperty("tablePath") String tablePath,
            @JsonProperty("partitionColumns") List<HiveColumnHandle> partitionColumns,
            @JsonProperty("dataColumns") List<Column> dataColumns,
            @JsonProperty("tableParameters") Map<String, String> tableParameters,
            @JsonProperty("domainPredicate") TupleDomain<Subfield> domainPredicate,
            @JsonProperty("remainingPredicate") RowExpression remainingPredicate,
            @JsonProperty("predicateColumns") Map<String, HiveColumnHandle> predicateColumns,
            @JsonProperty("partitionColumnPredicate") TupleDomain<ColumnHandle> partitionColumnPredicate,
            @JsonProperty("bucketHandle") Optional<HiveBucketHandle> bucketHandle,
            @JsonProperty("bucketFilter") Optional<HiveBucketFilter> bucketFilter,
            @JsonProperty("pushdownFilterEnabled") boolean pushdownFilterEnabled,
            @JsonProperty("layoutString") String layoutString,
            @JsonProperty("requestedColumns") Optional<Set<HiveColumnHandle>> requestedColumns,
            @JsonProperty("partialAggregationsPushedDown") boolean partialAggregationsPushedDown,
            @JsonProperty("appendRowNumber") boolean appendRowNumberEnabled,
            @JsonProperty("footerStatsUnreliable") boolean footerStatsUnreliable)
    {
        this(
                schemaTableName,
                tablePath,
                partitionColumns.stream().map(BaseHiveColumnHandle.class::cast).collect(toList()),
                dataColumns,
                tableParameters,
                domainPredicate,
                remainingPredicate,
                predicateColumns,
                partitionColumnPredicate,
                bucketHandle,
                bucketFilter,
                pushdownFilterEnabled,
                layoutString,
                requestedColumns,
                partialAggregationsPushedDown,
                appendRowNumberEnabled,
                Optional.empty(),
                footerStatsUnreliable,
                Optional.empty());
    }

    protected HiveTableLayoutHandle(
            SchemaTableName schemaTableName,
            String tablePath,
            List<BaseHiveColumnHandle> partitionColumns,
            List<Column> dataColumns,
            Map<String, String> tableParameters,
            TupleDomain<Subfield> domainPredicate,
            RowExpression remainingPredicate,
            Map<String, HiveColumnHandle> predicateColumns,
            TupleDomain<ColumnHandle> partitionColumnPredicate,
            Optional<HiveBucketHandle> bucketHandle,
            Optional<HiveBucketFilter> bucketFilter,
            boolean pushdownFilterEnabled,
            String layoutString,
            Optional<Set<HiveColumnHandle>> requestedColumns,
            boolean partialAggregationsPushedDown,
            boolean appendRowNumberEnabled,
            Optional<List<HivePartition>> partitions,
            boolean footerStatsUnreliable,
            Optional<HiveTableHandle> hiveTableHandle)
    {
        super(
                partitionColumns,
                domainPredicate,
                remainingPredicate,
                pushdownFilterEnabled,
                partitionColumnPredicate,
                partitions);

        this.schemaTableName = requireNonNull(schemaTableName, "schemaTableName is null");
        this.tablePath = requireNonNull(tablePath, "tablePath is null");
        this.dataColumns = ImmutableList.copyOf(requireNonNull(dataColumns, "dataColumns is null"));
        this.tableParameters = ImmutableMap.copyOf(requireNonNull(tableParameters, "tableProperties is null"));
        this.predicateColumns = requireNonNull(predicateColumns, "predicateColumns is null");
        this.bucketHandle = requireNonNull(bucketHandle, "bucketHandle is null");
        this.bucketFilter = requireNonNull(bucketFilter, "bucketFilter is null");
        this.layoutString = requireNonNull(layoutString, "layoutString is null");
        this.requestedColumns = requireNonNull(requestedColumns, "requestedColumns is null");
        this.partialAggregationsPushedDown = partialAggregationsPushedDown;
        if (requestedColumns.isPresent() && requestedColumns.get().stream().anyMatch(column -> isRowIdColumnHandle(column))) {
            this.appendRowId = true;
        }
        else if (predicateColumns.values().stream().anyMatch(column -> isRowIdColumnHandle(column))) {
            this.appendRowId = true;
        }
        else {
            this.appendRowId = false;
        }
        this.appendRowNumberEnabled = appendRowNumberEnabled;
        this.partitions = requireNonNull(partitions, "partitions is null");
        this.footerStatsUnreliable = footerStatsUnreliable;
        this.hiveTableHandle = requireNonNull(hiveTableHandle, "hiveTableHandle is null");
    }

    @JsonProperty
    public SchemaTableName getSchemaTableName()
    {
        return schemaTableName;
    }

    @JsonProperty
    public String getTablePath()
    {
        return tablePath;
    }

    @JsonProperty
    public List<Column> getDataColumns()
    {
        return dataColumns;
    }

    @JsonProperty
    public Map<String, String> getTableParameters()
    {
        return tableParameters;
    }

    /**
     * HiveTableHandle is dropped when HiveTableLayoutHandle is serialized.
     *
     * @return HiveTableHandle if available, {@code Optional.empty()} if dropped
     */
    @JsonIgnore
    public Optional<HiveTableHandle> getHiveTableHandle()
    {
        return hiveTableHandle;
    }

    @JsonProperty
    public Map<String, HiveColumnHandle> getPredicateColumns()
    {
        return predicateColumns;
    }

    @JsonProperty
    public Optional<HiveBucketHandle> getBucketHandle()
    {
        return bucketHandle;
    }

    @JsonProperty
    public Optional<HiveBucketFilter> getBucketFilter()
    {
        return bucketFilter;
    }

    @JsonProperty
    public String getLayoutString()
    {
        return layoutString;
    }

    @JsonProperty
    public Optional<Set<HiveColumnHandle>> getRequestedColumns()
    {
        return requestedColumns;
    }

    @Override
    public String toString()
    {
        return layoutString;
    }

    @JsonProperty
    public boolean isPartialAggregationsPushedDown()
    {
        return partialAggregationsPushedDown;
    }

    @JsonProperty
    public boolean isAppendRowNumberEnabled()
    {
        return appendRowNumberEnabled;
    }

    @JsonProperty
    public boolean isFooterStatsUnreliable()
    {
        return footerStatsUnreliable;
    }

    @Override
    public Object getIdentifier(Optional<ConnectorSplit> split, PlanCanonicalizationStrategy canonicalizationStrategy)
    {
        TupleDomain<Subfield> domainPredicate = this.getDomainPredicate();

        // If split is provided, we would update the identifier based on split runtime information.
        if (split.isPresent() && (split.get() instanceof HiveSplit) && domainPredicate.getColumnDomains().isPresent()) {
            HiveSplit hiveSplit = (HiveSplit) split.get();
            Set<Subfield> subfields = hiveSplit.getRedundantColumnDomains().stream()
                    .map(column -> new Subfield(((HiveColumnHandle) column).getName()))
                    .collect(toImmutableSet());
            List<ColumnDomain<Subfield>> columnDomains = domainPredicate.getColumnDomains().get().stream()
                    .filter(columnDomain -> !subfields.contains(columnDomain.getColumn()))
                    .collect(toImmutableList());
            domainPredicate = TupleDomain.fromColumnDomains(Optional.of(columnDomains));
        }

        // Identifier is used to identify if the table layout is providing the same set of data.
        // To achieve this, we need table name, data column predicates and bucket filter.
        // We did not include other fields because they are either table metadata or partition column predicate,
        // which is unrelated to identifier purpose, or has already been applied as the boundary of split.
        return ImmutableMap.builder()
                .put("schemaTableName", schemaTableName)
                .put("domainPredicate", canonicalizeDomainPredicate(domainPredicate, getPredicateColumns(), canonicalizationStrategy))
                .put("remainingPredicate", canonicalizeRowExpression(this.getRemainingPredicate(), false))
                .put("constraint", getConstraint(canonicalizationStrategy))
                // TODO: Decide what to do with bucketFilter when canonicalizing
                .put("bucketFilter", bucketFilter)
                .build();
    }

    private TupleDomain<ColumnHandle> getConstraint(PlanCanonicalizationStrategy canonicalizationStrategy)
    {
        if (canonicalizationStrategy == PlanCanonicalizationStrategy.DEFAULT) {
            return TupleDomain.all();
        }
        // Canonicalize constraint by removing constants when column is a partition key. This assumes
        // all partitions are similar, and will have similar statistics like size, cardinality etc.
        // Constants are only removed from point checks, and not range checks. Example:
        // `x = 1` is equivalent to `x = 1000`
        // `x > 1` is NOT equivalent to `x > 1000`
        TupleDomain<ColumnHandle> constraint = createPredicate(ImmutableList.copyOf(getPartitionColumns()), partitions.get());
        constraint = getDomainPredicate()
                .transform(subfield -> subfield.getPath().isEmpty() ? subfield.getRootName() : null)
                .transform(getPredicateColumns()::get)
                .transform(ColumnHandle.class::cast)
                .intersect(constraint);

        constraint = canonicalizationStrategy.equals(PlanCanonicalizationStrategy.IGNORE_SCAN_CONSTANTS) ? constraint.canonicalize(x -> true) : constraint.canonicalize(HiveTableLayoutHandle::isPartitionKey);
        return constraint;
    }

    @VisibleForTesting
    static TupleDomain<Subfield> canonicalizeDomainPredicate(TupleDomain<Subfield> domainPredicate, Map<String, HiveColumnHandle> predicateColumns, PlanCanonicalizationStrategy strategy)
    {
        if (strategy == PlanCanonicalizationStrategy.DEFAULT) {
            return domainPredicate.canonicalize(ignored -> false);
        }
        return domainPredicate
                .transform(subfield -> {
                    if (!subfield.getPath().isEmpty() || !predicateColumns.containsKey(subfield.getRootName())) {
                        return subfield;
                    }
                    return isPartitionKey(predicateColumns.get(subfield.getRootName())) || strategy.equals(PlanCanonicalizationStrategy.IGNORE_SCAN_CONSTANTS) ? null : subfield;
                })
                .canonicalize(ignored -> false);
    }

    private static boolean isPartitionKey(ColumnHandle columnHandle)
    {
        return columnHandle instanceof HiveColumnHandle && ((HiveColumnHandle) columnHandle).isPartitionKey();
    }

    public Table getTable(SemiTransactionalHiveMetastore metastore, MetastoreContext metastoreContext)
    {
        Optional<Table> table;
        if (hiveTableHandle.isPresent()) {
            table = metastore.getTable(metastoreContext, hiveTableHandle.get());
        }
        else {
            table = metastore.getTable(metastoreContext, schemaTableName.getSchemaName(), schemaTableName.getTableName());
        }
        return table.orElseThrow(() -> new TableNotFoundException(schemaTableName));
    }

    public Builder builder()
    {
        return new Builder()
                .setSchemaTableName(getSchemaTableName())
                .setTablePath(getTablePath())
                .setPartitionColumns(getPartitionColumns())
                .setDataColumns(getDataColumns())
                .setTableParameters(getTableParameters())
                .setDomainPredicate(getDomainPredicate())
                .setRemainingPredicate(getRemainingPredicate())
                .setPredicateColumns(getPredicateColumns())
                .setPartitionColumnPredicate(getPartitionColumnPredicate())
                .setBucketHandle(getBucketHandle())
                .setBucketFilter(getBucketFilter())
                .setPushdownFilterEnabled(isPushdownFilterEnabled())
                .setLayoutString(getLayoutString())
                .setRequestedColumns(getRequestedColumns())
                .setPartialAggregationsPushedDown(isPartialAggregationsPushedDown())
                .setAppendRowNumberEnabled(isAppendRowNumberEnabled())
                .setPartitions(getPartitions())
                .setFooterStatsUnreliable(isFooterStatsUnreliable())
                .setHiveTableHandle(getHiveTableHandle());
    }

    boolean isAppendRowId()
    {
        return this.appendRowId;
    }

    public static class Builder
    {
        private SchemaTableName schemaTableName;
        private String tablePath;
        private List<BaseHiveColumnHandle> partitionColumns;
        private List<Column> dataColumns;
        private Map<String, String> tableParameters;
        private TupleDomain<Subfield> domainPredicate;
        private RowExpression remainingPredicate;
        private Map<String, HiveColumnHandle> predicateColumns;
        private TupleDomain<ColumnHandle> partitionColumnPredicate;
        private Optional<HiveBucketHandle> bucketHandle;
        private Optional<HiveBucketFilter> bucketFilter;
        private boolean pushdownFilterEnabled;
        private String layoutString;
        private Optional<Set<HiveColumnHandle>> requestedColumns;
        private boolean partialAggregationsPushedDown;
        private boolean appendRowNumberEnabled;
        private boolean footerStatsUnreliable;

        private Optional<List<HivePartition>> partitions;
        private Optional<HiveTableHandle> hiveTableHandle = Optional.empty();

        public Builder setSchemaTableName(SchemaTableName schemaTableName)
        {
            this.schemaTableName = schemaTableName;
            return this;
        }

        public Builder setTablePath(String tablePath)
        {
            this.tablePath = tablePath;
            return this;
        }

        public Builder setPartitionColumns(List<BaseHiveColumnHandle> partitionColumns)
        {
            this.partitionColumns = partitionColumns;
            return this;
        }

        public Builder setDataColumns(List<Column> dataColumns)
        {
            this.dataColumns = dataColumns;
            return this;
        }

        public Builder setTableParameters(Map<String, String> tableParameters)
        {
            this.tableParameters = tableParameters;
            return this;
        }

        public Builder setDomainPredicate(TupleDomain<Subfield> domainPredicate)
        {
            this.domainPredicate = domainPredicate;
            return this;
        }

        public Builder setRemainingPredicate(RowExpression remainingPredicate)
        {
            this.remainingPredicate = remainingPredicate;
            return this;
        }

        public Builder setPredicateColumns(Map<String, HiveColumnHandle> predicateColumns)
        {
            this.predicateColumns = predicateColumns;
            return this;
        }

        public Builder setPartitionColumnPredicate(TupleDomain<ColumnHandle> partitionColumnPredicate)
        {
            this.partitionColumnPredicate = partitionColumnPredicate;
            return this;
        }

        public Builder setBucketHandle(Optional<HiveBucketHandle> bucketHandle)
        {
            this.bucketHandle = bucketHandle;
            return this;
        }

        public Builder setBucketFilter(Optional<HiveBucketFilter> bucketFilter)
        {
            this.bucketFilter = bucketFilter;
            return this;
        }

        public Builder setPushdownFilterEnabled(boolean pushdownFilterEnabled)
        {
            this.pushdownFilterEnabled = pushdownFilterEnabled;
            return this;
        }

        public Builder setLayoutString(String layoutString)
        {
            this.layoutString = layoutString;
            return this;
        }

        public Builder setRequestedColumns(Optional<Set<HiveColumnHandle>> requestedColumns)
        {
            this.requestedColumns = requestedColumns;
            return this;
        }

        public Builder setPartialAggregationsPushedDown(boolean partialAggregationsPushedDown)
        {
            this.partialAggregationsPushedDown = partialAggregationsPushedDown;
            return this;
        }

        public Builder setAppendRowNumberEnabled(boolean appendRowNumberEnabled)
        {
            this.appendRowNumberEnabled = appendRowNumberEnabled;
            return this;
        }

        public Builder setPartitions(List<HivePartition> partitions)
        {
            requireNonNull(partitions, "partitions is null");
            return setPartitions(Optional.of(partitions));
        }

        public Builder setPartitions(Optional<List<HivePartition>> partitions)
        {
            requireNonNull(partitions, "partitions is null");
            this.partitions = partitions;
            return this;
        }

        public Builder setFooterStatsUnreliable(boolean footerStatsUnreliable)
        {
            this.footerStatsUnreliable = footerStatsUnreliable;
            return this;
        }
        public Builder setHiveTableHandle(Optional<HiveTableHandle> hiveTableHandle)
        {
            this.hiveTableHandle = requireNonNull(hiveTableHandle, "hiveTableHandle is null");
            return this;
        }

        public Builder setHiveTableHandle(HiveTableHandle hiveTableHandle)
        {
            requireNonNull(hiveTableHandle, "hiveTableHandle is null");
            this.hiveTableHandle = Optional.of(hiveTableHandle);
            return this;
        }

        public HiveTableLayoutHandle build()
        {
            return new HiveTableLayoutHandle(
                    schemaTableName,
                    tablePath,
                    partitionColumns,
                    dataColumns,
                    tableParameters,
                    domainPredicate,
                    remainingPredicate,
                    predicateColumns,
                    partitionColumnPredicate,
                    bucketHandle,
                    bucketFilter,
                    pushdownFilterEnabled,
                    layoutString,
                    requestedColumns,
                    partialAggregationsPushedDown,
                    appendRowNumberEnabled,
                    partitions,
                    footerStatsUnreliable,
                    hiveTableHandle);
        }
    }
}