HiveFilterPushdown.java

/*
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.facebook.presto.hive.rule;

import com.facebook.presto.common.Subfield;
import com.facebook.presto.common.predicate.TupleDomain;
import com.facebook.presto.hive.HiveBucketHandle;
import com.facebook.presto.hive.HiveBucketing;
import com.facebook.presto.hive.HiveColumnHandle;
import com.facebook.presto.hive.HiveMetadata;
import com.facebook.presto.hive.HivePartitionManager;
import com.facebook.presto.hive.HivePartitionResult;
import com.facebook.presto.hive.HiveStorageFormat;
import com.facebook.presto.hive.HiveTableHandle;
import com.facebook.presto.hive.HiveTableLayoutHandle;
import com.facebook.presto.hive.HiveTransactionManager;
import com.facebook.presto.hive.metastore.Column;
import com.facebook.presto.hive.metastore.MetastoreContext;
import com.facebook.presto.hive.metastore.SemiTransactionalHiveMetastore;
import com.facebook.presto.hive.metastore.Table;
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ConnectorPlanOptimizer;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.ConnectorTableHandle;
import com.facebook.presto.spi.ConnectorTableLayoutHandle;
import com.facebook.presto.spi.Constraint;
import com.facebook.presto.spi.SchemaTableName;
import com.facebook.presto.spi.TableHandle;
import com.facebook.presto.spi.TableNotFoundException;
import com.facebook.presto.spi.VariableAllocator;
import com.facebook.presto.spi.connector.ConnectorMetadata;
import com.facebook.presto.spi.function.FunctionMetadataManager;
import com.facebook.presto.spi.function.StandardFunctionResolution;
import com.facebook.presto.spi.plan.PlanNode;
import com.facebook.presto.spi.plan.PlanNodeIdAllocator;
import com.facebook.presto.spi.relation.DomainTranslator;
import com.facebook.presto.spi.relation.RowExpression;
import com.facebook.presto.spi.relation.RowExpressionService;
import com.google.common.base.Functions;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;

import static com.facebook.presto.expressions.LogicalRowExpressions.TRUE_CONSTANT;
import static com.facebook.presto.hive.HiveSessionProperties.isParquetPushdownFilterEnabled;
import static com.facebook.presto.hive.HiveSessionProperties.isPushdownFilterEnabled;
import static com.facebook.presto.hive.HiveTableProperties.getHiveStorageFormat;
import static com.facebook.presto.hive.metastore.MetastoreUtil.getMetastoreHeaders;
import static com.facebook.presto.hive.metastore.MetastoreUtil.isUserDefinedTypeEncodingEnabled;
import static com.facebook.presto.hive.rule.FilterPushdownUtils.getDomainPredicate;
import static com.facebook.presto.hive.rule.FilterPushdownUtils.getPredicateColumnNames;
import static com.facebook.presto.spi.ConnectorPlanRewriter.rewriteWith;
import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static java.util.Objects.requireNonNull;

/**
 * Runs during both logical and physical phases of connector-aided plan optimization.
 * In most cases filter pushdown will occur during logical phase. However, in cases
 * when new filter is added between logical and physical phases, e.g. a filter on a join
 * key from one side of a join is added to the other side, the new filter will get
 * merged with the one already pushed down.
 */
public class HiveFilterPushdown
        implements ConnectorPlanOptimizer
{
    private final RowExpressionService rowExpressionService;
    private final StandardFunctionResolution functionResolution;
    private final FunctionMetadataManager functionMetadataManager;
    protected final HiveTransactionManager transactionManager;
    private final HivePartitionManager partitionManager;

    public HiveFilterPushdown(
            RowExpressionService rowExpressionService,
            StandardFunctionResolution functionResolution,
            FunctionMetadataManager functionMetadataManager,
            HiveTransactionManager transactionManager,
            HivePartitionManager partitionManager)
    {
        this.rowExpressionService = requireNonNull(rowExpressionService, "rowExpressionService is null");
        this.functionResolution = requireNonNull(functionResolution, "functionResolution is null");
        this.functionMetadataManager = requireNonNull(functionMetadataManager, "functionMetadataManager is null");
        this.transactionManager = requireNonNull(transactionManager, "transactionManager is null");
        this.partitionManager = requireNonNull(partitionManager, "partitionManager is null");
    }

    @Override
    public PlanNode optimize(PlanNode maxSubplan, ConnectorSession session, VariableAllocator variableAllocator, PlanNodeIdAllocator idAllocator)
    {
        return rewriteWith(new SubfieldExtractionRewriter(session, idAllocator, rowExpressionService, functionResolution, functionMetadataManager, transactionManager, partitionManager, tableHandle -> getConnectorMetadata(transactionManager, tableHandle)), maxSubplan);
    }

    public static class SubfieldExtractionRewriter
            extends BaseSubfieldExtractionRewriter
    {
        private final HivePartitionManager partitionManager;
        public SubfieldExtractionRewriter(
                ConnectorSession session,
                PlanNodeIdAllocator idAllocator,
                RowExpressionService rowExpressionService,
                StandardFunctionResolution functionResolution,
                FunctionMetadataManager functionMetadataManager,
                HiveTransactionManager transactionManager,
                HivePartitionManager partitionManager,
                Function<TableHandle, ConnectorMetadata> transactionToMetadata)
        {
            super(session, idAllocator, rowExpressionService, functionResolution, functionMetadataManager, transactionToMetadata);

            this.partitionManager = requireNonNull(partitionManager, "partitionManager is null");
        }

        @Override
        public ConnectorPushdownFilterResult getConnectorPushdownFilterResult(
                Map<String, ColumnHandle> columnHandles,
                ConnectorMetadata metadata,
                ConnectorSession session,
                RemainingExpressions remainingExpressions,
                DomainTranslator.ExtractionResult<Subfield> decomposedFilter,
                RowExpression optimizedRemainingExpression,
                Constraint<ColumnHandle> constraint,
                Optional<ConnectorTableLayoutHandle> currentLayoutHandle,
                ConnectorTableHandle tableHandle)
        {
            HivePartitionResult hivePartitionResult = partitionManager.getPartitions(((HiveMetadata) metadata).getMetastore(), tableHandle, constraint, session);

            TupleDomain<ColumnHandle> unenforcedConstraint = hivePartitionResult.getUnenforcedConstraint();

            TupleDomain<Subfield> domainPredicate = getDomainPredicate(decomposedFilter, unenforcedConstraint);

            Set<String> predicateColumnNames = getPredicateColumnNames(optimizedRemainingExpression, domainPredicate);

            Map<String, HiveColumnHandle> predicateColumns = predicateColumnNames.stream()
                    .map(columnHandles::get)
                    .map(HiveColumnHandle.class::cast)
                    .collect(toImmutableMap(HiveColumnHandle::getName, Functions.identity()));

            HiveTableHandle hiveTableHandle = (HiveTableHandle) tableHandle;
            SchemaTableName tableName = hiveTableHandle.getSchemaTableName();

            SemiTransactionalHiveMetastore metastore = ((HiveMetadata) metadata).getMetastore();

            MetastoreContext context = new MetastoreContext(
                    session.getIdentity(),
                    session.getQueryId(),
                    session.getClientInfo(),
                    session.getClientTags(),
                    session.getSource(),
                    getMetastoreHeaders(session),
                    isUserDefinedTypeEncodingEnabled(session),
                    metastore.getColumnConverterProvider(),
                    session.getWarningCollector(), session.getRuntimeStats());
            Table table = metastore.getTable(context, hiveTableHandle)
                    .orElseThrow(() -> new TableNotFoundException(tableName));

            String layoutString = createTableLayoutString(
                    session,
                    rowExpressionService,
                    tableName,
                    hivePartitionResult.getBucketHandle(),
                    hivePartitionResult.getBucketFilter(),
                    remainingExpressions.getRemainingExpression(),
                    domainPredicate);

            Optional<Set<HiveColumnHandle>> requestedColumns = currentLayoutHandle.map(layout -> ((HiveTableLayoutHandle) layout).getRequestedColumns()).orElse(Optional.empty());

            boolean appendRowNumber = currentLayoutHandle.map(layout -> ((HiveTableLayoutHandle) layout).isAppendRowNumberEnabled()).orElse(false);

            return new ConnectorPushdownFilterResult(
                    metadata.getTableLayout(
                            session,
                            new HiveTableLayoutHandle.Builder()
                                    .setSchemaTableName(tableName)
                                    .setTablePath(table.getStorage().getLocation())
                                    .setPartitionColumns(hivePartitionResult.getPartitionColumns())
                                    .setDataColumns(pruneColumnComments(hivePartitionResult.getDataColumns()))
                                    .setTableParameters(hivePartitionResult.getTableParameters())
                                    .setDomainPredicate(domainPredicate)
                                    .setRemainingPredicate(remainingExpressions.getRemainingExpression())
                                    .setPredicateColumns(predicateColumns)
                                    .setPartitionColumnPredicate(hivePartitionResult.getEnforcedConstraint())
                                    .setPartitions(hivePartitionResult.getPartitions())
                                    .setBucketHandle(hivePartitionResult.getBucketHandle())
                                    .setBucketFilter(hivePartitionResult.getBucketFilter())
                                    .setPushdownFilterEnabled(true)
                                    .setLayoutString(layoutString)
                                    .setRequestedColumns(requestedColumns)
                                    .setPartialAggregationsPushedDown(false)
                                    .setAppendRowNumberEnabled(appendRowNumber)
                                    .setHiveTableHandle(hiveTableHandle)
                                    .build()),
                    remainingExpressions.getDynamicFilterExpression());
        }

        @Override
        protected boolean isPushdownFilterSupported(ConnectorSession session, TableHandle tableHandle)
        {
            checkArgument(tableHandle.getConnectorHandle() instanceof HiveTableHandle, "pushdownFilter is never supported on a non-hive TableHandle");
            boolean pushdownFilterEnabled = isPushdownFilterEnabled(session);
            if (pushdownFilterEnabled) {
                HiveStorageFormat hiveStorageFormat = getHiveStorageFormat(transactionToMetadata.apply(tableHandle).getTableMetadata(session, tableHandle.getConnectorHandle()).getProperties());
                return hiveStorageFormat == HiveStorageFormat.ORC || hiveStorageFormat == HiveStorageFormat.DWRF || hiveStorageFormat == HiveStorageFormat.PARQUET && isParquetPushdownFilterEnabled(session);
            }
            return false;
        }
    }

    public static ConnectorMetadata getConnectorMetadata(HiveTransactionManager transactionManager, TableHandle tableHandle)
    {
        requireNonNull(transactionManager, "transactionManager is null");
        ConnectorMetadata metadata = transactionManager.get(tableHandle.getTransaction());
        checkState(metadata instanceof HiveMetadata, "metadata must be HiveMetadata");
        return metadata;
    }

    private static List<Column> pruneColumnComments(List<Column> columns)
    {
        return columns.stream()
                .map(column -> new Column(column.getName(), column.getType(), Optional.empty(), column.getTypeMetadata()))
                .collect(toImmutableList());
    }

    private static String createTableLayoutString(
            ConnectorSession session,
            RowExpressionService rowExpressionService,
            SchemaTableName tableName,
            Optional<HiveBucketHandle> bucketHandle,
            Optional<HiveBucketing.HiveBucketFilter> bucketFilter,
            RowExpression remainingPredicate,
            TupleDomain<Subfield> domainPredicate)
    {
        return toStringHelper(tableName.toString())
                .omitNullValues()
                .add("buckets", bucketHandle.map(HiveBucketHandle::getReadBucketCount).orElse(null))
                .add("bucketsToKeep", bucketFilter.map(HiveBucketing.HiveBucketFilter::getBucketsToKeep).orElse(null))
                .add("filter", TRUE_CONSTANT.equals(remainingPredicate) ? null : rowExpressionService.formatRowExpression(session, remainingPredicate))
                .add("domains", domainPredicate.isAll() ? null : domainPredicate.toString(session.getSqlFunctionProperties()))
                .toString();
    }
}