TestPrestoSparkQueryRunner.java

/*
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.facebook.presto.spark;

import com.facebook.presto.Session;
import com.facebook.presto.testing.MaterializedResult;
import com.facebook.presto.testing.QueryRunner;
import com.facebook.presto.tests.AbstractTestQueryFramework;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Ordering;
import com.google.common.io.Files;
import io.airlift.units.Duration;
import org.apache.hadoop.fs.Path;
import org.testng.annotations.Ignore;
import org.testng.annotations.Test;

import java.io.File;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

import static com.facebook.presto.SystemSessionProperties.HASH_PARTITION_COUNT;
import static com.facebook.presto.SystemSessionProperties.JOIN_DISTRIBUTION_TYPE;
import static com.facebook.presto.SystemSessionProperties.PARTIAL_MERGE_PUSHDOWN_STRATEGY;
import static com.facebook.presto.SystemSessionProperties.QUERY_MAX_MEMORY;
import static com.facebook.presto.SystemSessionProperties.QUERY_MAX_MEMORY_PER_NODE;
import static com.facebook.presto.SystemSessionProperties.QUERY_MAX_STAGE_COUNT;
import static com.facebook.presto.SystemSessionProperties.QUERY_MAX_TOTAL_MEMORY_PER_NODE;
import static com.facebook.presto.SystemSessionProperties.VERBOSE_EXCEEDED_MEMORY_LIMIT_ERRORS_ENABLED;
import static com.facebook.presto.spark.PrestoSparkQueryRunner.METASTORE_CONTEXT;
import static com.facebook.presto.spark.PrestoSparkQueryRunner.createHivePrestoSparkQueryRunner;
import static com.facebook.presto.spark.PrestoSparkSessionProperties.OUT_OF_MEMORY_RETRY_PRESTO_SESSION_PROPERTIES;
import static com.facebook.presto.spark.PrestoSparkSessionProperties.OUT_OF_MEMORY_RETRY_SPARK_CONFIGS;
import static com.facebook.presto.spark.PrestoSparkSessionProperties.SPARK_BROADCAST_JOIN_MAX_MEMORY_OVERRIDE;
import static com.facebook.presto.spark.PrestoSparkSessionProperties.SPARK_HASH_PARTITION_COUNT_SCALING_FACTOR_ON_OUT_OF_MEMORY;
import static com.facebook.presto.spark.PrestoSparkSessionProperties.SPARK_QUERY_EXECUTION_STRATEGIES;
import static com.facebook.presto.spark.PrestoSparkSessionProperties.SPARK_RETRY_ON_OUT_OF_MEMORY_BROADCAST_JOIN_ENABLED;
import static com.facebook.presto.spark.PrestoSparkSessionProperties.SPARK_RETRY_ON_OUT_OF_MEMORY_HIGHER_PARTITION_COUNT_ENABLED;
import static com.facebook.presto.spark.PrestoSparkSessionProperties.SPARK_RETRY_ON_OUT_OF_MEMORY_WITH_INCREASED_MEMORY_SETTINGS_ENABLED;
import static com.facebook.presto.spark.PrestoSparkSessionProperties.SPARK_RETRY_ON_OUT_OF_MEMORY_WITH_INCREASED_MEMORY_SETTINGS_ERROR_CODES;
import static com.facebook.presto.spark.PrestoSparkSessionProperties.SPARK_SPLIT_ASSIGNMENT_BATCH_SIZE;
import static com.facebook.presto.spark.PrestoSparkSessionProperties.STORAGE_BASED_BROADCAST_JOIN_ENABLED;
import static com.facebook.presto.sql.analyzer.FeaturesConfig.PartialMergePushdownStrategy.PUSH_THROUGH_LOW_MEMORY_OPERATORS;
import static com.facebook.presto.tests.QueryAssertions.assertEqualsIgnoreOrder;
import static com.google.common.io.Files.createTempDir;
import static com.google.common.io.MoreFiles.deleteRecursively;
import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE;
import static io.airlift.tpch.TpchTable.NATION;
import static java.lang.String.format;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.assertj.core.api.Assertions.assertThat;
import static org.testng.Assert.assertEquals;
import static org.testng.FileAssert.assertFile;

public class TestPrestoSparkQueryRunner
        extends AbstractTestQueryFramework
{
    @Override
    protected QueryRunner createQueryRunner()
            throws Exception
    {
        return PrestoSparkQueryRunner.createHivePrestoSparkQueryRunner();
    }

    @Test
    public void testTableWrite()
    {
        // some basic tests
        assertUpdate(
                "CREATE TABLE hive.hive_test.hive_orders AS " +
                        "SELECT orderkey, custkey, orderstatus, totalprice, orderdate, orderpriority, clerk, shippriority, comment " +
                        "FROM orders",
                15000);

        assertUpdate(
                "INSERT INTO hive.hive_test.hive_orders " +
                        "SELECT orderkey, custkey, orderstatus, totalprice, orderdate, orderpriority, clerk, shippriority, comment " +
                        "FROM orders " +
                        "UNION ALL " +
                        "SELECT orderkey, custkey, orderstatus, totalprice, orderdate, orderpriority, clerk, shippriority, comment " +
                        "FROM orders",
                30000);

        assertQuery(
                "SELECT orderkey, custkey, orderstatus, totalprice, orderdate, orderpriority, clerk, shippriority, comment " +
                        "FROM hive.hive_test.hive_orders",
                "SELECT orderkey, custkey, orderstatus, totalprice, orderdate, orderpriority, clerk, shippriority, comment " +
                        "FROM orders " +
                        "UNION ALL " +
                        "SELECT orderkey, custkey, orderstatus, totalprice, orderdate, orderpriority, clerk, shippriority, comment " +
                        "FROM orders " +
                        "UNION ALL " +
                        "SELECT orderkey, custkey, orderstatus, totalprice, orderdate, orderpriority, clerk, shippriority, comment " +
                        "FROM orders");

        // 3-way union all with potentially non-flattened plan
        // See https://github.com/prestodb/presto/issues/12625
        //
        // CreateTable is not supported yet, use CreateTableAsSelect
        assertUpdate(
                "CREATE TABLE hive.hive_test.test_table_write_with_union AS " +
                        "SELECT orderkey, 'dummy' AS dummy " +
                        "FROM orders",
                15000);
        assertUpdate(
                "INSERT INTO hive.hive_test.test_table_write_with_union " +
                        "SELECT orderkey, dummy " +
                        "FROM (" +
                        "   SELECT orderkey, 'a' AS dummy FROM orders " +
                        "UNION ALL" +
                        "   SELECT orderkey, 'bb' AS dummy FROM orders " +
                        "UNION ALL" +
                        "   SELECT orderkey, 'ccc' AS dummy FROM orders " +
                        ")",
                45000);
    }

    @Test
    public void testZeroFileCreatorForBucketedTable()
    {
        assertUpdate(
                getSession(),
                "CREATE TABLE hive.hive_test.test_hive_orders_bucketed_join_zero_file WITH (bucketed_by=array['orderkey'], bucket_count=8) AS " +
                        "SELECT orderkey, custkey, orderstatus, totalprice, orderdate, orderpriority, clerk, shippriority, comment " +
                        "FROM orders_bucketed " +
                        "WHERE orderkey = 1",
                1);
    }

    @Test
    public void testBucketedTableWriteSimple()
    {
        // simple write from a bucketed table to a bucketed table
        // same bucket count
        testBucketedTableWriteSimple(getSession(), 8, 8);
        for (Session testSession : getTestCompatibleBucketCountSessions()) {
            // incompatible bucket count
            testBucketedTableWriteSimple(testSession, 3, 13);
            testBucketedTableWriteSimple(testSession, 13, 7);
            // compatible bucket count
            testBucketedTableWriteSimple(testSession, 4, 8);
            testBucketedTableWriteSimple(testSession, 8, 4);
        }
    }

    private void testBucketedTableWriteSimple(Session session, int inputBucketCount, int outputBucketCount)
    {
        assertUpdate(
                session,
                format("CREATE TABLE hive.hive_test.test_hive_orders_bucketed_simple_input WITH (bucketed_by=array['orderkey'], bucket_count=%s) AS " +
                        "SELECT orderkey, custkey, orderstatus, totalprice, orderdate, orderpriority, clerk, shippriority, comment " +
                        "FROM orders_bucketed", inputBucketCount),
                15000);
        assertQuery(
                session,
                "SELECT count(*) " +
                        "FROM hive.hive_test.test_hive_orders_bucketed_simple_input " +
                        "WHERE \"$bucket\" = 0",
                format("SELECT count(*) FROM orders WHERE orderkey %% %s = 0", inputBucketCount));

        assertUpdate(
                session,
                format("CREATE TABLE hive.hive_test.test_hive_orders_bucketed_simple_output WITH (bucketed_by=array['orderkey'], bucket_count=%s) AS " +
                        "SELECT orderkey, custkey, orderstatus, totalprice, orderdate, orderpriority, clerk, shippriority, comment " +
                        "FROM hive.hive_test.test_hive_orders_bucketed_simple_input", outputBucketCount),
                15000);
        assertQuery(
                session,
                "SELECT count(*) " +
                        "FROM hive.hive_test.test_hive_orders_bucketed_simple_output " +
                        "WHERE \"$bucket\" = 0",
                format("SELECT count(*) FROM orders WHERE orderkey %% %s = 0", outputBucketCount));

        dropTable("hive_test", "test_hive_orders_bucketed_simple_input");
        dropTable("hive_test", "test_hive_orders_bucketed_simple_output");
    }

    @Test
    public void testBucketedTableWriteAggregation()
    {
        // aggregate on a bucket key and write to a bucketed table
        // same bucket count
        testBucketedTableWriteAggregation(getSession(), 8, 8);
        for (Session testSession : getTestCompatibleBucketCountSessions()) {
            // incompatible bucket count
            testBucketedTableWriteAggregation(testSession, 7, 13);
            testBucketedTableWriteAggregation(testSession, 13, 7);
            // compatible bucket count
            testBucketedTableWriteAggregation(testSession, 4, 8);
            testBucketedTableWriteAggregation(testSession, 8, 4);
        }
    }

    private void testBucketedTableWriteAggregation(Session session, int inputBucketCount, int outputBucketCount)
    {
        assertUpdate(
                session,
                format("CREATE TABLE hive.hive_test.test_hive_orders_bucketed_aggregation_input WITH (bucketed_by=array['orderkey'], bucket_count=%s) AS " +
                        "SELECT orderkey, custkey, orderstatus, totalprice, orderdate, orderpriority, clerk, shippriority, comment " +
                        "FROM orders_bucketed", inputBucketCount),
                15000);

        assertUpdate(
                session,
                format("CREATE TABLE hive.hive_test.test_hive_orders_bucketed_aggregation_output WITH (bucketed_by=array['orderkey'], bucket_count=%s) AS " +
                        "SELECT orderkey, sum(totalprice) totalprice " +
                        "FROM hive.hive_test.test_hive_orders_bucketed_aggregation_input " +
                        "GROUP BY orderkey", outputBucketCount),
                15000);
        assertQuery(
                session,
                "SELECT count(*) " +
                        "FROM hive.hive_test.test_hive_orders_bucketed_aggregation_output " +
                        "WHERE \"$bucket\" = 0",
                format("SELECT count(*) FROM orders WHERE orderkey %% %s = 0", outputBucketCount));

        dropTable("hive_test", "test_hive_orders_bucketed_aggregation_input");
        dropTable("hive_test", "test_hive_orders_bucketed_aggregation_output");
    }

    @Test
    public void testBucketedTableWriteJoin()
    {
        // join on a bucket key and write to a bucketed table
        // same bucket count
        testBucketedTableWriteJoin(getSession(), 8, 8, 8);
        for (Session testSession : getTestCompatibleBucketCountSessions()) {
            // incompatible bucket count
            testBucketedTableWriteJoin(testSession, 7, 13, 17);
            testBucketedTableWriteJoin(testSession, 13, 7, 17);
            testBucketedTableWriteJoin(testSession, 7, 7, 17);
            // compatible bucket count
            testBucketedTableWriteJoin(testSession, 4, 4, 8);
            testBucketedTableWriteJoin(testSession, 8, 8, 4);
            testBucketedTableWriteJoin(testSession, 4, 8, 8);
            testBucketedTableWriteJoin(testSession, 8, 4, 8);
            testBucketedTableWriteJoin(testSession, 4, 8, 4);
            testBucketedTableWriteJoin(testSession, 8, 4, 4);
        }
    }

    private void testBucketedTableWriteJoin(Session session, int firstInputBucketCount, int secondInputBucketCount, int outputBucketCount)
    {
        assertUpdate(
                session,
                format("CREATE TABLE hive.hive_test.test_hive_orders_bucketed_join_input_1 WITH (bucketed_by=array['orderkey'], bucket_count=%s) AS " +
                        "SELECT orderkey, custkey, orderstatus, totalprice, orderdate, orderpriority, clerk, shippriority, comment " +
                        "FROM orders_bucketed", firstInputBucketCount),
                15000);

        assertUpdate(
                session,
                format("CREATE TABLE hive.hive_test.test_hive_orders_bucketed_join_input_2 WITH (bucketed_by=array['orderkey'], bucket_count=%s) AS " +
                        "SELECT orderkey, custkey, orderstatus, totalprice, orderdate, orderpriority, clerk, shippriority, comment " +
                        "FROM orders_bucketed", secondInputBucketCount),
                15000);

        assertUpdate(
                session,
                format("CREATE TABLE hive.hive_test.test_hive_orders_bucketed_join_output WITH (bucketed_by=array['orderkey'], bucket_count=%s) AS " +
                                "SELECT  first.orderkey, second.totalprice " +
                                "FROM hive.hive_test.test_hive_orders_bucketed_join_input_1 first " +
                                "INNER JOIN hive.hive_test.test_hive_orders_bucketed_join_input_2 second " +
                                "ON first.orderkey = second.orderkey ",
                        outputBucketCount),
                15000);
        assertQuery(
                session,
                "SELECT count(*) " +
                        "FROM hive.hive_test.test_hive_orders_bucketed_join_output " +
                        "WHERE \"$bucket\" = 0",
                format("SELECT count(*) FROM orders WHERE orderkey %% %s = 0", outputBucketCount));

        dropTable("hive_test", "test_hive_orders_bucketed_join_input_1");
        dropTable("hive_test", "test_hive_orders_bucketed_join_input_2");
        dropTable("hive_test", "test_hive_orders_bucketed_join_output");
    }

    private void dropTable(String schema, String table)
    {
        ((PrestoSparkQueryRunner) getQueryRunner()).getMetastore().dropTable(METASTORE_CONTEXT, schema, table, true);
    }

    @Test
    public void testAggregation()
    {
        assertQuery("select partkey, count(*) c from lineitem where partkey % 10 = 1 group by partkey having count(*) = 42");
    }

    @Test
    public void testBucketedAggregation()
    {
        assertBucketedQuery("SELECT orderkey, count(*) c FROM lineitem_bucketed WHERE partkey % 10 = 1 GROUP BY orderkey");
    }

    @Test
    public void testJoin()
    {
        assertQuery("SELECT l.orderkey, l.linenumber, p.brand " +
                "FROM lineitem l, part p " +
                "WHERE l.partkey = p.partkey");
    }

    @Test
    public void testBucketedJoin()
    {
        // both tables are bucketed
        assertBucketedQuery("SELECT l.orderkey, l.linenumber, o.orderstatus " +
                "FROM lineitem_bucketed l " +
                "JOIN orders_bucketed o " +
                "ON l.orderkey = o.orderkey " +
                "WHERE l.orderkey % 223 = 42 AND l.linenumber = 4 and o.orderstatus = 'O'");

        // only probe side table is bucketed
        assertBucketedQuery("SELECT l.orderkey, l.linenumber, o.orderstatus " +
                "FROM lineitem_bucketed l " +
                "JOIN orders o " +
                "ON l.orderkey = o.orderkey " +
                "WHERE l.orderkey % 223 = 42 AND l.linenumber = 4 and o.orderstatus = 'O'");

        // only build side table is bucketed
        assertBucketedQuery("SELECT l.orderkey, l.linenumber, o.orderstatus " +
                "FROM lineitem l " +
                "JOIN orders_bucketed o " +
                "ON l.orderkey = o.orderkey " +
                "WHERE l.orderkey % 223 = 42 AND l.linenumber = 4 and o.orderstatus = 'O'");

        // different number of buckets
        assertUpdate("create table if not exists hive.hive_test.bucketed_nation_for_join_4 " +
                        "WITH (bucket_count = 4, bucketed_by = ARRAY['nationkey']) as select * from nation",
                25);
        assertUpdate("create table if not exists hive.hive_test.bucketed_nation_for_join_8 " +
                        "WITH (bucket_count = 8, bucketed_by = ARRAY['nationkey']) as select * from nation",
                25);

        for (Session session : getTestCompatibleBucketCountSessions()) {
            String expected = "SELECT * FROM nation first " +
                    "INNER JOIN nation second " +
                    "ON first.nationkey = second.nationkey";
            assertQuery(
                    session,
                    "SELECT * FROM hive.hive_test.bucketed_nation_for_join_4 first " +
                            "INNER JOIN hive.hive_test.bucketed_nation_for_join_8 second " +
                            "ON first.nationkey = second.nationkey",
                    expected);
            assertQuery(
                    session,
                    "SELECT * FROM hive.hive_test.bucketed_nation_for_join_8 first " +
                            "INNER JOIN hive.hive_test.bucketed_nation_for_join_4 second " +
                            "ON first.nationkey = second.nationkey",
                    expected);

            expected = "SELECT * FROM nation first " +
                    "INNER JOIN nation second " +
                    "ON first.nationkey = second.nationkey " +
                    "INNER JOIN nation third " +
                    "ON second.nationkey = third.nationkey";

            assertQuery(
                    session,
                    "SELECT * FROM hive.hive_test.bucketed_nation_for_join_4 first " +
                            "INNER JOIN hive.hive_test.bucketed_nation_for_join_8 second " +
                            "ON first.nationkey = second.nationkey " +
                            "INNER JOIN nation third " +
                            "ON second.nationkey = third.nationkey",
                    expected);
            assertQuery(
                    session,
                    "SELECT * FROM hive.hive_test.bucketed_nation_for_join_8 first " +
                            "INNER JOIN hive.hive_test.bucketed_nation_for_join_4 second " +
                            "ON first.nationkey = second.nationkey " +
                            "INNER JOIN nation third " +
                            "ON second.nationkey = third.nationkey",
                    expected);
        }
    }

    private List<Session> getTestCompatibleBucketCountSessions()
    {
        return ImmutableList.of(
                Session.builder(getSession())
                        .setSystemProperty(PARTIAL_MERGE_PUSHDOWN_STRATEGY, PUSH_THROUGH_LOW_MEMORY_OPERATORS.name())
                        .build(),
                Session.builder(getSession())
                        .setCatalogSessionProperty("hive", "optimize_mismatched_bucket_count", "true")
                        .build());
    }

    @Test
    public void testJoinUnderUnionALL()
    {
        assertUpdate("create table if not exists hive.hive_test.partitioned_nation_10 " +
                        "WITH (bucket_count = 10, bucketed_by = ARRAY['nationkey']) as select * from nation",
                25);
        assertUpdate("create table if not exists hive.hive_test.partitioned_nation_20 " +
                        "WITH (bucket_count = 20, bucketed_by = ARRAY['nationkey']) as select * from nation",
                25);
        assertUpdate("create table if not exists hive.hive_test.partitioned_nation_30 " +
                        "WITH (bucket_count = 30, bucketed_by = ARRAY['nationkey']) as select * from nation",
                25);

        assertQuery("SELECT hive.hive_test.partitioned_nation_10.nationkey " +
                        "FROM hive.hive_test.partitioned_nation_10 " +
                        "JOIN hive.hive_test.partitioned_nation_20 " +
                        "  ON hive.hive_test.partitioned_nation_10.nationkey = hive.hive_test.partitioned_nation_20.nationkey " +
                        "UNION ALL " +
                        "SELECT hive.hive_test.partitioned_nation_10.nationkey " +
                        "FROM hive.hive_test.partitioned_nation_10 " +
                        "JOIN hive.hive_test.partitioned_nation_30 " +
                        "  ON hive.hive_test.partitioned_nation_10.nationkey = hive.hive_test.partitioned_nation_30.nationkey ",
                "SELECT m.nationkey " +
                        "FROM nation m " +
                        "JOIN nation n " +
                        "  ON m.nationkey = n.nationkey " +
                        "UNION ALL " +
                        "SELECT m.nationkey " +
                        "FROM nation m " +
                        "JOIN nation n " +
                        "  ON m.nationkey = n.nationkey");

        assertQuery("SELECT nationkey " +
                        "FROM nation " +
                        "UNION ALL " +
                        "SELECT hive.hive_test.partitioned_nation_10.nationkey " +
                        "FROM hive.hive_test.partitioned_nation_10 " +
                        "JOIN hive.hive_test.partitioned_nation_30 " +
                        "  ON hive.hive_test.partitioned_nation_10.nationkey = hive.hive_test.partitioned_nation_30.nationkey ",
                "SELECT nationkey " +
                        "FROM nation " +
                        "UNION ALL " +
                        "SELECT m.nationkey " +
                        "FROM nation m " +
                        "JOIN nation n " +
                        "  ON m.nationkey = n.nationkey");
    }

    @Test
    public void testAggregationUnderUnionAll()
    {
        assertQuery("SELECT orderkey, 1 FROM orders UNION ALL SELECT orderkey, count(*) FROM orders GROUP BY 1",
                "SELECT orderkey, 1 FROM orders UNION ALL SELECT orderkey, count(*) FROM orders GROUP BY orderkey");

        assertQuery("SELECT " +
                        "   o.regionkey, " +
                        "   l.orderkey " +
                        "FROM (" +
                        "   SELECT " +
                        "       * " +
                        "   FROM lineitem " +
                        "   WHERE" +
                        "       linenumber = 4" +
                        ") l " +
                        "CROSS JOIN (" +
                        "   SELECT" +
                        "       regionkey," +
                        "       1 " +
                        "   FROM nation " +
                        "   UNION ALL " +
                        "   SELECT" +
                        "       regionkey," +
                        "       count(*) " +
                        "   FROM nation " +
                        "       GROUP BY regionkey" +
                        ") o",
                "SELECT " +
                        "   o.regionkey, " +
                        "   l.orderkey " +
                        "FROM (" +
                        "   SELECT " +
                        "       * " +
                        "   FROM lineitem " +
                        "   WHERE" +
                        "       linenumber = 4" +
                        ") l " +
                        "CROSS JOIN (" +
                        "   SELECT" +
                        "       regionkey," +
                        "       1 " +
                        "   FROM nation " +
                        "   UNION ALL " +
                        "   SELECT" +
                        "       regionkey," +
                        "       count(*) " +
                        "   FROM nation " +
                        "       GROUP BY regionkey" +
                        ") o");
    }

    @Test
    public void testCrossJoin()
    {
        assertQuery("" +
                "SELECT o.custkey, l.orderkey " +
                "FROM (SELECT * FROM lineitem WHERE linenumber = 4) l " +
                "CROSS JOIN (SELECT * FROM orders WHERE orderkey = 5) o");
        assertQuery("" +
                "SELECT o.custkey, l.orderkey " +
                "FROM (SELECT * FROM lineitem  WHERE linenumber = 4) l " +
                "CROSS JOIN (" +
                "   SELECT * FROM orders WHERE orderkey = 5 " +
                "   UNION ALL " +
                "   SELECT * FROM orders WHERE orderkey = 5 " +
                ") o");

        assertUpdate("create table if not exists hive.hive_test.partitioned_nation_11 " +
                        "WITH (bucket_count = 11, bucketed_by = ARRAY['nationkey']) as select * from nation",
                25);
        assertUpdate("create table if not exists hive.hive_test.partitioned_nation_22 " +
                        "WITH (bucket_count = 22, bucketed_by = ARRAY['nationkey']) as select * from nation",
                25);
        assertUpdate("create table if not exists hive.hive_test.partitioned_nation_33 " +
                        "WITH (bucket_count = 33, bucketed_by = ARRAY['nationkey']) as select * from nation",
                25);

        // UNION ALL over aggregation
        assertQuery("SELECT o.orderkey, l.orderkey " +
                        "FROM (SELECT * FROM lineitem  WHERE linenumber = 4) l " +
                        "CROSS JOIN (" +
                        "   SELECT orderkey, 1 FROM orders WHERE orderkey = 5 " +
                        "   UNION ALL " +
                        "   SELECT orderkey, count(*) " +
                        "       FROM orders WHERE orderkey = 5 " +
                        "   GROUP BY 1 " +
                        "   ) o",
                "SELECT o.orderkey, l.orderkey " +
                        "FROM (SELECT * FROM lineitem  WHERE linenumber = 4) l " +
                        "CROSS JOIN (" +
                        "   SELECT orderkey, 1 FROM orders WHERE orderkey = 5 " +
                        "   UNION ALL " +
                        "   SELECT orderkey, count(*) " +
                        "       FROM orders WHERE orderkey = 5 " +
                        "   GROUP BY orderkey " +
                        "   ) o");

        // 22 buckets UNION ALL 11 buckets
        assertQuery("SELECT o.regionkey, l.orderkey " +
                        "FROM (SELECT * FROM lineitem  WHERE linenumber = 4) l " +
                        "CROSS JOIN (" +
                        "   SELECT * FROM hive.hive_test.partitioned_nation_22 " +
                        "   UNION ALL " +
                        "   SELECT * FROM hive.hive_test.partitioned_nation_11 " +
                        ") o",
                "SELECT o.regionkey, l.orderkey " +
                        "FROM (SELECT * FROM lineitem  WHERE linenumber = 4) l " +
                        "CROSS JOIN (" +
                        "   SELECT * FROM nation " +
                        "   UNION ALL " +
                        "   SELECT * FROM nation" +
                        ") o");

        // 11 buckets UNION ALL 22 buckets
        assertQuery("SELECT o.regionkey, l.orderkey " +
                        "FROM (SELECT * FROM lineitem  WHERE linenumber = 4) l " +
                        "CROSS JOIN (" +
                        "   SELECT * FROM hive.hive_test.partitioned_nation_11 " +
                        "   UNION ALL " +
                        "   SELECT * FROM hive.hive_test.partitioned_nation_22 " +
                        ") o",
                "SELECT o.regionkey, l.orderkey " +
                        "FROM (SELECT * FROM lineitem  WHERE linenumber = 4) l " +
                        "CROSS JOIN (" +
                        "   SELECT * FROM nation " +
                        "   UNION ALL " +
                        "   SELECT * FROM nation" +
                        ") o");

        // bucketed UNION ALL non-bucketed
        assertQuery("SELECT o.regionkey, l.orderkey " +
                        "FROM (SELECT * FROM lineitem  WHERE linenumber = 4) l " +
                        "CROSS JOIN (" +
                        "   SELECT * FROM hive.hive_test.partitioned_nation_11 " +
                        "   UNION ALL " +
                        "   SELECT * FROM nation " +
                        ") o",
                "SELECT o.regionkey, l.orderkey " +
                        "FROM (SELECT * FROM lineitem  WHERE linenumber = 4) l " +
                        "CROSS JOIN (" +
                        "   SELECT * FROM nation " +
                        "   UNION ALL " +
                        "   SELECT * FROM nation" +
                        ") o");

        // non-bucketed UNION ALL bucketed
        assertQuery("SELECT o.regionkey, l.orderkey " +
                        "FROM (SELECT * FROM lineitem  WHERE linenumber = 4) l " +
                        "CROSS JOIN (" +
                        "   SELECT * FROM nation " +
                        "   UNION ALL " +
                        "   SELECT * FROM hive.hive_test.partitioned_nation_11 " +
                        ") o",
                "SELECT o.regionkey, l.orderkey " +
                        "FROM (SELECT * FROM lineitem  WHERE linenumber = 4) l " +
                        "CROSS JOIN (" +
                        "   SELECT * FROM nation " +
                        "   UNION ALL " +
                        "   SELECT * FROM nation" +
                        ") o");

        // 11 buckets UNION ALL non-bucketed UNION ALL 22 buckets
        assertQuery("SELECT o.regionkey, l.orderkey " +
                        "FROM (SELECT * FROM lineitem  WHERE linenumber = 4) l " +
                        "CROSS JOIN (" +
                        "   SELECT * FROM hive.hive_test.partitioned_nation_11 " +
                        "   UNION ALL " +
                        "   SELECT * FROM nation " +
                        "   UNION ALL " +
                        "   SELECT * FROM hive.hive_test.partitioned_nation_22 " +
                        ") o",
                "SELECT o.regionkey, l.orderkey " +
                        "FROM (SELECT * FROM lineitem  WHERE linenumber = 4) l " +
                        "CROSS JOIN (" +
                        "   SELECT * FROM nation " +
                        "   UNION ALL " +
                        "   SELECT * FROM nation" +
                        "   UNION ALL " +
                        "   SELECT * FROM nation" +
                        ") o");
    }

    @Test
    public void testNWayJoin()
    {
        assertQuery("SELECT l.orderkey, l.linenumber, p1.brand, p2.brand, p3.brand, p4.brand, p5.brand, p6.brand " +
                "FROM lineitem l, part p1, part p2, part p3, part p4, part p5, part p6 " +
                "WHERE l.partkey = p1.partkey " +
                "AND l.partkey = p2.partkey " +
                "AND l.partkey = p3.partkey " +
                "AND l.partkey = p4.partkey " +
                "AND l.partkey = p5.partkey " +
                "AND l.partkey = p6.partkey");
    }

    @Test
    public void testBucketedNWayJoin()
    {
        // all tables are bucketed
        assertBucketedQuery("SELECT l.orderkey, l.linenumber, o1.orderstatus, o2.orderstatus, o3.orderstatus, o4.orderstatus, o5.orderstatus, o6.orderstatus " +
                "FROM lineitem_bucketed l, orders_bucketed o1, orders_bucketed o2, orders_bucketed o3, orders_bucketed o4, orders_bucketed o5, orders_bucketed o6 " +
                "WHERE l.orderkey = o1.orderkey " +
                "AND l.orderkey = o2.orderkey " +
                "AND l.orderkey = o3.orderkey " +
                "AND l.orderkey = o4.orderkey " +
                "AND l.orderkey = o5.orderkey " +
                "AND l.orderkey = o6.orderkey");

        // some tables are bucketed
        assertBucketedQuery("SELECT l.orderkey, l.linenumber, o1.orderstatus, o2.orderstatus, o3.orderstatus, o4.orderstatus, o5.orderstatus, o6.orderstatus " +
                "FROM lineitem_bucketed l, orders o1, orders_bucketed o2, orders o3, orders_bucketed o4, orders o5, orders_bucketed o6 " +
                "WHERE l.orderkey = o1.orderkey " +
                "AND l.orderkey = o2.orderkey " +
                "AND l.orderkey = o3.orderkey " +
                "AND l.orderkey = o4.orderkey " +
                "AND l.orderkey = o5.orderkey " +
                "AND l.orderkey = o6.orderkey");
        assertBucketedQuery("SELECT l.orderkey, l.linenumber, o1.orderstatus, o2.orderstatus, o3.orderstatus, o4.orderstatus, o5.orderstatus, o6.orderstatus " +
                "FROM lineitem l, orders o1, orders_bucketed o2, orders o3, orders_bucketed o4, orders o5, orders_bucketed o6 " +
                "WHERE l.orderkey = o1.orderkey " +
                "AND l.orderkey = o2.orderkey " +
                "AND l.orderkey = o3.orderkey " +
                "AND l.orderkey = o4.orderkey " +
                "AND l.orderkey = o5.orderkey " +
                "AND l.orderkey = o6.orderkey");
    }

    @Test
    public void testUnionAll()
    {
        assertQuery("SELECT * FROM orders UNION ALL SELECT * FROM orders");

        assertBucketedQuery("SELECT * FROM lineitem_bucketed UNION ALL SELECT * FROM lineitem_bucketed");

        assertBucketedQuery("SELECT * FROM lineitem UNION ALL SELECT * FROM lineitem_bucketed");

        assertBucketedQuery("SELECT * FROM lineitem_bucketed UNION ALL SELECT * FROM lineitem");
    }

    @Test
    public void testBucketedUnionAll()
    {
        // all tables bucketed
        assertBucketedQuery("" +
                "SELECT orderkey, count(*) c " +
                "FROM (" +
                "   SELECT * FROM lineitem_bucketed " +
                "   UNION ALL " +
                "   SELECT * FROM lineitem_bucketed" +
                "   UNION ALL " +
                "   SELECT * FROM lineitem_bucketed" +
                ") GROUP BY orderkey");

        // some tables bucketed
        assertBucketedQuery("" +
                "SELECT orderkey, count(*) c " +
                "FROM (" +
                "   SELECT * FROM lineitem_bucketed " +
                "   UNION ALL " +
                "   SELECT * FROM lineitem" +
                "   UNION ALL " +
                "   SELECT * FROM lineitem_bucketed" +
                ") GROUP BY orderkey");
        assertBucketedQuery("" +
                "SELECT orderkey, count(*) c " +
                "FROM (" +
                "   SELECT * FROM lineitem " +
                "   UNION ALL " +
                "   SELECT * FROM lineitem_bucketed" +
                "   UNION ALL " +
                "   SELECT * FROM lineitem" +
                ") GROUP BY orderkey");
        assertBucketedQuery("" +
                "SELECT orderkey, count(*) c " +
                "FROM (" +
                "   SELECT * FROM lineitem " +
                "   UNION ALL " +
                "   SELECT * FROM lineitem_bucketed" +
                ") GROUP BY orderkey");
    }

    @Test
    public void testValues()
    {
        assertQuery("SELECT a, b " +
                "FROM (VALUES (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd')) t1 (a, b) ");
    }

    @Test
    public void testUnionWithAggregationAndJoin()
    {
        assertQuery(
                "SELECT * FROM ( " +
                        "SELECT orderkey, count(*) FROM (" +
                        "   SELECT orderdate ds, orderkey FROM orders " +
                        "   UNION ALL " +
                        "   SELECT shipdate ds, orderkey FROM lineitem) a " +
                        "GROUP BY orderkey) t " +
                        "JOIN orders o " +
                        "ON (o.orderkey = t.orderkey)");
    }

    @Test
    public void testFailures()
    {
        assertQueryFails("SELECT * FROM orders WHERE custkey / (orderkey - orderkey) = 0", "/ by zero");
        assertQueryFails(
                "CREATE TABLE hive.hive_test.hive_orders_test_failures AS " +
                        "(SELECT orderkey, custkey, orderstatus, totalprice, orderdate, orderpriority, clerk, shippriority, comment " +
                        "FROM orders) " +
                        "UNION ALL " +
                        "(SELECT orderkey, custkey, orderstatus, totalprice, orderdate, orderpriority, clerk, shippriority, comment " +
                        "FROM orders " +
                        "WHERE custkey / (orderkey - orderkey) = 0 )",
                "/ by zero");
    }

    @Test
    public void testSelectFromEmptyTable()
    {
        assertUpdate(
                "CREATE TABLE hive.hive_test.empty_orders AS " +
                        "SELECT orderkey, custkey, orderstatus, totalprice, orderdate, orderpriority, clerk, shippriority, comment " +
                        "FROM orders " +
                        "WITH NO DATA",
                0);

        assertQuery(
                "SELECT count(*) FROM hive.hive_test.empty_orders",
                "SELECT 0");
    }

    @Test
    public void testSelectFromEmptyBucketedTable()
    {
        assertUpdate(
                "CREATE TABLE hive.hive_test.empty_orders_bucketed WITH (bucketed_by=array['orderkey'], bucket_count=11) AS " +
                        "SELECT orderkey, custkey, orderstatus, totalprice, orderdate, orderpriority, clerk, shippriority, comment " +
                        "FROM orders " +
                        "WITH NO DATA",
                0);

        assertQuery(
                "SELECT count(*) FROM (SELECT orderkey, count(*) FROM hive.hive_test.empty_orders_bucketed GROUP BY orderkey)",
                "SELECT 0");
    }

    @Test
    public void testLimit()
    {
        MaterializedResult actual = computeActual("SELECT * FROM orders LIMIT 10");
        assertEquals(actual.getRowCount(), 10);
        actual = computeActual("SELECT 'a' FROM orders LIMIT 10");
        assertEquals(actual.getRowCount(), 10);
    }

    @Test
    public void testTableSampleSystem()
    {
        long totalRows = (Long) computeActual("SELECT count(*) FROM orders").getOnlyValue();
        long sampledRows = (Long) computeActual("SELECT count(*) FROM orders TABLESAMPLE SYSTEM (1)").getOnlyValue();
        assertThat(sampledRows).isLessThan(totalRows);
    }

    @Test(enabled = false)
    public void testTimeouts()
    {
        // Expected run time for this query is ~30s
        String longRunningCrossJoin = "SELECT count(l1.orderkey), count(l2.orderkey) FROM lineitem l1, lineitem l2";

        Session queryMaxRunTimeLimitSession = Session.builder(getSession())
                .setSystemProperty("query_max_run_time", "2s")
                .build();
        assertQueryFails(queryMaxRunTimeLimitSession, longRunningCrossJoin, "Query exceeded maximum time limit of 2.00s");

        Session queryMaxExecutionTimeLimitSession = Session.builder(getSession())
                .setSystemProperty("query_max_run_time", "3s")
                .setSystemProperty("query_max_execution_time", "2s")
                .build();
        assertQueryFails(queryMaxExecutionTimeLimitSession, longRunningCrossJoin, "Query exceeded maximum time limit of 2.00s");

        // Test whether waitTime is being considered while computing timeouts.
        // Expected run time for this query is ~30s. We will set `dummyServiceWaitTime` as 600s.
        // The timeout logic will set the timeout for the query as 605s (Actual timeout + waitTime)
        // and the query should succeed. This is a bit hacky way to check whether service waitTime
        // is added to the deadline time while submitting jobs
        Set<PrestoSparkServiceWaitTimeMetrics> waitTimeMetrics = ((PrestoSparkQueryRunner) getQueryRunner()).getWaitTimeMetrics();
        PrestoSparkTestingServiceWaitTimeMetrics testingServiceWaitTimeMetrics = (PrestoSparkTestingServiceWaitTimeMetrics) waitTimeMetrics.stream()
                .filter(metric -> metric instanceof PrestoSparkTestingServiceWaitTimeMetrics)
                .findFirst().get();
        testingServiceWaitTimeMetrics.setWaitTime(new Duration(600, SECONDS));
        queryMaxRunTimeLimitSession = Session.builder(getSession())
                .setSystemProperty("query_max_execution_time", "5s")
                .build();
        assertQuerySucceeds(queryMaxRunTimeLimitSession, longRunningCrossJoin);
        testingServiceWaitTimeMetrics.setWaitTime(new Duration(0, SECONDS));
    }

    @Test
    public void testDiskBasedBroadcastJoin()
    {
        Session session = Session.builder(getSession())
                .setSystemProperty(JOIN_DISTRIBUTION_TYPE, "BROADCAST")
                .setSystemProperty(STORAGE_BASED_BROADCAST_JOIN_ENABLED, "true")
                .build();

        assertQuery(session,
                "select * from lineitem l join orders o on l.orderkey = o.orderkey");

        assertQuery(session,
                "select l.orderkey from lineitem l join orders o on l.orderkey = o.orderkey " +
                        "Union all " +
                        "SELECT m.nationkey FROM nation m JOIN nation n  ON m.nationkey = n.nationkey");

        assertQuery(session,
                "SELECT o.custkey, l.orderkey " +
                        "FROM (SELECT * FROM lineitem WHERE linenumber = 4) l " +
                        "CROSS JOIN (SELECT * FROM orders WHERE orderkey = 5) o");

        assertQuery(session,
                "WITH broadcast_table1 AS ( " +
                        "    SELECT " +
                        "        * " +
                        "    FROM lineitem " +
                        "    WHERE " +
                        "        linenumber = 1 " +
                        ")," +
                        "broadcast_table2 AS ( " +
                        "    SELECT " +
                        "        * " +
                        "    FROM lineitem " +
                        "    WHERE " +
                        "        linenumber = 2 " +
                        ")," +
                        "broadcast_table3 AS ( " +
                        "    SELECT " +
                        "        * " +
                        "    FROM lineitem " +
                        "    WHERE " +
                        "        linenumber = 3 " +
                        ")," +
                        "broadcast_table4 AS ( " +
                        "    SELECT " +
                        "        * " +
                        "    FROM lineitem " +
                        "    WHERE " +
                        "        linenumber = 4 " +
                        ")" +
                        "SELECT " +
                        "    * " +
                        "FROM broadcast_table1 a " +
                        "JOIN broadcast_table2 b " +
                        "    ON a.orderkey = b.orderkey " +
                        "JOIN broadcast_table3 c " +
                        "    ON a.orderkey = c.orderkey " +
                        "JOIN broadcast_table4 d " +
                        "    ON a.orderkey = d.orderkey");
    }

    @Test
    public void testStorageBasedBroadcastJoinMaxThreshold()
    {
        Session session = Session.builder(getSession())
                .setSystemProperty(JOIN_DISTRIBUTION_TYPE, "BROADCAST")
                .setSystemProperty(STORAGE_BASED_BROADCAST_JOIN_ENABLED, "true")
                .setSystemProperty(QUERY_MAX_TOTAL_MEMORY_PER_NODE, "1MB")
                .build();

        assertQueryFails(
                session,
                "select * from lineitem l join orders o on l.orderkey = o.orderkey",
                "Query exceeded per-node broadcast memory limit of 1MB \\[Broadcast size: .*MB\\]");
    }

    @Test
    public void testStorageBasedBroadcastJoinDeserializedMaxThreshold()
    {
        Session session = Session.builder(getSession())
                .setSystemProperty(JOIN_DISTRIBUTION_TYPE, "BROADCAST")
                .setSystemProperty(STORAGE_BASED_BROADCAST_JOIN_ENABLED, "true")
                .setSystemProperty(SPARK_BROADCAST_JOIN_MAX_MEMORY_OVERRIDE, "1MB")
                .setSystemProperty(QUERY_MAX_TOTAL_MEMORY_PER_NODE, "100MB")
                .build();

        assertQueryFails(
                session,
                "select * from lineitem l join orders o on l.orderkey = o.orderkey",
                "Query exceeded per-node broadcast memory limit of 1MB \\[Broadcast size: .*MB\\]");
    }

    @Test
    public void testCorrectErrorMessageWhenSubPlanCreationFails()
    {
        String query = "with l as (" +
                "select * from lineitem UNION ALL select * from lineitem UNION ALL select * from lineitem" +
                "), " +
                "o as (" +
                "select * from orders UNION ALL select * from orders UNION ALL select * from orders" +
                ") " +
                "select * from l right outer join o on l.orderkey = o.orderkey";

        Session session = Session.builder(getSession())
                .setSystemProperty(JOIN_DISTRIBUTION_TYPE, "partitioned")
                .setSystemProperty(HASH_PARTITION_COUNT, "1")
                .setSystemProperty(QUERY_MAX_TOTAL_MEMORY_PER_NODE, "10MB")
                .setSystemProperty(QUERY_MAX_MEMORY, "100MB")
                .setSystemProperty(VERBOSE_EXCEEDED_MEMORY_LIMIT_ERRORS_ENABLED, "true")
                .setSystemProperty(SPARK_RETRY_ON_OUT_OF_MEMORY_HIGHER_PARTITION_COUNT_ENABLED, "false")
                .setSystemProperty(QUERY_MAX_STAGE_COUNT, "2")
                .build();
        assertQueryFails(session,
                query,
                "Number of stages in the query.* exceeds the allowed maximum.*");
    }

    @Test
    public void testRetryWithHigherHashPartitionCount()
    {
        String query = "with l as (" +
                "select * from lineitem UNION ALL select * from lineitem UNION ALL select * from lineitem" +
                "), " +
                "o as (" +
                "select * from orders UNION ALL select * from orders UNION ALL select * from orders" +
                ") " +
                "select * from l right outer join o on l.orderkey = o.orderkey";

        Session session = Session.builder(getSession())
                .setSystemProperty(JOIN_DISTRIBUTION_TYPE, "partitioned")
                .setSystemProperty(HASH_PARTITION_COUNT, "1")
                .setSystemProperty(QUERY_MAX_TOTAL_MEMORY_PER_NODE, "6.5MB")
                .setSystemProperty(QUERY_MAX_MEMORY, "100MB")
                .setSystemProperty(VERBOSE_EXCEEDED_MEMORY_LIMIT_ERRORS_ENABLED, "true")
                .setSystemProperty(SPARK_RETRY_ON_OUT_OF_MEMORY_HIGHER_PARTITION_COUNT_ENABLED, "false")
                .build();
        assertQueryFails(session,
                query,
                "Query exceeded per-node total memory limit of .*Top Consumers: \\{HashBuilderOperator.*");

        session = Session.builder(getSession())
                .setSystemProperty(JOIN_DISTRIBUTION_TYPE, "partitioned")
                .setSystemProperty(HASH_PARTITION_COUNT, "1")
                .setSystemProperty(QUERY_MAX_TOTAL_MEMORY_PER_NODE, "6.5MB")
                .setSystemProperty(QUERY_MAX_MEMORY, "100MB")
                .setSystemProperty(VERBOSE_EXCEEDED_MEMORY_LIMIT_ERRORS_ENABLED, "true")
                .setSystemProperty(SPARK_RETRY_ON_OUT_OF_MEMORY_HIGHER_PARTITION_COUNT_ENABLED, "true")
                .setSystemProperty(SPARK_HASH_PARTITION_COUNT_SCALING_FACTOR_ON_OUT_OF_MEMORY, "1.0")
                .build();
        assertQueryFails(session,
                query,
                "Query exceeded per-node total memory limit of .*Top Consumers: \\{HashBuilderOperator.*");

        session = Session.builder(getSession())
                .setSystemProperty(JOIN_DISTRIBUTION_TYPE, "partitioned")
                .setSystemProperty(HASH_PARTITION_COUNT, "1")
                .setSystemProperty(QUERY_MAX_TOTAL_MEMORY_PER_NODE, "6.5MB")
                .setSystemProperty(QUERY_MAX_MEMORY, "100MB")
                .setSystemProperty(VERBOSE_EXCEEDED_MEMORY_LIMIT_ERRORS_ENABLED, "true")
                .setSystemProperty(SPARK_RETRY_ON_OUT_OF_MEMORY_HIGHER_PARTITION_COUNT_ENABLED, "true")
                .build();
        assertQuerySucceeds(session, query);

        session = Session.builder(getSession())
                .setSystemProperty(JOIN_DISTRIBUTION_TYPE, "partitioned")
                .setSystemProperty(HASH_PARTITION_COUNT, "1")
                .setSystemProperty(QUERY_MAX_TOTAL_MEMORY_PER_NODE, "6.5MB")
                .setSystemProperty(QUERY_MAX_MEMORY, "100MB")
                .setSystemProperty(VERBOSE_EXCEEDED_MEMORY_LIMIT_ERRORS_ENABLED, "true")
                .setSystemProperty(SPARK_RETRY_ON_OUT_OF_MEMORY_HIGHER_PARTITION_COUNT_ENABLED, "true")
                .setSystemProperty(SPARK_HASH_PARTITION_COUNT_SCALING_FACTOR_ON_OUT_OF_MEMORY, "2.0")
                .build();
        assertQuerySucceeds(session, query);
    }

    @Test
    public void testRetryOnOutOfMemoryBroadcastJoin()
    {
        Session session = Session.builder(getSession())
                .setSystemProperty(JOIN_DISTRIBUTION_TYPE, "BROADCAST")
                .setSystemProperty(STORAGE_BASED_BROADCAST_JOIN_ENABLED, "true")
                .setSystemProperty(SPARK_BROADCAST_JOIN_MAX_MEMORY_OVERRIDE, "10B")
                .setSystemProperty(SPARK_RETRY_ON_OUT_OF_MEMORY_BROADCAST_JOIN_ENABLED, "false")
                .build();

        // Query should fail with broadcast join OOM
        assertQueryFails(
                session,
                "select * from lineitem l join orders o on l.orderkey = o.orderkey",
                "Query exceeded per-node broadcast memory limit of 10B \\[Broadcast size: .*MB\\]");

        session = Session.builder(getSession())
                .setSystemProperty(JOIN_DISTRIBUTION_TYPE, "BROADCAST")
                .setSystemProperty(STORAGE_BASED_BROADCAST_JOIN_ENABLED, "true")
                .setSystemProperty(SPARK_BROADCAST_JOIN_MAX_MEMORY_OVERRIDE, "10B")
                .setSystemProperty(SPARK_RETRY_ON_OUT_OF_MEMORY_BROADCAST_JOIN_ENABLED, "true")
                .build();

        // Query should succeed since broadcast join will be disabled on retry
        assertQuery(
                session,
                "select * from lineitem l join orders o on l.orderkey = o.orderkey");
    }

    @Test
    public void testRetryOnOutOfMemoryWithIncreasedContainerSize()
    {
        Session session = Session.builder(getSession())
                .setSystemProperty(QUERY_MAX_MEMORY_PER_NODE, "2MB")
                .setSystemProperty(QUERY_MAX_TOTAL_MEMORY_PER_NODE, "2MB")
                .setSystemProperty(SPARK_RETRY_ON_OUT_OF_MEMORY_WITH_INCREASED_MEMORY_SETTINGS_ENABLED, "false")
                .build();

        // Query should fail with OOM
        assertQueryFails(
                session,
                "select * from lineitem l join orders o on l.orderkey = o.orderkey",
                ".*Query exceeded per-node .* memory limit of 2MB.*");

        session = Session.builder(getSession())
                .setSystemProperty(QUERY_MAX_MEMORY_PER_NODE, "2MB")
                .setSystemProperty(QUERY_MAX_TOTAL_MEMORY_PER_NODE, "2MB")
                .setSystemProperty(SPARK_RETRY_ON_OUT_OF_MEMORY_WITH_INCREASED_MEMORY_SETTINGS_ENABLED, "true")
                .setSystemProperty(OUT_OF_MEMORY_RETRY_PRESTO_SESSION_PROPERTIES, "query_max_memory_per_node=100MB,query_max_total_memory_per_node=100MB")
                .setSystemProperty(OUT_OF_MEMORY_RETRY_SPARK_CONFIGS, "spark.executor.memory=1G")
                .build();

        // Query should succeed since memory will be increased on retry
        assertQuery(
                session,
                "select * from lineitem l join orders o on l.orderkey = o.orderkey");
    }

    @Test
    public void testRetryOnOutOfMemoryWithIncreasedContainerSizeWithSessionPropertiesProvidedErrorCode()
    {
        Session session = Session.builder(getSession())
                .setSystemProperty(QUERY_MAX_MEMORY_PER_NODE, "2MB")
                .setSystemProperty(QUERY_MAX_TOTAL_MEMORY_PER_NODE, "2MB")
                .setSystemProperty(SPARK_RETRY_ON_OUT_OF_MEMORY_WITH_INCREASED_MEMORY_SETTINGS_ENABLED, "false")
                .build();

        // Query should fail with OOM
        assertQueryFails(
                session,
                "select * from lineitem l join orders o on l.orderkey = o.orderkey",
                ".*Query exceeded per-node .* memory limit of 2MB.*");

        session = Session.builder(getSession())
                .setSystemProperty(QUERY_MAX_MEMORY_PER_NODE, "2MB")
                .setSystemProperty(QUERY_MAX_TOTAL_MEMORY_PER_NODE, "2MB")
                .setSystemProperty(SPARK_RETRY_ON_OUT_OF_MEMORY_WITH_INCREASED_MEMORY_SETTINGS_ENABLED, "true")
                // Do not provide any error code. INCREASE_CONTAINER_SIZE strategy won't be applied
                .setSystemProperty(SPARK_RETRY_ON_OUT_OF_MEMORY_WITH_INCREASED_MEMORY_SETTINGS_ERROR_CODES, "")
                .setSystemProperty(OUT_OF_MEMORY_RETRY_PRESTO_SESSION_PROPERTIES, "query_max_memory_per_node=100MB,query_max_total_memory_per_node=100MB")
                .setSystemProperty(OUT_OF_MEMORY_RETRY_SPARK_CONFIGS, "spark.executor.memory=1G")
                .build();

        // Query should fail with OOM
        assertQueryFails(
                session,
                "select * from lineitem l join orders o on l.orderkey = o.orderkey",
                ".*Query exceeded per-node .* memory limit of 2MB.*");

        session = Session.builder(getSession())
                .setSystemProperty(QUERY_MAX_MEMORY_PER_NODE, "2MB")
                .setSystemProperty(QUERY_MAX_TOTAL_MEMORY_PER_NODE, "2MB")
                .setSystemProperty(SPARK_RETRY_ON_OUT_OF_MEMORY_WITH_INCREASED_MEMORY_SETTINGS_ENABLED, "true")
                // Support EXCEEDED_LOCAL_MEMORY_LIMIT as the retry error code. INCREASE_CONTAINER_SIZE strategy will be applied
                .setSystemProperty(SPARK_RETRY_ON_OUT_OF_MEMORY_WITH_INCREASED_MEMORY_SETTINGS_ERROR_CODES, "EXCEEDED_LOCAL_MEMORY_LIMIT")
                .setSystemProperty(OUT_OF_MEMORY_RETRY_PRESTO_SESSION_PROPERTIES, "query_max_memory_per_node=100MB,query_max_total_memory_per_node=100MB")
                .setSystemProperty(OUT_OF_MEMORY_RETRY_SPARK_CONFIGS, "spark.executor.memory=1G")
                .build();

        // Query should succeed since memory will be increased on retry
        assertQuery(
                session,
                "select * from lineitem l join orders o on l.orderkey = o.orderkey");
    }

    @Test
    public void testSmileSerialization()
    {
        String query = "SELECT * FROM nation";
        try (QueryRunner queryRunner = createHivePrestoSparkQueryRunner(ImmutableList.of(NATION), ImmutableMap.of("spark.smile-serialization-enabled", "true"))) {
            MaterializedResult actual = queryRunner.execute(query);
            assertEqualsIgnoreOrder(actual, computeExpected(query, actual.getTypes()));
        }
        try (QueryRunner queryRunner = createHivePrestoSparkQueryRunner(ImmutableList.of(NATION), ImmutableMap.of("spark.smile-serialization-enabled", "false"))) {
            MaterializedResult actual = queryRunner.execute(query);
            assertEqualsIgnoreOrder(actual, computeExpected(query, actual.getTypes()));
        }
    }

    @Test
    public void testIterativeSplitEnumeration()
    {
        for (int batchSize = 1; batchSize <= 8; batchSize *= 2) {
            Session session = Session.builder(getSession())
                    .setSystemProperty(SPARK_SPLIT_ASSIGNMENT_BATCH_SIZE, batchSize + "")
                    .build();

            assertQuery(session, "select partkey, count(*) c from lineitem where partkey % 10 = 1 group by partkey having count(*) = 42");

            assertQuery(session, "SELECT l.orderkey, l.linenumber, p.brand " +
                    "FROM lineitem l, part p " +
                    "WHERE l.partkey = p.partkey");
        }
    }

    @Test
    public void testDropTable()
    {
        assertUpdate(
                "CREATE TABLE hive.hive_test.hive_orders1 AS " +
                        "SELECT orderkey, custkey, orderstatus, totalprice, orderdate, orderpriority, clerk, shippriority, comment " +
                        "FROM orders", 15000);
        assertQuery("select count(*) from hive.hive_test.hive_orders1", "select 15000");
        assertQuerySucceeds("DROP TABLE hive.hive_test.hive_orders1");
        assertQueryFails("select count(*) from hive.hive_test.hive_orders1", ".*Table hive.hive_test.hive_orders1 does not exist");
    }

    @Test
    public void testCreateDropSchema()
    {
        assertQuerySucceeds("CREATE SCHEMA hive.hive_test_new");
        assertQuerySucceeds("CREATE TABLE  hive.hive_test_new.test (x bigint)");
        assertQueryFails("DROP SCHEMA hive.hive_test_new", "Schema not empty: hive_test_new");
        assertQuerySucceeds("DROP TABLE hive.hive_test_new.test");
        assertQuerySucceeds("ALTER SCHEMA hive.hive_test_new RENAME TO hive_test_new1");
        assertQueryFails("DROP SCHEMA hive.hive_test_new", ".* Schema 'hive.hive_test_new' does not exist");
        assertQuerySucceeds("DROP SCHEMA hive.hive_test_new1");
    }

    @Test
    public void testCreateAlterTable()
    {
        // create table with default format orc
        String createTableSql = "CREATE TABLE hive.hive_test.hive_orders_new (\n" +
                "   \"x\" bigint\n" +
                ")\n" +
                "WITH (\n" +
                "   format = 'ORC'\n" +
                ")";
        assertQuerySucceeds(createTableSql);
        MaterializedResult actual = computeActual("SHOW CREATE TABLE hive.hive_test.hive_orders_new");
        assertEquals(createTableSql, actual.getOnlyValue());
        assertQuerySucceeds("ALTER TABLE hive.hive_test.hive_orders_new RENAME TO hive.hive_test.hive_orders_new1");
        assertQueryFails("DROP TABLE hive.hive_test.hive_orders_new", ".* Table 'hive.hive_test.hive_orders_new' does not exist");
        assertQuerySucceeds("DROP TABLE hive.hive_test.hive_orders_new1");
    }

    @Test
    public void testCreateDropView()
    {
        // create table with default format orc
        String createViewSql = "CREATE VIEW hive.hive_test.hive_view SECURITY DEFINER AS\n" +
                "SELECT *\n" +
                "FROM\n" +
                "  orders";
        assertQuerySucceeds(createViewSql);
        MaterializedResult actual = computeActual("SHOW CREATE VIEW hive.hive_test.hive_view");
        assertEquals(createViewSql, actual.getOnlyValue());
        assertQuerySucceeds("DROP VIEW hive.hive_test.hive_view");
    }

    @Test
    public void testCreateExternalTable()
            throws Exception
    {
        File tempDir = createTempDir();
        File dataFile = new File(tempDir, "test.txt");
        Files.write("hello\nworld\n", dataFile, UTF_8);

        String createTableSql = format("" +
                        "CREATE TABLE %s.%s.test_create_external (\n" +
                        "   \"name\" varchar\n" +
                        ")\n" +
                        "WITH (\n" +
                        "   external_location = '%s',\n" +
                        "   format = 'TEXTFILE'\n" +
                        ")",
                getSession().getCatalog().get(),
                getSession().getSchema().get(),
                new Path(tempDir.toURI().toASCIIString()).toString());

        assertQuerySucceeds(createTableSql);
        MaterializedResult actual = computeActual("SHOW CREATE TABLE test_create_external");
        assertEquals(actual.getOnlyValue(), createTableSql);

        actual = computeActual("SELECT name FROM test_create_external");
        assertEquals(actual.getOnlyColumnAsSet(), ImmutableSet.of("hello", "world"));

        assertQuerySucceeds("DROP TABLE test_create_external");

        // file should still exist after drop
        assertFile(dataFile);

        deleteRecursively(tempDir.toPath(), ALLOW_INSECURE);
    }

    //
    @Test
    @Ignore("PrsetoSparkQueryRunner was changed to use hive user with admin priviliges. It breaks this tess assumptions." +
            "Update this test to not make assumptions")
    public void testGrants()
    {
        assertQuerySucceeds("CREATE SCHEMA hive.hive_test_new");
        assertQuerySucceeds("CREATE TABLE  hive.hive_test_new.test (x bigint)");

        // Grant user
        assertQuerySucceeds("GRANT SELECT,INSERT,DELETE,UPDATE ON hive.hive_test_new.test to user");
        MaterializedResult actual = computeActual("SHOW GRANTS ON TABLE hive.hive_test_new.test");
        // permissions are in the eighth field
        List<String> grants = actual.getMaterializedRows().stream().map(row -> row.getField(7).toString()).collect(Collectors.toList());
        assertEquals(Ordering.natural().sortedCopy(grants), ImmutableList.of("DELETE", "INSERT", "SELECT", "UPDATE"));

        // Revoke select,insert grants
        assertQuerySucceeds("REVOKE SELECT,INSERT ON hive.hive_test_new.test FROM user");
        actual = computeActual("SHOW GRANTS ON TABLE hive.hive_test_new.test");
        grants = actual.getMaterializedRows().stream().map(row -> row.getField(7).toString()).collect(Collectors.toList());
        assertEquals(Ordering.natural().sortedCopy(grants), ImmutableList.of("DELETE", "UPDATE"));

        assertQuerySucceeds("DROP TABLE hive.hive_test_new.test");
        assertQuerySucceeds("DROP SCHEMA hive.hive_test_new");
    }

    @Test
    @Ignore("PrsetoSparkQueryRunner was changed to use hive user with admin priviliges. It breaks this tess assumptions." +
            "Update this test to not make assumptions")
    public void testRoles()
    {
        assertQuerySucceeds("CREATE ROLE admin");
        assertQuerySucceeds("CREATE ROLE test_role");
        assertQuerySucceeds("GRANT test_role TO USER user");

        // Show Roles
        MaterializedResult actual = computeActual("SHOW ROLES");
        List<String> roles = actual.getMaterializedRows().stream().map(row -> row.getField(0).toString()).collect(Collectors.toList());
        assertEquals(Ordering.natural().sortedCopy(roles), ImmutableList.of("admin", "test_role"));

        // Show roles assigned to user
        actual = computeActual("SHOW ROLE GRANTS");
        roles = actual.getMaterializedRows().stream().map(row -> row.getField(0).toString()).collect(Collectors.toList());
        assertEquals(Ordering.natural().sortedCopy(roles), ImmutableList.of("public", "test_role"));

        // Revokes roles
        assertQuerySucceeds("REVOKE test_role FROM USER user");
        actual = computeActual("SHOW ROLE GRANTS");
        roles = actual.getMaterializedRows().stream().map(row -> row.getField(0).toString()).collect(Collectors.toList());
        assertEquals(Ordering.natural().sortedCopy(roles), ImmutableList.of("public"));

        assertQuerySucceeds("DROP ROLE test_role");
    }

    @Test
    public void testAddColumns()
    {
        assertQuerySucceeds("CREATE TABLE test_add_column (a bigint COMMENT 'test comment AAA')");
        assertQuerySucceeds("ALTER TABLE test_add_column ADD COLUMN b bigint COMMENT 'test comment BBB'");
        assertQueryFails("ALTER TABLE test_add_column ADD COLUMN a varchar", ".* Column 'a' already exists");
        assertQueryFails("ALTER TABLE test_add_column ADD COLUMN c bad_type", ".* Unknown type 'bad_type' for column 'c'");
        assertQuery("SHOW COLUMNS FROM test_add_column", "VALUES ('a', 'bigint', '', 'test comment AAA'), ('b', 'bigint', '', 'test comment BBB')");
        assertQuerySucceeds("DROP TABLE test_add_column");
    }

    @Test
    public void testRenameColumn()
    {
        String createTable = "" +
                "CREATE TABLE test_rename_column\n" +
                "WITH (\n" +
                "  partitioned_by = ARRAY ['orderstatus']\n" +
                ")\n" +
                "AS\n" +
                "SELECT orderkey, orderstatus FROM orders";

        assertUpdate(createTable, "SELECT count(*) FROM orders");
        assertQuerySucceeds("ALTER TABLE test_rename_column RENAME COLUMN orderkey TO new_orderkey");
        assertQuery("SELECT new_orderkey, orderstatus FROM test_rename_column", "SELECT orderkey, orderstatus FROM orders");
        assertQueryFails("ALTER TABLE test_rename_column RENAME COLUMN \"$path\" TO test", ".* Cannot rename hidden column");
        assertQueryFails("ALTER TABLE test_rename_column RENAME COLUMN orderstatus TO new_orderstatus", "Renaming partition columns is not supported");
        assertQuery("SELECT new_orderkey, orderstatus FROM test_rename_column", "SELECT orderkey, orderstatus FROM orders");
        assertQuerySucceeds("DROP TABLE test_rename_column");
    }

    @Test
    public void testDropColumn()
    {
        assertQueryFails("DROP TABLE hive.hive_test.hive_orders_new", ".* Table 'hive.hive_test.hive_orders_new' does not exist");
        String createTable = "" +
                "CREATE TABLE test_drop_column\n" +
                "WITH (\n" +
                "  partitioned_by = ARRAY ['orderstatus']\n" +
                ")\n" +
                "AS\n" +
                "SELECT custkey, orderkey, orderstatus FROM orders";

        assertUpdate(createTable, "SELECT count(*) FROM orders");
        assertQuery("SELECT orderkey, orderstatus FROM test_drop_column", "SELECT orderkey, orderstatus FROM orders");

        assertQueryFails("ALTER TABLE test_drop_column DROP COLUMN \"$path\"", ".* Cannot drop hidden column");
        assertQueryFails("ALTER TABLE test_drop_column DROP COLUMN orderstatus", "Cannot drop partition columns");
        assertQuerySucceeds("ALTER TABLE test_drop_column DROP COLUMN orderkey");
        assertQueryFails("ALTER TABLE test_drop_column DROP COLUMN custkey", "Cannot drop the only non-partition column in a table");
        assertQuery("SELECT * FROM test_drop_column", "SELECT custkey, orderstatus FROM orders");

        assertQuerySucceeds("DROP TABLE test_drop_column");
    }

    @Test
    public void testCreateType()
    {
        assertQuerySucceeds("CREATE TYPE unittest.memory.num AS integer");
        assertQuerySucceeds("CREATE TYPE unittest.memory.pair AS (fst integer, snd integer)");
        assertQuerySucceeds("CREATE TYPE unittest.memory.pair3 AS (fst unittest.memory.pair, snd integer)");
        assertQuery("SELECT p.fst.fst FROM(SELECT CAST(ROW(CAST(ROW(1,2) AS unittest.memory.pair), 3) AS unittest.memory.pair3) AS p)", "SELECT 1");
        assertQuerySucceeds("CREATE TYPE unittest.memory.pair3Alt AS (fst ROW(fst integer, snd integer), snd integer)");
        assertQuery("SELECT p.fst.snd FROM(SELECT CAST(ROW(ROW(1,2), 3) AS  unittest.memory.pair3Alt) AS p)", "SELECT 2");
    }

    @Test
    public void testCreatePartitionedTable()
    {
        String createPartitionedTable = "" +
                "CREATE TABLE test_partition_table\n" +
                "WITH (\n" +
                "  format = 'Parquet', " +
                "  partitioned_by = ARRAY ['orderstatus']\n" +
                ")\n" +
                "AS\n" +
                "SELECT custkey, orderkey, orderstatus FROM orders";

        assertQuerySucceeds(createPartitionedTable);
        MaterializedResult actual = computeActual("SELECT count(*) FROM \"test_partition_table$partitions\"");
        // there are 3 partitions
        assertEquals(actual.getOnlyValue().toString(), "3");

        // invoke CALL procedure to add empty partitions
        assertQuerySucceeds(format("CALL system.create_empty_partition('%s', '%s', ARRAY['orderstatus'], ARRAY['%s'])", "tpch", "test_partition_table", "x"));
        assertQuerySucceeds(format("CALL system.create_empty_partition('%s', '%s', ARRAY['orderstatus'], ARRAY['%s'])", "tpch", "test_partition_table", "y"));
        actual = computeActual("SELECT count(*) FROM \"test_partition_table$partitions\"");

        // 2 new partitions added
        assertEquals(actual.getOnlyValue().toString(), "5");
        assertQuerySucceeds("DROP TABLE test_partition_table");
    }

    @Test
    public void testDisableBroadcastJoinExecutionStrategy()
    {
        Session session = Session.builder(getSession())
                .setSystemProperty(JOIN_DISTRIBUTION_TYPE, "BROADCAST")
                .setSystemProperty(STORAGE_BASED_BROADCAST_JOIN_ENABLED, "true")
                .setSystemProperty(SPARK_BROADCAST_JOIN_MAX_MEMORY_OVERRIDE, "10B")
                .setSystemProperty(SPARK_RETRY_ON_OUT_OF_MEMORY_BROADCAST_JOIN_ENABLED, "false")
                .build();

        // Query should fail with broadcast join OOM
        assertQueryFails(
                session,
                "select * from lineitem l join orders o on l.orderkey = o.orderkey",
                "Query exceeded per-node broadcast memory limit of 10B \\[Broadcast size: .*MB\\]");

        session = Session.builder(getSession())
                .setSystemProperty(JOIN_DISTRIBUTION_TYPE, "BROADCAST")
                .setSystemProperty(STORAGE_BASED_BROADCAST_JOIN_ENABLED, "true")
                .setSystemProperty(SPARK_BROADCAST_JOIN_MAX_MEMORY_OVERRIDE, "10B")
                .setSystemProperty(SPARK_RETRY_ON_OUT_OF_MEMORY_BROADCAST_JOIN_ENABLED, "false")
                .setSystemProperty(SPARK_QUERY_EXECUTION_STRATEGIES, "DISABLE_BROADCAST_JOIN")
                .build();

        // Query should succeed since broadcast join will be disabled due to
        // presence of DISABLE_BROADCAST_JOIN strategy in session property spark_query_execution_strategies
        assertQuery(
                session,
                "select * from lineitem l join orders o on l.orderkey = o.orderkey");
    }

    @Test
    public void testIncreaseContainerSizeExecutionStrategy()
    {
        Session session = Session.builder(getSession())
                .setSystemProperty(QUERY_MAX_MEMORY_PER_NODE, "2MB")
                .setSystemProperty(QUERY_MAX_TOTAL_MEMORY_PER_NODE, "2MB")
                .setSystemProperty(SPARK_RETRY_ON_OUT_OF_MEMORY_WITH_INCREASED_MEMORY_SETTINGS_ENABLED, "false")
                .setSystemProperty(OUT_OF_MEMORY_RETRY_PRESTO_SESSION_PROPERTIES, "query_max_memory_per_node=100MB,query_max_total_memory_per_node=100MB")
                .setSystemProperty(OUT_OF_MEMORY_RETRY_SPARK_CONFIGS, "spark.executor.memory=1G")
                .build();

        // Query should fail with OOM
        assertQueryFails(
                session,
                "select * from lineitem l join orders o on l.orderkey = o.orderkey",
                ".*Query exceeded per-node .* memory limit of 2MB.*");

        session = Session.builder(getSession())
                .setSystemProperty(QUERY_MAX_MEMORY_PER_NODE, "2MB")
                .setSystemProperty(QUERY_MAX_TOTAL_MEMORY_PER_NODE, "2MB")
                .setSystemProperty(SPARK_RETRY_ON_OUT_OF_MEMORY_WITH_INCREASED_MEMORY_SETTINGS_ENABLED, "false")
                .setSystemProperty(OUT_OF_MEMORY_RETRY_PRESTO_SESSION_PROPERTIES, "query_max_memory_per_node=100MB,query_max_total_memory_per_node=100MB")
                .setSystemProperty(OUT_OF_MEMORY_RETRY_SPARK_CONFIGS, "spark.executor.memory=1G")
                .setSystemProperty(SPARK_QUERY_EXECUTION_STRATEGIES, "INCREASE_CONTAINER_SIZE")
                .build();

        // Query should succeed since memory will be increased due to
        // presence of INCREASE_CONTAINER_SIZE strategy in session property spark_query_execution_strategies
        assertQuery(
                session,
                "select * from lineitem l join orders o on l.orderkey = o.orderkey");
    }

    @Test
    public void testIncreaseHashPartitionCountExecutionStrategy()
    {
        String query = "with l as (" +
                "select * from lineitem UNION ALL select * from lineitem UNION ALL select * from lineitem" +
                "), " +
                "o as (" +
                "select * from orders UNION ALL select * from orders UNION ALL select * from orders" +
                ") " +
                "select * from l right outer join o on l.orderkey = o.orderkey";

        Session session = Session.builder(getSession())
                .setSystemProperty(JOIN_DISTRIBUTION_TYPE, "partitioned")
                .setSystemProperty(HASH_PARTITION_COUNT, "1")
                .setSystemProperty(QUERY_MAX_TOTAL_MEMORY_PER_NODE, "6.5MB")
                .setSystemProperty(QUERY_MAX_MEMORY, "100MB")
                .setSystemProperty(VERBOSE_EXCEEDED_MEMORY_LIMIT_ERRORS_ENABLED, "true")
                .setSystemProperty(SPARK_RETRY_ON_OUT_OF_MEMORY_HIGHER_PARTITION_COUNT_ENABLED, "false")
                .build();
        assertQueryFails(session,
                query,
                "Query exceeded per-node total memory limit of .*Top Consumers: \\{HashBuilderOperator.*");

        session = Session.builder(getSession())
                .setSystemProperty(JOIN_DISTRIBUTION_TYPE, "partitioned")
                .setSystemProperty(HASH_PARTITION_COUNT, "1")
                .setSystemProperty(QUERY_MAX_TOTAL_MEMORY_PER_NODE, "6.5MB")
                .setSystemProperty(QUERY_MAX_MEMORY, "100MB")
                .setSystemProperty(VERBOSE_EXCEEDED_MEMORY_LIMIT_ERRORS_ENABLED, "true")
                .setSystemProperty(SPARK_RETRY_ON_OUT_OF_MEMORY_HIGHER_PARTITION_COUNT_ENABLED, "false")
                .setSystemProperty(SPARK_QUERY_EXECUTION_STRATEGIES, "INCREASE_HASH_PARTITION_COUNT")
                .build();
        assertQuerySucceeds(session, query);
    }

    private void assertBucketedQuery(String sql)
    {
        assertQuery(sql, sql.replaceAll("_bucketed", ""));
    }
}