TestHiveDistributedQueriesWithExchangeMaterialization.java
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.hive;
import com.facebook.presto.Session;
import com.facebook.presto.testing.MaterializedResult;
import com.facebook.presto.testing.QueryRunner;
import com.facebook.presto.tests.AbstractTestDistributedQueries;
import org.testng.annotations.Test;
import static com.facebook.presto.SystemSessionProperties.ENABLE_STATS_COLLECTION_FOR_TEMPORARY_TABLE;
import static com.facebook.presto.hive.BucketFunctionType.HIVE_COMPATIBLE;
import static com.facebook.presto.hive.BucketFunctionType.PRESTO_NATIVE;
import static com.facebook.presto.hive.HiveQueryRunner.createMaterializingQueryRunner;
import static com.facebook.presto.hive.HiveStorageFormat.ORC;
import static com.facebook.presto.hive.HiveStorageFormat.PAGEFILE;
import static com.facebook.presto.hive.TestHiveIntegrationSmokeTest.assertRemoteMaterializedExchangesCount;
import static com.facebook.presto.sql.tree.ExplainType.Type.LOGICAL;
import static com.google.common.collect.Iterables.getOnlyElement;
import static io.airlift.tpch.TpchTable.getTables;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertThrows;
@Test(singleThreaded = true)
public class TestHiveDistributedQueriesWithExchangeMaterialization
extends AbstractTestDistributedQueries
{
@Override
protected QueryRunner createQueryRunner()
throws Exception
{
return createMaterializingQueryRunner(getTables());
}
@Test
public void testMaterializedExchangesEnabled()
{
assertQuery(getSession(), "SELECT orderkey, COUNT(*) lines FROM lineitem GROUP BY orderkey", assertRemoteMaterializedExchangesCount(1));
}
@Test
public void testMaterializeHiveUnsupportedTypeForTemporaryTable()
{
testMaterializeHiveUnsupportedTypeForTemporaryTable(ORC, true);
testMaterializeHiveUnsupportedTypeForTemporaryTable(PAGEFILE, false);
assertThrows(RuntimeException.class, () -> testMaterializeHiveUnsupportedTypeForTemporaryTable(ORC, false));
}
private void testMaterializeHiveUnsupportedTypeForTemporaryTable(
HiveStorageFormat storageFormat,
boolean usePageFileForHiveUnsupportedType)
{
Session session = sessionBuilderNoConstantGrouping()
.setCatalogSessionProperty("hive", "temporary_table_storage_format", storageFormat.name())
.setCatalogSessionProperty("hive", "use_pagefile_for_hive_unsupported_type", String.valueOf(usePageFileForHiveUnsupportedType))
.setSystemProperty(ENABLE_STATS_COLLECTION_FOR_TEMPORARY_TABLE, "true")
.build();
assertUpdate(session, "CREATE TABLE test_materialize_non_hive_types AS\n" +
"WITH t1 AS (\n" +
" SELECT\n" +
" CAST('192.168.0.0' AS IPADDRESS) address,\n" +
" nationkey\n" +
" FROM nation\n" +
"),\n" +
"t2 AS (\n" +
" SELECT\n" +
" FROM_ISO8601_TIMESTAMP('2020-02-25') time,\n" +
" nationkey\n" +
" FROM nation\n" +
")\n" +
"SELECT\n" +
" t1.nationkey,\n" +
" CAST(t1.address AS VARCHAR) address,\n" +
" CAST(t2.time AS VARCHAR) time\n" +
"FROM t1\n" +
"JOIN t2\n" +
" ON t1.nationkey = t2.nationkey",
25,
assertRemoteMaterializedExchangesCount(2));
assertUpdate("DROP TABLE IF EXISTS test_materialize_non_hive_types");
assertUpdate(session, "CREATE TABLE test_materialize_non_hive_types AS\n" +
"WITH t1 AS (\n" +
" SELECT\n" +
" CAST('2000-01-01' AS DATE) date,\n" +
" nationkey\n" +
" FROM nation\n" +
"),\n" +
"t2 AS (\n" +
" SELECT\n" +
" FROM_ISO8601_TIMESTAMP('2020-02-25') time,\n" +
" nationkey\n" +
" FROM nation\n" +
")\n" +
"SELECT\n" +
" t1.nationkey,\n" +
" t1.date,\n" +
" CAST(t2.time AS VARCHAR) time\n" +
"FROM t1\n" +
"JOIN t2\n" +
" ON t1.nationkey = t2.nationkey",
25);
assertUpdate("DROP TABLE IF EXISTS test_materialize_non_hive_types");
}
@Test
public void testBucketedByHiveUnsupportedTypeForTemporaryTable()
{
testBucketedByHiveUnsupportedTypeForTemporaryTable(ORC, HIVE_COMPATIBLE, true);
testBucketedByHiveUnsupportedTypeForTemporaryTable(ORC, PRESTO_NATIVE, true);
testBucketedByHiveUnsupportedTypeForTemporaryTable(PAGEFILE, HIVE_COMPATIBLE, true);
testBucketedByHiveUnsupportedTypeForTemporaryTable(PAGEFILE, PRESTO_NATIVE, false);
assertThrows(RuntimeException.class, () -> testBucketedByHiveUnsupportedTypeForTemporaryTable(ORC, HIVE_COMPATIBLE, false));
assertThrows(RuntimeException.class, () -> testBucketedByHiveUnsupportedTypeForTemporaryTable(ORC, PRESTO_NATIVE, false));
assertThrows(RuntimeException.class, () -> testBucketedByHiveUnsupportedTypeForTemporaryTable(PAGEFILE, HIVE_COMPATIBLE, false));
}
private void testBucketedByHiveUnsupportedTypeForTemporaryTable(
HiveStorageFormat storageFormat,
BucketFunctionType bucketFunctionType,
boolean usePageFileForHiveUnsupportedType)
{
Session session = sessionBuilderNoConstantGrouping()
.setCatalogSessionProperty("hive", "temporary_table_storage_format", storageFormat.name())
.setCatalogSessionProperty("hive", "bucket_function_type_for_exchange", bucketFunctionType.name())
.setCatalogSessionProperty("hive", "use_pagefile_for_hive_unsupported_type", String.valueOf(usePageFileForHiveUnsupportedType))
.build();
assertUpdate(session, "CREATE TABLE test_materialize_bucket_by_non_hive_types AS\n" +
"WITH t1 AS (\n" +
" SELECT\n" +
" CAST('192.168.0.0' AS IPADDRESS) address,\n" +
" nationkey\n" +
" FROM nation\n" +
" GROUP BY\n" +
" nationkey,\n" +
" CAST('192.168.0.0' AS IPADDRESS)\n" +
"),\n" +
"t2 AS (\n" +
" SELECT\n" +
" FROM_ISO8601_TIMESTAMP('2020-02-25') time,\n" +
" nationkey\n" +
" FROM nation\n" +
" GROUP BY\n" +
" nationkey,\n" +
" FROM_ISO8601_TIMESTAMP('2020-02-25')\n" +
")\n" +
"SELECT\n" +
" t1.nationkey,\n" +
" CAST(t1.address AS VARCHAR) address,\n" +
" CAST(t2.time AS VARCHAR) time\n" +
"FROM t1\n" +
"JOIN t2\n" +
" ON t1.nationkey = t2.nationkey",
25,
assertRemoteMaterializedExchangesCount(3));
assertUpdate("DROP TABLE IF EXISTS test_materialize_bucket_by_non_hive_types");
}
// We need to disable optimize_constant_grouping_keys so Optimizer does NOT optimize
// [Aggregate() -> Project(add constant) -> TableScan()] ==> [Project(add constant) -> Aggregate() -> TableScan()]
// which will lead to type changes in materialized tables
private Session.SessionBuilder sessionBuilderNoConstantGrouping()
{
return Session.builder(getSession())
// We need to disable optimize_constant_grouping_keys so Optimizer does NOT optimize
// [Aggregate() -> Project(add constant) -> TableScan()] ==> [Project(add constant) -> Aggregate() -> TableScan()]
// which will lead to type changes in materialized tables
.setSystemProperty("optimize_constant_grouping_keys", "false")
.setSystemProperty("rewrite_expression_with_constant_expression", "false");
}
@Override
public void testDelete()
{
// Hive connector currently does not support row-by-row delete
}
@Override
public void testUpdate()
{
// Updates are not supported by the connector
}
@Override
public void testExcept()
{
// decimal type is not supported by the Hive hash code function
}
@Override
public void testIntersect()
{
// decimal type is not supported by the Hive hash code function
}
@Override
public void testQuantifiedComparison()
{
// decimal type is not supported by the Hive hash code function
}
public void testSemiJoin()
{
// decimal type is not supported by the Hive hash code function
}
@Override
public void testUnionRequiringCoercion()
{
// decimal type is not supported by the Hive hash code function
}
@Override
public void testValues()
{
// decimal type is not supported by the Hive hash code function
}
@Test
public void testExchangeMaterializationWithConstantFolding()
{
try {
assertUpdate(
// bucket count has to be different from materialized bucket number
"CREATE TABLE test_constant_folding_lineitem_bucketed\n" +
"WITH (bucket_count = 17, bucketed_by = ARRAY['partkey_mod_9', 'partkey', 'suppkey', 'suppkey_varchar']) AS\n" +
"SELECT partkey % 9 partkey_mod_9, partkey, suppkey, CAST(suppkey AS VARCHAR) suppkey_varchar, comment FROM lineitem",
"SELECT count(*) from lineitem");
assertUpdate(
"CREATE TABLE test_constant_folding_partsupp_unbucketed AS\n" +
"SELECT partkey % 9 partkey_mod_9, partkey, suppkey, CAST(suppkey AS VARCHAR) suppkey_varchar, comment FROM partsupp",
"SELECT count(*) from partsupp");
// one constant, third position (suppkey BIGINT)
assertQuery(
getSession(),
"SELECT lineitem.partkey, lineitem.suppkey, lineitem.comment lineitem_comment, partsupp.comment partsupp_comment\n" +
"FROM test_constant_folding_lineitem_bucketed lineitem JOIN test_constant_folding_partsupp_unbucketed partsupp\n" +
"ON\n" +
" lineitem.partkey = partsupp.partkey AND\n" +
" lineitem.partkey_mod_9 = partsupp.partkey_mod_9 AND\n" +
" lineitem.suppkey = partsupp.suppkey AND\n" +
" lineitem.suppkey_varchar = partsupp.suppkey_varchar\n" +
"WHERE lineitem.suppkey = 42",
"SELECT lineitem.partkey, lineitem.suppkey, lineitem.comment lineitem_comment, partsupp.comment partsupp_comment\n" +
"FROM lineitem JOIN partsupp\n" +
"ON lineitem.partkey = partsupp.partkey AND\n" +
"lineitem.suppkey = partsupp.suppkey\n" +
"WHERE lineitem.suppkey = 42",
assertRemoteMaterializedExchangesCount(1));
// one constant, fourth position (suppkey_varchar VARCHAR)
assertQuery(
getSession(),
"SELECT lineitem.partkey, lineitem.suppkey, lineitem.comment lineitem_comment, partsupp.comment partsupp_comment\n" +
"FROM test_constant_folding_lineitem_bucketed lineitem JOIN test_constant_folding_partsupp_unbucketed partsupp\n" +
"ON\n" +
" lineitem.partkey = partsupp.partkey AND\n" +
" lineitem.partkey_mod_9 = partsupp.partkey_mod_9 AND\n" +
" lineitem.suppkey = partsupp.suppkey AND\n" +
" lineitem.suppkey_varchar = partsupp.suppkey_varchar\n" +
"WHERE lineitem.suppkey_varchar = '42'",
"SELECT lineitem.partkey, lineitem.suppkey, lineitem.comment lineitem_comment, partsupp.comment partsupp_comment\n" +
"FROM lineitem JOIN partsupp\n" +
"ON lineitem.partkey = partsupp.partkey AND\n" +
"lineitem.suppkey = partsupp.suppkey\n" +
"WHERE lineitem.suppkey = 42",
assertRemoteMaterializedExchangesCount(1));
// two constants, first and third position (partkey_mod_9 BIGINT, suppkey BIGINT)
assertQuery(
getSession(),
"SELECT lineitem.partkey, lineitem.suppkey, lineitem.comment lineitem_comment, partsupp.comment partsupp_comment\n" +
"FROM test_constant_folding_lineitem_bucketed lineitem JOIN test_constant_folding_partsupp_unbucketed partsupp\n" +
"ON\n" +
" lineitem.partkey = partsupp.partkey AND\n" +
" lineitem.partkey_mod_9 = partsupp.partkey_mod_9 AND\n" +
" lineitem.suppkey = partsupp.suppkey AND\n" +
" lineitem.suppkey_varchar = partsupp.suppkey_varchar\n" +
"WHERE lineitem.partkey_mod_9 = 7 AND lineitem.suppkey = 42",
"SELECT lineitem.partkey, lineitem.suppkey, lineitem.comment lineitem_comment, partsupp.comment partsupp_comment\n" +
"FROM lineitem JOIN partsupp\n" +
"ON lineitem.partkey = partsupp.partkey AND\n" +
"lineitem.suppkey = partsupp.suppkey\n" +
"WHERE lineitem.partkey % 9 = 7 AND lineitem.suppkey = 42",
assertRemoteMaterializedExchangesCount(1));
// two constants, first and forth position (partkey_mod_9 BIGINT, suppkey_varchar VARCHAR)
assertQuery(
getSession(),
"SELECT lineitem.partkey, lineitem.suppkey, lineitem.comment lineitem_comment, partsupp.comment partsupp_comment\n" +
"FROM test_constant_folding_lineitem_bucketed lineitem JOIN test_constant_folding_partsupp_unbucketed partsupp\n" +
"ON\n" +
" lineitem.partkey = partsupp.partkey AND\n" +
" lineitem.partkey_mod_9 = partsupp.partkey_mod_9 AND\n" +
" lineitem.suppkey = partsupp.suppkey AND\n" +
" lineitem.suppkey_varchar = partsupp.suppkey_varchar\n" +
"WHERE lineitem.partkey_mod_9 = 7 AND lineitem.suppkey_varchar = '42'",
"SELECT lineitem.partkey, lineitem.suppkey, lineitem.comment lineitem_comment, partsupp.comment partsupp_comment\n" +
"FROM lineitem JOIN partsupp\n" +
"ON lineitem.partkey = partsupp.partkey AND\n" +
"lineitem.suppkey = partsupp.suppkey\n" +
"WHERE lineitem.partkey % 9 = 7 AND lineitem.suppkey = 42",
assertRemoteMaterializedExchangesCount(1));
}
finally {
assertUpdate("DROP TABLE IF EXISTS test_constant_folding_lineitem_bucketed");
assertUpdate("DROP TABLE IF EXISTS test_constant_folding_partsupp_unbucketed");
}
}
@Test
public void testIgnoreTableBucketingWhenTableBucketCountIsSmall()
{
try {
assertUpdate(
"CREATE TABLE partitioned_nation\n" +
"WITH (\n" +
" bucket_count = 17,\n" +
" bucketed_by = ARRAY['nationkey']\n" +
") AS\n" +
"SELECT\n" +
" *\n" +
"FROM nation",
25);
// test default : not ignore table bucketing
assertQuery(
getSession(),
"SELECT\n" +
" *\n" +
"FROM partitioned_nation t1\n" +
"JOIN nation t2\n" +
" ON t1.nationkey = t2.nationkey",
"SELECT\n" +
" *\n" +
"FROM nation t1\n" +
"JOIN nation t2\n" +
" ON t1.nationkey = t2.nationkey",
assertRemoteMaterializedExchangesCount(1));
// test ignore table bucketing
Session testSession = Session.builder(getSession())
.setCatalogSessionProperty("hive", "min_bucket_count_to_not_ignore_table_bucketing", "20")
.build();
assertQuery(
testSession,
"SELECT\n" +
" *\n" +
"FROM partitioned_nation t1\n" +
"JOIN nation t2\n" +
" ON t1.nationkey = t2.nationkey",
"SELECT\n" +
" *\n" +
"FROM nation t1\n" +
"JOIN nation t2\n" +
" ON t1.nationkey = t2.nationkey",
assertRemoteMaterializedExchangesCount(2));
// do not ignore table bucketing if $bucket column is referenced
assertQuery(
testSession,
"SELECT\n" +
" *\n" +
"FROM partitioned_nation t1\n" +
"JOIN nation t2\n" +
" ON t1.nationkey = t2.nationkey\n" +
"WHERE\n" +
" \"$bucket\" < 20",
"SELECT\n" +
" *\n" +
"FROM nation t1\n" +
"JOIN nation t2\n" +
" ON t1.nationkey = t2.nationkey",
assertRemoteMaterializedExchangesCount(1));
}
finally {
assertUpdate("DROP TABLE IF EXISTS partitioned_nation");
}
}
@Test
public void testExplainOfCreateTableAs()
{
String query = "CREATE TABLE copy_orders AS SELECT * FROM orders";
MaterializedResult result = computeActual("EXPLAIN " + query);
assertEquals(getOnlyElement(result.getOnlyColumnAsSet()), getExplainPlan("EXPLAIN ", query, LOGICAL));
}
@Override
protected boolean supportsNotNullColumns()
{
return false;
}
@Test
public void testEmptyBucketedTemporaryTable()
{
assertQuery("SELECT COUNT(DISTINCT linenumber), COUNT(*) from lineitem where linenumber < 0");
}
@Test
public void testBucketedTemporaryTableWithMissingFiles()
{
testBucketedTemporaryTableWithMissingFiles(true);
testBucketedTemporaryTableWithMissingFiles(false);
}
private void testBucketedTemporaryTableWithMissingFiles(boolean isFileRenameEnabled)
{
Session session = Session.builder(getSession())
.setCatalogSessionProperty("hive", "file_renaming_enabled", String.valueOf(isFileRenameEnabled))
.build();
assertQuery(session, "SELECT COUNT(DISTINCT linenumber), COUNT(*) from (SELECT * from lineitem LIMIT 1)");
}
// Hive specific tests should normally go in TestHiveIntegrationSmokeTest
}