TestHiveLogicalPlanner.java

/*
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.facebook.presto.hive;

import com.facebook.presto.Session;
import com.facebook.presto.common.Subfield;
import com.facebook.presto.common.predicate.Domain;
import com.facebook.presto.common.predicate.Range;
import com.facebook.presto.common.predicate.TupleDomain;
import com.facebook.presto.common.predicate.ValueSet;
import com.facebook.presto.common.type.ArrayType;
import com.facebook.presto.common.type.VarcharType;
import com.facebook.presto.cost.StatsProvider;
import com.facebook.presto.hive.authentication.NoHdfsAuthentication;
import com.facebook.presto.hive.metastore.ExtendedHiveMetastore;
import com.facebook.presto.hive.metastore.MetastoreContext;
import com.facebook.presto.hive.metastore.Partition;
import com.facebook.presto.hive.metastore.PartitionStatistics;
import com.facebook.presto.hive.metastore.PartitionWithStatistics;
import com.facebook.presto.hive.metastore.Table;
import com.facebook.presto.hive.metastore.file.FileHiveMetastore;
import com.facebook.presto.metadata.FunctionAndTypeManager;
import com.facebook.presto.metadata.Metadata;
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ConnectorTableLayoutHandle;
import com.facebook.presto.spi.plan.AggregationNode;
import com.facebook.presto.spi.plan.JoinNode;
import com.facebook.presto.spi.plan.PlanNode;
import com.facebook.presto.spi.plan.SemiJoinNode;
import com.facebook.presto.spi.plan.TableScanNode;
import com.facebook.presto.spi.relation.CallExpression;
import com.facebook.presto.spi.relation.ConstantExpression;
import com.facebook.presto.spi.relation.RowExpression;
import com.facebook.presto.spi.relation.VariableReferenceExpression;
import com.facebook.presto.sql.analyzer.FeaturesConfig;
import com.facebook.presto.sql.planner.Plan;
import com.facebook.presto.sql.planner.assertions.ExpectedValueProvider;
import com.facebook.presto.sql.planner.assertions.MatchResult;
import com.facebook.presto.sql.planner.assertions.Matcher;
import com.facebook.presto.sql.planner.assertions.PlanMatchPattern;
import com.facebook.presto.sql.planner.assertions.SymbolAliases;
import com.facebook.presto.sql.relational.FunctionResolution;
import com.facebook.presto.sql.tree.FunctionCall;
import com.facebook.presto.sql.tree.LongLiteral;
import com.facebook.presto.sql.tree.StringLiteral;
import com.facebook.presto.testing.QueryRunner;
import com.facebook.presto.tests.AbstractTestQueryFramework;
import com.facebook.presto.tests.DistributedQueryRunner;
import com.google.common.base.Functions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import io.airlift.slice.Slice;
import io.airlift.slice.Slices;
import org.apache.hadoop.fs.Path;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

import static com.facebook.presto.SystemSessionProperties.JOIN_REORDERING_STRATEGY;
import static com.facebook.presto.SystemSessionProperties.OPTIMIZE_METADATA_QUERIES;
import static com.facebook.presto.SystemSessionProperties.OPTIMIZE_METADATA_QUERIES_CALL_THRESHOLD;
import static com.facebook.presto.SystemSessionProperties.OPTIMIZE_METADATA_QUERIES_IGNORE_STATS;
import static com.facebook.presto.SystemSessionProperties.PUSHDOWN_DEREFERENCE_ENABLED;
import static com.facebook.presto.SystemSessionProperties.PUSHDOWN_SUBFIELDS_ENABLED;
import static com.facebook.presto.SystemSessionProperties.PUSHDOWN_SUBFIELDS_FOR_MAP_FUNCTIONS;
import static com.facebook.presto.common.function.OperatorType.EQUAL;
import static com.facebook.presto.common.predicate.Domain.create;
import static com.facebook.presto.common.predicate.Domain.multipleValues;
import static com.facebook.presto.common.predicate.Domain.notNull;
import static com.facebook.presto.common.predicate.Domain.singleValue;
import static com.facebook.presto.common.predicate.Range.greaterThan;
import static com.facebook.presto.common.predicate.TupleDomain.withColumnDomains;
import static com.facebook.presto.common.predicate.ValueSet.ofRanges;
import static com.facebook.presto.common.type.BigintType.BIGINT;
import static com.facebook.presto.common.type.BooleanType.BOOLEAN;
import static com.facebook.presto.common.type.DoubleType.DOUBLE;
import static com.facebook.presto.common.type.VarcharType.VARCHAR;
import static com.facebook.presto.common.type.VarcharType.createVarcharType;
import static com.facebook.presto.expressions.LogicalRowExpressions.TRUE_CONSTANT;
import static com.facebook.presto.hive.BaseHiveColumnHandle.ColumnType.SYNTHESIZED;
import static com.facebook.presto.hive.HiveColumnHandle.isPushedDownSubfield;
import static com.facebook.presto.hive.HiveCommonSessionProperties.RANGE_FILTERS_ON_SUBSCRIPTS_ENABLED;
import static com.facebook.presto.hive.HiveQueryRunner.HIVE_CATALOG;
import static com.facebook.presto.hive.HiveSessionProperties.COLLECT_COLUMN_STATISTICS_ON_WRITE;
import static com.facebook.presto.hive.HiveSessionProperties.PARQUET_DEREFERENCE_PUSHDOWN_ENABLED;
import static com.facebook.presto.hive.HiveSessionProperties.PARTIAL_AGGREGATION_PUSHDOWN_ENABLED;
import static com.facebook.presto.hive.HiveSessionProperties.PARTIAL_AGGREGATION_PUSHDOWN_FOR_VARIABLE_LENGTH_DATATYPES_ENABLED;
import static com.facebook.presto.hive.HiveSessionProperties.PUSHDOWN_FILTER_ENABLED;
import static com.facebook.presto.hive.HiveSessionProperties.SHUFFLE_PARTITIONED_COLUMNS_FOR_TABLE_WRITE;
import static com.facebook.presto.hive.TestHiveIntegrationSmokeTest.assertRemoteExchangesCount;
import static com.facebook.presto.hive.metastore.MetastoreUtil.toPartitionValues;
import static com.facebook.presto.hive.metastore.StorageFormat.fromHiveStorageFormat;
import static com.facebook.presto.parquet.ParquetTypeUtils.pushdownColumnNameForSubfield;
import static com.facebook.presto.spi.plan.JoinType.INNER;
import static com.facebook.presto.sql.analyzer.TypeSignatureProvider.fromTypes;
import static com.facebook.presto.sql.planner.assertions.MatchResult.NO_MATCH;
import static com.facebook.presto.sql.planner.assertions.MatchResult.match;
import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.aggregation;
import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.any;
import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.anySymbol;
import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.anyTree;
import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.equiJoinClause;
import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.exchange;
import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.expression;
import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.filter;
import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.globalAggregation;
import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.join;
import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.node;
import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.output;
import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.project;
import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.strictTableScan;
import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.values;
import static com.facebook.presto.sql.planner.optimizations.PlanNodeSearcher.searchFrom;
import static com.facebook.presto.sql.planner.plan.ExchangeNode.Scope.LOCAL;
import static com.facebook.presto.sql.planner.plan.ExchangeNode.Scope.REMOTE_STREAMING;
import static com.facebook.presto.sql.planner.plan.ExchangeNode.Type.GATHER;
import static com.facebook.presto.testing.assertions.Assert.assertEquals;
import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static io.airlift.slice.Slices.utf8Slice;
import static io.airlift.tpch.TpchTable.CUSTOMER;
import static io.airlift.tpch.TpchTable.LINE_ITEM;
import static io.airlift.tpch.TpchTable.NATION;
import static io.airlift.tpch.TpchTable.ORDERS;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotSame;
import static org.testng.Assert.assertTrue;

@Test(singleThreaded = true)
public class TestHiveLogicalPlanner
        extends AbstractTestQueryFramework
{
    @Override
    protected QueryRunner createQueryRunner()
            throws Exception
    {
        return HiveQueryRunner.createQueryRunner(
                ImmutableList.of(ORDERS, LINE_ITEM, CUSTOMER, NATION),
                ImmutableMap.of("experimental.pushdown-subfields-enabled", "true",
                        "pushdown-subfields-from-lambda-enabled", "true"),
                Optional.empty());
    }

    @Override
    protected QueryRunner createExpectedQueryRunner()
            throws Exception
    {
        return getQueryRunner();
    }

    @Test
    public void testMetadataQueryOptimizationWithLimit()
    {
        QueryRunner queryRunner = getQueryRunner();
        Session sessionWithOptimizeMetadataQueries = getSessionWithOptimizeMetadataQueries();
        Session defaultSession = queryRunner.getDefaultSession();
        try {
            queryRunner.execute("CREATE TABLE test_metadata_query_optimization_with_limit(a varchar, b int, c int) WITH (partitioned_by = ARRAY['b', 'c'])");
            queryRunner.execute("INSERT INTO test_metadata_query_optimization_with_limit VALUES" +
                    " ('1001', 1, 1), ('1002', 1, 1), ('1003', 1, 1)," +
                    " ('1004', 1, 2), ('1005', 1, 2), ('1006', 1, 2)," +
                    " ('1007', 2, 1), ('1008', 2, 1), ('1009', 2, 1)");

            // Could do metadata optimization when `limit` existing above `aggregation`
            assertQuery(sessionWithOptimizeMetadataQueries, "select distinct b, c from test_metadata_query_optimization_with_limit order by c desc limit 3",
                    "values(1, 2), (1, 1), (2, 1)");
            assertPlan(sessionWithOptimizeMetadataQueries, "select distinct b, c from test_metadata_query_optimization_with_limit order by c desc limit 3",
                    anyTree(values(ImmutableList.of("b", "c"),
                            ImmutableList.of(
                                    ImmutableList.of(new LongLiteral("1"), new LongLiteral("2")),
                                    ImmutableList.of(new LongLiteral("1"), new LongLiteral("1")),
                                    ImmutableList.of(new LongLiteral("2"), new LongLiteral("1"))))));
            // Compare with default session which do not enable metadata optimization
            assertQuery(defaultSession, "select distinct b, c from test_metadata_query_optimization_with_limit order by c desc limit 3",
                    "values(1, 2), (1, 1), (2, 1)");
            assertPlan(defaultSession, "select distinct b, c from test_metadata_query_optimization_with_limit order by c desc limit 3",
                    anyTree(strictTableScan("test_metadata_query_optimization_with_limit", identityMap("b", "c"))));

            // Should not do metadata optimization when `limit` existing below `aggregation`
            assertQuery(sessionWithOptimizeMetadataQueries, "with tt as (select b, c from test_metadata_query_optimization_with_limit order by c desc limit 3) select b, min(c), max(c) from tt group by b",
                    "values(1, 2, 2)");
            assertPlan(sessionWithOptimizeMetadataQueries, "with tt as (select b, c from test_metadata_query_optimization_with_limit order by c desc limit 3) select b, min(c), max(c) from tt group by b",
                    anyTree(strictTableScan("test_metadata_query_optimization_with_limit", identityMap("b", "c"))));
            // Compare with default session which do not enable metadata optimization
            assertQuery(defaultSession, "with tt as (select b, c from test_metadata_query_optimization_with_limit order by c desc limit 3) select b, min(c), max(c) from tt group by b",
                    "values(1, 2, 2)");
            assertPlan(defaultSession, "with tt as (select b, c from test_metadata_query_optimization_with_limit order by c desc limit 3) select b, min(c), max(c) from tt group by b",
                    anyTree(strictTableScan("test_metadata_query_optimization_with_limit", identityMap("b", "c"))));
        }
        finally {
            queryRunner.execute("DROP TABLE test_metadata_query_optimization_with_limit");
        }
    }

    @Test
    public void testRepeatedFilterPushdown()
    {
        QueryRunner queryRunner = getQueryRunner();

        try {
            queryRunner.execute("CREATE TABLE orders_partitioned WITH (partitioned_by = ARRAY['ds']) AS " +
                    "SELECT orderkey, orderpriority, '2019-11-01' as ds FROM orders WHERE orderkey < 1000 " +
                    "UNION ALL " +
                    "SELECT orderkey, orderpriority, '2019-11-02' as ds FROM orders WHERE orderkey < 1000");

            queryRunner.execute("CREATE TABLE lineitem_unpartitioned AS " +
                    "SELECT orderkey, linenumber, shipmode, '2019-11-01' as ds FROM lineitem WHERE orderkey < 1000 " +
                    "UNION ALL " +
                    "SELECT orderkey, linenumber, shipmode, '2019-11-02' as ds FROM lineitem WHERE orderkey < 1000 ");

            TupleDomain<String> ordersDomain = withColumnDomains(ImmutableMap.of(
                    "orderpriority", singleValue(createVarcharType(15), utf8Slice("1-URGENT"))));

            TupleDomain<String> lineitemDomain = withColumnDomains(ImmutableMap.of(
                    "shipmode", singleValue(createVarcharType(10), utf8Slice("MAIL")),
                    "ds", singleValue(createVarcharType(10), utf8Slice("2019-11-02"))));

            assertPlan(pushdownFilterEnabled(),
                    "WITH a AS (\n" +
                            "    SELECT ds, orderkey\n" +
                            "    FROM orders_partitioned\n" +
                            "    WHERE orderpriority = '1-URGENT' AND ds > '2019-11-01'\n" +
                            "),\n" +
                            "b AS (\n" +
                            "    SELECT ds, orderkey, linenumber\n" +
                            "    FROM lineitem_unpartitioned\n" +
                            "    WHERE shipmode = 'MAIL'\n" +
                            ")\n" +
                            "SELECT * FROM a LEFT JOIN b ON a.ds = b.ds",
                    anyTree(node(JoinNode.class,
                            anyTree(tableScan("orders_partitioned", ordersDomain, TRUE_CONSTANT, ImmutableSet.of("orderpriority"))),
                            anyTree(tableScan("lineitem_unpartitioned", lineitemDomain, TRUE_CONSTANT, ImmutableSet.of("shipmode", "ds"))))));
        }
        finally {
            queryRunner.execute("DROP TABLE IF EXISTS orders_partitioned");
            queryRunner.execute("DROP TABLE IF EXISTS lineitem_unpartitioned");
        }
    }

    @Test
    public void testPushdownFilter()
    {
        Session pushdownFilterEnabled = pushdownFilterEnabled();

        // Only domain predicates
        assertPlan("SELECT linenumber FROM lineitem WHERE partkey = 10",
                output(exchange(project(
                        filter("partkey = 10",
                                strictTableScan("lineitem", identityMap("linenumber", "partkey")))))));

        assertPlan(pushdownFilterEnabled, "SELECT linenumber FROM lineitem WHERE partkey = 10",
                output(exchange(
                        strictTableScan("lineitem", identityMap("linenumber")))),
                plan -> assertTableLayout(plan, "lineitem", withColumnDomains(ImmutableMap.of(new Subfield("partkey", ImmutableList.of()), singleValue(BIGINT, 10L))), TRUE_CONSTANT, ImmutableSet.of("partkey")));

        assertPlan(pushdownFilterEnabled, "SELECT partkey, linenumber FROM lineitem WHERE partkey = 10",
                output(exchange(
                        project(
                                strictTableScan("lineitem", identityMap("linenumber"))))),
                plan -> assertTableLayout(plan, "lineitem", withColumnDomains(ImmutableMap.of(new Subfield("partkey", ImmutableList.of()), singleValue(BIGINT, 10L))), TRUE_CONSTANT, ImmutableSet.of("partkey")));

        // Only remaining predicate
        assertPlan("SELECT linenumber FROM lineitem WHERE mod(orderkey, 2) = 1",
                output(exchange(project(
                        filter("mod(orderkey, 2) = 1",
                                strictTableScan("lineitem", identityMap("linenumber", "orderkey")))))));
        // Remaining predicate is NULL
        assertPlan(pushdownFilterEnabled, "SELECT linenumber FROM lineitem WHERE cardinality(NULL) > 0",
                output(values("linenumber")));

        assertPlan(pushdownFilterEnabled, "SELECT linenumber FROM lineitem WHERE orderkey > 10 AND cardinality(NULL) > 0",
                output(values("linenumber")));

        // Remaining predicate is always FALSE
        assertPlan(pushdownFilterEnabled, "SELECT linenumber FROM lineitem WHERE cardinality(ARRAY[1]) > 1",
                output(values("linenumber")));

        assertPlan(pushdownFilterEnabled, "SELECT linenumber FROM lineitem WHERE orderkey > 10 AND cardinality(ARRAY[1]) > 1",
                output(values("linenumber")));

        // TupleDomain predicate is always FALSE
        assertPlan(pushdownFilterEnabled, "SELECT linenumber FROM lineitem WHERE orderkey = 1 AND orderkey = 2",
                output(values("linenumber")));

        assertPlan(pushdownFilterEnabled, "SELECT linenumber FROM lineitem WHERE orderkey = 1 AND orderkey = 2 AND linenumber % 2 = 1",
                output(values("linenumber")));

        FunctionAndTypeManager functionAndTypeManager = getQueryRunner().getMetadata().getFunctionAndTypeManager();
        FunctionResolution functionResolution = new FunctionResolution(functionAndTypeManager.getFunctionAndTypeResolver());
        RowExpression remainingPredicate = new CallExpression(EQUAL.name(),
                functionResolution.comparisonFunction(EQUAL, BIGINT, BIGINT),
                BOOLEAN,
                ImmutableList.of(
                        new CallExpression("mod",
                                functionAndTypeManager.lookupFunction("mod", fromTypes(BIGINT, BIGINT)),
                                BIGINT,
                                ImmutableList.of(
                                        new VariableReferenceExpression(Optional.empty(), "orderkey", BIGINT),
                                        constant(2))),
                        constant(1)));

        assertPlan(pushdownFilterEnabled, "SELECT linenumber FROM lineitem WHERE mod(orderkey, 2) = 1",
                output(exchange(
                        strictTableScan("lineitem", identityMap("linenumber")))),
                plan -> assertTableLayout(plan, "lineitem", TupleDomain.all(), remainingPredicate, ImmutableSet.of("orderkey")));

        assertPlan(pushdownFilterEnabled, "SELECT orderkey, linenumber FROM lineitem WHERE mod(orderkey, 2) = 1",
                output(exchange(
                        strictTableScan("lineitem", identityMap("orderkey", "linenumber")))),
                plan -> assertTableLayout(plan, "lineitem", TupleDomain.all(), remainingPredicate, ImmutableSet.of("orderkey")));

        // A mix of domain and remaining predicates
        assertPlan("SELECT linenumber FROM lineitem WHERE partkey = 10 AND mod(orderkey, 2) = 1",
                output(exchange(project(
                        filter("partkey = 10 AND mod(orderkey, 2) = 1",
                                strictTableScan("lineitem", identityMap("linenumber", "orderkey", "partkey")))))));

        assertPlan(pushdownFilterEnabled, "SELECT linenumber FROM lineitem WHERE partkey = 10 AND mod(orderkey, 2) = 1",
                output(exchange(
                        strictTableScan("lineitem", identityMap("linenumber")))),
                plan -> assertTableLayout(plan, "lineitem", withColumnDomains(ImmutableMap.of(new Subfield("partkey", ImmutableList.of()), singleValue(BIGINT, 10L))), remainingPredicate, ImmutableSet.of("partkey", "orderkey")));

        assertPlan(pushdownFilterEnabled, "SELECT partkey, orderkey, linenumber FROM lineitem WHERE partkey = 10 AND mod(orderkey, 2) = 1",
                output(exchange(
                        project(
                                ImmutableMap.of("expr_8", expression("10")),
                                strictTableScan("lineitem", identityMap("orderkey", "linenumber"))))),
                plan -> assertTableLayout(plan, "lineitem", withColumnDomains(ImmutableMap.of(new Subfield("partkey", ImmutableList.of()), singleValue(BIGINT, 10L))), remainingPredicate, ImmutableSet.of("partkey", "orderkey")));
    }

    @Test
    public void testPartitionPruning()
    {
        QueryRunner queryRunner = getQueryRunner();
        queryRunner.execute("CREATE TABLE test_partition_pruning WITH (partitioned_by = ARRAY['ds']) AS " +
                "SELECT orderkey, CAST(to_iso8601(date_add('DAY', orderkey % 7, date('2019-11-01'))) AS VARCHAR) AS ds FROM orders WHERE orderkey < 1000");

        Session pushdownFilterEnabled = pushdownFilterEnabled();
        try {
            assertPlan(pushdownFilterEnabled, "SELECT * FROM test_partition_pruning WHERE ds = '2019-11-01'",
                    anyTree(tableScanWithConstraint("test_partition_pruning", ImmutableMap.of("ds", singleValue(VARCHAR, utf8Slice("2019-11-01"))))));

            assertPlan(pushdownFilterEnabled, "SELECT * FROM test_partition_pruning WHERE date(ds) = date('2019-11-01')",
                    anyTree(tableScanWithConstraint("test_partition_pruning", ImmutableMap.of("ds", singleValue(VARCHAR, utf8Slice("2019-11-01"))))));

            assertPlan(pushdownFilterEnabled, "SELECT * FROM test_partition_pruning WHERE date(ds) BETWEEN date('2019-11-02') AND date('2019-11-04')",
                    anyTree(tableScanWithConstraint("test_partition_pruning", ImmutableMap.of("ds", multipleValues(VARCHAR, utf8Slices("2019-11-02", "2019-11-03", "2019-11-04"))))));

            assertPlan(pushdownFilterEnabled, "SELECT * FROM test_partition_pruning WHERE ds < '2019-11-05'",
                    anyTree(tableScanWithConstraint("test_partition_pruning", ImmutableMap.of("ds", multipleValues(VARCHAR, utf8Slices("2019-11-01", "2019-11-02", "2019-11-03", "2019-11-04"))))));

            assertPlan(pushdownFilterEnabled, "SELECT * FROM test_partition_pruning WHERE date(ds) > date('2019-11-02')",
                    anyTree(tableScanWithConstraint("test_partition_pruning", ImmutableMap.of("ds", multipleValues(VARCHAR, utf8Slices("2019-11-03", "2019-11-04", "2019-11-05", "2019-11-06", "2019-11-07"))))));

            assertPlan(pushdownFilterEnabled, "SELECT * FROM test_partition_pruning WHERE ds < '2019-11-05' AND date(ds) > date('2019-11-02')",
                    anyTree(tableScanWithConstraint("test_partition_pruning", ImmutableMap.of("ds", multipleValues(VARCHAR, utf8Slices("2019-11-03", "2019-11-04"))))));
        }
        finally {
            queryRunner.execute("DROP TABLE test_partition_pruning");
        }
    }

    @Test
    public void testOptimizeMetadataQueries()
    {
        QueryRunner queryRunner = getQueryRunner();
        Session optimizeMetadataQueries = Session.builder(this.getQueryRunner().getDefaultSession())
                .setSystemProperty(OPTIMIZE_METADATA_QUERIES, Boolean.toString(true))
                .setCatalogSessionProperty(HIVE_CATALOG, PUSHDOWN_FILTER_ENABLED, Boolean.toString(true))
                .build();

        queryRunner.execute(
                "CREATE TABLE test_optimize_metadata_queries WITH (partitioned_by = ARRAY['ds']) AS " +
                        "SELECT orderkey, CAST(to_iso8601(date_add('DAY', orderkey % 7, date('2020-10-01'))) AS VARCHAR) AS ds FROM orders WHERE orderkey < 1000");
        queryRunner.execute(
                "CREATE TABLE test_optimize_metadata_queries_multiple_partition_columns WITH (partitioned_by = ARRAY['ds', 'value']) AS " +
                        "SELECT orderkey, CAST(to_iso8601(date_add('DAY', orderkey % 7, date('2020-10-01'))) AS VARCHAR) AS ds, 1 AS value FROM orders WHERE orderkey < 1000");

        try {
            assertPlan(
                    optimizeMetadataQueries,
                    "SELECT DISTINCT ds FROM test_optimize_metadata_queries",
                    anyTree(values(
                            ImmutableList.of("ds"),
                            ImmutableList.of(
                                    ImmutableList.of(new StringLiteral("2020-10-01")),
                                    ImmutableList.of(new StringLiteral("2020-10-02")),
                                    ImmutableList.of(new StringLiteral("2020-10-03")),
                                    ImmutableList.of(new StringLiteral("2020-10-04")),
                                    ImmutableList.of(new StringLiteral("2020-10-05")),
                                    ImmutableList.of(new StringLiteral("2020-10-06")),
                                    ImmutableList.of(new StringLiteral("2020-10-07"))))));
            assertPlan(
                    optimizeMetadataQueries,
                    "SELECT DISTINCT ds FROM test_optimize_metadata_queries WHERE ds > '2020-10-04'",
                    anyTree(values(
                            ImmutableList.of("ds"),
                            ImmutableList.of(
                                    ImmutableList.of(new StringLiteral("2020-10-05")),
                                    ImmutableList.of(new StringLiteral("2020-10-06")),
                                    ImmutableList.of(new StringLiteral("2020-10-07"))))));
            assertPlan(
                    optimizeMetadataQueries,
                    "SELECT DISTINCT ds FROM test_optimize_metadata_queries WHERE ds = '2020-10-04' AND orderkey > 200",
                    anyTree(tableScan(
                            "test_optimize_metadata_queries",
                            withColumnDomains(ImmutableMap.of("orderkey", Domain.create(ValueSet.ofRanges(Range.greaterThan(BIGINT, 200L)), false))),
                            TRUE_CONSTANT,
                            ImmutableSet.of("orderkey"))));
            assertPlan(
                    optimizeMetadataQueries,
                    "SELECT DISTINCT ds FROM test_optimize_metadata_queries WHERE ds = '2020-10-04' AND orderkey > 200",
                    anyTree(tableScan(
                            "test_optimize_metadata_queries",
                            withColumnDomains(ImmutableMap.of("orderkey", Domain.create(ValueSet.ofRanges(Range.greaterThan(BIGINT, 200L)), false))),
                            TRUE_CONSTANT,
                            ImmutableSet.of("orderkey"))));

            assertPlan(
                    optimizeMetadataQueries,
                    "SELECT DISTINCT ds FROM test_optimize_metadata_queries_multiple_partition_columns",
                    anyTree(values(
                            ImmutableList.of("ds"),
                            ImmutableList.of(
                                    ImmutableList.of(new StringLiteral("2020-10-01")),
                                    ImmutableList.of(new StringLiteral("2020-10-02")),
                                    ImmutableList.of(new StringLiteral("2020-10-03")),
                                    ImmutableList.of(new StringLiteral("2020-10-04")),
                                    ImmutableList.of(new StringLiteral("2020-10-05")),
                                    ImmutableList.of(new StringLiteral("2020-10-06")),
                                    ImmutableList.of(new StringLiteral("2020-10-07"))))));
            assertPlan(
                    optimizeMetadataQueries,
                    "SELECT DISTINCT ds FROM test_optimize_metadata_queries_multiple_partition_columns WHERE ds > '2020-10-04'",
                    anyTree(values(
                            ImmutableList.of("ds"),
                            ImmutableList.of(
                                    ImmutableList.of(new StringLiteral("2020-10-05")),
                                    ImmutableList.of(new StringLiteral("2020-10-06")),
                                    ImmutableList.of(new StringLiteral("2020-10-07"))))));
            assertPlan(
                    optimizeMetadataQueries,
                    "SELECT DISTINCT ds FROM test_optimize_metadata_queries_multiple_partition_columns WHERE ds = '2020-10-04' AND orderkey > 200",
                    anyTree(tableScan(
                            "test_optimize_metadata_queries_multiple_partition_columns",
                            withColumnDomains(ImmutableMap.of("orderkey", Domain.create(ValueSet.ofRanges(Range.greaterThan(BIGINT, 200L)), false))),
                            TRUE_CONSTANT,
                            ImmutableSet.of("orderkey"))));
            assertPlan(
                    optimizeMetadataQueries,
                    "SELECT ds, MAX(value) FROM test_optimize_metadata_queries_multiple_partition_columns WHERE ds > '2020-10-04' GROUP BY ds",
                    anyTree(values(
                            ImmutableList.of("ds", "value"),
                            ImmutableList.of(
                                    ImmutableList.of(new StringLiteral("2020-10-05"), new LongLiteral("1")),
                                    ImmutableList.of(new StringLiteral("2020-10-06"), new LongLiteral("1")),
                                    ImmutableList.of(new StringLiteral("2020-10-07"), new LongLiteral("1"))))));
            assertPlan(
                    optimizeMetadataQueries,
                    "SELECT MAX(ds), MAX(value) FROM test_optimize_metadata_queries_multiple_partition_columns WHERE ds > '2020-10-04'",
                    anyTree(
                            project(
                                    ImmutableMap.of(
                                            "max", expression("'2020-10-07'"),
                                            "max_2", expression("1")),
                                    any(values()))));
            assertPlan(
                    optimizeMetadataQueries,
                    "SELECT MAX(value), MAX(ds) FROM test_optimize_metadata_queries_multiple_partition_columns WHERE ds > '2020-10-04'",
                    anyTree(
                            project(
                                    ImmutableMap.of(
                                            "max", expression("1"),
                                            "max_2", expression("'2020-10-07'")),
                                    any(values()))));
        }
        finally {
            queryRunner.execute("DROP TABLE IF EXISTS test_optimize_metadata_queries");
            queryRunner.execute("DROP TABLE IF EXISTS test_optimize_metadata_queries_multiple_partition_columns");
        }
    }

    @Test
    public void testMetadataAggregationFolding()
    {
        QueryRunner queryRunner = getQueryRunner();
        Session optimizeMetadataQueries = Session.builder(this.getQueryRunner().getDefaultSession())
                .setSystemProperty(OPTIMIZE_METADATA_QUERIES, Boolean.toString(true))
                .build();
        Session shufflePartitionColumns = Session.builder(this.getQueryRunner().getDefaultSession())
                .setCatalogSessionProperty(HIVE_CATALOG, SHUFFLE_PARTITIONED_COLUMNS_FOR_TABLE_WRITE, Boolean.toString(true))
                .build();

        queryRunner.execute(
                shufflePartitionColumns,
                "CREATE TABLE test_metadata_aggregation_folding WITH (partitioned_by = ARRAY['ds']) AS " +
                        "SELECT orderkey, CAST(to_iso8601(date_add('DAY', orderkey % 7, date('2020-07-01'))) AS VARCHAR) AS ds FROM orders WHERE orderkey < 1000");
        queryRunner.execute(
                shufflePartitionColumns,
                "CREATE TABLE test_metadata_aggregation_folding_more_partitions WITH (partitioned_by = ARRAY['ds']) AS " +
                        "SELECT orderkey, CAST(to_iso8601(date_add('DAY', orderkey % 200, date('2020-07-01'))) AS VARCHAR) AS ds FROM orders WHERE orderkey < 1000");
        queryRunner.execute(
                shufflePartitionColumns,
                "CREATE TABLE test_metadata_aggregation_folding_null_partitions WITH (partitioned_by = ARRAY['ds']) AS " +
                        "SELECT orderkey, CAST(to_iso8601(date_add('DAY', orderkey % 7, date('2020-07-01'))) AS VARCHAR) AS ds FROM orders WHERE orderkey < 1000");
        queryRunner.execute(
                shufflePartitionColumns,
                "INSERT INTO test_metadata_aggregation_folding_null_partitions SELECT 0 as orderkey, null AS ds");

        try {
            assertPlan(
                    optimizeMetadataQueries,
                    "SELECT * FROM test_metadata_aggregation_folding WHERE ds = (SELECT max(ds) from test_metadata_aggregation_folding)",
                    anyTree(
                            join(INNER, ImmutableList.of(),
                                    tableScan("test_metadata_aggregation_folding", getSingleValueColumnDomain("ds", "2020-07-07"), TRUE_CONSTANT, ImmutableSet.of("ds")),
                                    anyTree(any()))));
            assertPlan(
                    optimizeMetadataQueries,
                    "SELECT * FROM test_metadata_aggregation_folding WHERE ds = (SELECT min(ds) from test_metadata_aggregation_folding)",
                    anyTree(
                            join(INNER, ImmutableList.of(),
                                    tableScan("test_metadata_aggregation_folding", getSingleValueColumnDomain("ds", "2020-07-01"), TRUE_CONSTANT, ImmutableSet.of("ds")),
                                    anyTree(any()))));

            assertPlan(
                    optimizeMetadataQueries,
                    "SELECT * FROM test_metadata_aggregation_folding_more_partitions WHERE ds = (SELECT max(ds) from test_metadata_aggregation_folding_more_partitions)",
                    anyTree(
                            join(INNER, ImmutableList.of(),
                                    tableScan("test_metadata_aggregation_folding_more_partitions", getSingleValueColumnDomain("ds", "2021-01-16"), TRUE_CONSTANT, ImmutableSet.of("ds")),
                                    anyTree(any()))));
            assertPlan(
                    optimizeMetadataQueries,
                    "SELECT * FROM test_metadata_aggregation_folding_more_partitions WHERE ds = (SELECT min(ds) from test_metadata_aggregation_folding_more_partitions)",
                    anyTree(
                            join(INNER, ImmutableList.of(),
                                    tableScan("test_metadata_aggregation_folding_more_partitions", getSingleValueColumnDomain("ds", "2020-07-01"), TRUE_CONSTANT, ImmutableSet.of("ds")),
                                    anyTree(any()))));

            assertPlan(
                    optimizeMetadataQueries,
                    "SELECT * FROM test_metadata_aggregation_folding WHERE ds = (SELECT max(ds) from test_metadata_aggregation_folding_null_partitions)",
                    anyTree(
                            join(INNER, ImmutableList.of(),
                                    tableScan("test_metadata_aggregation_folding", getSingleValueColumnDomain("ds", "2020-07-07"), TRUE_CONSTANT, ImmutableSet.of("ds")),
                                    anyTree(any()))));
            assertPlan(
                    optimizeMetadataQueries,
                    "SELECT * FROM test_metadata_aggregation_folding WHERE ds = (SELECT min(ds) from test_metadata_aggregation_folding_null_partitions)",
                    anyTree(
                            join(INNER, ImmutableList.of(),
                                    tableScan("test_metadata_aggregation_folding", getSingleValueColumnDomain("ds", "2020-07-01"), TRUE_CONSTANT, ImmutableSet.of("ds")),
                                    anyTree(any()))));
        }
        finally {
            queryRunner.execute("DROP TABLE IF EXISTS test_metadata_aggregation_folding");
            queryRunner.execute("DROP TABLE IF EXISTS test_metadata_aggregation_folding_more_partitions");
            queryRunner.execute("DROP TABLE IF EXISTS test_metadata_aggregation_folding_null_partitions");
        }
    }

    @Test
    public void testMetadataAggregationFoldingWithEmptyPartitions()
    {
        QueryRunner queryRunner = getQueryRunner();
        Session optimizeMetadataQueries = Session.builder(this.getQueryRunner().getDefaultSession())
                .setSystemProperty(OPTIMIZE_METADATA_QUERIES, Boolean.toString(true))
                .build();
        Session optimizeMetadataQueriesIgnoreStats = Session.builder(this.getQueryRunner().getDefaultSession())
                .setSystemProperty(OPTIMIZE_METADATA_QUERIES_IGNORE_STATS, Boolean.toString(true))
                .build();
        Session shufflePartitionColumns = Session.builder(this.getQueryRunner().getDefaultSession())
                .setCatalogSessionProperty(HIVE_CATALOG, SHUFFLE_PARTITIONED_COLUMNS_FOR_TABLE_WRITE, Boolean.toString(true))
                .build();

        queryRunner.execute(
                shufflePartitionColumns,
                "CREATE TABLE test_metadata_aggregation_folding_with_empty_partitions WITH (partitioned_by = ARRAY['ds']) AS " +
                        "SELECT orderkey, CAST(to_iso8601(date_add('DAY', orderkey % 2, date('2020-07-01'))) AS VARCHAR) AS ds FROM orders WHERE orderkey < 1000");
        ExtendedHiveMetastore metastore = replicateHiveMetastore((DistributedQueryRunner) queryRunner);
        MetastoreContext metastoreContext = new MetastoreContext(getSession().getUser(), getSession().getQueryId().getId(), Optional.empty(), Collections.emptySet(), Optional.empty(), Optional.empty(), false, HiveColumnConverterProvider.DEFAULT_COLUMN_CONVERTER_PROVIDER, getSession().getWarningCollector(), getSession().getRuntimeStats());
        Table table = metastore.getTable(metastoreContext, getSession().getSchema().get(), "test_metadata_aggregation_folding_with_empty_partitions").get();

        // Add one partition with no statistics.
        String partitionNameNoStats = "ds=2020-07-20";
        Partition partitionNoStats = createDummyPartition(table, partitionNameNoStats);
        metastore.addPartitions(
                metastoreContext,
                table.getDatabaseName(),
                table.getTableName(),
                ImmutableList.of(new PartitionWithStatistics(partitionNoStats, partitionNameNoStats, PartitionStatistics.empty())));

        // Add one partition with statistics indicating that it has no rows.
        String emptyPartitionName = "ds=2020-06-30";
        Partition emptyPartition = createDummyPartition(table, emptyPartitionName);
        metastore.addPartitions(
                metastoreContext,
                table.getDatabaseName(),
                table.getTableName(),
                ImmutableList.of(new PartitionWithStatistics(
                        emptyPartition,
                        emptyPartitionName,
                        PartitionStatistics.builder().setBasicStatistics(new HiveBasicStatistics(1, 0, 0, 0)).build())));
        try {
            // Max ds doesn't have stats. Disable rewrite.
            assertPlan(
                    optimizeMetadataQueries,
                    "SELECT * FROM test_metadata_aggregation_folding_with_empty_partitions WHERE ds = (SELECT max(ds) from test_metadata_aggregation_folding_with_empty_partitions)",
                    anyTree(
                            anyTree(
                                    PlanMatchPattern.tableScan("test_metadata_aggregation_folding_with_empty_partitions")),
                            anyTree(
                                    PlanMatchPattern.tableScan("test_metadata_aggregation_folding_with_empty_partitions"))));
            // Ignore metastore stats. Enable rewrite.
            assertPlan(
                    optimizeMetadataQueriesIgnoreStats,
                    "SELECT * FROM test_metadata_aggregation_folding_with_empty_partitions WHERE ds = (SELECT max(ds) from test_metadata_aggregation_folding_with_empty_partitions)",
                    anyTree(
                            join(
                                    INNER,
                                    ImmutableList.of(),
                                    tableScan("test_metadata_aggregation_folding_with_empty_partitions", getSingleValueColumnDomain("ds", "2020-07-20"), TRUE_CONSTANT, ImmutableSet.of("ds")),
                                    anyTree(any()))));
            // Max ds matching the filter has stats. Enable rewrite.
            assertPlan(
                    optimizeMetadataQueries,
                    "SELECT * FROM test_metadata_aggregation_folding_with_empty_partitions WHERE ds = (SELECT max(ds) from test_metadata_aggregation_folding_with_empty_partitions WHERE ds <= '2020-07-02')",
                    anyTree(
                            join(
                                    INNER,
                                    ImmutableList.of(),
                                    tableScan("test_metadata_aggregation_folding_with_empty_partitions", getSingleValueColumnDomain("ds", "2020-07-02"), TRUE_CONSTANT, ImmutableSet.of("ds")),
                                    anyTree(any()))));
            // Min ds partition stats indicates that it is an empty partition. Disable rewrite.
            assertPlan(
                    optimizeMetadataQueries,
                    "SELECT * FROM test_metadata_aggregation_folding_with_empty_partitions WHERE ds = (SELECT min(ds) from test_metadata_aggregation_folding_with_empty_partitions)",
                    anyTree(
                            anyTree(
                                    PlanMatchPattern.tableScan("test_metadata_aggregation_folding_with_empty_partitions")),
                            anyTree(
                                    PlanMatchPattern.tableScan("test_metadata_aggregation_folding_with_empty_partitions"))));
            // Min ds partition matching the filter has non-empty stats. Enable rewrite.
            assertPlan(
                    optimizeMetadataQueries,
                    "SELECT * FROM test_metadata_aggregation_folding_with_empty_partitions WHERE ds = (SELECT min(ds) from test_metadata_aggregation_folding_with_empty_partitions WHERE ds >= '2020-07-01')",
                    anyTree(
                            join(
                                    INNER,
                                    ImmutableList.of(),
                                    tableScan("test_metadata_aggregation_folding_with_empty_partitions", getSingleValueColumnDomain("ds", "2020-07-01"), TRUE_CONSTANT, ImmutableSet.of("ds")),
                                    anyTree(any()))));
            // Test the non-reducible code path.
            // Disable rewrite as there are partitions with empty stats.
            assertPlan(
                    optimizeMetadataQueries,
                    "SELECT DISTINCT ds FROM test_metadata_aggregation_folding_with_empty_partitions",
                    anyTree(tableScanWithConstraint("test_metadata_aggregation_folding_with_empty_partitions", ImmutableMap.of("ds", multipleValues(VARCHAR, utf8Slices("2020-06-30", "2020-07-01", "2020-07-02", "2020-07-20"))))));
            // Enable rewrite as all matching partitions have stats.
            assertPlan(
                    optimizeMetadataQueries,
                    "SELECT DISTINCT ds FROM test_metadata_aggregation_folding_with_empty_partitions WHERE ds BETWEEN '2020-07-01' AND '2020-07-03'",
                    anyTree(
                            values(
                                    ImmutableList.of("ds"),
                                    ImmutableList.of(
                                            ImmutableList.of(new StringLiteral("2020-07-01")),
                                            ImmutableList.of(new StringLiteral("2020-07-02"))))));
            // One of two resulting partitions doesn't have stats. Disable rewrite.
            assertPlan(
                    optimizeMetadataQueries,
                    "SELECT MIN(ds), MAX(ds) FROM test_metadata_aggregation_folding_with_empty_partitions WHERE ds BETWEEN '2020-06-30' AND '2020-07-03'",
                    anyTree(tableScanWithConstraint("test_metadata_aggregation_folding_with_empty_partitions", ImmutableMap.of("ds", multipleValues(VARCHAR, utf8Slices("2020-06-30", "2020-07-01", "2020-07-02"))))));
            // Ignore metadata stats. Always enable rewrite.
            assertPlan(
                    optimizeMetadataQueriesIgnoreStats,
                    "SELECT MIN(ds), MAX(ds) FROM test_metadata_aggregation_folding_with_empty_partitions WHERE ds BETWEEN '2020-06-30' AND '2020-07-03'",
                    anyTree(
                            project(
                                    ImmutableMap.of(
                                            "min", expression("'2020-06-30'"),
                                            "max", expression("'2020-07-02'")),
                                    anyTree(values()))));
            // Both resulting partitions have stats. Enable rewrite.
            assertPlan(
                    optimizeMetadataQueries,
                    "SELECT MIN(ds), MAX(ds) FROM test_metadata_aggregation_folding_with_empty_partitions WHERE ds BETWEEN '2020-07-01' AND '2020-07-03'",
                    anyTree(
                            project(
                                    ImmutableMap.of(
                                            "min", expression("'2020-07-01'"),
                                            "max", expression("'2020-07-02'")),
                                    anyTree(values()))));
        }
        finally {
            queryRunner.execute("DROP TABLE IF EXISTS test_metadata_aggregation_folding_with_empty_partitions");
        }
    }

    @Test
    public void testMetadataAggregationFoldingWithEmptyPartitionsAndMetastoreThreshold()
    {
        QueryRunner queryRunner = getQueryRunner();
        Session optimizeMetadataQueriesWithHighThreshold = Session.builder(this.getQueryRunner().getDefaultSession())
                .setSystemProperty(OPTIMIZE_METADATA_QUERIES, Boolean.toString(true))
                .setSystemProperty(OPTIMIZE_METADATA_QUERIES_CALL_THRESHOLD, Integer.toString(100))
                .build();
        Session optimizeMetadataQueriesWithLowThreshold = Session.builder(this.getQueryRunner().getDefaultSession())
                .setSystemProperty(OPTIMIZE_METADATA_QUERIES, Boolean.toString(true))
                .setSystemProperty(OPTIMIZE_METADATA_QUERIES_CALL_THRESHOLD, Integer.toString(1))
                .build();
        Session optimizeMetadataQueriesIgnoreStatsWithLowThreshold = Session.builder(this.getQueryRunner().getDefaultSession())
                .setSystemProperty(OPTIMIZE_METADATA_QUERIES_IGNORE_STATS, Boolean.toString(true))
                .setSystemProperty(OPTIMIZE_METADATA_QUERIES_CALL_THRESHOLD, Integer.toString(1))
                .build();
        Session shufflePartitionColumns = Session.builder(this.getQueryRunner().getDefaultSession())
                .setCatalogSessionProperty(HIVE_CATALOG, SHUFFLE_PARTITIONED_COLUMNS_FOR_TABLE_WRITE, Boolean.toString(true))
                .build();

        queryRunner.execute(
                shufflePartitionColumns,
                "CREATE TABLE test_metadata_aggregation_folding_with_empty_partitions_with_threshold WITH (partitioned_by = ARRAY['ds']) AS " +
                        "SELECT orderkey, CAST(to_iso8601(date_add('DAY', orderkey % 2, date('2020-07-01'))) AS VARCHAR) AS ds FROM orders WHERE orderkey < 1000");
        ExtendedHiveMetastore metastore = replicateHiveMetastore((DistributedQueryRunner) queryRunner);
        MetastoreContext metastoreContext = new MetastoreContext(getSession().getUser(), getSession().getQueryId().getId(), Optional.empty(), Collections.emptySet(), Optional.empty(), Optional.empty(), false, HiveColumnConverterProvider.DEFAULT_COLUMN_CONVERTER_PROVIDER, getSession().getWarningCollector(), getSession().getRuntimeStats());
        Table table = metastore.getTable(metastoreContext, getSession().getSchema().get(), "test_metadata_aggregation_folding_with_empty_partitions_with_threshold").get();

        // Add one partition with no statistics.
        String partitionNameNoStats = "ds=2020-07-20";
        Partition partitionNoStats = createDummyPartition(table, partitionNameNoStats);
        metastore.addPartitions(
                metastoreContext,
                table.getDatabaseName(),
                table.getTableName(),
                ImmutableList.of(new PartitionWithStatistics(partitionNoStats, partitionNameNoStats, PartitionStatistics.empty())));

        // Add one partition with statistics indicating that it has no rows.
        String emptyPartitionName = "ds=2020-06-30";
        Partition emptyPartition = createDummyPartition(table, emptyPartitionName);
        metastore.addPartitions(
                metastoreContext,
                table.getDatabaseName(),
                table.getTableName(),
                ImmutableList.of(new PartitionWithStatistics(
                        emptyPartition,
                        emptyPartitionName,
                        PartitionStatistics.builder().setBasicStatistics(new HiveBasicStatistics(1, 0, 0, 0)).build())));
        try {
            // Test the non-reducible code path.
            // Enable rewrite as all matching partitions have stats.
            assertPlan(
                    optimizeMetadataQueriesWithHighThreshold,
                    "SELECT DISTINCT ds FROM test_metadata_aggregation_folding_with_empty_partitions_with_threshold WHERE ds BETWEEN '2020-07-01' AND '2020-07-03'",
                    anyTree(
                            values(
                                    ImmutableList.of("ds"),
                                    ImmutableList.of(
                                            ImmutableList.of(new StringLiteral("2020-07-01")),
                                            ImmutableList.of(new StringLiteral("2020-07-02"))))));
            // All matching partitions have stats, Metastore threshold is reached, Disable rewrite
            assertPlan(
                    optimizeMetadataQueriesWithLowThreshold,
                    "SELECT DISTINCT ds FROM test_metadata_aggregation_folding_with_empty_partitions_with_threshold WHERE ds BETWEEN '2020-07-01' AND '2020-07-03'",
                    anyTree(tableScanWithConstraint("test_metadata_aggregation_folding_with_empty_partitions_with_threshold", ImmutableMap.of("ds", multipleValues(VARCHAR, utf8Slices("2020-07-01", "2020-07-02"))))));
            // All matching partitions have stats, Metastore threshold is reached, but IgnoreStats will overwrite, Enable rewrite
            assertPlan(
                    optimizeMetadataQueriesIgnoreStatsWithLowThreshold,
                    "SELECT DISTINCT ds FROM test_metadata_aggregation_folding_with_empty_partitions_with_threshold WHERE ds BETWEEN '2020-07-01' AND '2020-07-03'",
                    anyTree(
                            values(
                                    ImmutableList.of("ds"),
                                    ImmutableList.of(
                                            ImmutableList.of(new StringLiteral("2020-07-01")),
                                            ImmutableList.of(new StringLiteral("2020-07-02"))))));
        }
        finally {
            queryRunner.execute("DROP TABLE IF EXISTS test_metadata_aggregation_folding_with_empty_partitions_with_threshold");
        }
    }

    @Test
    public void testMetadataAggregationFoldingWithTwoPartitionColumns()
    {
        QueryRunner queryRunner = getQueryRunner();
        Session optimizeMetadataQueries = Session.builder(this.getQueryRunner().getDefaultSession())
                .setSystemProperty(OPTIMIZE_METADATA_QUERIES, Boolean.toString(true))
                .build();
        Session shufflePartitionColumns = Session.builder(this.getQueryRunner().getDefaultSession())
                .setCatalogSessionProperty(HIVE_CATALOG, SHUFFLE_PARTITIONED_COLUMNS_FOR_TABLE_WRITE, Boolean.toString(true))
                .build();

        // Create a table with partitions: ds=2020-07-01/status=A, ds=2020-07-01/status=B, ds=2020-07-02/status=A, ds=2020-07-02/status=B
        queryRunner.execute(
                shufflePartitionColumns,
                "CREATE TABLE test_metadata_aggregation_folding_with_two_partitions_columns WITH (partitioned_by = ARRAY['ds', 'status']) AS " +
                        "SELECT orderkey, CAST(to_iso8601(date_add('DAY', orderkey % 2, date('2020-07-01'))) AS VARCHAR) AS ds, IF(orderkey % 2 = 1, 'A', 'B') status " +
                        "FROM orders WHERE orderkey < 1000");
        ExtendedHiveMetastore metastore = replicateHiveMetastore((DistributedQueryRunner) queryRunner);
        MetastoreContext metastoreContext = new MetastoreContext(getSession().getUser(), getSession().getQueryId().getId(), Optional.empty(), Collections.emptySet(), Optional.empty(), Optional.empty(), false, HiveColumnConverterProvider.DEFAULT_COLUMN_CONVERTER_PROVIDER, getSession().getWarningCollector(), getSession().getRuntimeStats());
        Table table = metastore.getTable(metastoreContext, getSession().getSchema().get(), "test_metadata_aggregation_folding_with_two_partitions_columns").get();

        // Add one partition with no statistics.
        String partitionNameNoStats = "ds=2020-07-03/status=C";
        Partition partitionNoStats = createDummyPartition(table, partitionNameNoStats);
        metastore.addPartitions(
                metastoreContext,
                table.getDatabaseName(),
                table.getTableName(),
                ImmutableList.of(new PartitionWithStatistics(partitionNoStats, partitionNameNoStats, PartitionStatistics.empty())));

        try {
            // All matching partitions have stats. Enable rewrite.
            assertPlan(
                    optimizeMetadataQueries,
                    "SELECT MIN(ds), MAX(ds) FROM test_metadata_aggregation_folding_with_two_partitions_columns WHERE ds BETWEEN '2020-07-01' AND '2020-07-02'",
                    anyTree(
                            project(
                                    ImmutableMap.of(
                                            "min", expression("'2020-07-01'"),
                                            "max", expression("'2020-07-02'")),
                                    anyTree(values()))));
            // All matching partitions have stats. Enable rewrite.
            assertPlan(
                    optimizeMetadataQueries,
                    "SELECT MIN(status), MAX(ds) FROM test_metadata_aggregation_folding_with_two_partitions_columns WHERE ds BETWEEN '2020-07-01' AND '2020-07-02'",
                    anyTree(
                            project(
                                    ImmutableMap.of(
                                            "min", expression("'A'"),
                                            "max", expression("'2020-07-02'")),
                                    anyTree(values()))));
            // All matching partitions have stats. Enable rewrite.
            assertPlan(
                    optimizeMetadataQueries,
                    "SELECT MIN(ds) ds, MIN(status) status FROM test_metadata_aggregation_folding_with_two_partitions_columns",
                    anyTree(
                            project(
                                    ImmutableMap.of(
                                            "ds", expression("'2020-07-01'"),
                                            "status", expression("'A'")),
                                    anyTree(values()))));
            // Resulting partition doesn't have stats. Disable rewrite.
            assertPlan(
                    optimizeMetadataQueries,
                    "SELECT MAX(status) status FROM test_metadata_aggregation_folding_with_two_partitions_columns",
                    anyTree(
                            tableScanWithConstraint(
                                    "test_metadata_aggregation_folding_with_two_partitions_columns",
                                    ImmutableMap.of(
                                            "status", multipleValues(VarcharType.createVarcharType(1), utf8Slices("A", "B", "C")),
                                            "ds", multipleValues(VARCHAR, utf8Slices("2020-07-01", "2020-07-02", "2020-07-03"))))));
        }
        finally {
            queryRunner.execute("DROP TABLE IF EXISTS test_metadata_aggregation_folding_with_two_partitions_columns");
        }
    }

    @DataProvider(name = "optimize_metadata_filter_properties")
    public static Object[][] optimizeMetadataFilterProperties()
    {
        return new Object[][] {
                {true, true},
                {true, false},
                {false, true},
                {false, false}
        };
    }

    @Test(dataProvider = "optimize_metadata_filter_properties")
    public void testMetadataAggregationFoldingWithFilters(boolean pushdownSubfieldsEnabled, boolean pushdownFilterEnabled)
    {
        QueryRunner queryRunner = getQueryRunner();
        Session optimizeMetadataQueries = Session.builder(this.getQueryRunner().getDefaultSession())
                .setSystemProperty(OPTIMIZE_METADATA_QUERIES, Boolean.toString(true))
                .setSystemProperty(PUSHDOWN_SUBFIELDS_ENABLED, Boolean.toString(pushdownSubfieldsEnabled))
                .setCatalogSessionProperty(HIVE_CATALOG, PUSHDOWN_FILTER_ENABLED, Boolean.toString(pushdownFilterEnabled))
                .build();
        Session shufflePartitionColumns = Session.builder(this.getQueryRunner().getDefaultSession())
                .setCatalogSessionProperty(HIVE_CATALOG, SHUFFLE_PARTITIONED_COLUMNS_FOR_TABLE_WRITE, Boolean.toString(true))
                .build();

        queryRunner.execute(
                shufflePartitionColumns,
                "CREATE TABLE test_metadata_aggregation_folding_with_filters WITH (partitioned_by = ARRAY['ds']) AS " +
                        "SELECT orderkey, ARRAY[orderstatus] AS orderstatus, CAST (ROW (orderkey) as ROW(subfield BIGINT)) AS struct, CAST(to_iso8601(date_add('DAY', orderkey % 2, date('2020-07-01'))) AS VARCHAR) AS ds FROM orders WHERE orderkey < 1000");

        try {
            // There is a filter on non-partition column which can be pushed down to the connector. Disable the rewrite.
            assertPlan(
                    optimizeMetadataQueries,
                    "SELECT max(ds) from test_metadata_aggregation_folding_with_filters WHERE contains(orderstatus, 'F')",
                    anyTree(
                            tableScanWithConstraint(
                                    "test_metadata_aggregation_folding_with_filters",
                                    ImmutableMap.of("ds", multipleValues(VARCHAR, utf8Slices("2020-07-01", "2020-07-02"))))));

            // There is a filter on a subfield of non-partition column which can be pushed down to the connector. Disable the rewrite.
            assertPlan(
                    optimizeMetadataQueries,
                    "SELECT max(ds) from test_metadata_aggregation_folding_with_filters WHERE struct.subfield is null",
                    anyTree(
                            tableScanWithConstraint(
                                    "test_metadata_aggregation_folding_with_filters",
                                    ImmutableMap.of("ds", multipleValues(VARCHAR, utf8Slices("2020-07-01", "2020-07-02"))))));
        }
        finally {
            queryRunner.execute("DROP TABLE IF EXISTS test_metadata_aggregation_folding_with_filters");
        }
    }

    private static Partition createDummyPartition(Table table, String partitionName)
    {
        return Partition.builder()
                .setCatalogName(Optional.of("hive"))
                .setDatabaseName(table.getDatabaseName())
                .setTableName(table.getTableName())
                .setColumns(table.getDataColumns())
                .setValues(toPartitionValues(partitionName))
                .withStorage(storage -> storage
                        .setStorageFormat(fromHiveStorageFormat(HiveStorageFormat.ORC))
                        .setLocation(new Path(table.getStorage().getLocation(), partitionName).toString()))
                .setEligibleToIgnore(true)
                .setSealedPartition(true)
                .build();
    }

    private static TupleDomain<String> getSingleValueColumnDomain(String column, String value)
    {
        return withColumnDomains(ImmutableMap.of(column, singleValue(VARCHAR, utf8Slice(value))));
    }

    static List<Slice> utf8Slices(String... values)
    {
        return Arrays.stream(values).map(Slices::utf8Slice).collect(toImmutableList());
    }

    private static PlanMatchPattern tableScanWithConstraint(String tableName, Map<String, Domain> expectedConstraint)
    {
        return PlanMatchPattern.tableScan(tableName).with(new Matcher()
        {
            @Override
            public boolean shapeMatches(PlanNode node)
            {
                return node instanceof TableScanNode;
            }

            @Override
            public MatchResult detailMatches(PlanNode node, StatsProvider stats, Session session, Metadata metadata, SymbolAliases symbolAliases)
            {
                TableScanNode tableScan = (TableScanNode) node;
                TupleDomain<String> constraint = tableScan.getCurrentConstraint()
                        .transform(HiveColumnHandle.class::cast)
                        .transform(HiveColumnHandle::getName);

                if (!expectedConstraint.equals(constraint.getDomains().get())) {
                    return NO_MATCH;
                }

                return match();
            }
        });
    }

    @Test
    public void testPushdownFilterOnSubfields()
    {
        assertUpdate("CREATE TABLE test_pushdown_filter_on_subfields(" +
                "id bigint, " +
                "a array(bigint), " +
                "b map(varchar, bigint), " +
                "c row(" +
                "a bigint, " +
                "b row(x bigint), " +
                "c array(bigint), " +
                "d map(bigint, bigint), " +
                "e map(varchar, bigint)))");

        assertPushdownFilterOnSubfields("SELECT * FROM test_pushdown_filter_on_subfields WHERE a[1] = 1",
                ImmutableMap.of(new Subfield("a[1]"), singleValue(BIGINT, 1L)));

        assertPushdownFilterOnSubfields("SELECT * FROM test_pushdown_filter_on_subfields where a[1 + 1] = 1",
                ImmutableMap.of(new Subfield("a[2]"), singleValue(BIGINT, 1L)));

        assertPushdownFilterOnSubfields("SELECT *  FROM test_pushdown_filter_on_subfields WHERE b['foo'] = 1",
                ImmutableMap.of(new Subfield("b[\"foo\"]"), singleValue(BIGINT, 1L)));

        assertPushdownFilterOnSubfields("SELECT * FROM test_pushdown_filter_on_subfields WHERE b[concat('f','o', 'o')] = 1",
                ImmutableMap.of(new Subfield("b[\"foo\"]"), singleValue(BIGINT, 1L)));

        assertPushdownFilterOnSubfields("SELECT * FROM test_pushdown_filter_on_subfields WHERE c.a = 1",
                ImmutableMap.of(new Subfield("c.a"), singleValue(BIGINT, 1L)));

        assertPushdownFilterOnSubfields("SELECT * FROM test_pushdown_filter_on_subfields WHERE c.b.x = 1",
                ImmutableMap.of(new Subfield("c.b.x"), singleValue(BIGINT, 1L)));

        assertPushdownFilterOnSubfields("SELECT * FROM test_pushdown_filter_on_subfields WHERE c.c[5] = 1",
                ImmutableMap.of(new Subfield("c.c[5]"), singleValue(BIGINT, 1L)));

        assertPushdownFilterOnSubfields("SELECT * FROM test_pushdown_filter_on_subfields WHERE c.d[5] = 1",
                ImmutableMap.of(new Subfield("c.d[5]"), singleValue(BIGINT, 1L)));

        assertPushdownFilterOnSubfields("SELECT * FROM test_pushdown_filter_on_subfields WHERE c.e[concat('f', 'o', 'o')] = 1",
                ImmutableMap.of(new Subfield("c.e[\"foo\"]"), singleValue(BIGINT, 1L)));

        assertPushdownFilterOnSubfields("SELECT * FROM test_pushdown_filter_on_subfields WHERE c.e['foo'] = 1",
                ImmutableMap.of(new Subfield("c.e[\"foo\"]"), singleValue(BIGINT, 1L)));

        assertPushdownFilterOnSubfields("SELECT * FROM test_pushdown_filter_on_subfields WHERE c.a IS NOT NULL AND c.c IS NOT NULL",
                ImmutableMap.of(new Subfield("c.a"), notNull(BIGINT), new Subfield("c.c"), notNull(new ArrayType(BIGINT))));

        // TupleDomain predicate is always FALSE
        assertPlan(pushdownFilterEnabled(), "SELECT id FROM test_pushdown_filter_on_subfields WHERE c.a = 1 AND c.a = 2",
                output(values("id")));

        assertUpdate("DROP TABLE test_pushdown_filter_on_subfields");
    }

    @Test
    public void testPushdownArraySubscripts()
    {
        assertUpdate("CREATE TABLE test_pushdown_array_subscripts(id bigint, " +
                "a array(bigint), " +
                "b array(array(varchar)), " +
                "y array(row(a bigint, b varchar, c double, d row(d1 bigint, d2 double))), " +
                "z array(array(row(p bigint, e row(e1 bigint, e2 varchar)))))");

        assertPushdownSubscripts("test_pushdown_array_subscripts");

        // Unnest
        assertPushdownSubfields("SELECT t.b, a[1] FROM test_pushdown_array_subscripts CROSS JOIN UNNEST(b) as t(b)", "test_pushdown_array_subscripts",
                ImmutableMap.of("a", toSubfields("a[1]")));

        assertPushdownSubfields("SELECT t.b, a[1] FROM test_pushdown_array_subscripts CROSS JOIN UNNEST(b[1]) as t(b)", "test_pushdown_array_subscripts",
                ImmutableMap.of("a", toSubfields("a[1]"), "b", toSubfields("b[1]")));

        assertPushdownSubfields("SELECT t.b[2], a[1] FROM test_pushdown_array_subscripts CROSS JOIN UNNEST(b) as t(b)", "test_pushdown_array_subscripts",
                ImmutableMap.of("a", toSubfields("a[1]"), "b", toSubfields("b[*][2]")));

        assertPushdownSubfields("SELECT id, grouping(index), sum(length(b[1][2])) FROM test_pushdown_array_subscripts CROSS JOIN UNNEST(a) as t(index) GROUP BY grouping sets ((index, id), (index))", "test_pushdown_array_subscripts",
                ImmutableMap.of("b", toSubfields("b[1][2]")));

        assertPushdownSubfields("SELECT id, b[1] FROM test_pushdown_array_subscripts CROSS JOIN UNNEST(a) as t(unused)", "test_pushdown_array_subscripts",
                ImmutableMap.of("b", toSubfields("b[1]")));

        // No subfield pruning
        assertPushdownSubfields("SELECT array_sort(a)[1] FROM test_pushdown_array_subscripts", "test_pushdown_array_subscripts",
                ImmutableMap.of());

        assertPushdownSubfields("SELECT id FROM test_pushdown_array_subscripts CROSS JOIN UNNEST(a) as t(index) WHERE a[1] > 10 AND cardinality(b[index]) = 2", "test_pushdown_array_subscripts",
                ImmutableMap.of());

        assertUpdate("DROP TABLE test_pushdown_array_subscripts");
    }

    @Test
    public void testPushdownMapSubscripts()
    {
        assertUpdate("CREATE TABLE test_pushdown_map_subscripts(id bigint, " +
                "a map(bigint, bigint), " +
                "b map(bigint, map(bigint, varchar)), " +
                "c map(varchar, bigint), \n" +
                "y map(bigint, row(a bigint, b varchar, c double, d row(d1 bigint, d2 double)))," +
                "z map(bigint, map(bigint, row(p bigint, e row(e1 bigint, e2 varchar)))))");

        assertPushdownSubscripts("test_pushdown_map_subscripts");

        // Unnest
        assertPushdownSubfields("SELECT t.b, a[1] FROM test_pushdown_map_subscripts CROSS JOIN UNNEST(b) as t(k, b)", "test_pushdown_map_subscripts",
                ImmutableMap.of("a", toSubfields("a[1]")));

        assertPushdownSubfields("SELECT t.b, a[1] FROM test_pushdown_map_subscripts CROSS JOIN UNNEST(b[1]) as t(k, b)", "test_pushdown_map_subscripts",
                ImmutableMap.of("a", toSubfields("a[1]"), "b", toSubfields("b[1]")));

        assertPushdownSubfields("SELECT t.b[2], a[1] FROM test_pushdown_map_subscripts CROSS JOIN UNNEST(b) as t(k, b)", "test_pushdown_map_subscripts",
                ImmutableMap.of("a", toSubfields("a[1]"), "b", toSubfields("b[*][2]")));

        assertPushdownSubfields("SELECT id, b[1] FROM test_pushdown_map_subscripts CROSS JOIN UNNEST(a) as t(unused_k, unused_v)", "test_pushdown_map_subscripts",
                ImmutableMap.of("b", toSubfields("b[1]")));

        // Map with varchar keys
        assertPushdownSubfields("SELECT c['cat'] FROM test_pushdown_map_subscripts", "test_pushdown_map_subscripts",
                ImmutableMap.of("c", toSubfields("c[\"cat\"]")));

        assertPushdownSubfields("SELECT c[JSON_EXTRACT_SCALAR(JSON_PARSE('{}'),'$.a')] FROM test_pushdown_map_subscripts", "test_pushdown_map_subscripts",
                ImmutableMap.of());

        assertPushdownSubfields("SELECT mod(c['cat'], 2) FROM test_pushdown_map_subscripts WHERE c['dog'] > 10", "test_pushdown_map_subscripts",
                ImmutableMap.of("c", toSubfields("c[\"cat\"]", "c[\"dog\"]")));

        // No subfield pruning
        assertPushdownSubfields("SELECT map_keys(a)[1] FROM test_pushdown_map_subscripts", "test_pushdown_map_subscripts",
                ImmutableMap.of());

        assertUpdate("DROP TABLE test_pushdown_map_subscripts");
    }

    private void assertPushdownSubscripts(String tableName)
    {
        // Filter and project
        assertPushdownSubfields(format("SELECT a[1] FROM %s", tableName), tableName,
                ImmutableMap.of("a", toSubfields("a[1]")));

        assertPushdownSubfields(format("SELECT a[1] + 10 FROM %s", tableName), tableName,
                ImmutableMap.of("a", toSubfields("a[1]")));

        assertPushdownSubfields(format("SELECT a[1] + mod(a[2], 3) FROM %s", tableName), tableName,
                ImmutableMap.of("a", toSubfields("a[1]", "a[2]")));

        assertPushdownSubfields(format("SELECT a[1] FROM %s WHERE a[2] > 10", tableName), tableName,
                ImmutableMap.of("a", toSubfields("a[1]", "a[2]")));

        assertPushdownSubfields(format("SELECT a[1] FROM %s WHERE mod(a[2], 3) = 1", tableName), tableName,
                ImmutableMap.of("a", toSubfields("a[1]", "a[2]")));

        assertPushdownSubfields(format("SELECT a[1], b[2][3] FROM %s", tableName), tableName,
                ImmutableMap.of("a", toSubfields("a[1]"), "b", toSubfields("b[2][3]")));

        assertPushdownSubfields(format("SELECT cardinality(b[1]), b[1][2] FROM %s", tableName), tableName,
                ImmutableMap.of("b", toSubfields("b[1]")));

        assertPushdownSubfields(format("CREATE TABLE x AS SELECT id, a[1] as a1 FROM %s", tableName), tableName,
                ImmutableMap.of("a", toSubfields("a[1]")));

        assertPushdownSubfields(format("CREATE TABLE x AS SELECT id FROM %s WHERE a[1] > 10", tableName), tableName,
                ImmutableMap.of("a", toSubfields("a[1]")));

        assertPushdownSubfields(format("SELECT a[1] FROM %s ORDER BY id LIMIT 1", tableName), tableName,
                ImmutableMap.of("a", toSubfields("a[1]")));

        // Sort
        assertPushdownSubfields(format("SELECT a[1] FROM %s ORDER BY a[2]", tableName), tableName,
                ImmutableMap.of("a", toSubfields("a[1]", "a[2]")));

        // Join
        assertPlan(format("SELECT l.orderkey, a.a[1] FROM lineitem l, %s a WHERE l.linenumber = a.id", tableName),
                anyTree(
                        node(JoinNode.class,
                                anyTree(tableScan("lineitem", ImmutableMap.of())),
                                anyTree(tableScan(tableName, ImmutableMap.of("a", toSubfields("a[1]")))))));

        assertPlan(format("SELECT l.orderkey, a.a[1] FROM lineitem l, %s a WHERE l.linenumber = a.id AND a.a[2] > 10", tableName),
                anyTree(
                        node(JoinNode.class,
                                anyTree(tableScan("lineitem", ImmutableMap.of())),
                                anyTree(tableScan(tableName, ImmutableMap.of("a", toSubfields("a[1]", "a[2]")))))));

        // Semi join
        assertPlan(format("SELECT a[1] FROM %s WHERE a[2] IN (SELECT a[3] FROM %s)", tableName, tableName),
                anyTree(node(SemiJoinNode.class,
                        anyTree(tableScan(tableName, ImmutableMap.of("a", toSubfields("a[1]", "a[2]")))),
                        anyTree(tableScan(tableName, ImmutableMap.of("a", toSubfields("a[3]")))))));

        // Aggregation
        assertPushdownSubfields(format("SELECT id, min(a[1]) FROM %s GROUP BY 1", tableName), tableName,
                ImmutableMap.of("a", toSubfields("a[1]")));

        assertPushdownSubfields(format("SELECT id, min(a[1]) FROM %s GROUP BY 1, a[2]", tableName), tableName,
                ImmutableMap.of("a", toSubfields("a[1]", "a[2]")));

        assertPushdownSubfields(format("SELECT id, min(a[1]) FROM %s GROUP BY 1 HAVING max(a[2]) > 10", tableName), tableName,
                ImmutableMap.of("a", toSubfields("a[1]", "a[2]")));

        assertPushdownSubfields(format("SELECT id, min(mod(a[1], 3)) FROM %s GROUP BY 1", tableName), tableName,
                ImmutableMap.of("a", toSubfields("a[1]")));

        assertPushdownSubfields(format("SELECT id, min(a[1]) FILTER (WHERE a[2] > 10) FROM %s GROUP BY 1", tableName), tableName,
                ImmutableMap.of("a", toSubfields("a[1]", "a[2]")));

        assertPushdownSubfields(format("SELECT id, min(a[1] + length(b[2][3])) * avg(a[4]) FROM %s GROUP BY 1", tableName), tableName,
                ImmutableMap.of("a", toSubfields("a[1]", "a[4]"), "b", toSubfields("b[2][3]")));

        assertPushdownSubfields(format("SELECT min(a[1]) FROM %s GROUP BY id", tableName), tableName,
                ImmutableMap.of("a", toSubfields("a[1]")));

        assertPushdownSubfields(format("SELECT arbitrary(y[1]).a FROM %s GROUP BY id", tableName), tableName,
                ImmutableMap.of("y", toSubfields("y[1].a")));

        assertPushdownSubfields(format("SELECT arbitrary(y[1]).d.d1 FROM %s GROUP BY id", tableName), tableName,
                ImmutableMap.of("y", toSubfields("y[1].d.d1")));

        assertPushdownSubfields(format("SELECT arbitrary(y[2].d).d1 FROM %s GROUP BY id", tableName), tableName,
                ImmutableMap.of("y", toSubfields("y[2].d.d1")));

        assertPushdownSubfields(format("SELECT arbitrary(y[3].d.d1) FROM %s GROUP BY id", tableName), tableName,
                ImmutableMap.of("y", toSubfields("y[3].d.d1")));

        assertPushdownSubfields(format("SELECT arbitrary(z[1][2]).e.e1 FROM %s GROUP BY id", tableName), tableName,
                ImmutableMap.of("z", toSubfields("z[1][2].e.e1")));

        assertPushdownSubfields(format("SELECT arbitrary(z[2][3].e).e2 FROM %s GROUP BY id", tableName), tableName,
                ImmutableMap.of("z", toSubfields("z[2][3].e.e2")));

        // Union
        assertPlan(format("SELECT a[1] FROM %s UNION ALL SELECT a[2] FROM %s", tableName, tableName),
                anyTree(exchange(
                        anyTree(tableScan(tableName, ImmutableMap.of("a", toSubfields("a[1]")))),
                        anyTree(tableScan(tableName, ImmutableMap.of("a", toSubfields("a[2]")))))));

        assertPlan(format("SELECT a[1] FROM (SELECT * FROM %s UNION ALL SELECT * FROM %s)", tableName, tableName),
                anyTree(exchange(
                        anyTree(tableScan(tableName, ImmutableMap.of("a", toSubfields("a[1]")))),
                        anyTree(tableScan(tableName, ImmutableMap.of("a", toSubfields("a[1]")))))));

        assertPlan(format("SELECT a[1] FROM (SELECT * FROM %s WHERE a[2] > 10 UNION ALL SELECT * FROM %s)", tableName, tableName),
                anyTree(exchange(
                        anyTree(tableScan(tableName, ImmutableMap.of("a", toSubfields("a[1]", "a[2]")))),
                        anyTree(tableScan(tableName, ImmutableMap.of("a", toSubfields("a[1]")))))));

        // Except
        assertPlan(format("SELECT a[1] FROM %s EXCEPT SELECT a[2] FROM %s", tableName, tableName),
                anyTree(exchange(
                        anyTree(tableScan(tableName, ImmutableMap.of("a", toSubfields("a[1]")))),
                        anyTree(tableScan(tableName, ImmutableMap.of("a", toSubfields("a[2]")))))));

        // Intersect
        assertPlan(format("SELECT a[1] FROM %s INTERSECT SELECT a[2] FROM %s", tableName, tableName),
                anyTree(exchange(
                        anyTree(tableScan(tableName, ImmutableMap.of("a", toSubfields("a[1]")))),
                        anyTree(tableScan(tableName, ImmutableMap.of("a", toSubfields("a[2]")))))));

        // Window function
        assertPushdownSubfields(format("SELECT id, first_value(a[1]) over (partition by a[2] order by b[1][2]) FROM %s", tableName), tableName,
                ImmutableMap.of("a", toSubfields("a[1]", "a[2]"), "b", toSubfields("b[1][2]")));

        assertPushdownSubfields(format("SELECT count(*) over (partition by a[1] order by a[2] rows between a[3] preceding and a[4] preceding) FROM %s", tableName), tableName,
                ImmutableMap.of("a", toSubfields("a[1]", "a[2]", "a[3]", "a[4]")));

        // no subfield pruning
        assertPushdownSubfields(format("SELECT a[id] FROM %s", tableName), tableName,
                ImmutableMap.of());

        assertPushdownSubfields(format("SELECT a[1] FROM (SELECT DISTINCT * FROM %s) LIMIT 10", tableName), tableName,
                ImmutableMap.of());

        // No pass through subfield pruning
        assertPushdownSubfields(format("SELECT id, min(y[1]).a FROM %s GROUP BY 1", tableName), tableName,
                ImmutableMap.of("y", toSubfields("y[1]")));
        assertPushdownSubfields(format("SELECT id, min(y[1]).a, min(y[1].d).d1 FROM %s GROUP BY 1", tableName), tableName,
                ImmutableMap.of("y", toSubfields("y[1]")));
        assertPushdownSubfields(format("SELECT id, min(z[1][2]).e.e1 FROM %s GROUP BY 1", tableName), tableName,
                ImmutableMap.of("z", toSubfields("z[1][2]")));
    }

    @Test
    public void testPushDownSubfieldsFromLambdas()
    {
        final String tableName = "test_pushdown_subfields_from_array_lambda";
        try {
            assertUpdate("CREATE TABLE " + tableName + "(id bigint, " +
                    "a array(bigint), " +
                    "b array(array(varchar)), " +
                    "mi map(int,array(row(a1 bigint, a2 double))), " +
                    "mv map(varchar,array(row(a1 bigint, a2 double))), " +
                    "m1 map(int,row(a1 bigint, a2 double)), " +
                    "m2 map(int,row(a1 bigint, a2 double)), " +
                    "m3 map(int,row(a1 bigint, a2 double)), " +
                    "r row(a array(row(a1 bigint, a2 double)), i bigint, d row(d1 bigint, d2 double)), " +
                    "y array(row(a bigint, b varchar, c double, d row(d1 bigint, d2 double))), " +
                    "yy array(row(a bigint, b varchar, c double, d row(d1 bigint, d2 double))), " +
                    "yyy array(row(a bigint, b varchar, c double, d row(d1 bigint, d2 double))), " +
                    "am array(map(int,row(a1 bigint, a2 double))), " +
                    "aa array(array(row(a1 bigint, a2 double))), " +
                    "z array(array(row(p bigint, e row(e1 bigint, e2 varchar)))))");
            // transform
            assertPushdownSubfields("SELECT TRANSFORM(y, x -> x.d.d1) FROM " + tableName, tableName,
                    ImmutableMap.of("y", toSubfields("y[*].d.d1")));

            // map_zip_with
            assertPushdownSubfields("SELECT MAP_ZIP_WITH(m1, m2, (k, v1, v2) -> v1.a1 + v2.a2) FROM " + tableName, tableName,
                    ImmutableMap.of("m1", toSubfields("m1[*].a1"), "m2", toSubfields("m2[*].a2")));

            // transform_values
            assertPushdownSubfields("SELECT TRANSFORM_VALUES(m1, (k, v) -> v.a1 * 1000) FROM " + tableName, tableName,
                    ImmutableMap.of("m1", toSubfields("m1[*].a1")));

            // transform
            assertPushdownSubfields("SELECT TRANSFORM(y, x -> ROW(x.a, x.d.d1)) FROM " + tableName, tableName,
                    ImmutableMap.of("y", toSubfields("y[*].a", "y[*].d.d1")));

            assertPushdownSubfields("SELECT TRANSFORM(r.a, x -> x.a1) FROM " + tableName, tableName,
                    ImmutableMap.of("r", toSubfields("r.a[*].a1")));

            // zip_with
            assertPushdownSubfields("SELECT ZIP_WITH(y, yy, (x, xx) -> ROW(x.a, xx.d.d1)) FROM " + tableName, tableName,
                    ImmutableMap.of("y", toSubfields("y[*].a"), "yy", toSubfields("yy[*].d.d1")));

            // functions that outputing all subfields and accept functional parameter

            //filter
            assertPushdownSubfields("SELECT FILTER(y, x -> x.a > 0) FROM " + tableName, tableName,
                    ImmutableMap.of("y", toSubfields()));

            // flatten
            assertPushdownSubfields("SELECT TRANSFORM(FLATTEN(z), x -> x.p) FROM " + tableName, tableName,
                    ImmutableMap.of("z", toSubfields("z[*][*].p")));

            // concat
            assertPushdownSubfields("SELECT TRANSFORM(y || yy,  x -> x.d.d1) FROM " + tableName, tableName,
                    ImmutableMap.of("y", toSubfields("y[*].d.d1"), "yy", toSubfields("yy[*].d.d1")));

            assertPushdownSubfields("SELECT TRANSFORM(CONCAT(y, yy)  ,  x -> x.d.d1) FROM " + tableName, tableName,
                    ImmutableMap.of("y", toSubfields("y[*].d.d1"), "yy", toSubfields("yy[*].d.d1")));

            assertPushdownSubfields("SELECT TRANSFORM(CONCAT(y, yy, yyy)  ,  x -> x.d.d1) FROM " + tableName, tableName,
                    ImmutableMap.of("y", toSubfields("y[*].d.d1"), "yy", toSubfields("yy[*].d.d1"), "yyy", toSubfields("yyy[*].d.d1")));

            assertPushdownSubfields("SELECT TRANSFORM(y, x -> COALESCE(x, row(1, '', 1.0, row(1, 1.0)))) FROM " + tableName, tableName,
                    ImmutableMap.of("y", toSubfields())); // 'coalesce' effectively includes the entire subfield to returned values

            // row_construction
            assertPushdownSubfields("SELECT TRANSFORM(y, x -> ROW(x)) FROM " + tableName, tableName,
                    ImmutableMap.of("y", toSubfields())); // entire struct of the element of 'y' was included to the output

            assertPushdownSubfields("SELECT ZIP_WITH(y, yy, (x, xx) -> ROW(x,xx)) FROM " + tableName, tableName,
                    ImmutableMap.of("y", toSubfields(), "yy", toSubfields())); // entire struct of the elements of 'y' and 'yy' was included to the output

            // switch
            assertPushdownSubfields("SELECT TRANSFORM(y, x -> CASE x WHEN row(1, '', 1.0, row(1, 1.0)) THEN true ELSE false END) FROM " + tableName, tableName,
                    ImmutableMap.of("y", toSubfields())); // entire struct of the element of 'y' was accessed

            // if
            assertPushdownSubfields("SELECT TRANSFORM(y, x -> IF(x = row(1, '', 1.0, row(1, 1.0)), 1, 0)) FROM " + tableName, tableName,
                    ImmutableMap.of("y", toSubfields())); // entire struct of the element of 'y' was accessed
        }
        finally {
            assertUpdate("DROP TABLE IF EXISTS " + tableName);
        }
    }

    @Test
    public void testPushdownSubfields()
    {
        assertUpdate("CREATE TABLE test_pushdown_struct_subfields(id bigint, x row(a bigint, b varchar, c double, d row(d1 bigint, d2 double)), y array(row(a bigint, b varchar, c double, d row(d1 bigint, d2 double))))");

        assertPushdownSubfields("SELECT t.a, t.d.d1, x.a FROM test_pushdown_struct_subfields CROSS JOIN UNNEST(y) as t(a, b, c, d)", "test_pushdown_struct_subfields",
                ImmutableMap.of("x", toSubfields("x.a"), "y", toSubfields("y[*].a", "y[*].d.d1")));

        assertPushdownSubfields("SELECT x.a, mod(x.d.d1, 2) FROM test_pushdown_struct_subfields", "test_pushdown_struct_subfields",
                ImmutableMap.of("x", toSubfields("x.a", "x.d.d1")));

        assertPushdownSubfields("SELECT x.d, mod(x.d.d1, 2), x.d.d2 FROM test_pushdown_struct_subfields", "test_pushdown_struct_subfields",
                ImmutableMap.of("x", toSubfields("x.d")));

        assertPushdownSubfields("SELECT x.a FROM test_pushdown_struct_subfields WHERE x.b LIKE 'abc%'", "test_pushdown_struct_subfields",
                ImmutableMap.of("x", toSubfields("x.a", "x.b")));

        assertPushdownSubfields("SELECT x.a FROM test_pushdown_struct_subfields WHERE x.a > 10 AND x.b LIKE 'abc%'", "test_pushdown_struct_subfields",
                ImmutableMap.of("x", toSubfields("x.a", "x.b")));

        assertQuery("SELECT struct.b FROM (SELECT CUSTOM_STRUCT_WITH_PASSTHROUGH(x) AS struct FROM test_pushdown_struct_subfields)");
        assertQuery("SELECT struct.b FROM (SELECT CUSTOM_STRUCT_WITHOUT_PASSTHROUGH(x) AS struct FROM test_pushdown_struct_subfields)");
        assertQuery("SELECT struct FROM (SELECT CUSTOM_STRUCT_WITH_PASSTHROUGH(x) AS struct FROM test_pushdown_struct_subfields)");
        assertQuery("SELECT struct FROM (SELECT CUSTOM_STRUCT_WITHOUT_PASSTHROUGH(x) AS struct FROM test_pushdown_struct_subfields)");

        assertPushdownSubfields("SELECT struct.b FROM (SELECT x AS struct FROM test_pushdown_struct_subfields)", "test_pushdown_struct_subfields",
                ImmutableMap.of("x", toSubfields("x.b")));
        assertPushdownSubfields("SELECT struct.b FROM (SELECT CUSTOM_STRUCT_WITH_PASSTHROUGH(x) AS struct FROM test_pushdown_struct_subfields)", "test_pushdown_struct_subfields",
                ImmutableMap.of("x", toSubfields("x.b")));
        assertPushdownSubfields("SELECT struct.b FROM (SELECT CUSTOM_STRUCT_WITHOUT_PASSTHROUGH(x) AS struct FROM test_pushdown_struct_subfields)", "test_pushdown_struct_subfields",
                ImmutableMap.of());

        assertPushdownSubfields("SELECT struct FROM (SELECT x AS struct FROM test_pushdown_struct_subfields)", "test_pushdown_struct_subfields",
                ImmutableMap.of());
        assertPushdownSubfields("SELECT struct FROM (SELECT CUSTOM_STRUCT_WITH_PASSTHROUGH(x) AS struct FROM test_pushdown_struct_subfields)", "test_pushdown_struct_subfields",
                ImmutableMap.of());
        assertPushdownSubfields("SELECT struct FROM (SELECT CUSTOM_STRUCT_WITHOUT_PASSTHROUGH(x) AS struct FROM test_pushdown_struct_subfields)", "test_pushdown_struct_subfields",
                ImmutableMap.of());

        // Join
        assertPlan("SELECT l.orderkey, x.a, mod(x.d.d1, 2) FROM lineitem l, test_pushdown_struct_subfields a WHERE l.linenumber = a.id",
                anyTree(
                        node(JoinNode.class,
                                anyTree(tableScan("lineitem", ImmutableMap.of())),
                                anyTree(tableScan("test_pushdown_struct_subfields", ImmutableMap.of("x", toSubfields("x.a", "x.d.d1")))))));

        assertPlan("SELECT l.orderkey, x.a, mod(x.d.d1, 2) FROM lineitem l, test_pushdown_struct_subfields a WHERE l.linenumber = a.id AND x.a > 10",
                anyTree(
                        node(JoinNode.class,
                                anyTree(tableScan("lineitem", ImmutableMap.of())),
                                anyTree(tableScan("test_pushdown_struct_subfields", ImmutableMap.of("x", toSubfields("x.a", "x.d.d1")))))));

        // Aggregation
        assertPushdownSubfields("SELECT id, min(x.a) FROM test_pushdown_struct_subfields GROUP BY 1", "test_pushdown_struct_subfields",
                ImmutableMap.of("x", toSubfields("x.a")));

        assertPushdownSubfields("SELECT id, min(mod(x.a, 3)) FROM test_pushdown_struct_subfields GROUP BY 1", "test_pushdown_struct_subfields",
                ImmutableMap.of("x", toSubfields("x.a")));

        assertPushdownSubfields("SELECT id, min(x.a) FILTER (WHERE x.b LIKE 'abc%') FROM test_pushdown_struct_subfields GROUP BY 1", "test_pushdown_struct_subfields",
                ImmutableMap.of("x", toSubfields("x.a", "x.b")));

        assertPushdownSubfields("SELECT id, min(x.a + length(y[2].b)) * avg(x.d.d1) FROM test_pushdown_struct_subfields GROUP BY 1", "test_pushdown_struct_subfields",
                ImmutableMap.of("x", toSubfields("x.a", "x.d.d1"), "y", toSubfields("y[2].b")));

        assertPushdownSubfields("SELECT id, arbitrary(x.a) FROM test_pushdown_struct_subfields GROUP BY 1", "test_pushdown_struct_subfields",
                ImmutableMap.of("x", toSubfields("x.a")));

        assertPushdownSubfields("SELECT id, arbitrary(x).a FROM test_pushdown_struct_subfields GROUP BY 1", "test_pushdown_struct_subfields",
                ImmutableMap.of("x", toSubfields("x.a")));

        assertPushdownSubfields("SELECT id, arbitrary(x).d.d1 FROM test_pushdown_struct_subfields GROUP BY 1", "test_pushdown_struct_subfields",
                ImmutableMap.of("x", toSubfields("x.d.d1")));

        assertPushdownSubfields("SELECT id, arbitrary(x.d).d1 FROM test_pushdown_struct_subfields GROUP BY 1", "test_pushdown_struct_subfields",
                ImmutableMap.of("x", toSubfields("x.d.d1")));

        assertPushdownSubfields("SELECT id, arbitrary(x.d.d2) FROM test_pushdown_struct_subfields GROUP BY 1", "test_pushdown_struct_subfields",
                ImmutableMap.of("x", toSubfields("x.d.d2")));

        // Unnest
        assertPushdownSubfields("SELECT t.a, t.d.d1, x.a FROM test_pushdown_struct_subfields CROSS JOIN UNNEST(y) as t(a, b, c, d)", "test_pushdown_struct_subfields",
                ImmutableMap.of("x", toSubfields("x.a"), "y", toSubfields("y[*].a", "y[*].d.d1")));

        assertPushdownSubfields("SELECT t.*, x.a FROM test_pushdown_struct_subfields CROSS JOIN UNNEST(y) as t(a, b, c, d)", "test_pushdown_struct_subfields",
                ImmutableMap.of("x", toSubfields("x.a"), "y", toSubfields("y[*].a", "y[*].b", "y[*].c", "y[*].d")));

        assertPushdownSubfields("SELECT id, x.a FROM test_pushdown_struct_subfields CROSS JOIN UNNEST(y) as t(a, b, c, d)", "test_pushdown_struct_subfields",
                ImmutableMap.of("x", toSubfields("x.a")));

        // Legacy unnest
        Session legacyUnnest = Session.builder(getSession()).setSystemProperty("legacy_unnest", "true").build();
        assertPushdownSubfields(legacyUnnest, "SELECT t.y.a, t.y.d.d1, x.a FROM test_pushdown_struct_subfields CROSS JOIN UNNEST(y) as t(y)", "test_pushdown_struct_subfields",
                ImmutableMap.of("x", toSubfields("x.a"), "y", toSubfields("y[*].a", "y[*].d.d1")));

        assertPushdownSubfields(legacyUnnest, "SELECT t.*, x.a FROM test_pushdown_struct_subfields CROSS JOIN UNNEST(y) as t(y)", "test_pushdown_struct_subfields",
                ImmutableMap.of("x", toSubfields("x.a")));

        assertPushdownSubfields(legacyUnnest, "SELECT id, x.a FROM test_pushdown_struct_subfields CROSS JOIN UNNEST(y) as t(y)", "test_pushdown_struct_subfields",
                ImmutableMap.of("x", toSubfields("x.a")));

        // Case sensitivity
        assertPushdownSubfields("SELECT x.a, x.b, x.A + 2 FROM test_pushdown_struct_subfields WHERE x.B LIKE 'abc%'", "test_pushdown_struct_subfields",
                ImmutableMap.of("x", toSubfields("x.a", "x.b")));

        // No pass-through subfield pruning
        assertPushdownSubfields("SELECT id, min(x.d).d1 FROM test_pushdown_struct_subfields GROUP BY 1", "test_pushdown_struct_subfields",
                ImmutableMap.of("x", toSubfields("x.d")));

        assertPushdownSubfields("SELECT id, min(x.d).d1, min(x.d.d2) FROM test_pushdown_struct_subfields GROUP BY 1", "test_pushdown_struct_subfields",
                ImmutableMap.of("x", toSubfields("x.d")));

        assertUpdate("DROP TABLE test_pushdown_struct_subfields");
    }

    @Test
    public void testPushdownNegativeSubfiels()
    {
        assertUpdate("CREATE TABLE test_pushdown_subfields_negative_key(id bigint, arr array(bigint), mp map(integer, varchar))");
        assertPushdownSubfields("select element_at(arr, -1) from test_pushdown_subfields_negative_key", "test_pushdown_subfields_negative_key", ImmutableMap.of("arr", toSubfields()));
        assertPushdownSubfields("select element_at(mp, -1) from test_pushdown_subfields_negative_key", "test_pushdown_subfields_negative_key", ImmutableMap.of("mp", toSubfields("mp[-1]")));
        assertPushdownSubfields("select element_at(arr, -1), element_at(arr, 2) from test_pushdown_subfields_negative_key", "test_pushdown_subfields_negative_key", ImmutableMap.of("arr", toSubfields()));
        assertPushdownSubfields("select element_at(mp, -1), element_at(mp, 2) from test_pushdown_subfields_negative_key", "test_pushdown_subfields_negative_key", ImmutableMap.of("mp", toSubfields("mp[-1]", "mp[2]")));

        assertUpdate("DROP TABLE test_pushdown_subfields_negative_key");
    }

    @Test
    public void testPushdownSubfieldsForMapSubset()
    {
        Session mapSubset = Session.builder(getSession()).setSystemProperty(PUSHDOWN_SUBFIELDS_FOR_MAP_FUNCTIONS, "true").build();
        assertUpdate("CREATE TABLE test_pushdown_map_subfields(id integer, x map(integer, double))");
        assertPushdownSubfields(mapSubset, "SELECT t.id, map_subset(x, array[1, 2, 3]) FROM test_pushdown_map_subfields t", "test_pushdown_map_subfields",
                ImmutableMap.of("x", toSubfields("x[1]", "x[2]", "x[3]")));
        assertPushdownSubfields(mapSubset, "SELECT t.id, map_subset(x, array[-1, -2, 3]) FROM test_pushdown_map_subfields t", "test_pushdown_map_subfields",
                ImmutableMap.of("x", toSubfields("x[-1]", "x[-2]", "x[3]")));
        assertPushdownSubfields(mapSubset, "SELECT t.id, map_subset(x, array[1, 2, null]) FROM test_pushdown_map_subfields t", "test_pushdown_map_subfields",
                ImmutableMap.of("x", toSubfields()));
        assertPushdownSubfields(mapSubset, "SELECT t.id, map_subset(x, array[1, 2, 3, id]) FROM test_pushdown_map_subfields t", "test_pushdown_map_subfields",
                ImmutableMap.of("x", toSubfields()));
        assertPushdownSubfields(mapSubset, "SELECT t.id, map_subset(x, array[id]) FROM test_pushdown_map_subfields t", "test_pushdown_map_subfields",
                ImmutableMap.of("x", toSubfields()));
        assertUpdate("DROP TABLE test_pushdown_map_subfields");

        assertUpdate("CREATE TABLE test_pushdown_map_subfields(id integer, x array(map(integer, double)))");
        assertPushdownSubfields(mapSubset, "SELECT t.id, transform(x, mp -> map_subset(mp, array[1, 2, 3])) FROM test_pushdown_map_subfields t", "test_pushdown_map_subfields",
                ImmutableMap.of("x", toSubfields("x[*][1]", "x[*][2]", "x[*][3]")));
        assertPushdownSubfields(mapSubset, "SELECT t.id, transform(x, mp -> map_subset(mp, array[1, 2, null])) FROM test_pushdown_map_subfields t", "test_pushdown_map_subfields",
                ImmutableMap.of("x", toSubfields()));
        assertPushdownSubfields(mapSubset, "SELECT t.id, transform(x, mp -> map_subset(mp, array[1, 2, id])) FROM test_pushdown_map_subfields t", "test_pushdown_map_subfields",
                ImmutableMap.of("x", toSubfields()));
        assertUpdate("DROP TABLE test_pushdown_map_subfields");

        assertUpdate("CREATE TABLE test_pushdown_map_subfields(id varchar, x map(varchar, double))");
        assertPushdownSubfields(mapSubset, "SELECT t.id, map_subset(x, array['ab', 'c', 'd']) FROM test_pushdown_map_subfields t", "test_pushdown_map_subfields",
                ImmutableMap.of("x", toSubfields("x[\"ab\"]", "x[\"c\"]", "x[\"d\"]")));
        assertPushdownSubfields(mapSubset, "SELECT t.id, map_subset(x, array['ab', 'c', null]) FROM test_pushdown_map_subfields t", "test_pushdown_map_subfields",
                ImmutableMap.of("x", toSubfields()));
        assertPushdownSubfields(mapSubset, "SELECT t.id, map_subset(x, array['ab', 'c', 'd', id]) FROM test_pushdown_map_subfields t", "test_pushdown_map_subfields",
                ImmutableMap.of("x", toSubfields()));
        assertPushdownSubfields(mapSubset, "SELECT t.id, map_subset(x, array[id]) FROM test_pushdown_map_subfields t", "test_pushdown_map_subfields",
                ImmutableMap.of("x", toSubfields()));
        assertUpdate("DROP TABLE test_pushdown_map_subfields");
    }

    @Test
    public void testPushdownSubfieldsForMapFilter()
    {
        Session mapSubset = Session.builder(getSession()).setSystemProperty(PUSHDOWN_SUBFIELDS_FOR_MAP_FUNCTIONS, "true").build();
        assertUpdate("CREATE TABLE test_pushdown_map_subfields(id integer, x map(integer, double))");
        assertPushdownSubfields(mapSubset, "SELECT t.id, map_filter(x, (k, v) -> k in (1, 2, 3)) FROM test_pushdown_map_subfields t", "test_pushdown_map_subfields",
                ImmutableMap.of("x", toSubfields("x[1]", "x[2]", "x[3]")));
        assertPushdownSubfields(mapSubset, "SELECT t.id, map_filter(x, (k, v) -> v in (1, 2, 3)) FROM test_pushdown_map_subfields t", "test_pushdown_map_subfields",
                ImmutableMap.of("x", toSubfields()));
        assertPushdownSubfields(mapSubset, "SELECT t.id, map_filter(x, (k, v) -> contains(array[1, 2, 3], k)) FROM test_pushdown_map_subfields t", "test_pushdown_map_subfields",
                ImmutableMap.of("x", toSubfields("x[1]", "x[2]", "x[3]")));
        assertPushdownSubfields(mapSubset, "SELECT t.id, map_filter(x, (k, v) -> contains(array[1, 2, 3], v)) FROM test_pushdown_map_subfields t", "test_pushdown_map_subfields",
                ImmutableMap.of("x", toSubfields()));
        assertPushdownSubfields(mapSubset, "SELECT t.id, map_filter(x, (k, v) -> k = 1) FROM test_pushdown_map_subfields t", "test_pushdown_map_subfields",
                ImmutableMap.of("x", toSubfields("x[1]")));
        assertPushdownSubfields(mapSubset, "SELECT t.id, map_filter(x, (k, v) -> v = 1) FROM test_pushdown_map_subfields t", "test_pushdown_map_subfields",
                ImmutableMap.of("x", toSubfields()));
        assertPushdownSubfields(mapSubset, "SELECT t.id, map_filter(x, (k, v) -> 1 = k) FROM test_pushdown_map_subfields t", "test_pushdown_map_subfields",
                ImmutableMap.of("x", toSubfields("x[1]")));
        assertPushdownSubfields(mapSubset, "SELECT t.id, map_filter(x, (k, v) -> 1 = v) FROM test_pushdown_map_subfields t", "test_pushdown_map_subfields",
                ImmutableMap.of("x", toSubfields()));
        assertPushdownSubfields(mapSubset, "SELECT t.id, map_filter(x, (k, v) -> k in (-1, -2, 3)) FROM test_pushdown_map_subfields t", "test_pushdown_map_subfields",
                ImmutableMap.of("x", toSubfields("x[-1]", "x[-2]", "x[3]")));
        assertPushdownSubfields(mapSubset, "SELECT t.id, map_filter(x, (k, v) -> contains(array[-1, -2, 3], k)) FROM test_pushdown_map_subfields t", "test_pushdown_map_subfields",
                ImmutableMap.of("x", toSubfields("x[-1]", "x[-2]", "x[3]")));
        assertPushdownSubfields(mapSubset, "SELECT t.id, map_filter(x, (k, v) -> k=-2) FROM test_pushdown_map_subfields t", "test_pushdown_map_subfields",
                ImmutableMap.of("x", toSubfields("x[-2]")));
        assertPushdownSubfields(mapSubset, "SELECT t.id, map_filter(x, (k, v) -> -2=k) FROM test_pushdown_map_subfields t", "test_pushdown_map_subfields",
                ImmutableMap.of("x", toSubfields("x[-2]")));
        assertPushdownSubfields(mapSubset, "SELECT t.id, map_filter(x, (k, v) -> k in (1, 2, null)) FROM test_pushdown_map_subfields t", "test_pushdown_map_subfields",
                ImmutableMap.of("x", toSubfields()));
        assertPushdownSubfields(mapSubset, "SELECT t.id, map_filter(x, (k, v) -> k is null) FROM test_pushdown_map_subfields t", "test_pushdown_map_subfields",
                ImmutableMap.of("x", toSubfields()));
        assertPushdownSubfields(mapSubset, "SELECT t.id, map_filter(x, (k, v) -> contains(array[1, 2, null], k)) FROM test_pushdown_map_subfields t", "test_pushdown_map_subfields",
                ImmutableMap.of("x", toSubfields()));
        assertPushdownSubfields(mapSubset, "SELECT t.id, map_filter(x, (k, v) -> k in (1, 2, 3, id)) FROM test_pushdown_map_subfields t", "test_pushdown_map_subfields",
                ImmutableMap.of("x", toSubfields()));
        assertPushdownSubfields(mapSubset, "SELECT t.id, map_filter(x, (k, v) -> contains(array[1, 2, 3, id], k)) FROM test_pushdown_map_subfields t", "test_pushdown_map_subfields",
                ImmutableMap.of("x", toSubfields()));
        assertPushdownSubfields(mapSubset, "SELECT t.id, map_filter(x, (k, v) -> k = id) FROM test_pushdown_map_subfields t", "test_pushdown_map_subfields",
                ImmutableMap.of("x", toSubfields()));
        assertPushdownSubfields(mapSubset, "SELECT t.id, map_filter(x, (k, v) -> id = k) FROM test_pushdown_map_subfields t", "test_pushdown_map_subfields",
                ImmutableMap.of("x", toSubfields()));
        assertPushdownSubfields(mapSubset, "SELECT t.id, map_filter(x, (k, v) -> k in (id)) FROM test_pushdown_map_subfields t", "test_pushdown_map_subfields",
                ImmutableMap.of("x", toSubfields()));
        assertPushdownSubfields(mapSubset, "SELECT t.id, map_filter(x, (k, v) -> contains(array[id], k)) FROM test_pushdown_map_subfields t", "test_pushdown_map_subfields",
                ImmutableMap.of("x", toSubfields()));
        assertUpdate("DROP TABLE test_pushdown_map_subfields");

        assertUpdate("CREATE TABLE test_pushdown_map_subfields(id integer, x array(map(integer, double)))");
        assertPushdownSubfields(mapSubset, "SELECT t.id, transform(x, mp -> map_filter(mp, (k, v) -> k in (1, 2, 3))) FROM test_pushdown_map_subfields t", "test_pushdown_map_subfields",
                ImmutableMap.of("x", toSubfields("x[*][1]", "x[*][2]", "x[*][3]")));
        assertPushdownSubfields(mapSubset, "SELECT t.id, transform(x, mp -> map_filter(mp, (k, v) -> v in (1, 2, 3))) FROM test_pushdown_map_subfields t", "test_pushdown_map_subfields",
                ImmutableMap.of("x", toSubfields()));
        assertPushdownSubfields(mapSubset, "SELECT t.id, transform(x, mp -> map_filter(mp, (k, v) -> contains(array[1, 2, 3], k))) FROM test_pushdown_map_subfields t", "test_pushdown_map_subfields",
                ImmutableMap.of("x", toSubfields("x[*][1]", "x[*][2]", "x[*][3]")));
        assertPushdownSubfields(mapSubset, "SELECT t.id, transform(x, mp -> map_filter(mp, (k, v) -> contains(array[1, 2, 3], v))) FROM test_pushdown_map_subfields t", "test_pushdown_map_subfields",
                ImmutableMap.of("x", toSubfields()));
        assertPushdownSubfields(mapSubset, "SELECT t.id, transform(x, mp -> map_filter(mp, (k, v) -> k=2)) FROM test_pushdown_map_subfields t", "test_pushdown_map_subfields",
                ImmutableMap.of("x", toSubfields("x[*][2]")));
        assertPushdownSubfields(mapSubset, "SELECT t.id, transform(x, mp -> map_filter(mp, (k, v) -> v=2)) FROM test_pushdown_map_subfields t", "test_pushdown_map_subfields",
                ImmutableMap.of("x", toSubfields()));
        assertPushdownSubfields(mapSubset, "SELECT t.id, transform(x, mp -> map_filter(mp, (k, v) -> 2=k)) FROM test_pushdown_map_subfields t", "test_pushdown_map_subfields",
                ImmutableMap.of("x", toSubfields("x[*][2]")));
        assertPushdownSubfields(mapSubset, "SELECT t.id, transform(x, mp -> map_filter(mp, (k, v) -> 2=v)) FROM test_pushdown_map_subfields t", "test_pushdown_map_subfields",
                ImmutableMap.of("x", toSubfields()));
        assertPushdownSubfields(mapSubset, "SELECT t.id, transform(x, mp -> map_filter(mp, (k, v) -> k in (1, 2, null))) FROM test_pushdown_map_subfields t", "test_pushdown_map_subfields",
                ImmutableMap.of("x", toSubfields()));
        assertPushdownSubfields(mapSubset, "SELECT t.id, transform(x, mp -> map_filter(mp, (k, v) -> contains(array[1, 2, null], k))) FROM test_pushdown_map_subfields t", "test_pushdown_map_subfields",
                ImmutableMap.of("x", toSubfields()));
        assertPushdownSubfields(mapSubset, "SELECT t.id, transform(x, mp -> map_filter(mp, (k, v) -> k in (1, 2, id))) FROM test_pushdown_map_subfields t", "test_pushdown_map_subfields",
                ImmutableMap.of("x", toSubfields()));
        assertPushdownSubfields(mapSubset, "SELECT t.id, transform(x, mp -> map_filter(mp, (k, v) -> k=id)) FROM test_pushdown_map_subfields t", "test_pushdown_map_subfields",
                ImmutableMap.of("x", toSubfields()));
        assertPushdownSubfields(mapSubset, "SELECT t.id, transform(x, mp -> map_filter(mp, (k, v) -> id=k)) FROM test_pushdown_map_subfields t", "test_pushdown_map_subfields",
                ImmutableMap.of("x", toSubfields()));
        assertPushdownSubfields(mapSubset, "SELECT t.id, transform(x, mp -> map_filter(mp, (k, v) -> contains(array[1, 2, id], k))) FROM test_pushdown_map_subfields t", "test_pushdown_map_subfields",
                ImmutableMap.of("x", toSubfields()));
        assertUpdate("DROP TABLE test_pushdown_map_subfields");

        assertUpdate("CREATE TABLE test_pushdown_map_subfields(id varchar, x map(varchar, varchar))");
        assertPushdownSubfields(mapSubset, "SELECT t.id, map_filter(x, (k, v) -> k in ('ab', 'c', 'd')) FROM test_pushdown_map_subfields t", "test_pushdown_map_subfields",
                ImmutableMap.of("x", toSubfields("x[\"ab\"]", "x[\"c\"]", "x[\"d\"]")));
        assertPushdownSubfields(mapSubset, "SELECT t.id, map_filter(x, (k, v) -> contains(array['ab', 'c', 'd'], k)) FROM test_pushdown_map_subfields t", "test_pushdown_map_subfields",
                ImmutableMap.of("x", toSubfields("x[\"ab\"]", "x[\"c\"]", "x[\"d\"]")));
        assertPushdownSubfields(mapSubset, "SELECT t.id, map_filter(x, (k, v) -> k='d') FROM test_pushdown_map_subfields t", "test_pushdown_map_subfields",
                ImmutableMap.of("x", toSubfields("x[\"d\"]")));
        assertPushdownSubfields(mapSubset, "SELECT t.id, map_filter(x, (k, v) -> 'd'=k) FROM test_pushdown_map_subfields t", "test_pushdown_map_subfields",
                ImmutableMap.of("x", toSubfields("x[\"d\"]")));
        assertPushdownSubfields(mapSubset, "SELECT t.id, map_filter(x, (k, v) -> v in ('ab', 'c', 'd')) FROM test_pushdown_map_subfields t", "test_pushdown_map_subfields",
                ImmutableMap.of("x", toSubfields()));
        assertPushdownSubfields(mapSubset, "SELECT t.id, map_filter(x, (k, v) -> contains(array['ab', 'c', 'd'], v)) FROM test_pushdown_map_subfields t", "test_pushdown_map_subfields",
                ImmutableMap.of("x", toSubfields()));
        assertPushdownSubfields(mapSubset, "SELECT t.id, map_filter(x, (k, v) -> v='d') FROM test_pushdown_map_subfields t", "test_pushdown_map_subfields",
                ImmutableMap.of("x", toSubfields()));
        assertPushdownSubfields(mapSubset, "SELECT t.id, map_filter(x, (k, v) -> 'd'=v) FROM test_pushdown_map_subfields t", "test_pushdown_map_subfields",
                ImmutableMap.of("x", toSubfields()));
        assertPushdownSubfields(mapSubset, "SELECT t.id, map_filter(x, (k, v) -> k in ('ab', 'c', null)) FROM test_pushdown_map_subfields t", "test_pushdown_map_subfields",
                ImmutableMap.of("x", toSubfields()));
        assertPushdownSubfields(mapSubset, "SELECT t.id, map_filter(x, (k, v) -> contains(array['ab', 'c', null], k)) FROM test_pushdown_map_subfields t", "test_pushdown_map_subfields",
                ImmutableMap.of("x", toSubfields()));
        assertPushdownSubfields(mapSubset, "SELECT t.id, map_filter(x, (k, v) -> k in ('ab', 'c', 'd', id)) FROM test_pushdown_map_subfields t", "test_pushdown_map_subfields",
                ImmutableMap.of("x", toSubfields()));
        assertPushdownSubfields(mapSubset, "SELECT t.id, map_filter(x, (k, v) -> k = id) FROM test_pushdown_map_subfields t", "test_pushdown_map_subfields",
                ImmutableMap.of("x", toSubfields()));
        assertPushdownSubfields(mapSubset, "SELECT t.id, map_filter(x, (k, v) -> id = k) FROM test_pushdown_map_subfields t", "test_pushdown_map_subfields",
                ImmutableMap.of("x", toSubfields()));
        assertPushdownSubfields(mapSubset, "SELECT t.id, map_filter(x, (k, v) -> contains(array['ab', 'c', 'd', id], k)) FROM test_pushdown_map_subfields t", "test_pushdown_map_subfields",
                ImmutableMap.of("x", toSubfields()));
        assertPushdownSubfields(mapSubset, "SELECT t.id, map_filter(x, (k, v) -> k in (id)) FROM test_pushdown_map_subfields t", "test_pushdown_map_subfields",
                ImmutableMap.of("x", toSubfields()));
        assertPushdownSubfields(mapSubset, "SELECT t.id, map_filter(x, (k, v) -> contains(array[id], k)) FROM test_pushdown_map_subfields t", "test_pushdown_map_subfields",
                ImmutableMap.of("x", toSubfields()));
        assertUpdate("DROP TABLE test_pushdown_map_subfields");
    }

    @Test
    public void testPushdownSubfieldsAssorted()
    {
        assertUpdate("CREATE TABLE test_pushdown_subfields(" +
                "id bigint, " +
                "a array(bigint), " +
                "b map(bigint, bigint), " +
                "c map(varchar, bigint), " +
                "d row(d1 bigint, d2 array(bigint), d3 map(bigint, bigint), d4 row(x double, y double)), " +
                "w array(array(row(p bigint, e row(e1 bigint, e2 varchar)))), " +
                "x row(a bigint, b varchar, c double, d row(d1 bigint, d2 double)), " +
                "y array(row(a bigint, b varchar, c double, d row(d1 bigint, d2 double))), " +
                "z row(a bigint, b varchar, c double))");

        assertPushdownSubfields("SELECT id, a[1], mod(a[2], 3), b[10], c['cat'] + c['dog'], d.d1 * d.d2[5] / d.d3[2], d.d4.x FROM test_pushdown_subfields", "test_pushdown_subfields",
                ImmutableMap.of(
                        "a", toSubfields("a[1]", "a[2]"),
                        "b", toSubfields("b[10]"),
                        "c", toSubfields("c[\"cat\"]", "c[\"dog\"]"),
                        "d", toSubfields("d.d1", "d.d2[5]", "d.d3[2]", "d.d4.x")));

        assertPushdownSubfields("SELECT count(*) FROM test_pushdown_subfields WHERE a[1] > a[2] AND b[1] * c['cat'] = 5 AND d.d4.x IS NULL", "test_pushdown_subfields",
                ImmutableMap.of(
                        "a", toSubfields("a[1]", "a[2]"),
                        "b", toSubfields("b[1]"),
                        "c", toSubfields("c[\"cat\"]"),
                        "d", toSubfields("d.d4.x")));

        assertPushdownSubfields("SELECT a[1], cardinality(b), map_keys(c), k, v, d.d3[5] FROM test_pushdown_subfields CROSS JOIN UNNEST(c) as t(k, v)", "test_pushdown_subfields",
                ImmutableMap.of(
                        "a", toSubfields("a[1]"),
                        "d", toSubfields("d.d3[5]")));

        // Subfield pruning should pass-through arbitrary() function
        assertPushdownSubfields("SELECT id, " +
                        "arbitrary(x.a), " +
                        "arbitrary(x).a, " +
                        "arbitrary(x).d.d1, " +
                        "arbitrary(x.d).d1, " +
                        "arbitrary(x.d.d2), " +
                        "arbitrary(y[1]).a, " +
                        "arbitrary(y[1]).d.d1, " +
                        "arbitrary(y[2]).d.d1, " +
                        "arbitrary(y[3].d.d1), " +
                        "arbitrary(z).c, " +
                        "arbitrary(w[1][2]).e.e1, " +
                        "arbitrary(w[2][3].e.e2) " +
                        "FROM test_pushdown_subfields " +
                        "GROUP BY 1", "test_pushdown_subfields",
                ImmutableMap.of("x", toSubfields("x.a", "x.d.d1", "x.d.d2"),
                        "y", toSubfields("y[1].a", "y[1].d.d1", "y[2].d.d1", "y[3].d.d1"),
                        "z", toSubfields("z.c"),
                        "w", toSubfields("w[1][2].e.e1", "w[2][3].e.e2")));

        // Subfield pruning should not pass-through other aggregate functions e.g. min() function
        assertPushdownSubfields("SELECT id, " +
                        "min(x.d).d1, " +
                        "min(x.d.d2), " +
                        "min(z).c, " +
                        "min(z.b), " +
                        "min(y[1]).a, " +
                        "min(y[1]).d.d1, " +
                        "min(y[2].d.d1), " +
                        "min(w[1][2]).e.e1, " +
                        "min(w[2][3].e.e2) " +
                        "FROM test_pushdown_subfields " +
                        "GROUP BY 1", "test_pushdown_subfields",
                ImmutableMap.of("x", toSubfields("x.d"),
                        "y", toSubfields("y[1]", "y[2].d.d1"),
                        "w", toSubfields("w[1][2]", "w[2][3].e.e2")));

        assertUpdate("DROP TABLE test_pushdown_subfields");
    }

    @Test
    public void testPushdownFilterAndSubfields()
    {
        assertUpdate("CREATE TABLE test_pushdown_filter_and_subscripts(id bigint, a array(bigint), b array(array(varchar)))");

        Session pushdownFilterEnabled = Session.builder(getQueryRunner().getDefaultSession())
                .setCatalogSessionProperty(HIVE_CATALOG, PUSHDOWN_FILTER_ENABLED, "true")
                .build();

        assertPushdownSubfields("SELECT a[1] FROM test_pushdown_filter_and_subscripts WHERE a[2] > 10", "test_pushdown_filter_and_subscripts",
                ImmutableMap.of("a", toSubfields("a[1]", "a[2]")));

        assertPushdownSubfields(pushdownFilterEnabled, "SELECT a[1] FROM test_pushdown_filter_and_subscripts WHERE a[2] > 10", "test_pushdown_filter_and_subscripts",
                ImmutableMap.of("a", toSubfields("a[1]")));

        assertUpdate("DROP TABLE test_pushdown_filter_and_subscripts");
    }

    @Test
    public void testVirtualBucketing()
    {
        try {
            assertUpdate("CREATE TABLE test_virtual_bucket(a bigint, b bigint)");
            Session virtualBucketEnabled = Session.builder(getSession())
                    .setCatalogSessionProperty(HIVE_CATALOG, "virtual_bucket_count", "2")
                    .build();

            assertPlan(
                    virtualBucketEnabled,
                    "SELECT COUNT(DISTINCT(\"$path\")) FROM test_virtual_bucket",
                    anyTree(
                            exchange(REMOTE_STREAMING, GATHER, anyTree(
                                    tableScan("test_virtual_bucket", ImmutableMap.of())))),
                    assertRemoteExchangesCount(1, getSession(), (DistributedQueryRunner) getQueryRunner()));
        }
        finally {
            assertUpdate("DROP TABLE IF EXISTS test_virtual_bucket");
        }
    }

    // Make sure subfield pruning doesn't interfere with cost-based optimizer
    @Test
    public void testPushdownSubfieldsAndJoinReordering()
    {
        Session collectStatistics = Session.builder(getSession())
                .setCatalogSessionProperty(HIVE_CATALOG, COLLECT_COLUMN_STATISTICS_ON_WRITE, "true")
                .build();

        getQueryRunner().execute(collectStatistics, "CREATE TABLE orders_ex AS SELECT orderkey, custkey, array[custkey] as keys FROM orders");

        try {
            Session joinReorderingOn = Session.builder(pushdownFilterEnabled())
                    .setSystemProperty(JOIN_REORDERING_STRATEGY, FeaturesConfig.JoinReorderingStrategy.AUTOMATIC.name())
                    .build();

            Session joinReorderingOff = Session.builder(pushdownFilterEnabled())
                    .setSystemProperty(JOIN_REORDERING_STRATEGY, FeaturesConfig.JoinReorderingStrategy.ELIMINATE_CROSS_JOINS.name())
                    .build();

            assertPlan(joinReorderingOff, "SELECT sum(custkey) FROM orders_ex o, lineitem l WHERE o.orderkey = l.orderkey",
                    anyTree(join(INNER, ImmutableList.of(equiJoinClause("o_orderkey", "l_orderkey")),
                            anyTree(PlanMatchPattern.tableScan("orders_ex", ImmutableMap.of("o_orderkey", "orderkey"))),
                            anyTree(PlanMatchPattern.tableScan("lineitem", ImmutableMap.of("l_orderkey", "orderkey"))))));

            assertPlan(joinReorderingOff, "SELECT sum(keys[1]) FROM orders_ex o, lineitem l WHERE o.orderkey = l.orderkey",
                    anyTree(join(INNER, ImmutableList.of(equiJoinClause("o_orderkey", "l_orderkey")),
                            anyTree(PlanMatchPattern.tableScan("orders_ex", ImmutableMap.of("o_orderkey", "orderkey"))),
                            anyTree(PlanMatchPattern.tableScan("lineitem", ImmutableMap.of("l_orderkey", "orderkey"))))));

            assertPlan(joinReorderingOn, "SELECT sum(custkey) FROM orders_ex o, lineitem l WHERE o.orderkey = l.orderkey",
                    anyTree(join(INNER, ImmutableList.of(equiJoinClause("l_orderkey", "o_orderkey")),
                            anyTree(PlanMatchPattern.tableScan("lineitem", ImmutableMap.of("l_orderkey", "orderkey"))),
                            anyTree(PlanMatchPattern.tableScan("orders_ex", ImmutableMap.of("o_orderkey", "orderkey"))))));

            assertPlan(joinReorderingOn, "SELECT sum(keys[1]) FROM orders_ex o, lineitem l WHERE o.orderkey = l.orderkey",
                    anyTree(join(INNER, ImmutableList.of(equiJoinClause("l_orderkey", "o_orderkey")),
                            anyTree(PlanMatchPattern.tableScan("lineitem", ImmutableMap.of("l_orderkey", "orderkey"))),
                            anyTree(PlanMatchPattern.tableScan("orders_ex", ImmutableMap.of("o_orderkey", "orderkey"))))));

            assertPlan(joinReorderingOff, "SELECT l.discount, l.orderkey, o.totalprice FROM lineitem l, orders o WHERE l.orderkey = o.orderkey AND l.quantity < 2 AND o.totalprice BETWEEN 0 AND 200000",
                    anyTree(
                            node(JoinNode.class,
                                    anyTree(tableScan("lineitem", ImmutableMap.of())),
                                    anyTree(tableScan("orders", ImmutableMap.of())))));

            assertPlan(joinReorderingOn, "SELECT l.discount, l.orderkey, o.totalprice FROM lineitem l, orders o WHERE l.orderkey = o.orderkey AND l.quantity < 2 AND o.totalprice BETWEEN 0 AND 200000",
                    anyTree(
                            node(JoinNode.class,
                                    anyTree(tableScan("orders", ImmutableMap.of())),
                                    anyTree(tableScan("lineitem", ImmutableMap.of())))));

            assertPlan(joinReorderingOff, "SELECT keys[1] FROM orders_ex o, lineitem l WHERE o.orderkey = l.orderkey AND o.keys[1] > 0",
                    anyTree(join(INNER, ImmutableList.of(equiJoinClause("o_orderkey", "l_orderkey")),
                            anyTree(PlanMatchPattern.tableScan("orders_ex", ImmutableMap.of("o_orderkey", "orderkey"))),
                            anyTree(PlanMatchPattern.tableScan("lineitem", ImmutableMap.of("l_orderkey", "orderkey"))))));

            assertPlan(joinReorderingOn, "SELECT keys[1] FROM orders_ex o, lineitem l WHERE o.orderkey = l.orderkey AND o.keys[1] > 0",
                    anyTree(join(INNER, ImmutableList.of(equiJoinClause("l_orderkey", "o_orderkey")),
                            anyTree(PlanMatchPattern.tableScan("lineitem", ImmutableMap.of("l_orderkey", "orderkey"))),
                            anyTree(PlanMatchPattern.tableScan("orders_ex", ImmutableMap.of("o_orderkey", "orderkey"))))));
        }
        finally {
            assertUpdate("DROP TABLE orders_ex");
        }
    }

    @Test
    public void testParquetDereferencePushDown()
    {
        assertUpdate("CREATE TABLE test_pushdown_nestedcolumn_parquet(" +
                "id bigint, " +
                "x row(a bigint, b varchar, c double, d row(d1 bigint, d2 double))," +
                "y array(row(a bigint, b varchar, c double, d row(d1 bigint, d2 double)))) " +
                "with (format = 'PARQUET')");

        assertParquetDereferencePushDown("SELECT x.a FROM test_pushdown_nestedcolumn_parquet",
                "test_pushdown_nestedcolumn_parquet",
                nestedColumnMap("x.a"));

        assertParquetDereferencePushDown("SELECT x.a, mod(x.d.d1, 2) FROM test_pushdown_nestedcolumn_parquet",
                "test_pushdown_nestedcolumn_parquet",
                nestedColumnMap("x.a", "x.d.d1"));

        assertParquetDereferencePushDown("SELECT x.d, mod(x.d.d1, 2), x.d.d2 FROM test_pushdown_nestedcolumn_parquet",
                "test_pushdown_nestedcolumn_parquet",
                nestedColumnMap("x.d"));

        assertParquetDereferencePushDown("SELECT x.a FROM test_pushdown_nestedcolumn_parquet WHERE x.b LIKE 'abc%'",
                "test_pushdown_nestedcolumn_parquet",
                nestedColumnMap("x.a", "x.b"));

        assertParquetDereferencePushDown("SELECT x.a FROM test_pushdown_nestedcolumn_parquet WHERE x.a > 10 AND x.b LIKE 'abc%'",
                "test_pushdown_nestedcolumn_parquet",
                nestedColumnMap("x.a", "x.b"),
                withColumnDomains(ImmutableMap.of(pushdownColumnNameForSubfield(nestedColumn("x.a")), create(ofRanges(greaterThan(BIGINT, 10L)), false))),
                ImmutableSet.of(pushdownColumnNameForSubfield(nestedColumn("x.a"))),
                TRUE_CONSTANT);

        // Join
        assertPlan(withParquetDereferencePushDownEnabled(), "SELECT l.orderkey, x.a, mod(x.d.d1, 2) FROM lineitem l, test_pushdown_nestedcolumn_parquet a WHERE l.linenumber = a.id",
                anyTree(
                        node(JoinNode.class,
                                anyTree(tableScan("lineitem", ImmutableMap.of())),
                                anyTree(tableScanParquetDeferencePushDowns("test_pushdown_nestedcolumn_parquet", nestedColumnMap("x.a", "x.d.d1"))))));

        assertPlan(withParquetDereferencePushDownEnabled(), "SELECT l.orderkey, x.a, mod(x.d.d1, 2) FROM lineitem l, test_pushdown_nestedcolumn_parquet a WHERE l.linenumber = a.id AND x.a > 10",
                anyTree(
                        node(JoinNode.class,
                                anyTree(tableScan("lineitem", ImmutableMap.of())),
                                anyTree(tableScanParquetDeferencePushDowns(
                                        "test_pushdown_nestedcolumn_parquet",
                                        nestedColumnMap("x.a", "x.d.d1"),
                                        withColumnDomains(ImmutableMap.of(pushdownColumnNameForSubfield(nestedColumn("x.a")), create(ofRanges(greaterThan(BIGINT, 10L)), false))),
                                        ImmutableSet.of(pushdownColumnNameForSubfield(nestedColumn("x.a"))),
                                        TRUE_CONSTANT)))));

        // Self-Join and Table scan assignments
        assertPlan(withParquetDereferencePushDownEnabled(), "SELECT t1.x.a, t2.x.a FROM test_pushdown_nestedcolumn_parquet t1, test_pushdown_nestedcolumn_parquet t2 where t1.id = t2.id",
                anyTree(
                        node(JoinNode.class,
                                anyTree(tableScanParquetDeferencePushDowns("test_pushdown_nestedcolumn_parquet", nestedColumnMap("x.a"))),
                                anyTree(tableScanParquetDeferencePushDowns("test_pushdown_nestedcolumn_parquet", nestedColumnMap("x.a"))))));

        // Aggregation
        assertParquetDereferencePushDown("SELECT id, min(x.a) FROM test_pushdown_nestedcolumn_parquet GROUP BY 1",
                "test_pushdown_nestedcolumn_parquet",
                nestedColumnMap("x.a"));

        assertParquetDereferencePushDown("SELECT id, min(mod(x.a, 3)) FROM test_pushdown_nestedcolumn_parquet GROUP BY 1",
                "test_pushdown_nestedcolumn_parquet",
                nestedColumnMap("x.a"));

        assertParquetDereferencePushDown("SELECT id, min(x.a) FILTER (WHERE x.b LIKE 'abc%') FROM test_pushdown_nestedcolumn_parquet GROUP BY 1",
                "test_pushdown_nestedcolumn_parquet",
                nestedColumnMap("x.a", "x.b"));

        assertParquetDereferencePushDown("SELECT id, min(x.a + 1) * avg(x.d.d1) FROM test_pushdown_nestedcolumn_parquet GROUP BY 1",
                "test_pushdown_nestedcolumn_parquet",
                nestedColumnMap("x.a", "x.d.d1"));

        assertParquetDereferencePushDown("SELECT id, arbitrary(x.a) FROM test_pushdown_nestedcolumn_parquet GROUP BY 1",
                "test_pushdown_nestedcolumn_parquet",
                nestedColumnMap("x.a"));

        // Dereference can't be pushed down, but the subfield pushdown will help in pruning the number of columns to read
        assertPushdownSubfields("SELECT id, arbitrary(x.a) FROM test_pushdown_nestedcolumn_parquet GROUP BY 1",
                "test_pushdown_nestedcolumn_parquet",
                ImmutableMap.of("x", toSubfields("x.a")));

        // Dereference can't be pushed down, but the subfield pushdown will help in pruning the number of columns to read
        assertPushdownSubfields("SELECT id, arbitrary(x).d.d1 FROM test_pushdown_nestedcolumn_parquet GROUP BY 1",
                "test_pushdown_nestedcolumn_parquet",
                ImmutableMap.of("x", toSubfields("x.d.d1")));

        assertParquetDereferencePushDown("SELECT id, arbitrary(x.d).d1 FROM test_pushdown_nestedcolumn_parquet GROUP BY 1",
                "test_pushdown_nestedcolumn_parquet",
                nestedColumnMap("x.d"));

        assertParquetDereferencePushDown("SELECT id, arbitrary(x.d.d2) FROM test_pushdown_nestedcolumn_parquet GROUP BY 1",
                "test_pushdown_nestedcolumn_parquet",
                nestedColumnMap("x.d.d2"));

        // Unnest
        assertParquetDereferencePushDown("SELECT t.a, t.d.d1, x.a FROM test_pushdown_nestedcolumn_parquet CROSS JOIN UNNEST(y) as t(a, b, c, d)",
                "test_pushdown_nestedcolumn_parquet",
                nestedColumnMap("x.a"));

        assertParquetDereferencePushDown("SELECT t.*, x.a FROM test_pushdown_nestedcolumn_parquet CROSS JOIN UNNEST(y) as t(a, b, c, d)",
                "test_pushdown_nestedcolumn_parquet",
                nestedColumnMap("x.a"));

        assertParquetDereferencePushDown("SELECT id, x.a FROM test_pushdown_nestedcolumn_parquet CROSS JOIN UNNEST(y) as t(a, b, c, d)",
                "test_pushdown_nestedcolumn_parquet",
                nestedColumnMap("x.a"));

        // Legacy unnest
        Session legacyUnnest = Session.builder(withParquetDereferencePushDownEnabled())
                .setSystemProperty("legacy_unnest", "true")
                .build();
        assertParquetDereferencePushDown(legacyUnnest, "SELECT t.y.a, t.y.d.d1, x.a FROM test_pushdown_nestedcolumn_parquet CROSS JOIN UNNEST(y) as t(y)",
                "test_pushdown_nestedcolumn_parquet",
                nestedColumnMap("x.a"));

        assertParquetDereferencePushDown(legacyUnnest, "SELECT t.*, x.a FROM test_pushdown_nestedcolumn_parquet CROSS JOIN UNNEST(y) as t(y)",
                "test_pushdown_nestedcolumn_parquet",
                nestedColumnMap("x.a"));

        assertParquetDereferencePushDown(legacyUnnest, "SELECT id, x.a FROM test_pushdown_nestedcolumn_parquet CROSS JOIN UNNEST(y) as t(y)",
                "test_pushdown_nestedcolumn_parquet",
                nestedColumnMap("x.a"));

        // Case sensitivity
        assertParquetDereferencePushDown("SELECT x.a, x.b, x.A + 2 FROM test_pushdown_nestedcolumn_parquet WHERE x.B LIKE 'abc%'",
                "test_pushdown_nestedcolumn_parquet",
                nestedColumnMap("x.a", "x.b"));

        // No pass-through nested column pruning
        assertParquetDereferencePushDown("SELECT id, min(x.d).d1 FROM test_pushdown_nestedcolumn_parquet GROUP BY 1",
                "test_pushdown_nestedcolumn_parquet",
                nestedColumnMap("x.d"));

        assertParquetDereferencePushDown("SELECT id, min(x.d).d1, min(x.d.d2) FROM test_pushdown_nestedcolumn_parquet GROUP BY 1",
                "test_pushdown_nestedcolumn_parquet",
                nestedColumnMap("x.d"));

        // Test pushdown of filters on dereference columns
        assertParquetDereferencePushDown("SELECT id, x.d.d1 FROM test_pushdown_nestedcolumn_parquet WHERE x.d.d1 = 1",
                "test_pushdown_nestedcolumn_parquet",
                nestedColumnMap("x.d.d1"),
                withColumnDomains(ImmutableMap.of(
                        pushdownColumnNameForSubfield(nestedColumn("x.d.d1")), singleValue(BIGINT, 1L))),
                ImmutableSet.of(pushdownColumnNameForSubfield(nestedColumn("x.d.d1"))),
                TRUE_CONSTANT);

        assertParquetDereferencePushDown("SELECT id, x.d.d1 FROM test_pushdown_nestedcolumn_parquet WHERE x.d.d1 = 1 and x.d.d2 = 5.0",
                "test_pushdown_nestedcolumn_parquet",
                nestedColumnMap("x.d.d1", "x.d.d2"),
                withColumnDomains(ImmutableMap.of(
                        pushdownColumnNameForSubfield(nestedColumn("x.d.d1")), singleValue(BIGINT, 1L),
                        pushdownColumnNameForSubfield(nestedColumn("x.d.d2")), singleValue(DOUBLE, 5.0))),
                ImmutableSet.of(
                        pushdownColumnNameForSubfield(nestedColumn("x.d.d1")),
                        pushdownColumnNameForSubfield(nestedColumn("x.d.d2"))),
                TRUE_CONSTANT);

        assertUpdate("DROP TABLE test_pushdown_nestedcolumn_parquet");
    }

    @Test
    public void testBucketPruning()
    {
        QueryRunner queryRunner = getQueryRunner();
        queryRunner.execute("CREATE TABLE orders_bucketed WITH (bucket_count = 11, bucketed_by = ARRAY['orderkey']) AS " +
                "SELECT * FROM orders");

        try {
            assertPlan(getSession(), "SELECT * FROM orders_bucketed WHERE orderkey = 100",
                    anyTree(PlanMatchPattern.tableScan("orders_bucketed")),
                    plan -> assertBucketFilter(plan, "orders_bucketed", 11, ImmutableSet.of(1)));

            assertPlan(getSession(), "SELECT * FROM orders_bucketed WHERE orderkey = 100 OR orderkey = 101",
                    anyTree(PlanMatchPattern.tableScan("orders_bucketed")),
                    plan -> assertBucketFilter(plan, "orders_bucketed", 11, ImmutableSet.of(1, 2)));

            assertPlan(getSession(), "SELECT * FROM orders_bucketed WHERE orderkey IN (100, 101, 133)",
                    anyTree(PlanMatchPattern.tableScan("orders_bucketed")),
                    plan -> assertBucketFilter(plan, "orders_bucketed", 11, ImmutableSet.of(1, 2)));

            assertPlan(getSession(), "SELECT * FROM orders_bucketed",
                    anyTree(PlanMatchPattern.tableScan("orders_bucketed")),
                    plan -> assertNoBucketFilter(plan, "orders_bucketed", 11));

            assertPlan(getSession(), "SELECT * FROM orders_bucketed WHERE orderkey > 100",
                    anyTree(PlanMatchPattern.tableScan("orders_bucketed")),
                    plan -> assertNoBucketFilter(plan, "orders_bucketed", 11));

            assertPlan(getSession(), "SELECT * FROM orders_bucketed WHERE orderkey != 100",
                    anyTree(PlanMatchPattern.tableScan("orders_bucketed")),
                    plan -> assertNoBucketFilter(plan, "orders_bucketed", 11));
        }
        finally {
            queryRunner.execute("DROP TABLE orders_bucketed");
        }

        queryRunner.execute("CREATE TABLE orders_bucketed WITH (bucket_count = 11, bucketed_by = ARRAY['orderkey', 'custkey']) AS " +
                "SELECT * FROM orders");

        try {
            assertPlan(getSession(), "SELECT * FROM orders_bucketed WHERE orderkey = 101 AND custkey = 280",
                    anyTree(PlanMatchPattern.tableScan("orders_bucketed")),
                    plan -> assertBucketFilter(plan, "orders_bucketed", 11, ImmutableSet.of(1)));

            assertPlan(getSession(), "SELECT * FROM orders_bucketed WHERE orderkey IN (101, 71) AND custkey = 280",
                    anyTree(PlanMatchPattern.tableScan("orders_bucketed")),
                    plan -> assertBucketFilter(plan, "orders_bucketed", 11, ImmutableSet.of(1, 6)));

            assertPlan(getSession(), "SELECT * FROM orders_bucketed WHERE orderkey IN (101, 71) AND custkey IN (280, 34)",
                    anyTree(PlanMatchPattern.tableScan("orders_bucketed")),
                    plan -> assertBucketFilter(plan, "orders_bucketed", 11, ImmutableSet.of(1, 2, 6, 8)));

            assertPlan(getSession(), "SELECT * FROM orders_bucketed WHERE orderkey = 101 AND custkey = 280 AND orderstatus <> '0'",
                    anyTree(PlanMatchPattern.tableScan("orders_bucketed")),
                    plan -> assertBucketFilter(plan, "orders_bucketed", 11, ImmutableSet.of(1)));

            assertPlan(getSession(), "SELECT * FROM orders_bucketed WHERE orderkey = 101",
                    anyTree(PlanMatchPattern.tableScan("orders_bucketed")),
                    plan -> assertNoBucketFilter(plan, "orders_bucketed", 11));

            assertPlan(getSession(), "SELECT * FROM orders_bucketed WHERE custkey = 280",
                    anyTree(PlanMatchPattern.tableScan("orders_bucketed")),
                    plan -> assertNoBucketFilter(plan, "orders_bucketed", 11));

            assertPlan(getSession(), "SELECT * FROM orders_bucketed WHERE orderkey = 101 AND custkey > 280",
                    anyTree(PlanMatchPattern.tableScan("orders_bucketed")),
                    plan -> assertNoBucketFilter(plan, "orders_bucketed", 11));
        }
        finally {
            queryRunner.execute("DROP TABLE orders_bucketed");
        }
    }

    @Test
    public void testAddRequestedColumnsToLayout()
    {
        String tableName = "test_add_requested_columns_to_layout";
        assertUpdate(format("CREATE TABLE %s(" +
                "id bigint, " +
                "a row(d1 bigint, d2 array(bigint), d3 map(bigint, bigint), d4 row(x double, y double)), " +
                "b varchar )", tableName));

        try {
            assertPlan(getSession(), format("SELECT b FROM %s", tableName),
                    anyTree(PlanMatchPattern.tableScan(tableName)),
                    plan -> assertRequestedColumnsInLayout(plan, tableName, ImmutableSet.of("b")));

            assertPlan(getSession(), format("SELECT id, b FROM %s", tableName),
                    anyTree(PlanMatchPattern.tableScan(tableName)),
                    plan -> assertRequestedColumnsInLayout(plan, tableName, ImmutableSet.of("id", "b")));

            assertPlan(getSession(), format("SELECT id, a FROM %s", tableName),
                    anyTree(PlanMatchPattern.tableScan(tableName)),
                    plan -> assertRequestedColumnsInLayout(plan, tableName, ImmutableSet.of("id", "a")));

            assertPlan(getSession(), format("SELECT a.d1, a.d4.x FROM %s", tableName),
                    anyTree(PlanMatchPattern.tableScan(tableName)),
                    plan -> assertRequestedColumnsInLayout(plan, tableName, ImmutableSet.of("a.d1", "a.d4.x")));
        }
        finally {
            assertUpdate(format("DROP TABLE %s", tableName));
        }
    }

    @Test
    public void testPartialAggregatePushdown()
    {
        QueryRunner queryRunner = getQueryRunner();
        try {
            queryRunner.execute("CREATE TABLE orders_partitioned_parquet WITH (partitioned_by = ARRAY['ds'], format='PARQUET') AS " +
                    "SELECT orderkey, orderpriority, comment, '2019-11-01' as ds FROM orders WHERE orderkey < 1000 " +
                    "UNION ALL " +
                    "SELECT orderkey, orderpriority, comment, '2019-11-02' as ds FROM orders WHERE orderkey < 1000");

            Map<Optional<String>, ExpectedValueProvider<FunctionCall>> aggregations = ImmutableMap.of(Optional.of("count"),
                    PlanMatchPattern.functionCall("count", false, ImmutableList.of(anySymbol())));

            assertPlan(partialAggregatePushdownEnabled(),
                    "select count(*) from orders_partitioned_parquet",
                    anyTree(aggregation(globalAggregation(), aggregations, ImmutableMap.of(), Optional.empty(), AggregationNode.Step.FINAL,
                            exchange(LOCAL, GATHER,
                                    new PlanMatchPattern[] {exchange(REMOTE_STREAMING, GATHER,
                                            new PlanMatchPattern[] {tableScan("orders_partitioned_parquet", ImmutableMap.of())})}))));

            aggregations = ImmutableMap.of(
                    Optional.of("count_1"),
                    PlanMatchPattern.functionCall("count", false, ImmutableList.of(anySymbol())),
                    Optional.of("max"),
                    PlanMatchPattern.functionCall("max", false, ImmutableList.of(anySymbol())),
                    Optional.of("min"),
                    PlanMatchPattern.functionCall("max", false, ImmutableList.of(anySymbol())));

            // Negative tests
            assertPlan(partialAggregatePushdownEnabled(),
                    "select count(orderkey), max(orderpriority), min(ds) from orders_partitioned_parquet",
                    anyTree(PlanMatchPattern.tableScan("orders_partitioned_parquet")),
                    plan -> assertNoAggregatedColumns(plan, "orders_partitioned_parquet"));
            assertPlan(partialAggregatePushdownEnabled(),
                    "select count(orderkey), max(orderpriority), min(ds) from orders_partitioned_parquet where orderkey = 100",
                    anyTree(PlanMatchPattern.tableScan("orders_partitioned_parquet")),
                    plan -> assertNoAggregatedColumns(plan, "orders_partitioned_parquet"));

            assertPlan(partialAggregatePushdownEnabled(),
                    "select count(orderkey), arbitrary(orderpriority), min(ds) from orders_partitioned_parquet",
                    anyTree(PlanMatchPattern.tableScan("orders_partitioned_parquet")),
                    plan -> assertNoAggregatedColumns(plan, "orders_partitioned_parquet"));

            assertPlan(partialAggregatePushdownEnabled(),
                    "select count(orderkey), max(orderpriority), min(ds) from orders_partitioned_parquet where ds = '2019-11-01' and orderkey = 100",
                    anyTree(PlanMatchPattern.tableScan("orders_partitioned_parquet")),
                    plan -> assertNoAggregatedColumns(plan, "orders_partitioned_parquet"));
            assertPlan(partialAggregatePushdownEnabled(),
                    "SELECT min(ds) from orders_partitioned_parquet",
                    anyTree(PlanMatchPattern.tableScan("orders_partitioned_parquet")),
                    plan -> assertNoAggregatedColumns(plan, "orders_partitioned_parquet"));

            Session session = Session.builder(getQueryRunner().getDefaultSession())
                    .setCatalogSessionProperty(HIVE_CATALOG, PARTIAL_AGGREGATION_PUSHDOWN_ENABLED, "true")
                    .build();
            queryRunner.execute("CREATE TABLE variable_length_table(col1 varchar, col2 varchar(100), col3 varbinary) with (format='PARQUET')");
            queryRunner.execute("INSERT INTO variable_length_table values ('foo','bar',cast('baz' as varbinary))");
            assertPlan(session,
                    "select min(col1) from variable_length_table",
                    anyTree(PlanMatchPattern.tableScan("variable_length_table")),
                    plan -> assertNoAggregatedColumns(plan, "variable_length_table"));
            assertPlan(session,
                    "select max(col3) from variable_length_table",
                    anyTree(PlanMatchPattern.tableScan("variable_length_table")),
                    plan -> assertNoAggregatedColumns(plan, "variable_length_table"));
        }
        finally {
            queryRunner.execute("DROP TABLE IF EXISTS orders_partitioned_parquet");
            queryRunner.execute("DROP TABLE IF EXISTS variable_length_table");
        }
    }

    private static Set<Subfield> toSubfields(String... subfieldPaths)
    {
        return Arrays.stream(subfieldPaths)
                .map(Subfield::new)
                .collect(toImmutableSet());
    }

    private void assertPushdownSubfields(String query, String tableName, Map<String, Set<Subfield>> requiredSubfields)
    {
        assertPlan(query, anyTree(tableScan(tableName, requiredSubfields)));
    }

    private void assertPushdownSubfields(Session session, String query, String tableName, Map<String, Set<Subfield>> requiredSubfields)
    {
        assertPlan(session, query, anyTree(tableScan(tableName, requiredSubfields)));
    }

    private static PlanMatchPattern tableScan(String expectedTableName, Map<String, Set<Subfield>> expectedRequiredSubfields)
    {
        return PlanMatchPattern.tableScan(expectedTableName).with(new HiveTableScanMatcher(expectedRequiredSubfields));
    }

    private static PlanMatchPattern tableScanParquetDeferencePushDowns(String expectedTableName, Map<String, Subfield> expectedDeferencePushDowns)
    {
        return PlanMatchPattern.tableScan(expectedTableName).with(new HiveParquetDereferencePushdownMatcher(expectedDeferencePushDowns, TupleDomain.all(), ImmutableSet.of(), TRUE_CONSTANT));
    }

    private static PlanMatchPattern tableScanParquetDeferencePushDowns(String expectedTableName, Map<String, Subfield> expectedDeferencePushDowns,
            TupleDomain<String> domainPredicate, Set<String> predicateColumns, RowExpression remainingPredicate)
    {
        return PlanMatchPattern.tableScan(expectedTableName).with(new HiveParquetDereferencePushdownMatcher(expectedDeferencePushDowns, domainPredicate, predicateColumns, remainingPredicate));
    }

    private static boolean isTableScanNode(PlanNode node, String tableName)
    {
        return node instanceof TableScanNode && ((HiveTableHandle) ((TableScanNode) node).getTable().getConnectorHandle()).getTableName().equals(tableName);
    }

    private void assertPushdownFilterOnSubfields(String query, Map<Subfield, Domain> predicateDomains)
    {
        String tableName = "test_pushdown_filter_on_subfields";
        assertPlan(pushdownFilterAndNestedColumnFilterEnabled(), query,
                output(exchange(PlanMatchPattern.tableScan(tableName))),
                plan -> assertTableLayout(
                        plan,
                        tableName,
                        withColumnDomains(predicateDomains),
                        TRUE_CONSTANT,
                        predicateDomains.keySet().stream().map(Subfield::getRootName).collect(toImmutableSet())));
    }

    private void assertParquetDereferencePushDown(String query, String tableName, Map<String, Subfield> expectedDeferencePushDowns)
    {
        assertParquetDereferencePushDown(withParquetDereferencePushDownEnabled(), query, tableName, expectedDeferencePushDowns);
    }

    private void assertParquetDereferencePushDown(String query, String tableName, Map<String, Subfield> expectedDeferencePushDowns, TupleDomain<String> domainPredicate,
            Set<String> predicateColumns, RowExpression remainingPredicate)
    {
        assertPlan(withParquetDereferencePushDownEnabled(), query,
                anyTree(tableScanParquetDeferencePushDowns(tableName, expectedDeferencePushDowns, domainPredicate, predicateColumns, remainingPredicate)));
    }

    private void assertParquetDereferencePushDown(Session session, String query, String tableName, Map<String, Subfield> expectedDeferencePushDowns)
    {
        assertPlan(session, query, anyTree(tableScanParquetDeferencePushDowns(tableName, expectedDeferencePushDowns)));
    }

    protected Session getSessionWithOptimizeMetadataQueries()
    {
        return Session.builder(getQueryRunner().getDefaultSession())
                .setSystemProperty(OPTIMIZE_METADATA_QUERIES, "true")
                .build();
    }

    private Session pushdownFilterEnabled()
    {
        return Session.builder(getQueryRunner().getDefaultSession())
                .setCatalogSessionProperty(HIVE_CATALOG, PUSHDOWN_FILTER_ENABLED, "true")
                .build();
    }

    private Session pushdownFilterAndNestedColumnFilterEnabled()
    {
        return Session.builder(getQueryRunner().getDefaultSession())
                .setCatalogSessionProperty(HIVE_CATALOG, PUSHDOWN_FILTER_ENABLED, "true")
                .setCatalogSessionProperty(HIVE_CATALOG, RANGE_FILTERS_ON_SUBSCRIPTS_ENABLED, "true")
                .build();
    }

    private Session withParquetDereferencePushDownEnabled()
    {
        return Session.builder(getQueryRunner().getDefaultSession())
                .setSystemProperty(PUSHDOWN_DEREFERENCE_ENABLED, "true")
                .setCatalogSessionProperty(HIVE_CATALOG, PARQUET_DEREFERENCE_PUSHDOWN_ENABLED, "true")
                .build();
    }

    private Session partialAggregatePushdownEnabled()
    {
        return Session.builder(getQueryRunner().getDefaultSession())
                .setCatalogSessionProperty(HIVE_CATALOG, PARTIAL_AGGREGATION_PUSHDOWN_ENABLED, "true")
                .setCatalogSessionProperty(HIVE_CATALOG, PARTIAL_AGGREGATION_PUSHDOWN_FOR_VARIABLE_LENGTH_DATATYPES_ENABLED, "true")
                .build();
    }

    private RowExpression constant(long value)
    {
        return new ConstantExpression(value, BIGINT);
    }

    private static Map<String, String> identityMap(String... values)
    {
        return Arrays.stream(values).collect(toImmutableMap(Functions.identity(), Functions.identity()));
    }

    private void assertTableLayout(Plan plan, String tableName, TupleDomain<Subfield> domainPredicate, RowExpression remainingPredicate, Set<String> predicateColumnNames)
    {
        TableScanNode tableScan = searchFrom(plan.getRoot())
                .where(node -> isTableScanNode(node, tableName))
                .findOnlyElement();

        assertTrue(tableScan.getTable().getLayout().isPresent());
        HiveTableLayoutHandle layoutHandle = (HiveTableLayoutHandle) tableScan.getTable().getLayout().get();

        assertEquals(layoutHandle.getPredicateColumns().keySet(), predicateColumnNames);
        assertEquals(layoutHandle.getDomainPredicate(), domainPredicate);
        assertEquals(layoutHandle.getRemainingPredicate(), remainingPredicate);
        assertEquals(layoutHandle.getRemainingPredicate(), remainingPredicate);
    }

    private void assertBucketFilter(Plan plan, String tableName, int readBucketCount, Set<Integer> bucketsToKeep)
    {
        TableScanNode tableScan = searchFrom(plan.getRoot())
                .where(node -> isTableScanNode(node, tableName))
                .findOnlyElement();

        assertTrue(tableScan.getTable().getLayout().isPresent());
        HiveTableLayoutHandle layoutHandle = (HiveTableLayoutHandle) tableScan.getTable().getLayout().get();

        assertTrue(layoutHandle.getBucketHandle().isPresent());
        assertTrue(layoutHandle.getBucketFilter().isPresent());
        assertEquals(layoutHandle.getBucketHandle().get().getReadBucketCount(), readBucketCount);
        assertEquals(layoutHandle.getBucketFilter().get().getBucketsToKeep(), bucketsToKeep);
    }

    private void assertNoBucketFilter(Plan plan, String tableName, int readBucketCount)
    {
        TableScanNode tableScan = searchFrom(plan.getRoot())
                .where(node -> isTableScanNode(node, tableName))
                .findOnlyElement();

        assertTrue(tableScan.getTable().getLayout().isPresent());
        HiveTableLayoutHandle layoutHandle = (HiveTableLayoutHandle) tableScan.getTable().getLayout().get();

        assertEquals(layoutHandle.getBucketHandle().get().getReadBucketCount(), readBucketCount);
        assertFalse(layoutHandle.getBucketFilter().isPresent());
    }

    private void assertNoAggregatedColumns(Plan plan, String tableName)
    {
        TableScanNode tableScan = searchFrom(plan.getRoot())
                .where(node -> isTableScanNode(node, tableName))
                .findOnlyElement();

        for (ColumnHandle columnHandle : tableScan.getAssignments().values()) {
            assertTrue(columnHandle instanceof HiveColumnHandle);
            HiveColumnHandle hiveColumnHandle = (HiveColumnHandle) columnHandle;
            assertNotSame(hiveColumnHandle.getColumnType(), HiveColumnHandle.ColumnType.AGGREGATED);
            assertFalse(hiveColumnHandle.getPartialAggregation().isPresent());
        }
    }

    private void assertRequestedColumnsInLayout(Plan plan, String tableName, Set<String> expectedRequestedColumns)
    {
        TableScanNode tableScan = searchFrom(plan.getRoot())
                .where(node -> isTableScanNode(node, tableName))
                .findOnlyElement();

        assertTrue(tableScan.getTable().getLayout().isPresent());
        HiveTableLayoutHandle layoutHandle = (HiveTableLayoutHandle) tableScan.getTable().getLayout().get();

        assertTrue(layoutHandle.getRequestedColumns().isPresent());
        Set<HiveColumnHandle> requestedColumns = layoutHandle.getRequestedColumns().get();

        List<String> actualRequestedColumns = new ArrayList<>();
        for (HiveColumnHandle column : requestedColumns) {
            if (!column.getRequiredSubfields().isEmpty()) {
                column.getRequiredSubfields().stream().map(Subfield::serialize).forEach(actualRequestedColumns::add);
            }
            else {
                actualRequestedColumns.add(column.getName());
            }
        }

        Set<String> requestedColumnsSet = ImmutableSet.copyOf(actualRequestedColumns);
        assertEquals(requestedColumnsSet.size(), actualRequestedColumns.size(), "There should be no duplicates in the requested column list");
        assertEquals(requestedColumnsSet, expectedRequestedColumns);
    }

    static ExtendedHiveMetastore replicateHiveMetastore(DistributedQueryRunner queryRunner)
    {
        URI dataDirectory = queryRunner.getCoordinator().getDataDirectory().resolve("hive_data").toUri();
        HiveClientConfig hiveClientConfig = new HiveClientConfig();
        MetastoreClientConfig metastoreClientConfig = new MetastoreClientConfig();
        HdfsConfiguration hdfsConfiguration = new HiveHdfsConfiguration(new HdfsConfigurationInitializer(hiveClientConfig, metastoreClientConfig), ImmutableSet.of(), hiveClientConfig);
        HdfsEnvironment hdfsEnvironment = new HdfsEnvironment(hdfsConfiguration, metastoreClientConfig, new NoHdfsAuthentication());
        return new FileHiveMetastore(hdfsEnvironment, dataDirectory.toString(), "test");
    }

    private static PlanMatchPattern tableScan(String tableName, TupleDomain<String> domainPredicate, RowExpression remainingPredicate, Set<String> predicateColumnNames)
    {
        return PlanMatchPattern.tableScan(tableName).with(new Matcher()
        {
            @Override
            public boolean shapeMatches(PlanNode node)
            {
                return node instanceof TableScanNode;
            }

            @Override
            public MatchResult detailMatches(PlanNode node, StatsProvider stats, Session session, Metadata metadata, SymbolAliases symbolAliases)
            {
                TableScanNode tableScan = (TableScanNode) node;

                Optional<ConnectorTableLayoutHandle> layout = tableScan.getTable().getLayout();

                if (!layout.isPresent()) {
                    return NO_MATCH;
                }

                HiveTableLayoutHandle layoutHandle = (HiveTableLayoutHandle) layout.get();

                if (!Objects.equals(layoutHandle.getPredicateColumns().keySet(), predicateColumnNames) ||
                        !Objects.equals(layoutHandle.getDomainPredicate(), domainPredicate.transform(Subfield::new)) ||
                        !Objects.equals(layoutHandle.getRemainingPredicate(), remainingPredicate)) {
                    return NO_MATCH;
                }

                return match();
            }
        });
    }

    private static final class HiveTableScanMatcher
            implements Matcher
    {
        private final Map<String, Set<Subfield>> requiredSubfields;

        private HiveTableScanMatcher(Map<String, Set<Subfield>> requiredSubfields)
        {
            this.requiredSubfields = requireNonNull(requiredSubfields, "requiredSubfields is null");
        }

        @Override
        public boolean shapeMatches(PlanNode node)
        {
            return node instanceof TableScanNode;
        }

        @Override
        public MatchResult detailMatches(PlanNode node, StatsProvider stats, Session session, Metadata metadata, SymbolAliases symbolAliases)
        {
            TableScanNode tableScan = (TableScanNode) node;
            for (ColumnHandle column : tableScan.getAssignments().values()) {
                HiveColumnHandle hiveColumn = (HiveColumnHandle) column;
                String columnName = hiveColumn.getName();
                if (requiredSubfields.containsKey(columnName)) {
                    if (!requiredSubfields.get(columnName).equals(ImmutableSet.copyOf(hiveColumn.getRequiredSubfields()))) {
                        return NO_MATCH;
                    }
                }
                else {
                    if (!hiveColumn.getRequiredSubfields().isEmpty()) {
                        return NO_MATCH;
                    }
                }
            }

            return match();
        }

        @Override
        public String toString()
        {
            return toStringHelper(this)
                    .add("requiredSubfields", requiredSubfields)
                    .toString();
        }
    }

    private static final class HiveParquetDereferencePushdownMatcher
            implements Matcher
    {
        private final Map<String, Subfield> dereferenceColumns;
        private final TupleDomain<String> domainPredicate;
        private final Set<String> predicateColumns;
        private final RowExpression remainingPredicate;

        private HiveParquetDereferencePushdownMatcher(
                Map<String, Subfield> dereferenceColumns,
                TupleDomain<String> domainPredicate,
                Set<String> predicateColumns,
                RowExpression remainingPredicate)
        {
            this.dereferenceColumns = requireNonNull(dereferenceColumns, "dereferenceColumns is null");
            this.domainPredicate = requireNonNull(domainPredicate, "domainPredicate is null");
            this.predicateColumns = requireNonNull(predicateColumns, "predicateColumns is null");
            this.remainingPredicate = requireNonNull(remainingPredicate, "remainingPredicate is null");
        }

        @Override
        public boolean shapeMatches(PlanNode node)
        {
            return node instanceof TableScanNode;
        }

        @Override
        public MatchResult detailMatches(PlanNode node, StatsProvider stats, Session session, Metadata metadata, SymbolAliases symbolAliases)
        {
            TableScanNode tableScan = (TableScanNode) node;
            for (ColumnHandle column : tableScan.getAssignments().values()) {
                HiveColumnHandle hiveColumn = (HiveColumnHandle) column;
                String columnName = hiveColumn.getName();
                if (dereferenceColumns.containsKey(columnName)) {
                    if (hiveColumn.getColumnType() != SYNTHESIZED ||
                            hiveColumn.getRequiredSubfields().size() != 1 ||
                            !hiveColumn.getRequiredSubfields().get(0).equals(dereferenceColumns.get(columnName))) {
                        return NO_MATCH;
                    }
                    dereferenceColumns.remove(columnName);
                }
                else {
                    if (isPushedDownSubfield(hiveColumn)) {
                        return NO_MATCH;
                    }
                }
            }

            if (!dereferenceColumns.isEmpty()) {
                return NO_MATCH;
            }

            Optional<ConnectorTableLayoutHandle> layout = tableScan.getTable().getLayout();

            if (!layout.isPresent()) {
                return NO_MATCH;
            }

            HiveTableLayoutHandle layoutHandle = (HiveTableLayoutHandle) layout.get();

            if (!Objects.equals(layoutHandle.getPredicateColumns().keySet(), predicateColumns) ||
                    !Objects.equals(layoutHandle.getDomainPredicate(), domainPredicate.transform(Subfield::new)) ||
                    !Objects.equals(layoutHandle.getRemainingPredicate(), remainingPredicate)) {
                return NO_MATCH;
            }

            return match();
        }

        @Override
        public String toString()
        {
            return toStringHelper(this)
                    .add("dereferenceColumns", dereferenceColumns)
                    .add("domainPredicate", domainPredicate)
                    .add("predicateColumns", predicateColumns)
                    .add("remainingPredicate", remainingPredicate)
                    .toString();
        }
    }

    private static Map<String, Subfield> nestedColumnMap(String... columns)
    {
        return Arrays.stream(columns).collect(Collectors.toMap(
                column -> pushdownColumnNameForSubfield(nestedColumn(column)),
                column -> nestedColumn(column)));
    }

    private static Subfield nestedColumn(String column)
    {
        return new Subfield(column);
    }
}