TestParquetDistributedQueries.java

/*
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.facebook.presto.hive;

import com.facebook.presto.Session;
import com.facebook.presto.testing.MaterializedResult;
import com.facebook.presto.testing.QueryRunner;
import com.facebook.presto.tests.AbstractTestDistributedQueries;
import com.google.common.collect.ImmutableMap;
import org.testng.annotations.Test;

import java.util.Map;
import java.util.Optional;

import static com.facebook.presto.hive.HiveSessionProperties.COLLECT_COLUMN_STATISTICS_ON_WRITE;
import static com.facebook.presto.hive.HiveSessionProperties.QUICK_STATS_ENABLED;
import static com.facebook.presto.sql.tree.ExplainType.Type.LOGICAL;
import static com.google.common.collect.Iterables.getOnlyElement;
import static io.airlift.tpch.TpchTable.getTables;
import static org.testng.Assert.assertEquals;

@Test(singleThreaded = true)
public class TestParquetDistributedQueries
        extends AbstractTestDistributedQueries
{
    @Override
    protected QueryRunner createQueryRunner()
            throws Exception
    {
        Map<String, String> parquetProperties = ImmutableMap.<String, String>builder()
                .put("hive.storage-format", "PARQUET")
                .put("hive.parquet.use-column-names", "true")
                .put("hive.compression-codec", "GZIP")
                .put("hive.enable-parquet-dereference-pushdown", "true")
                .put("hive.partial_aggregation_pushdown_enabled", "true")
                .put("hive.partial_aggregation_pushdown_for_variable_length_datatypes_enabled", "true")
                .build();
        return HiveQueryRunner.createQueryRunner(
                getTables(),
                ImmutableMap.of(
                        "experimental.pushdown-subfields-enabled", "true",
                        "experimental.pushdown-dereference-enabled", "true"),
                "sql-standard",
                parquetProperties,
                Optional.empty());
    }

    @Test
    public void testQuickStats()
    {
        Session session = Session.builder(getSession())
                .setSystemProperty("hive." + COLLECT_COLUMN_STATISTICS_ON_WRITE, "false")
                .setSystemProperty("hive." + QUICK_STATS_ENABLED, "true")
                .build();

        getQueryRunner().execute(session, "CREATE TABLE test_quick_stats AS " +
                "SELECT orderkey, linenumber, shipdate," +
                " ARRAY[returnflag, linestatus] as arr," +
                " CAST(ROW(commitdate, receiptdate) AS ROW(date1 DATE,date2 DATE)) as rrow from lineitem");

        try {
            // Since no stats were collected during write, all column stats will be null
            assertQuery("SHOW STATS FOR test_quick_stats",
                    "SELECT * FROM (VALUES " +
                            "   ('orderkey', null, null, null, null, null, null, null), " +
                            "   ('linenumber', null, null, null, null, null, null, null), " +
                            "   ('shipdate', null, null, null, null, null, null, null), " +
                            "   ('arr', null, null, null, null, null, null, null), " +
                            "   ('rrow', null, null, null, null, null, null, null), " +
                            "   (null, null, null, null, 60175.0, null, null, null))");

            // With quick stats enabled, we should get nulls_fraction, low_value and high_value for the non-nested columns
            assertQuery(session, "SHOW STATS FOR test_quick_stats",
                    "SELECT * FROM (VALUES " +
                            "   ('orderkey', null, null, 0.0, null, '1', '60000', null), " +
                            "   ('linenumber', null, null, 0.0, null, '1', '7', null), " +
                            "   ('shipdate', null, null, 0.0, null, '1992-01-04', '1998-11-29', null), " +
                            "   ('arr', null, null, null, null, null, null, null), " +
                            "   ('rrow', null, null, null, null, null, null, null), " +
                            "   (null, null, null, null, 60175.0, null, null, null))");
        }
        finally {
            getQueryRunner().execute("DROP TABLE test_quick_stats");
        }
    }

    @Test
    public void testQuickStatsPartitionedTable()
    {
        Session session = Session.builder(getSession())
                .setSystemProperty("hive." + COLLECT_COLUMN_STATISTICS_ON_WRITE, "false")
                .setSystemProperty("hive." + QUICK_STATS_ENABLED, "true")
                .build();

        getQueryRunner().execute(session, "CREATE TABLE test_quick_stats_partitioned (" +
                "    \"suppkey\" bigint," +
                "    \"linenumber\" integer," +
                "    \"orderkey\" bigint," +
                "    \"partkey\" bigint" +
                "   )" +
                "   WITH (" +
                "    format = 'PARQUET'," +
                "    partitioned_by = ARRAY['orderkey','partkey']" +
                "   )");

        getQueryRunner().execute(session, "INSERT INTO test_quick_stats_partitioned (suppkey, linenumber, orderkey, partkey)" +
                "VALUES" +
                "(1, 1, 100, 1000)," +
                "(2, 2, 101, 1001)," +
                "(3, 3, 102, 1002)," +
                "(4, 4, 103, 1003)," +
                "(5, 5, 104, 1004)," +
                "(6, 6, 105, 1005)," +
                "(7, 7, 106, 1006)," +
                "(8, 8, 107, 1007)," +
                "(9, 9, 108, 1008)," +
                "(10, 10, 109, 1009)");

        try {
            // Since no stats were collected during write, only the partitioned columns will have stats
            assertQuery("SHOW STATS FOR test_quick_stats_partitioned",
                    "SELECT * FROM (VALUES " +
                            "   ('suppkey', null, null, null, null, null, null, null), " +
                            "   ('linenumber', null, null, null, null, null, null, null), " +
                            "   ('orderkey', null, 10.0, 0.0, null, 100, 109, null), " +
                            "   ('partkey', null, 10.0, 0.0, null, 1000, 1009, null), " +
                            "   (null, null, null, null, 10.0, null, null, null))");

            // With quick stats enabled, we should get nulls_fraction, low_value and high_value for all columns
            assertQuery(session, "SHOW STATS FOR test_quick_stats_partitioned",
                    "SELECT * FROM (VALUES " +
                            "   ('suppkey', null, null, 0.0, null, 1, 10, null), " +
                            "   ('linenumber', null, null, 0.0, null, 1, 10, null), " +
                            "   ('orderkey', null, 10.0, 0.0, null, 100, 109, null), " +
                            "   ('partkey', null, 10.0, 0.0, null, 1000, 1009, null), " +
                            "   (null, null, null, null, 10.0, null, null, null))");

            // If a query targets a specific partition, stats are correctly limited to that partition
            assertQuery(session, "show stats for (select * from test_quick_stats_partitioned where partkey = 1009)",
                    "SELECT * FROM (VALUES " +
                            "   ('suppkey', null, null, 0.0, null, 10, 10, null), " +
                            "   ('linenumber', null, null, 0.0, null, 10, 10, null), " +
                            "   ('orderkey', null, 1.0, 0.0, null, 109, 109, null), " +
                            "   ('partkey', null, 1.0, 0.0, null, 1009, 1009, null), " +
                            "   (null, null, null, null, 1.0, null, null, null))");
        }
        finally {
            getQueryRunner().execute("DROP TABLE test_quick_stats_partitioned");
        }
    }

    @Test
    public void testSubfieldPruning()
    {
        getQueryRunner().execute("CREATE TABLE test_subfield_pruning AS " +
                "SELECT orderkey, linenumber, shipdate, " +
                "   CAST(ROW(orderkey, linenumber, ROW(day(shipdate), month(shipdate), year(shipdate))) " +
                "       AS ROW(orderkey BIGINT, linenumber INTEGER, shipdate ROW(ship_day TINYINT, ship_month TINYINT, ship_year INTEGER))) AS info " +
                "FROM lineitem");

        try {
            assertQuery("SELECT info.orderkey, info.shipdate.ship_month FROM test_subfield_pruning", "SELECT orderkey, month(shipdate) FROM lineitem");

            assertQuery("SELECT orderkey FROM test_subfield_pruning WHERE info.shipdate.ship_month % 2 = 0", "SELECT orderkey FROM lineitem WHERE month(shipdate) % 2 = 0");
        }
        finally {
            getQueryRunner().execute("DROP TABLE test_subfield_pruning");
        }
    }

    @Test
    public void testSubfieldWithSchemaChanges()
    {
        getQueryRunner().execute("CREATE TABLE test_subfield_multilevel_pruning AS " +
                "SELECT " +
                "   1 as orderkey," +
                "   CAST(ROW('N', ROW(5, 7)) AS ROW(returnflag CHAR(1), shipdate ROW(ship_day INTEGER, ship_month INTEGER))) as nestedColumnLevelUpdates," +
                "   CAST(ROW('N', ROW(5, 7)) AS ROW(returnflag CHAR(1), shipdate ROW(ship_day INTEGER, ship_month INTEGER))) as colLevelUpdates");

        // update the schema of the nested column
        getQueryRunner().execute("ALTER TABLE test_subfield_multilevel_pruning DROP COLUMN nestedColumnLevelUpdates");
        getQueryRunner().execute("ALTER TABLE test_subfield_multilevel_pruning ADD COLUMN " +
                "nestedColumnLevelUpdates ROW(returnflag CHAR(1), shipdate ROW(ship_day INTEGER, ship_month INTEGER, ship_year INTEGER))");

        // delete the entire struct column (colLevelUpdates) and add a new struct column with different name (colLevelUpdates2)
        getQueryRunner().execute("ALTER TABLE test_subfield_multilevel_pruning DROP COLUMN colLevelUpdates");
        getQueryRunner().execute("ALTER TABLE test_subfield_multilevel_pruning ADD COLUMN " +
                "colLevelUpdates2 ROW(returnflag CHAR(1), shipdate ROW(ship_day INTEGER, ship_month INTEGER, ship_year INTEGER))");

        getQueryRunner().execute("INSERT INTO test_subfield_multilevel_pruning " +
                "SELECT 2, cast(row('Y', ROW(5, 7, 2020)) as ROW(returnflag CHAR(1), shipdate ROW(ship_day INTEGER, ship_month INTEGER, ship_year INTEGER)))," +
                "cast(row('Y', ROW(5, 7, 2020)) as ROW(returnflag CHAR(1), shipdate ROW(ship_day INTEGER, ship_month INTEGER, ship_year INTEGER)))");

        try {
            assertQuery("SELECT orderkey, nestedColumnLevelUpdates.shipdate.ship_day, nestedColumnLevelUpdates.shipdate.ship_month, nestedColumnLevelUpdates.shipdate.ship_year FROM test_subfield_multilevel_pruning",
                    "SELECT * from (VALUES(1, 5, 7, null), (2, 5, 7, 2020))");

            assertQuery("SELECT orderkey, colLevelUpdates2.shipdate.ship_day, colLevelUpdates2.shipdate.ship_month, colLevelUpdates2.shipdate.ship_year FROM test_subfield_multilevel_pruning",
                    "SELECT * from (VALUES (1, null, null, null), (2, 5, 7, 2020))");
        }
        finally {
            getQueryRunner().execute("DROP TABLE test_subfield_multilevel_pruning");
        }
    }

    @Override
    protected boolean supportsNotNullColumns()
    {
        return false;
    }

    @Override
    public void testDelete()
    {
        // Hive connector currently does not support row-by-row delete
    }

    @Override
    public void testUpdate()
    {
        // Updates are not supported by the connector
    }

    @Override
    public void testRenameColumn()
    {
        // Parquet field lookup use column name does not support Rename
    }

    @Test
    public void testExplainOfCreateTableAs()
    {
        String query = "CREATE TABLE copy_orders AS SELECT * FROM orders";
        MaterializedResult result = computeActual("EXPLAIN " + query);
        assertEquals(getOnlyElement(result.getOnlyColumnAsSet()), getExplainPlan("EXPLAIN ", query, LOGICAL));
    }
}