TestHiveDistributedQueries.java

/*
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.facebook.presto.hive;

import com.facebook.presto.Session;
import com.facebook.presto.hive.TestHiveEventListenerPlugin.TestingHiveEventListener;
import com.facebook.presto.spi.QueryId;
import com.facebook.presto.spi.eventlistener.EventListener;
import com.facebook.presto.testing.MaterializedResult;
import com.facebook.presto.testing.QueryRunner;
import com.facebook.presto.tests.AbstractTestDistributedQueries;
import com.google.common.collect.ImmutableSet;
import org.intellij.lang.annotations.Language;
import org.testng.annotations.AfterClass;
import org.testng.annotations.Test;

import java.util.Set;

import static com.facebook.presto.SystemSessionProperties.CTE_MATERIALIZATION_STRATEGY;
import static com.facebook.presto.SystemSessionProperties.CTE_PARTITIONING_PROVIDER_CATALOG;
import static com.facebook.presto.SystemSessionProperties.PUSHDOWN_SUBFIELDS_ENABLED;
import static com.facebook.presto.SystemSessionProperties.PUSHDOWN_SUBFIELDS_FOR_MAP_FUNCTIONS;
import static com.facebook.presto.SystemSessionProperties.PUSHDOWN_SUBFIELDS_FROM_LAMBDA_ENABLED;
import static com.facebook.presto.SystemSessionProperties.VERBOSE_OPTIMIZER_INFO_ENABLED;
import static com.facebook.presto.sql.tree.ExplainType.Type.LOGICAL;
import static com.google.common.collect.Iterables.getOnlyElement;
import static io.airlift.tpch.TpchTable.getTables;
import static java.lang.String.format;
import static java.util.stream.Collectors.joining;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;

@Test(singleThreaded = true)
public class TestHiveDistributedQueries
        extends AbstractTestDistributedQueries
{
    @Override
    protected QueryRunner createQueryRunner()
            throws Exception
    {
        return HiveQueryRunner.createQueryRunner(getTables());
    }

    @Override
    protected boolean supportsNotNullColumns()
    {
        return false;
    }

    @Override
    @AfterClass(alwaysRun = true)
    public void close()
            throws Exception
    {
        assertFalse(getQueryRunner().getEventListeners().isEmpty());
        EventListener eventListener = getQueryRunner().getEventListeners().get(0);
        assertTrue(eventListener instanceof TestingHiveEventListener, eventListener.getClass().getName());
        Set<QueryId> runningQueryIds = ((TestingHiveEventListener) eventListener).getRunningQueries();

        if (!runningQueryIds.isEmpty()) {
            // Await query events to propagate and finish
            Thread.sleep(1000);
        }
        assertEquals(
                runningQueryIds,
                ImmutableSet.of(),
                format(
                        "Query completion events not sent for %d queries: %s",
                        runningQueryIds.size(),
                        runningQueryIds.stream().map(QueryId::getId).collect(joining(", "))));
        super.close();
    }

    @Override
    public void testDelete()
    {
        // Hive connector currently does not support row-by-row delete
    }

    @Override
    public void testUpdate()
    {
        // Updates are not supported by the connector
    }

    @Test
    public void testExplainOfCreateTableAs()
    {
        String query = "CREATE TABLE copy_orders AS SELECT * FROM orders";
        MaterializedResult result = computeActual("EXPLAIN " + query);
        assertEquals(getOnlyElement(result.getOnlyColumnAsSet()), getExplainPlan("EXPLAIN ", query, LOGICAL));
    }

    @Test
    public void testTrackMaterializedCTEs()
    {
        Session materializedSession = Session.builder(getSession())
                .setSystemProperty(VERBOSE_OPTIMIZER_INFO_ENABLED, "true")
                .setSystemProperty(CTE_MATERIALIZATION_STRATEGY, "ALL")
                .setSystemProperty(CTE_PARTITIONING_PROVIDER_CATALOG, "hive")
                .build();

        String query = "with tbl as (select * from lineitem), tbl2 as (select * from tbl) select * from tbl, tbl2";
        MaterializedResult materializedResult = computeActual(materializedSession, "explain " + query);
        String explain = (String) getOnlyElement(materializedResult.getOnlyColumnAsSet());

        checkCTEInfo(explain, "tbl", 2, false, true);
        checkCTEInfo(explain, "tbl2", 1, false, true);
    }

    @Test
    public void testPushdownSubfieldForMapFunctionsInLambda()
    {
        Session enabled = Session.builder(getSession())
                .setSystemProperty(PUSHDOWN_SUBFIELDS_FOR_MAP_FUNCTIONS, "true")
                .setSystemProperty(PUSHDOWN_SUBFIELDS_ENABLED, "true")
                .setSystemProperty(PUSHDOWN_SUBFIELDS_FROM_LAMBDA_ENABLED, "true")
                .build();
        Session disabled = Session.builder(getSession())
                .setSystemProperty(PUSHDOWN_SUBFIELDS_FOR_MAP_FUNCTIONS, "false")
                .setSystemProperty(PUSHDOWN_SUBFIELDS_ENABLED, "false")
                .setSystemProperty(PUSHDOWN_SUBFIELDS_FROM_LAMBDA_ENABLED, "false")
                .build();
        try {
            getQueryRunner().execute(
                    "CREATE TABLE test_pushdown_subfields AS\n" +
                            "SELECT * FROM (\n" +
                            "  VALUES \n" +
                            "    (3, '2025-01-08', \n" +
                            "      ARRAY[\n" +
                            "        MAP(ARRAY[-2, 1], ARRAY[0.34, 0.92]),\n" +
                            "        MAP(ARRAY[3, 4], ARRAY[0.12, 0.88])\n" +
                            "      ],\n" +
                            "      ARRAY[\n" +
                            "        MAP(ARRAY['a', 'b'], ARRAY[0.56, 0.44]),\n" +
                            "        MAP(ARRAY['c', 'd'], ARRAY[0.90, 0.10])\n" +
                            "      ]\n" +
                            "    ),\n" +
                            "    (1, '2025-01-02', \n" +
                            "      ARRAY[\n" +
                            "        MAP(ARRAY[1, 2], ARRAY[0.23, 0.45]),\n" +
                            "        MAP(ARRAY[5, 6], ARRAY[0.67, 0.89])\n" +
                            "      ],\n" +
                            "      ARRAY[\n" +
                            "        MAP(ARRAY['x', 'y'], ARRAY[0.78, 0.22]),\n" +
                            "        MAP(ARRAY['z', 'w'], ARRAY[0.11, 0.99])\n" +
                            "      ]\n" +
                            "    ),\n" +
                            "    (7, '2025-01-17', \n" +
                            "      ARRAY[\n" +
                            "        MAP(ARRAY[-1, 0], ARRAY[0.60, 0.70]),\n" +
                            "        MAP(ARRAY[2, 3], ARRAY[0.21, 0.79])\n" +
                            "      ],\n" +
                            "      ARRAY[\n" +
                            "        MAP(ARRAY['m', 'n'], ARRAY[0.43, 0.57]),\n" +
                            "        MAP(ARRAY['o', 'p'], ARRAY[0.25, 0.75])\n" +
                            "      ]\n" +
                            "    ),\n" +
                            "    (2, '2025-01-06', \n" +
                            "      ARRAY[\n" +
                            "        MAP(ARRAY[4, 5], ARRAY[0.75, 0.32]),\n" +
                            "        MAP(ARRAY[6, 7], ARRAY[0.19, 0.46])\n" +
                            "      ],\n" +
                            "      ARRAY[\n" +
                            "        MAP(ARRAY['q', 'r'], ARRAY[0.98, 0.02]),\n" +
                            "        MAP(ARRAY['s', 't'], ARRAY[0.49, 0.51])\n" +
                            "      ]\n" +
                            "    ),\n" +
                            "    (5, '2025-01-14', \n" +
                            "      ARRAY[\n" +
                            "        MAP(ARRAY[8, 9], ARRAY[0.88, 0.99]),\n" +
                            "        MAP(ARRAY[10, 11], ARRAY[0.00, 0.33])\n" +
                            "      ],\n" +
                            "      ARRAY[\n" +
                            "        MAP(ARRAY['u', 'v'], ARRAY[0.66, 0.34]),\n" +
                            "        MAP(ARRAY['w', 'x'], ARRAY[0.17, 0.83])\n" +
                            "      ]\n" +
                            "    )\n" +
                            ") t(id, ds, array_of_maps_int, array_of_maps_str)");

            @Language("SQL") String sql = "select transform(array_of_maps_int, item -> map_filter(item, (k, v) -> k = 1)), " +
                    "transform(array_of_maps_str, item -> map_filter(item, (k, v) -> k = 'x')) from test_pushdown_subfields";
            assertQueryWithSameQueryRunner(enabled, sql, disabled);
            sql = "SELECT \n" +
                    "  transform(array_of_maps_int, item -> map_filter(item, (k, v) -> contains(array[-2, 1, 0], k))),\n" +
                    "  transform(array_of_maps_str, item -> map_filter(item, (k, v) -> contains(array['a', 'x'], k)))\n" +
                    "FROM test_pushdown_subfields";
            assertQueryWithSameQueryRunner(enabled, sql, disabled);
            sql = "SELECT \n" +
                    "  transform(array_of_maps_int, item -> map_subset(item, array[-2, 1, 0])),\n" +
                    "  transform(array_of_maps_str, item -> map_subset(item, array['a', 'x']))\n" +
                    "FROM test_pushdown_subfields";
            assertQueryWithSameQueryRunner(enabled, sql, disabled);
            sql = "SELECT \n" +
                    "  transform(array_of_maps_int, item -> map_filter(item, (k, v) -> k in (-2, 1, 0))),\n" +
                    "  transform(array_of_maps_str, item -> map_filter(item, (k, v) -> k in ('a', 'x')))\n" +
                    "FROM test_pushdown_subfields";
            assertQueryWithSameQueryRunner(enabled, sql, disabled);
            sql = "SELECT \n" +
                    "  transform(array_of_maps_int, item -> map_filter(item, (k, v) -> k = 1)),\n" +
                    "  transform(array_of_maps_str, item -> map_filter(item, (k, v) -> k = 'a'))\n" +
                    "FROM test_pushdown_subfields";
            assertQueryWithSameQueryRunner(enabled, sql, disabled);
            sql = "SELECT \n" +
                    "  transform(array_of_maps_int, item -> map_filter(item, (k, v) -> contains(array[-2, 1, id], k))),\n" +
                    "  transform(array_of_maps_str, item -> map_filter(item, (k, v) -> contains(array['a', 'x'], k)))\n" +
                    "FROM test_pushdown_subfields";
            assertQueryWithSameQueryRunner(enabled, sql, disabled);
            sql = "SELECT \n" +
                    "  transform(array_of_maps_int, item -> map_subset(item, array[-2, 1, id])),\n" +
                    "  transform(array_of_maps_str, item -> map_subset(item, array['a', 'x']))\n" +
                    "FROM test_pushdown_subfields";
            assertQueryWithSameQueryRunner(enabled, sql, disabled);
            sql = "SELECT \n" +
                    "  transform(array_of_maps_int, item -> map_filter(item, (k, v) -> k in (-2, 1, null))),\n" +
                    "  transform(array_of_maps_str, item -> map_filter(item, (k, v) -> k in ('a', 'x')))\n" +
                    "FROM test_pushdown_subfields";
            assertQueryWithSameQueryRunner(enabled, sql, disabled);
            sql = "SELECT \n" +
                    "  transform(array_of_maps_int, item -> map_filter(item, (k, v) -> k = id)),\n" +
                    "  transform(array_of_maps_str, item -> map_filter(item, (k, v) -> k = 'a'))\n" +
                    "FROM test_pushdown_subfields";
            assertQueryWithSameQueryRunner(enabled, sql, disabled);
        }
        finally {
            getQueryRunner().execute("DROP TABLE IF EXISTS test_pushdown_subfields");
        }
    }

    @Test
    public void testPushdownSubfieldForMapFunctions()
    {
        Session enabled = Session.builder(getSession())
                .setSystemProperty(PUSHDOWN_SUBFIELDS_FOR_MAP_FUNCTIONS, "true")
                .setSystemProperty(PUSHDOWN_SUBFIELDS_ENABLED, "true")
                .build();
        Session disabled = Session.builder(getSession())
                .setSystemProperty(PUSHDOWN_SUBFIELDS_FOR_MAP_FUNCTIONS, "false")
                .setSystemProperty(PUSHDOWN_SUBFIELDS_ENABLED, "false")
                .build();
        try {
            getQueryRunner().execute(
                    "CREATE TABLE test_pushdown_subfields_map_functions AS\n" +
                            "SELECT * FROM (\n" +
                            "  VALUES \n" +
                            "    (3, '2025-01-08', MAP(ARRAY[2, 1], ARRAY[0.34, 0.92]), MAP(ARRAY['a', 'b'], ARRAY[0.12, 0.88])),\n" +
                            "    (1, '2025-01-02', MAP(ARRAY[1, 3], ARRAY[0.23, 0.5]), MAP(ARRAY['x', 'y'], ARRAY[0.45, 0.55])),\n" +
                            "    (7, '2025-01-17', MAP(ARRAY[6, 8], ARRAY[0.60, 0.70]), MAP(ARRAY['m', 'n'], ARRAY[0.21, 0.79])),\n" +
                            "    (2, '2025-01-06', MAP(ARRAY[2, 3, 5, 7], ARRAY[0.75, 0.32, 0.19, 0.46]), MAP(ARRAY['p', 'q', 'r'], ARRAY[0.11, 0.22, 0.67])),\n" +
                            "    (5, '2025-01-14', MAP(ARRAY[8, 4, 6], ARRAY[0.88, 0.99, 0.00]), MAP(ARRAY['s', 't', 'u'], ARRAY[0.33, 0.44, 0.23])),\n" +
                            "    (4, '2025-01-12', MAP(ARRAY[7, 3, 2], ARRAY[0.33, 0.44, 0.55]), MAP(ARRAY['v', 'w'], ARRAY[0.66, 0.34])),\n" +
                            "    (8, '2025-01-20', MAP(ARRAY[1, 7, 6], ARRAY[0.35, 0.45, 0.55]), MAP(ARRAY['i', 'j', 'k'], ARRAY[0.78, 0.89, 0.12])),\n" +
                            "    (6, '2025-01-16', MAP(ARRAY[9, 1, 3], ARRAY[0.30, 0.40, 0.50]), MAP(ARRAY['c', 'd'], ARRAY[0.90, 0.10])),\n" +
                            "    (2, '2025-01-05', MAP(ARRAY[3, 4], ARRAY[0.98, 0.21]), MAP(ARRAY['e', 'f'], ARRAY[0.56, 0.44])),\n" +
                            "    (1, '2025-01-04', MAP(ARRAY[1, 2], ARRAY[0.45, 0.67]), MAP(ARRAY['g', 'h'], ARRAY[0.23, 0.77]))\n" +
                            ") AS t(id, ds, feature, extra_feature)");

            @Language("SQL") String sql = "select map_filter(feature, (k, v) -> k in (-2, 1, 0)), map_filter(extra_feature, (k, v) -> k in ('a', 'x')) from test_pushdown_subfields_map_functions";
            assertQueryWithSameQueryRunner(enabled, sql, disabled);
            sql = "select map_filter(feature, (k, v) -> contains(array[-2, 1, 0], k)), map_filter(extra_feature, (k, v) -> contains(array['a', 'x'], k)) from test_pushdown_subfields_map_functions";
            assertQueryWithSameQueryRunner(enabled, sql, disabled);
            sql = "select map_filter(feature, (k, v) -> k = 0), map_filter(extra_feature, (k, v) -> k = 'a') from test_pushdown_subfields_map_functions";
            assertQueryWithSameQueryRunner(enabled, sql, disabled);
            sql = "select map_subset(feature, array[-2, 1, 0]), map_subset(extra_feature, array['a', 'x']) from test_pushdown_subfields_map_functions";
            assertQueryWithSameQueryRunner(enabled, sql, disabled);

            sql = "select map_filter(feature, (k, v) -> k in (-2, 1, id)), map_filter(extra_feature, (k, v) -> k in ('a', 'x')) from test_pushdown_subfields_map_functions";
            assertQueryWithSameQueryRunner(enabled, sql, disabled);
            sql = "select map_filter(feature, (k, v) -> contains(array[-2, 1, id], k)), map_filter(extra_feature, (k, v) -> contains(array['a', 'x', null], k)) from test_pushdown_subfields_map_functions";
            assertQueryWithSameQueryRunner(enabled, sql, disabled);
            sql = "select map_filter(feature, (k, v) -> k = id), map_filter(extra_feature, (k, v) -> k = 'a') from test_pushdown_subfields_map_functions";
            assertQueryWithSameQueryRunner(enabled, sql, disabled);
            sql = "select map_subset(feature, array[id]), map_subset(extra_feature, array['a', null]) from test_pushdown_subfields_map_functions";
            assertQueryWithSameQueryRunner(enabled, sql, disabled);
        }
        finally {
            getQueryRunner().execute("DROP TABLE IF EXISTS test_pushdown_subfields_map_functions");
        }
    }

    // Hive specific tests should normally go in TestHiveIntegrationSmokeTest
}