TestHiveCanonicalPlanGenerator.java
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.hive;
import com.facebook.airlift.json.JsonObjectMapperProvider;
import com.facebook.presto.Session;
import com.facebook.presto.common.block.Block;
import com.facebook.presto.common.block.TestingBlockEncodingSerde;
import com.facebook.presto.common.block.TestingBlockJsonSerde;
import com.facebook.presto.common.plan.PlanCanonicalizationStrategy;
import com.facebook.presto.common.type.TestingTypeDeserializer;
import com.facebook.presto.common.type.TestingTypeManager;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.sql.planner.CanonicalPlan;
import com.facebook.presto.sql.planner.CanonicalPlanFragment;
import com.facebook.presto.sql.planner.PlanFragment;
import com.facebook.presto.sql.planner.SubPlan;
import com.facebook.presto.testing.QueryRunner;
import com.facebook.presto.tests.AbstractTestQueryFramework;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.common.collect.ImmutableList;
import org.testng.annotations.Test;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import static com.facebook.presto.SystemSessionProperties.REWRITE_EXPRESSION_WITH_CONSTANT_EXPRESSION;
import static com.facebook.presto.common.plan.PlanCanonicalizationStrategy.CONNECTOR;
import static com.facebook.presto.common.plan.PlanCanonicalizationStrategy.IGNORE_SAFE_CONSTANTS;
import static com.facebook.presto.common.plan.PlanCanonicalizationStrategy.IGNORE_SCAN_CONSTANTS;
import static com.facebook.presto.hive.HiveQueryRunner.HIVE_CATALOG;
import static com.facebook.presto.hive.HiveSessionProperties.PUSHDOWN_FILTER_ENABLED;
import static com.facebook.presto.sql.planner.CanonicalPlanGenerator.generateCanonicalPlan;
import static com.facebook.presto.sql.planner.CanonicalPlanGenerator.generateCanonicalPlanFragment;
import static com.fasterxml.jackson.databind.SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.Iterables.getOnlyElement;
import static io.airlift.tpch.TpchTable.LINE_ITEM;
import static io.airlift.tpch.TpchTable.ORDERS;
import static java.lang.String.format;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertTrue;
public class TestHiveCanonicalPlanGenerator
extends AbstractTestQueryFramework
{
private ObjectMapper objectMapper;
public TestHiveCanonicalPlanGenerator()
{
TestingTypeManager typeManager = new TestingTypeManager();
TestingBlockEncodingSerde blockEncodingSerde = new TestingBlockEncodingSerde();
this.objectMapper = new JsonObjectMapperProvider().get()
.registerModule(new SimpleModule()
.addDeserializer(Type.class, new TestingTypeDeserializer(typeManager))
.addSerializer(Block.class, new TestingBlockJsonSerde.Serializer(blockEncodingSerde))
.addDeserializer(Block.class, new TestingBlockJsonSerde.Deserializer(blockEncodingSerde)))
.configure(ORDER_MAP_ENTRIES_BY_KEYS, true);
}
@Override
protected QueryRunner createQueryRunner()
throws Exception
{
return HiveQueryRunner.createQueryRunner(ImmutableList.of(ORDERS, LINE_ITEM));
}
@Test
public void testCanonicalizationStrategies()
throws Exception
{
QueryRunner queryRunner = getQueryRunner();
try {
queryRunner.execute("CREATE TABLE test_orders WITH (partitioned_by = ARRAY['ds', 'ts']) AS " +
"SELECT orderkey, orderpriority, comment, custkey, '2020-09-01' as ds, '00:01' as ts FROM orders WHERE orderkey < 1000 " +
"UNION ALL " +
"SELECT orderkey, orderpriority, comment, custkey, '2020-09-02' as ds, '00:02' as ts FROM orders WHERE orderkey < 1000");
assertSameCanonicalLeafPlan(
pushdownFilterEnabled(),
"SELECT orderkey from test_orders",
"SELECT orderkey from test_orders",
CONNECTOR);
assertSameCanonicalLeafPlan(
pushdownFilterEnabled(),
"SELECT orderkey from test_orders where ds > '2020-09-01'",
"SELECT orderkey from test_orders where ds = '2020-09-02'",
CONNECTOR);
assertDifferentCanonicalLeafPlan(
pushdownFilterEnabled(),
"SELECT orderkey from test_orders where ds = '2020-09-01' AND orderkey < 10",
"SELECT orderkey from test_orders where ds = '2020-09-02' AND orderkey < 20",
CONNECTOR);
assertDifferentCanonicalLeafPlan(
pushdownFilterEnabled(),
"SELECT orderkey from test_orders where ds = '2020-09-01' AND orderkey < 10",
"SELECT orderkey from test_orders where ds = '2020-09-02' AND orderkey < 20",
IGNORE_SAFE_CONSTANTS);
assertSameCanonicalLeafPlan(
pushdownFilterEnabled(),
"SELECT orderkey, CAST('1' AS VARCHAR) from test_orders where ds = '2020-09-01' AND orderkey < 10 AND ts >= '00:01'",
"SELECT orderkey, CAST('11' AS VARCHAR) from test_orders where ds = '2020-09-02' AND orderkey < 10 AND ts >= '00:02'",
IGNORE_SAFE_CONSTANTS);
assertDifferentCanonicalLeafPlan(
pushdownFilterEnabled(),
"SELECT orderkey, CAST('1' AS VARCHAR) from test_orders where ds = '2020-09-01' AND orderkey = 10",
"SELECT orderkey, CAST('11' AS VARCHAR) from test_orders where ds = '2020-09-02' AND orderkey = 20",
IGNORE_SAFE_CONSTANTS);
assertDifferentCanonicalLeafPlan(
pushdownFilterEnabled(),
"SELECT orderkey from test_orders where ds = '2020-09-01' AND orderkey < 10",
"SELECT orderkey from test_orders where ds = '2020-09-02' AND orderkey < 20",
IGNORE_SCAN_CONSTANTS);
assertSameCanonicalLeafPlan(
pushdownFilterEnabled(),
"SELECT orderkey, CAST('1' AS VARCHAR) from test_orders where ds = '2020-09-01' AND orderkey < 10 AND ts >= '00:01'",
"SELECT orderkey, CAST('11' AS VARCHAR) from test_orders where ds = '2020-09-02' AND orderkey < 10 AND ts >= '00:02'",
IGNORE_SCAN_CONSTANTS);
assertSameCanonicalLeafPlan(
pushdownFilterEnabled(),
"SELECT orderkey, CAST('1' AS VARCHAR) from test_orders where ds = '2020-09-01' AND orderkey = 10",
"SELECT orderkey, CAST('11' AS VARCHAR) from test_orders where ds = '2020-09-02' AND orderkey = 20",
IGNORE_SCAN_CONSTANTS);
}
finally {
queryRunner.execute("DROP TABLE IF EXISTS test_orders");
}
}
@Test
public void testColumnPredicates()
throws Exception
{
QueryRunner queryRunner = getQueryRunner();
try {
queryRunner.execute("CREATE TABLE test_column_predicates WITH (partitioned_by = ARRAY['ds']) AS " +
"SELECT orderkey, orderpriority, comment, custkey, '2020-09-01' as ds FROM orders WHERE orderkey < 1000 " +
"UNION ALL " +
"SELECT orderkey, orderpriority, comment, custkey, '2020-09-02' as ds FROM orders WHERE orderkey < 1000");
assertDifferentCanonicalLeafSubPlan(
getSession(),
"SELECT * FROM test_column_predicates WHERE ds IN ('2020-09-01', '2020-09-02')",
"SELECT * FROM test_column_predicates");
// Enabling filter push down would extract partition column predicate from domainPredicate, which would
// make partition column predicates irrelevant for canonical plan.
assertSameCanonicalLeafSubPlan(
pushdownFilterEnabled(),
"SELECT * FROM test_column_predicates WHERE ds IN ('2020-09-01', '2020-09-02')",
"SELECT * FROM test_column_predicates");
assertSameCanonicalLeafSubPlan(
pushdownFilterEnabledConstantPullUpDisabled(),
"SELECT * FROM test_column_predicates WHERE ds = '2020-09-01'",
"SELECT * FROM test_column_predicates WHERE ds = '2020-09-02'");
assertSameCanonicalLeafSubPlan(
pushdownFilterEnabled(),
"SELECT * FROM test_column_predicates WHERE ds = '2020-09-01' AND regexp_like(comment, '.*foo.*')");
assertDifferentCanonicalLeafSubPlan(
pushdownFilterEnabled(),
"SELECT * FROM test_column_predicates WHERE ds = '2020-09-01' AND regexp_like(comment, '.*foo.*')",
"SELECT * FROM test_column_predicates WHERE ds = '2020-09-01' AND regexp_like(comment, '.*bar.*')");
assertDifferentCanonicalLeafSubPlan(
pushdownFilterEnabled(),
"SELECT * FROM test_column_predicates WHERE ds = '2020-09-01' AND orderkey < 50",
"SELECT * FROM test_column_predicates WHERE ds = '2020-09-01' AND orderkey < 100");
assertDifferentCanonicalLeafSubPlan(
pushdownFilterEnabled(),
"SELECT * FROM test_column_predicates WHERE ds = '2020-09-01' AND orderkey < 50",
"SELECT * FROM test_column_predicates WHERE ds = '2020-09-01' AND custkey < 50");
}
finally {
queryRunner.execute("DROP TABLE IF EXISTS test_column_predicates");
}
}
@Test
public void testBucketFilter()
throws Exception
{
QueryRunner queryRunner = getQueryRunner();
try {
queryRunner.execute("CREATE TABLE test_bucket_filter WITH (partitioned_by = ARRAY['ds'], bucketed_by = ARRAY['orderkey'], bucket_count = 11) AS " +
"SELECT orderkey, orderpriority, comment, '2020-09-01' as ds FROM orders WHERE orderkey < 1000 " +
"UNION ALL " +
"SELECT orderkey, orderpriority, comment, '2020-09-02' as ds FROM orders WHERE orderkey < 1000");
assertSameCanonicalLeafSubPlan(
pushdownFilterEnabled(),
"SELECT * FROM test_bucket_filter WHERE ds = '2020-09-01' AND orderkey = 50");
assertDifferentCanonicalLeafSubPlan(
pushdownFilterEnabled(),
"SELECT * FROM test_bucket_filter WHERE ds = '2020-09-01' AND orderkey = 50",
"SELECT * FROM test_bucket_filter WHERE ds = '2020-09-01' AND orderkey = 60");
}
finally {
queryRunner.execute("DROP TABLE IF EXISTS test_bucket_filter");
}
}
private Session pushdownFilterEnabled()
{
return Session.builder(getQueryRunner().getDefaultSession())
.setCatalogSessionProperty(HIVE_CATALOG, PUSHDOWN_FILTER_ENABLED, "true")
.build();
}
private Session pushdownFilterEnabledConstantPullUpDisabled()
{
return Session.builder(getQueryRunner().getDefaultSession())
.setCatalogSessionProperty(HIVE_CATALOG, PUSHDOWN_FILTER_ENABLED, "true")
.setSystemProperty(REWRITE_EXPRESSION_WITH_CONSTANT_EXPRESSION, "false")
.build();
}
private static List<SubPlan> getLeafSubPlans(SubPlan subPlan)
{
if (subPlan.getChildren().isEmpty()) {
return ImmutableList.of(subPlan);
}
return subPlan.getChildren().stream()
.map(TestHiveCanonicalPlanGenerator::getLeafSubPlans)
.flatMap(List::stream)
.collect(toImmutableList());
}
private void assertSameCanonicalLeafSubPlan(Session session, String sql)
throws Exception
{
assertSameCanonicalLeafSubPlan(session, sql, sql);
}
// This helper method would check if the provided sql could generate the same leaf canonical plan fragment when it appears
// at two sides of UNION ALL. The provided sql should only contain queries that don't have subplan fanout like JOIN.
private void assertSameCanonicalLeafSubPlan(Session session, String sql2, String sql1)
throws Exception
{
SubPlan subplan = subplan(format("( %s ) UNION ALL ( %s )", sql1, sql2), session);
List<CanonicalPlanFragment> leafCanonicalPlans = getLeafSubPlans(subplan).stream()
.map(SubPlan::getFragment)
.map(fragment -> generateCanonicalPlanFragment(fragment.getRoot(), fragment.getPartitioningScheme(), objectMapper, session))
.map(Optional::get)
.collect(Collectors.toList());
assertEquals(leafCanonicalPlans.size(), 2);
String s1 = objectMapper.writeValueAsString(leafCanonicalPlans.get(0));
String s2 = objectMapper.writeValueAsString(leafCanonicalPlans.get(1));
assertEquals(s1, s2);
}
private void assertDifferentCanonicalLeafSubPlan(Session session, String sql1, String sql2)
throws Exception
{
PlanFragment fragment1 = getOnlyElement(getLeafSubPlans(subplan(sql1, session))).getFragment();
PlanFragment fragment2 = getOnlyElement(getLeafSubPlans(subplan(sql2, session))).getFragment();
Optional<CanonicalPlanFragment> canonicalPlan1 = generateCanonicalPlanFragment(fragment1.getRoot(), fragment1.getPartitioningScheme(), objectMapper, session);
Optional<CanonicalPlanFragment> canonicalPlan2 = generateCanonicalPlanFragment(fragment2.getRoot(), fragment2.getPartitioningScheme(), objectMapper, session);
assertTrue(canonicalPlan1.isPresent());
assertTrue(canonicalPlan2.isPresent());
assertNotEquals(objectMapper.writeValueAsString(canonicalPlan1), objectMapper.writeValueAsString(canonicalPlan2));
}
private void assertDifferentCanonicalLeafPlan(Session session, String sql1, String sql2, PlanCanonicalizationStrategy strategy)
throws Exception
{
PlanFragment fragment1 = getOnlyElement(getLeafSubPlans(subplan(sql1, session))).getFragment();
PlanFragment fragment2 = getOnlyElement(getLeafSubPlans(subplan(sql2, session))).getFragment();
Optional<CanonicalPlan> canonicalPlan1 = generateCanonicalPlan(fragment1.getRoot(), strategy, objectMapper, session);
Optional<CanonicalPlan> canonicalPlan2 = generateCanonicalPlan(fragment2.getRoot(), strategy, objectMapper, session);
assertTrue(canonicalPlan1.isPresent());
assertTrue(canonicalPlan2.isPresent());
assertNotEquals(objectMapper.writeValueAsString(canonicalPlan1), objectMapper.writeValueAsString(canonicalPlan2));
}
private void assertSameCanonicalLeafPlan(Session session, String sql1, String sql2, PlanCanonicalizationStrategy strategy)
throws Exception
{
PlanFragment fragment1 = getOnlyElement(getLeafSubPlans(subplan(sql1, session))).getFragment();
PlanFragment fragment2 = getOnlyElement(getLeafSubPlans(subplan(sql2, session))).getFragment();
Optional<CanonicalPlan> canonicalPlan1 = generateCanonicalPlan(fragment1.getRoot(), strategy, objectMapper, session);
Optional<CanonicalPlan> canonicalPlan2 = generateCanonicalPlan(fragment2.getRoot(), strategy, objectMapper, session);
assertTrue(canonicalPlan1.isPresent());
assertTrue(canonicalPlan2.isPresent());
assertEquals(objectMapper.writeValueAsString(canonicalPlan1), objectMapper.writeValueAsString(canonicalPlan2));
}
}