NativeQueryRunnerUtils.java

/*
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.facebook.presto.nativeworker;

import com.facebook.presto.Session;
import com.facebook.presto.testing.QueryRunner;
import com.google.common.collect.ImmutableMap;

import java.util.Map;
import java.util.concurrent.TimeUnit;

import static com.facebook.presto.nativeworker.PrestoNativeQueryRunnerUtils.ICEBERG_DEFAULT_STORAGE_FORMAT;

public class NativeQueryRunnerUtils
{
    private NativeQueryRunnerUtils() {}

    public static Map<String, String> getNativeWorkerHiveProperties()
    {
        return ImmutableMap.of("hive.parquet.pushdown-filter-enabled", "true",
            "hive.orc-compression-codec", "ZSTD", "hive.storage-format", "DWRF");
    }

    public static Map<String, String> getNativeWorkerIcebergProperties()
    {
        return ImmutableMap.of("iceberg.pushdown-filter-enabled", "true",
                "iceberg.catalog.type", "HIVE");
    }

    public static Map<String, String> getNativeWorkerSystemProperties()
    {
        return ImmutableMap.<String, String>builder()
                .put("native-execution-enabled", "true")
                .put("optimizer.optimize-hash-generation", "false")
                .put("regex-library", "RE2J")
                .put("offset-clause-enabled", "true")
                // By default, Presto will expand some functions into its SQL equivalent (e.g. array_duplicates()).
                // With Velox, we do not want Presto to replace the function with its SQL equivalent.
                // To achieve that, we set inline-sql-functions to false.
                .put("inline-sql-functions", "false")
                .put("use-alternative-function-signatures", "true")
                .build();
    }

    public static Map<String, String> getNativeSidecarProperties()
    {
        return ImmutableMap.<String, String>builder()
                .put("coordinator-sidecar-enabled", "true")
                .put("exclude-invalid-worker-session-properties", "true")
                .put("presto.default-namespace", "native.default")
                .build();
    }

    public static Map<String, String> getNativeWorkerTpcdsProperties()
    {
        return ImmutableMap.<String, String>builder()
                .put("tpcds.use-varchar-type", "true")
                .build();
    }

    /**
     * Creates all tables for local testing, except for bench tables.
     *
     * @param queryRunner
     */
    public static void createAllTables(QueryRunner queryRunner)
    {
        createAllTables(queryRunner, true);
    }

    public static void createAllTables(QueryRunner queryRunner, boolean castDateToVarchar)
    {
        createLineitem(queryRunner, castDateToVarchar);
        createOrders(queryRunner, castDateToVarchar);
        createOrdersEx(queryRunner);
        createOrdersHll(queryRunner);
        createNation(queryRunner);
        createPartitionedNation(queryRunner);
        createBucketedCustomer(queryRunner);
        createCustomer(queryRunner);
        createPart(queryRunner);
        createPartSupp(queryRunner);
        createRegion(queryRunner);
        createTableToTestHiddenColumns(queryRunner);
        createSupplier(queryRunner);
        createEmptyTable(queryRunner);
        createBucketedLineitemAndOrders(queryRunner);
    }

    /**
     * Creates all iceberg tables for local testing.
     *
     * @param queryRunner
     */
    public static void createAllIcebergTables(QueryRunner queryRunner)
    {
        createLineitemStandard(queryRunner);
        createOrders(queryRunner);
        createNationWithFormat(queryRunner, ICEBERG_DEFAULT_STORAGE_FORMAT);
        createCustomer(queryRunner);
        createPart(queryRunner);
        createPartSupp(queryRunner);
        createRegion(queryRunner);
        createSupplier(queryRunner);
    }

    public static void createLineitem(QueryRunner queryRunner)
    {
        createLineitem(queryRunner, true);
    }

    public static void createLineitem(QueryRunner queryRunner, boolean castDateToVarchar)
    {
        queryRunner.execute("DROP TABLE IF EXISTS lineitem");
        String shipDate = castDateToVarchar ? "cast(shipdate as varchar) as shipdate" : "shipdate";
        String commitDate = castDateToVarchar ? "cast(commitdate as varchar) as commitdate" : "commitdate";
        String receiptDate = castDateToVarchar ? "cast(receiptdate as varchar) as receiptdate" : "receiptdate";
        queryRunner.execute("CREATE TABLE lineitem AS " +
                "SELECT orderkey, partkey, suppkey, linenumber, quantity, extendedprice, discount, tax, " +
                "   returnflag, linestatus, " + shipDate + ", " + commitDate + ", " + receiptDate + ", " +
                "   shipinstruct, shipmode, comment, " +
                "   linestatus = 'O' as is_open, returnflag = 'R' as is_returned, " +
                "   cast(tax as real) as tax_as_real, cast(discount as real) as discount_as_real, " +
                "   cast(linenumber as smallint) as linenumber_as_smallint, " +
                "   cast(linenumber as tinyint) as linenumber_as_tinyint " +
                "FROM tpch.tiny.lineitem");
    }

    public static void createLineitemStandard(QueryRunner queryRunner)
    {
        createLineitemStandard(queryRunner.getDefaultSession(), queryRunner);
    }

    public static void createLineitemStandard(Session session, QueryRunner queryRunner)
    {
        if (!queryRunner.tableExists(session, "lineitem")) {
            queryRunner.execute(session, "CREATE TABLE lineitem AS " +
                    "SELECT orderkey, partkey, suppkey, linenumber, quantity, extendedprice, discount, tax, " +
                    "   returnflag, linestatus, cast(shipdate as varchar) as shipdate, cast(commitdate as varchar) as commitdate, " +
                    "   cast(receiptdate as varchar) as receiptdate, shipinstruct, shipmode, comment " +
                    "FROM tpch.tiny.lineitem");
        }
    }

    public static void createOrders(QueryRunner queryRunner)
    {
        createOrders(queryRunner, true);
    }

    public static void createOrders(QueryRunner queryRunner, boolean castDateToVarchar)
    {
        createOrders(queryRunner.getDefaultSession(), queryRunner, castDateToVarchar);
    }

    public static void createOrders(Session session, QueryRunner queryRunner, boolean castDateToVarchar)
    {
        queryRunner.execute(session, "DROP TABLE IF EXISTS orders");
        String orderDate = castDateToVarchar ? "cast(orderdate as varchar) as orderdate" : "orderdate";
        queryRunner.execute(session, "CREATE TABLE orders AS " +
                "SELECT orderkey, custkey, orderstatus, totalprice, " + orderDate + ", " +
                "   orderpriority, clerk, shippriority, comment " +
                "FROM tpch.tiny.orders");
    }

    public static void createOrdersEx(QueryRunner queryRunner)
    {
        if (!queryRunner.tableExists(queryRunner.getDefaultSession(), "orders_ex")) {
            queryRunner.execute("CREATE TABLE orders_ex AS " +
                    "SELECT orderkey, array_agg(quantity) as quantities, map_agg(linenumber, quantity) as quantity_by_linenumber " +
                    "FROM tpch.tiny.lineitem " +
                    "GROUP BY 1");
        }
    }

    public static void createOrdersHll(QueryRunner queryRunner)
    {
        if (!queryRunner.tableExists(queryRunner.getDefaultSession(), "orders_hll")) {
            queryRunner.execute("CREATE TABLE orders_hll AS " +
                    "SELECT orderkey % 23 as key, cast(approx_set(cast(orderdate as varchar)) as varbinary) as hll " +
                    "FROM tpch.tiny.orders " +
                    "GROUP BY 1");
        }
    }

    public static void createNation(QueryRunner queryRunner)
    {
        if (!queryRunner.tableExists(queryRunner.getDefaultSession(), "nation")) {
            queryRunner.execute("CREATE TABLE nation AS SELECT * FROM tpch.tiny.nation");
        }
        if (!queryRunner.tableExists(queryRunner.getDefaultSession(), "nation")) {
            queryRunner.execute("CREATE TABLE nation WITH (FORMAT = 'ORC') AS SELECT * FROM tpch.tiny.nation");
        }
        if (!queryRunner.tableExists(queryRunner.getDefaultSession(), "nation_json")) {
            queryRunner.execute("CREATE TABLE nation_json WITH (FORMAT = 'JSON') AS SELECT * FROM tpch.tiny.nation");
        }
        if (!queryRunner.tableExists(queryRunner.getDefaultSession(), "nation_text")) {
            queryRunner.execute("CREATE TABLE nation_text WITH (FORMAT = 'TEXTFILE') AS SELECT * FROM tpch.tiny.nation");
        }
    }

    public static void createNationWithFormat(QueryRunner queryRunner, String storageFormat)
    {
        createNationWithFormat(queryRunner.getDefaultSession(), queryRunner, storageFormat);
    }

    public static void createNationWithFormat(Session session, QueryRunner queryRunner, String storageFormat)
    {
        queryRunner.execute(session, "DROP TABLE IF EXISTS nation");
        if (storageFormat.equals("PARQUET") && !queryRunner.tableExists(session, "nation")) {
            queryRunner.execute(session, "CREATE TABLE nation AS SELECT * FROM tpch.tiny.nation");
        }

        if (storageFormat.equals("ORC") && !queryRunner.tableExists(session, "nation")) {
            queryRunner.execute(session, "CREATE TABLE nation WITH (FORMAT = 'ORC') AS SELECT * FROM tpch.tiny.nation");
        }

        if (storageFormat.equals("JSON") && !queryRunner.tableExists(session, "nation_json")) {
            queryRunner.execute(session, "CREATE TABLE nation_json WITH (FORMAT = 'JSON') AS SELECT * FROM tpch.tiny.nation");
        }

        if (storageFormat.equals("TEXTFILE") && !queryRunner.tableExists(session, "nation_text")) {
            queryRunner.execute(session, "CREATE TABLE nation_text WITH (FORMAT = 'TEXTFILE') AS SELECT * FROM tpch.tiny.nation");
        }
    }

    public static void createPartitionedNation(QueryRunner queryRunner)
    {
        if (!queryRunner.tableExists(queryRunner.getDefaultSession(), "nation_partitioned")) {
            queryRunner.execute("CREATE TABLE nation_partitioned(nationkey BIGINT, name VARCHAR, comment VARCHAR, regionkey VARCHAR) WITH (partitioned_by = ARRAY['regionkey'])");
            queryRunner.execute("INSERT INTO nation_partitioned SELECT nationkey, name, comment, cast(regionkey as VARCHAR) FROM tpch.tiny.nation");
        }

        if (!queryRunner.tableExists(queryRunner.getDefaultSession(), "nation_partitioned_ds")) {
            queryRunner.execute("CREATE TABLE nation_partitioned_ds(nationkey BIGINT, name VARCHAR, comment VARCHAR, regionkey VARCHAR, ds VARCHAR) WITH (partitioned_by = ARRAY['ds'])");
            queryRunner.execute("INSERT INTO nation_partitioned_ds SELECT nationkey, name, comment, cast(regionkey as VARCHAR), '2022-04-09' FROM tpch.tiny.nation");
            queryRunner.execute("INSERT INTO nation_partitioned_ds SELECT nationkey, name, comment, cast(regionkey as VARCHAR), '2022-03-18' FROM tpch.tiny.nation");
        }
    }

    public static void createBucketedCustomer(QueryRunner queryRunner)
    {
        if (!queryRunner.tableExists(queryRunner.getDefaultSession(), "customer_bucketed")) {
            queryRunner.execute("CREATE TABLE customer_bucketed(acctbal DOUBLE, custkey BIGINT, name VARCHAR, ds VARCHAR) WITH (bucket_count = 10, bucketed_by = ARRAY['name'], partitioned_by = ARRAY['ds'])");
            queryRunner.execute("INSERT INTO customer_bucketed SELECT acctbal, custkey, cast(name as VARCHAR), '2021-01-01' FROM tpch.tiny.customer limit 10");
            queryRunner.execute("INSERT INTO customer_bucketed SELECT acctbal, custkey, cast(name as VARCHAR), '2021-01-02' FROM tpch.tiny.customer limit 20");
            queryRunner.execute("INSERT INTO customer_bucketed SELECT acctbal, custkey, cast(name as VARCHAR), '2021-01-03' FROM tpch.tiny.customer limit 30");
        }
    }

    // Create PrestoBench tables that can be useful for Velox testing. PrestoBench leverages TPC-H data generator but it is
    // more representative of modern data warehouses workloads than TPC-H. Main highlights:
    // - It is based on 4 tables which are a denormalized version of the 7 tables in TPC-H.
    // - It supports complex types like arrays and maps which is not covered by TPC-H.
    // - TPC-H data model does not have nulls and this gap is covered by PrestoBench.
    // - Add partitioning and bucketing to some of the tables in PrestoBench.
    public static void createPrestoBenchTables(QueryRunner queryRunner)
    {
        // Create PrestoBench 4 tables
        createPrestoBenchNation(queryRunner);
        createPrestoBenchPart(queryRunner);
        createPrestoBenchCustomer(queryRunner);
        createPrestoBenchOrders(queryRunner);
    }

    public static void createCustomer(QueryRunner queryRunner)
    {
        createCustomer(queryRunner.getDefaultSession(), queryRunner);
    }

    public static void createCustomer(Session session, QueryRunner queryRunner)
    {
        if (!queryRunner.tableExists(session, "customer")) {
            queryRunner.execute(session, "CREATE TABLE customer AS " +
                    "SELECT custkey, name, address, nationkey, phone, acctbal, comment, mktsegment " +
                    "FROM tpch.tiny.customer");
        }
    }

    // prestobench_nation: TPC-H Nation and region tables are consolidated into the nation table adding the region name as a new field.
    // This table is not bucketed or partitioned.
    public static void createPrestoBenchNation(QueryRunner queryRunner)
    {
        if (!queryRunner.tableExists(queryRunner.getDefaultSession(), "prestobench_nation")) {
            queryRunner.execute("CREATE TABLE prestobench_nation as SELECT nation.nationkey, nation.name, region.name as regionname,nation.comment " +
                    "FROM tpch.tiny.nation, tpch.tiny.region  WHERE nation.regionkey = region.regionkey");
        }
    }

    // prestobench_part: TPC-H part, supplier and part supplier are consolidated into the part table.
    // - Complex types:
    //   * The list of suppliers of each part is captured into an array of JSON objects(about 4 suppliers for each part).
    //     Each JSON has a key and a value corresponding to a supplier.
    //        - The key is the supplier key (suppkey)
    //        - The value is simply the original supplier columns in tpc-h which are: suppkey, name, address, nationkey, phone, acctbal, comment
    // - Partitioning: p_size (50 values)
    // - Bucketing:none to exercise non bucketed joins
    public static void createPrestoBenchPart(QueryRunner queryRunner)
    {
        if (!queryRunner.tableExists(queryRunner.getDefaultSession(), "prestobench_part")) {
            queryRunner.execute("CREATE TABLE prestobench_part " +
                    "with (partitioned_by = array['size']) " +
                    "as WITH part_suppliers as (SELECT part.partkey, supplier.suppkey, " +
                    "                                  array_agg(cast(row(supplier.suppkey, supplier.name, availqty, supplycost, address, nationkey, phone, acctbal) as " +
                    "                                                 row(suppkey integer, suppname varchar(25), availqty double, suppcost double, address varchar(40), " +
                    "                                                     nationkey integer, phone varchar(15), acctbal double))) suppliers " +
                    "                            FROM tpch.tiny.part part, tpch.tiny.supplier supplier, tpch.tiny.partsupp partsupp " +
                    "                            WHERE supplier.suppkey = partsupp.suppkey and partsupp.partkey = part.partkey GROUP BY 1, 2 " +
                    "                          ), " +
                    "                          part_agg_suppliers as (SELECT partkey, map_agg(suppkey, suppliers) suppliers FROM part_suppliers GROUP BY partkey) " +
                    "SELECT part.partkey, part.name, part.mfgr, part.brand, part.type, part.container, part.retailprice, part.comment, suppliers, part.size " +
                    "FROM tpch.tiny.part part, part_agg_suppliers " +
                    "WHERE part_agg_suppliers.partkey = part.partkey");
        }
    }

    // prestobench_orders: orders and line items are merged to form orders tables.
    // Complex types: The new orders table has all the line items as a map of
    //      * key = line item numbers
    //      * value  = ROW of partkey, suppkey, linenumber, quantity, extendedprice, discount, tax,
    //                        returnflag, linestatus, shipdate, commitdate, receiptdate, shipinstruct, shipmode
    // Bucketing: The new order table is bucketed by customer key to help joins with customer table
    // Partitioning: Order date is used commonly in the workload below and is a good candidate for partitioning field.
    //               However, that will produce too many partitions. An alternative is to partition by
    //               year of order date (7 values) and this means we need a field for that since Presto only allows partitioning  by fields.
    // Nulls: Make 10% of custkey as nulls. This is useful for join keys with nulls.
    // Skew: There are already columns with few values like order status that can be used for skewed (hot task) aggregations.
    //       There are three values with these distributions:  ���F��� with 49%, ���O��� with 49% and ���P��� with 2%
    public static void createPrestoBenchOrders(QueryRunner queryRunner)
    {
        if (!queryRunner.tableExists(queryRunner.getDefaultSession(), "prestobench_orders")) {
            queryRunner.execute("CREATE TABLE prestobench_orders " +
                    "with (partitioned_by = array['orderdate_year'], bucketed_by = array['custkey'], bucket_count = 16) " +
                    "as WITH order_lineitems as (SELECT o.orderkey, l.linenumber, " +
                    "                                   array_agg(cast(row(partkey, suppkey, quantity, extendedprice, discount, tax, returnflag, linestatus, " +
                    "                                                      shipdate, commitdate, receiptdate, shipinstruct, shipmode ) as " +
                    "                                                  row(partkey integer, suppkey integer, quantity integer, extendedprice double, discount double, " +
                    "                                                      tax double , returnflag varchar(1), linestatus varchar(1), " +
                    "                                                      shipdate varchar, commitdate varchar, receiptdate varchar, shipinstruct varchar(25) , shipmode varchar(10)) " +
                    "                                             ) " +
                    "                                            ) lineitems " +
                    "                            FROM orders o, lineitem l " +
                    "                            WHERE o.orderkey = l.orderkey " +
                    "                            GROUP BY 1, 2 " +
                    "                           ), " +
                    "        order_lineitems_agg as (SELECT orderkey, map_agg(linenumber, lineitems) lineitems_map FROM order_lineitems GROUP BY orderkey) " +
                    "SELECT o.orderkey, if(custkey % 10 = 0,null,custkey) as custkey,cast(orderstatus as varchar(1)) as orderstatus," +
                    "       totalprice,cast(orderpriority as varchar(15)) as orderpriority," +
                    "       cast(clerk as varchar(15)) as clerk,shippriority,cast(comment as varchar(79)) as comment,lineitems_map, " +
                    "       cast(orderdate as varchar) orderdate, cast(substr(cast(orderdate as varchar),1,4) as integer) orderdate_year " +
                    "FROM orders o, order_lineitems_agg " +
                    "WHERE order_lineitems_agg.orderkey = o.orderkey");
        }
    }

    // prestobench_customer represents the original tpc-h customer table with additional fields with complex types.
    // Complex Types: Add two fields as map
    //  * key=year of ship date and value is an array of parts info for the customer orders in the year
    //  * key=year of ship date and values is total spending by the customer on orders for the year
    // Bucketing: customer key
    // Partitioning: mktsegment (150 values)
    public static void createPrestoBenchCustomer(QueryRunner queryRunner)
    {
        if (!queryRunner.tableExists(queryRunner.getDefaultSession(), "prestobench_customer")) {
            queryRunner.execute("CREATE TABLE prestobench_customer " +
                    "with (partitioned_by = array['mktsegment'], bucketed_by = array['custkey'], bucket_count = 64) " +
                    "as WITH customer_yearly_summary as (SELECT custkey, map_agg(y, parts) AS year_parts, map_agg(y, total_cost) AS year_cost " +
                    "                                    FROM (SELECT c.custkey,  cast(substr(cast(shipdate as varchar),1,4) as integer) y, " +
                    "                                                 ARRAY_AGG( CAST(ROW (partkey, extendedprice, quantity) AS " +
                    "                                                                 ROW (pk BIGINT, ep DOUBLE, qt DOUBLE))) AS parts, " +
                    "                                                 SUM(extendedprice) AS total_cost " +
                    "                                          FROM customer c LEFT OUTER JOIN orders o ON o.custkey = c.custkey " +
                    "                                               LEFT OUTER JOIN lineitem l " +
                    "                                               ON l.orderkey = o.orderkey " +
                    "                                               GROUP BY c.custkey, cast(substr(cast(shipdate as varchar),1,4) as integer) " +
                    "                                         ) GROUP BY custkey" +
                    "                                   ) " +
                    "SELECT customer.custkey, cast(name as varchar) as name, " +
                    "       cast(address as varchar) as address, nationkey, " +
                    "       cast(phone as varchar) as phone, acctbal, " +
                    "       cast(comment as varchar) as comment, year_parts, year_cost, " +
                    "       cast(mktsegment as varchar) as mktsegment " +
                    "FROM customer, customer_yearly_summary " +
                    "WHERE customer_yearly_summary.custkey=customer.custkey");
        }
    }

    public static void createPart(QueryRunner queryRunner)
    {
        createPart(queryRunner.getDefaultSession(), queryRunner);
    }

    public static void createPart(Session session, QueryRunner queryRunner)
    {
        if (!queryRunner.tableExists(session, "part")) {
            queryRunner.execute(session, "CREATE TABLE part AS SELECT * FROM tpch.tiny.part");
        }
    }

    public static void createPartSupp(QueryRunner queryRunner)
    {
        createPartSupp(queryRunner.getDefaultSession(), queryRunner);
    }

    public static void createPartSupp(Session session, QueryRunner queryRunner)
    {
        if (!queryRunner.tableExists(session, "partsupp")) {
            queryRunner.execute(session, "CREATE TABLE partsupp AS SELECT * FROM tpch.tiny.partsupp");
        }
    }

    public static void createRegion(QueryRunner queryRunner)
    {
        createRegion(queryRunner.getDefaultSession(), queryRunner);
    }

    public static void createRegion(Session session, QueryRunner queryRunner)
    {
        if (!queryRunner.tableExists(session, "region")) {
            queryRunner.execute(session, "CREATE TABLE region AS SELECT * FROM tpch.tiny.region");
        }
    }

    public static void createTableToTestHiddenColumns(QueryRunner queryRunner)
    {
        if (!queryRunner.tableExists(queryRunner.getDefaultSession(), "test_hidden_columns")) {
            queryRunner.execute("CREATE TABLE test_hidden_columns (regionkey bigint, name varchar(25), comment varchar(152))");

            // Inserting two rows with 2 seconds delay to have a different modified timestamp for each file.
            queryRunner.execute("INSERT INTO test_hidden_columns SELECT * FROM region where regionkey = 0");
            try {
                TimeUnit.SECONDS.sleep(2);
            }
            catch (InterruptedException e) {
            }
            queryRunner.execute("INSERT INTO test_hidden_columns SELECT * FROM region where regionkey = 1");
        }
    }

    public static void createSupplier(QueryRunner queryRunner)
    {
        createSupplier(queryRunner.getDefaultSession(), queryRunner);
    }

    public static void createSupplier(Session session, QueryRunner queryRunner)
    {
        if (!queryRunner.tableExists(session, "supplier")) {
            queryRunner.execute(session, "CREATE TABLE supplier AS SELECT * FROM tpch.tiny.supplier");
        }
    }

    public static void createEmptyTable(QueryRunner queryRunner)
    {
        if (!queryRunner.tableExists(queryRunner.getDefaultSession(), "empty_table")) {
            queryRunner.execute("CREATE TABLE empty_table (orderkey BIGINT, shipmodes array(varchar))");
        }
    }

    // Create two bucketed by 'orderkey' tables to be able to run bucketed execution join query on them.
    public static void createBucketedLineitemAndOrders(QueryRunner queryRunner)
    {
        if (!queryRunner.tableExists(queryRunner.getDefaultSession(), "lineitem_bucketed")) {
            queryRunner.execute("CREATE TABLE lineitem_bucketed(orderkey BIGINT, partkey BIGINT, suppkey BIGINT, linenumber INTEGER, quantity DOUBLE, ds VARCHAR) " +
                    "WITH (bucket_count = 10, bucketed_by = ARRAY['orderkey'], sorted_by = ARRAY['orderkey'], partitioned_by = ARRAY['ds'])");
            queryRunner.execute("INSERT INTO lineitem_bucketed SELECT orderkey, partkey, suppkey, linenumber, quantity, '2021-12-20' FROM tpch.tiny.lineitem");
            queryRunner.execute("INSERT INTO lineitem_bucketed SELECT orderkey, partkey, suppkey, linenumber, quantity+10, '2021-12-21' FROM tpch.tiny.lineitem");
        }

        if (!queryRunner.tableExists(queryRunner.getDefaultSession(), "orders_bucketed")) {
            queryRunner.execute("CREATE TABLE orders_bucketed (orderkey BIGINT, custkey BIGINT, orderstatus VARCHAR, ds VARCHAR) " +
                    "WITH (bucket_count = 10, bucketed_by = ARRAY['orderkey'], sorted_by = ARRAY['orderkey'], partitioned_by = ARRAY['ds'])");
            queryRunner.execute("INSERT INTO orders_bucketed SELECT orderkey, custkey, orderstatus, '2021-12-20' FROM tpch.tiny.orders");
            queryRunner.execute("INSERT INTO orders_bucketed SELECT orderkey, custkey, orderstatus, '2021-12-21' FROM tpch.tiny.orders");
        }
    }
}