TestHiveDistributedJoinQueriesWithDynamicFiltering.java
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.hive;
import com.facebook.presto.Session;
import com.facebook.presto.operator.OperatorStats;
import com.facebook.presto.spi.QueryId;
import com.facebook.presto.spi.plan.FilterNode;
import com.facebook.presto.spi.plan.PlanNodeId;
import com.facebook.presto.spi.plan.ProjectNode;
import com.facebook.presto.spi.plan.TableScanNode;
import com.facebook.presto.sql.analyzer.FeaturesConfig;
import com.facebook.presto.sql.planner.Plan;
import com.facebook.presto.sql.planner.optimizations.PlanNodeSearcher;
import com.facebook.presto.testing.MaterializedResult;
import com.facebook.presto.testing.QueryRunner;
import com.facebook.presto.tests.AbstractTestJoinQueries;
import com.facebook.presto.tests.DistributedQueryRunner;
import com.facebook.presto.tests.ResultWithQueryId;
import com.google.common.collect.MoreCollectors;
import org.testng.annotations.Test;
import static com.facebook.airlift.testing.Assertions.assertGreaterThan;
import static com.facebook.airlift.testing.Assertions.assertLessThanOrEqual;
import static com.facebook.presto.SystemSessionProperties.ENABLE_DYNAMIC_FILTERING;
import static com.facebook.presto.SystemSessionProperties.JOIN_DISTRIBUTION_TYPE;
import static com.facebook.presto.SystemSessionProperties.JOIN_REORDERING_STRATEGY;
import static com.facebook.presto.SystemSessionProperties.PUSHDOWN_SUBFIELDS_ENABLED;
import static com.facebook.presto.hive.HiveQueryRunner.HIVE_CATALOG;
import static com.facebook.presto.hive.HiveSessionProperties.PUSHDOWN_FILTER_ENABLED;
import static com.facebook.presto.sql.analyzer.FeaturesConfig.JoinDistributionType.BROADCAST;
import static io.airlift.tpch.TpchTable.getTables;
import static org.testng.Assert.assertEquals;
public class TestHiveDistributedJoinQueriesWithDynamicFiltering
extends AbstractTestJoinQueries
{
@Override
protected QueryRunner createQueryRunner()
throws Exception
{
return HiveQueryRunner.createQueryRunner(getTables());
}
@Override
protected Session getSession()
{
return Session.builder(super.getSession())
.setSystemProperty(ENABLE_DYNAMIC_FILTERING, "true")
.setSystemProperty(PUSHDOWN_SUBFIELDS_ENABLED, "true")
.setSystemProperty(JOIN_DISTRIBUTION_TYPE, BROADCAST.name())
.setCatalogSessionProperty(HIVE_CATALOG, PUSHDOWN_FILTER_ENABLED, "true")
.build();
}
@Test
public void testJoinWithEmptyBuildSide()
{
Session session = Session.builder(getSession())
.setSystemProperty(JOIN_DISTRIBUTION_TYPE, FeaturesConfig.JoinDistributionType.BROADCAST.name())
.setSystemProperty(PUSHDOWN_SUBFIELDS_ENABLED, "false")
.setCatalogSessionProperty(HIVE_CATALOG, PUSHDOWN_FILTER_ENABLED, "false")
.build();
DistributedQueryRunner runner = (DistributedQueryRunner) getQueryRunner();
ResultWithQueryId<MaterializedResult> result = runner.executeWithQueryId(
session,
"SELECT * FROM lineitem JOIN orders ON lineitem.orderkey = orders.orderkey AND orders.totalprice = 123.4567");
assertEquals(result.getResult().getRowCount(), 0);
OperatorStats probeStats = searchScanFilterAndProjectOperatorStats(result.getQueryId(), "lineitem");
// Probe-side is not scanned at all, due to dynamic filtering:
assertEquals(probeStats.getInputPositions(), 0L);
}
@Test
public void testJoinWithSelectiveBuildSide()
{
Session session = Session.builder(getSession())
.setSystemProperty(JOIN_DISTRIBUTION_TYPE, FeaturesConfig.JoinDistributionType.BROADCAST.name())
.setSystemProperty(PUSHDOWN_SUBFIELDS_ENABLED, "false")
.setCatalogSessionProperty(HIVE_CATALOG, PUSHDOWN_FILTER_ENABLED, "false")
.build();
DistributedQueryRunner runner = (DistributedQueryRunner) getQueryRunner();
ResultWithQueryId<MaterializedResult> result = runner.executeWithQueryId(
session,
"SELECT * FROM lineitem JOIN orders ON lineitem.orderkey = orders.orderkey AND orders.custkey = 1");
assertGreaterThan(result.getResult().getRowCount(), 0);
OperatorStats probeStats = searchScanFilterAndProjectOperatorStats(result.getQueryId(), "lineitem");
// Probe side may be partially scanned, depending on the drivers' scheduling:
assertLessThanOrEqual(probeStats.getInputPositions(), countRows("lineitem"));
}
@Test
public void testJoinDynamicFilteringMultiJoin()
{
assertUpdate("CREATE TABLE t0 (k0 integer, v0 real)");
assertUpdate("CREATE TABLE t1 (k1 integer, v1 real)");
assertUpdate("CREATE TABLE t2 (k2 integer, v2 real)");
assertUpdate("INSERT INTO t0 VALUES (1, 1.0)", 1);
assertUpdate("INSERT INTO t1 VALUES (1, 2.0)", 1);
assertUpdate("INSERT INTO t2 VALUES (1, 3.0)", 1);
String query = "SELECT k0, k1, k2 FROM t0, t1, t2 WHERE (k0 = k1) AND (k0 = k2) AND (v0 + v1 = v2)";
Session session = Session.builder(getSession())
.setSystemProperty(ENABLE_DYNAMIC_FILTERING, "true")
.setSystemProperty(JOIN_DISTRIBUTION_TYPE, FeaturesConfig.JoinDistributionType.BROADCAST.name())
.setSystemProperty(JOIN_REORDERING_STRATEGY, FeaturesConfig.JoinReorderingStrategy.NONE.name())
.build();
assertQuery(session, query, "SELECT 1, 1, 1");
}
@Test
public void testJoinOnNullPartitioning()
{
assertUpdate("CREATE TABLE t3(c2 bigint, c1 bigint)");
assertUpdate("INSERT INTO t3 VALUES(null, 2)", 1);
assertUpdate("CREATE TABLE t4(c2 bigint, c1 bigint) with(partitioned_by=array['c1'])");
assertUpdate("INSERT INTO t4 VALUES(null, null), (2,2)", 2);
String query = "select * from t3, t4 where t3.c1=t4.c2";
Session session = Session.builder(getSession())
.setSystemProperty(ENABLE_DYNAMIC_FILTERING, "true")
.setSystemProperty(JOIN_DISTRIBUTION_TYPE, FeaturesConfig.JoinDistributionType.AUTOMATIC.name())
.setSystemProperty(JOIN_REORDERING_STRATEGY, FeaturesConfig.JoinReorderingStrategy.AUTOMATIC.name())
.build();
assertQuery(session, query, "SELECT null, 2, 2, 2");
}
@Test
public void testMixedJoin()
{
// Mixed join could produce conjunction dynamic filters, we should be able to extract them out when integrating with filter pushdown
assertQuery("SELECT * FROM\n" +
"lineitem l1 LEFT OUTER JOIN part p1\n" +
"ON l1.orderkey = p1.partkey AND p1.size = 47\n" +
"INNER JOIN orders o1 ON l1.orderkey = o1.orderkey\n" +
"AND o1.custkey = 397\n" +
"LEFT OUTER JOIN part p2\n" +
"ON p1.name = p2.name AND p1.partkey = p2.partkey\n" +
"WHERE o1.shippriority = 0");
}
private OperatorStats searchScanFilterAndProjectOperatorStats(QueryId queryId, String tableName)
{
DistributedQueryRunner runner = (DistributedQueryRunner) getQueryRunner();
Plan plan = runner.getQueryPlan(queryId);
PlanNodeId nodeId = PlanNodeSearcher.searchFrom(plan.getRoot())
.where(node -> {
if (!(node instanceof ProjectNode)) {
return false;
}
ProjectNode projectNode = (ProjectNode) node;
if (!(projectNode.getSource() instanceof FilterNode)) {
return false;
}
FilterNode filterNode = (FilterNode) projectNode.getSource();
TableScanNode tableScanNode = (TableScanNode) filterNode.getSource();
return tableName.equals(((HiveTableHandle) (tableScanNode.getTable().getConnectorHandle())).getTableName());
})
.findOnlyElement()
.getId();
return runner.getCoordinator().getQueryManager().getFullQueryInfo(queryId).getQueryStats().getOperatorSummaries().stream()
.filter(summary -> nodeId.equals(summary.getPlanNodeId())).collect(MoreCollectors.onlyElement());
}
private Long countRows(String tableName)
{
MaterializedResult result = getQueryRunner().execute("SELECT COUNT() FROM " + tableName);
return (Long) result.getOnlyValue();
}
}