TestLambdaSubfieldPruning.java

/*
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.facebook.presto.hive;

import com.facebook.presto.Session;
import com.facebook.presto.testing.QueryRunner;
import com.facebook.presto.tests.AbstractTestQueryFramework;
import com.facebook.presto.tests.DistributedQueryRunner;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import io.airlift.tpch.TpchTable;
import org.testng.annotations.Test;

import java.util.Optional;

import static com.facebook.presto.hive.HiveQueryRunner.HIVE_CATALOG;
import static com.facebook.presto.hive.HiveSessionProperties.PUSHDOWN_FILTER_ENABLED;

@Test(singleThreaded = true)
public class TestLambdaSubfieldPruning
        extends AbstractTestQueryFramework
{
    @Override
    protected QueryRunner createQueryRunner()
            throws Exception
    {
        DistributedQueryRunner queryRunner = HiveQueryRunner.createQueryRunner(
                ImmutableList.of(TpchTable.LINE_ITEM),
                ImmutableMap.of(
                        "experimental.pushdown-subfields-enabled", "true",
                        "pushdown-subfields-from-lambda-enabled", "true",
                        "experimental.pushdown-dereference-enabled", "true"),
                "sql-standard",
                ImmutableMap.<String, String>builder()
                        .put("hive.pushdown-filter-enabled", "true")
                        .put("hive.parquet.pushdown-filter-enabled", "false") // Parquet does not support selective reader yet.
                        .build(),
                Optional.empty());

        return createLineItemExTable(queryRunner);
    }

    @Override
    protected QueryRunner createExpectedQueryRunner()
            throws Exception
    {
        DistributedQueryRunner queryRunner = HiveQueryRunner.createQueryRunner(
                ImmutableList.of(TpchTable.LINE_ITEM),
                ImmutableMap.of(
                        "experimental.pushdown-subfields-enabled", "false",
                        "pushdown-subfields-from-lambda-enabled", "false",
                        "experimental.pushdown-dereference-enabled", "true"),
                "sql-standard",
                ImmutableMap.<String, String>builder()
                        .put("hive.pushdown-filter-enabled", "true")
                        .put("hive.parquet.pushdown-filter-enabled", "false")
                        .build(),
                Optional.empty());

        return createLineItemExTable(queryRunner);
    }

    private static DistributedQueryRunner createLineItemExTable(DistributedQueryRunner queryRunner)
    {
        for (String fileFormatName : ImmutableSet.of("ORC", "DWRF", "PARQUET")) {
            queryRunner.execute(noPushdownFilter(queryRunner.getDefaultSession()),
                    "CREATE TABLE lineitem_ex_" + fileFormatName + " ( \n" +
                            "    array_of_varchar_keys, \n" +
                            "    array_of_rows, \n" +
                            "    array_of_non_null_rows, \n" +
                            "    array_of_array_of_rows, \n" +
                            "    row_with_array_of_rows, \n" +
                            "    row_with_map_varchar_key_row_value, \n" +
                            "    map_varchar_key_row_value, \n" +
                            "    map_varchar_key_array_of_row_value, \n" +
                            "    array_of_map_entries_varchar_key_row_value, \n" +
                            "    END_OF_LIST \n" +
                            ") WITH (format = '" + fileFormatName + "') AS \n" +
                            "SELECT \n" +
                            "   ARRAY['orderkey', 'linenumber', 'partkey'] AS array_of_varchar_keys, \n" +
                            "   IF (orderkey % 49 = 0, NULL, CAST(ARRAY[ \n" +
                            "        ROW(IF (orderkey % 17 = 0, NULL, orderkey),  comment), \n" +
                            "        ROW(IF (linenumber % 7 = 0, NULL, linenumber),  upper(comment)), \n" +
                            "        ROW(IF (partkey % 5 = 0, NULL, partkey),  shipmode)  \n" +
                            "     ] AS ARRAY(ROW(itemdata BIGINT, comment VARCHAR)))) AS array_of_rows, \n" +
                            "   CAST(ARRAY[ \n" +
                            "        ROW(orderkey,  comment), \n" +
                            "        ROW(linenumber,  upper(comment)), \n" +
                            "        ROW(partkey,  shipmode)  \n" +
                            "     ] AS ARRAY(ROW(itemdata BIGINT, comment VARCHAR))) AS array_of_non_null_rows, \n" +
                            "   IF (orderkey % 49 = 0, NULL, CAST(ARRAY[ARRAY[ \n" +
                            "        ROW(IF (orderkey % 17 = 0, NULL, orderkey),  comment), \n" +
                            "        ROW(IF (linenumber % 7 = 0, NULL, linenumber),  upper(comment)), \n" +
                            "        ROW(IF (partkey % 5 = 0, NULL, partkey),  shipmode) \n" +
                            "     ]] AS ARRAY(ARRAY(ROW(itemdata BIGINT, comment VARCHAR))))) AS array_of_array_of_rows, \n" +
                            "   CAST(ROW(ARRAY[ROW(orderkey, comment)]) AS ROW(array_of_rows ARRAY(ROW(itemdata BIGINT, comment VARCHAR)))) row_with_array_of_rows, \n" +
                            "   CAST(ROW(MAP_FROM_ENTRIES(ARRAY[ \n" +
                            "         ROW('orderdata',    ROW(IF (orderkey % 17 = 0, 1, orderkey), linenumber, partkey)), \n" +
                            "         ROW('orderdata_ex', ROW(orderkey + 100, linenumber + 100, partkey + 100))])) \n" +
                            "      AS ROW(map_varchar_key_row_value MAP(VARCHAR, ROW(orderkey BIGINT, linenumber BIGINT, partkey BIGINT)))) row_with_map_varchar_key_row_value, \n" +
                            "   CAST(MAP_FROM_ENTRIES(ARRAY[ \n" +
                            "         ROW('orderdata',    ROW(IF (orderkey % 17 = 0, 1, orderkey), linenumber, partkey)), \n" +
                            "         ROW('orderdata_ex', ROW(orderkey + 100, linenumber + 100, partkey + 100))]) \n" +
                            "      AS MAP(VARCHAR, ROW(orderkey BIGINT, linenumber BIGINT, partkey BIGINT))) AS map_varchar_key_row_value, \n" +
                            "   CAST(MAP_FROM_ENTRIES(ARRAY[ \n" +
                            "         ROW('orderdata',    IF (orderkey % 13 = 0, NULL, ARRAY[ROW( IF (orderkey % 17 = 0, NULL, orderkey), linenumber, partkey)])), \n" +
                            "         ROW('orderdata_ex', ARRAY[ ROW(orderkey + 100, linenumber + 100, partkey + 100)])]) \n" +
                            "      AS MAP(VARCHAR, ARRAY(ROW(orderkey BIGINT, linenumber BIGINT, partkey BIGINT)))) AS map_varchar_key_array_of_row_value, \n" +
                            "   CAST(ARRAY[ \n" +
                            "         ROW('orderdata',     IF (orderkey % 13 = 0, NULL, ROW( IF (orderkey % 17 = 0, NULL, orderkey), linenumber, partkey))), \n" +
                            "         ROW('orderdata_ex',  ROW(orderkey + 100, linenumber + 100, partkey + 100))] \n" +
                            "      AS ARRAY(ROW(key VARCHAR, value ROW(orderkey BIGINT, linenumber BIGINT, partkey BIGINT)))) AS array_of_map_entries_varchar_key_row_value, \n" +
                            "   true AS END_OF_LIST \n" +

                            "FROM lineitem  \n");
        }
        return queryRunner;
    }

    private static Session noPushdownFilter(Session session)
    {
        return Session.builder(session)
                .setCatalogSessionProperty(HIVE_CATALOG, PUSHDOWN_FILTER_ENABLED, "false")
                .build();
    }

    @Test
    public void testPushDownSubfieldsFromLambdas()
    {
        for (String fileFormatName : ImmutableSet.of("ORC", "DWRF", "PARQUET")) {
            testPushDownSubfieldsFromLambdas("lineitem_ex_" + fileFormatName);
        }
    }

    private void testPushDownSubfieldsFromLambdas(String tableName)
    {
        assertQuery("SELECT TRANSFORM(array_of_rows, x -> x.itemdata > 0) FROM " + tableName);
        assertQuery("SELECT TRANSFORM(row_with_array_of_rows.array_of_rows, x -> CAST(ROW(x.comment, x.comment) AS ROW(d1 VARCHAR, d2 VARCHAR))) FROM " + tableName);
        assertQuery("SELECT TRANSFORM_VALUES(map_varchar_key_row_value, (k,v) -> v.orderkey) FROM " + tableName);
        assertQuery("SELECT ZIP_WITH(array_of_rows, row_with_array_of_rows.array_of_rows, (x, y) -> CAST(ROW(x.itemdata, y.comment) AS ROW(d1 BIGINT, d2 VARCHAR))) FROM " + tableName);
        assertQuery("SELECT MAP_ZIP_WITH(map_varchar_key_row_value, row_with_map_varchar_key_row_value.map_varchar_key_row_value, (k, v1, v2) -> v1.orderkey + v2.orderkey) FROM " + tableName);

        // functions that outputing all subfields and accept functional parameter
        assertQuery("SELECT FILTER(array_of_rows, x -> POSITION('T' IN x.comment) > 0) FROM " + tableName);
        assertQuery("SELECT TRANSFORM(FLATTEN(array_of_array_of_rows), x -> x.itemdata) FROM " + tableName);
        assertQuery("SELECT TRANSFORM(CONCAT(array_of_rows, row_with_array_of_rows.array_of_rows), x -> x.itemdata) FROM " + tableName);
        assertQuery("SELECT TRANSFORM(array_of_rows || row_with_array_of_rows.array_of_rows, x -> x.itemdata) FROM " + tableName);
        assertQuery("SELECT TRANSFORM_VALUES(MAP_CONCAT(map_varchar_key_row_value, row_with_map_varchar_key_row_value.map_varchar_key_row_value), (k,v) -> v.orderkey) FROM " + tableName);
        assertQuery("SELECT TRANSFORM_VALUES(MAP_REMOVE_NULL_VALUES(row_with_map_varchar_key_row_value.map_varchar_key_row_value), (k,v) -> v.orderkey) FROM " + tableName);
        assertQuery("SELECT TRANSFORM_VALUES(MAP_SUBSET(row_with_map_varchar_key_row_value.map_varchar_key_row_value, ARRAY['orderdata_ex']), (k,v) -> v.orderkey) FROM " + tableName);
    }
}