TestIcebergDistributedQueries.java
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.iceberg;
import com.facebook.presto.Session;
import com.facebook.presto.testing.MaterializedResult;
import com.facebook.presto.testing.QueryRunner;
import com.facebook.presto.tests.AbstractTestDistributedQueries;
import com.google.common.collect.ImmutableMap;
import org.testng.annotations.Test;
import java.util.Map;
import static com.facebook.presto.SystemSessionProperties.ITERATIVE_OPTIMIZER_TIMEOUT;
import static com.facebook.presto.SystemSessionProperties.OPTIMIZER_USE_HISTOGRAMS;
import static com.facebook.presto.common.type.BigintType.BIGINT;
import static com.facebook.presto.common.type.BooleanType.BOOLEAN;
import static com.facebook.presto.common.type.VarcharType.VARCHAR;
import static com.facebook.presto.testing.MaterializedResult.resultBuilder;
import static com.facebook.presto.tests.QueryAssertions.assertEqualsIgnoreOrder;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.joining;
import static java.util.stream.IntStream.range;
import static org.testng.Assert.assertTrue;
@Test(singleThreaded = true)
public abstract class TestIcebergDistributedQueries
extends AbstractTestDistributedQueries
{
private final CatalogType catalogType;
private final Map<String, String> extraConnectorProperties;
protected TestIcebergDistributedQueries(CatalogType catalogType, Map<String, String> extraConnectorProperties)
{
this.catalogType = requireNonNull(catalogType, "catalogType is null");
this.extraConnectorProperties = requireNonNull(extraConnectorProperties, "extraConnectorProperties is null");
}
protected TestIcebergDistributedQueries(CatalogType catalogType)
{
this(catalogType, ImmutableMap.of());
}
@Override
protected QueryRunner createQueryRunner()
throws Exception
{
return IcebergQueryRunner.builder()
.setCatalogType(catalogType)
.setExtraConnectorProperties(extraConnectorProperties)
.build().getQueryRunner();
}
@Override
protected boolean supportsNotNullColumns()
{
// Not null columns are not yet supported by the connector
return false;
}
@Override
public void testUpdate()
{
// Updates are not supported by the connector
}
@Override
public void testDescribeOutput()
{
Session session = Session.builder(getSession())
.addPreparedStatement("my_query", "SELECT * FROM nation")
.build();
// VarcharType on Iceberg cannot record its length parameter.
// So we will get types of `varchar` for column `name` and `comment` rather than `varchar(25)` and `varchar(152)`
// comparing with the overridden method in parent class.
MaterializedResult actual = computeActual(session, "DESCRIBE OUTPUT my_query");
MaterializedResult expected = resultBuilder(session, VARCHAR, VARCHAR, VARCHAR, VARCHAR, VARCHAR, BIGINT, BOOLEAN)
.row("nationkey", session.getCatalog().get(), session.getSchema().get(), "nation", "bigint", 8, false)
.row("name", session.getCatalog().get(), session.getSchema().get(), "nation", "varchar", 0, false)
.row("regionkey", session.getCatalog().get(), session.getSchema().get(), "nation", "bigint", 8, false)
.row("comment", session.getCatalog().get(), session.getSchema().get(), "nation", "varchar", 0, false)
.build();
assertEqualsIgnoreOrder(actual, expected);
}
@Override
public void testDescribeOutputNamedAndUnnamed()
{
Session session = Session.builder(getSession())
.addPreparedStatement("my_query", "SELECT 1, name, regionkey AS my_alias FROM nation")
.build();
// VarcharType on Iceberg cannot record its length parameter.
// So we will get a type of `varchar` for column `name` rather than `varchar(25)`
// comparing with the overridden method in parent class.
MaterializedResult actual = computeActual(session, "DESCRIBE OUTPUT my_query");
MaterializedResult expected = resultBuilder(session, VARCHAR, VARCHAR, VARCHAR, VARCHAR, VARCHAR, BIGINT, BOOLEAN)
.row("_col0", "", "", "", "integer", 4, false)
.row("name", session.getCatalog().get(), session.getSchema().get(), "nation", "varchar", 0, false)
.row("my_alias", session.getCatalog().get(), session.getSchema().get(), "nation", "bigint", 8, true)
.build();
assertEqualsIgnoreOrder(actual, expected);
}
/**
* Increased the optimizer timeout from 15000ms to 25000ms
*/
@Override
public void testLargeIn()
{
String longValues = range(0, 5000)
.mapToObj(Integer::toString)
.collect(joining(", "));
Session session = Session.builder(getSession())
.setSystemProperty(ITERATIVE_OPTIMIZER_TIMEOUT, "25000ms")
.build();
assertQuery(session, "SELECT orderkey FROM orders WHERE orderkey IN (" + longValues + ")");
assertQuery(session, "SELECT orderkey FROM orders WHERE orderkey NOT IN (" + longValues + ")");
assertQuery(session, "SELECT orderkey FROM orders WHERE orderkey IN (mod(1000, orderkey), " + longValues + ")");
assertQuery(session, "SELECT orderkey FROM orders WHERE orderkey NOT IN (mod(1000, orderkey), " + longValues + ")");
String varcharValues = range(0, 5000)
.mapToObj(i -> "'" + i + "'")
.collect(joining(", "));
assertQuery(session, "SELECT orderkey FROM orders WHERE cast(orderkey AS VARCHAR) IN (" + varcharValues + ")");
assertQuery(session, "SELECT orderkey FROM orders WHERE cast(orderkey AS VARCHAR) NOT IN (" + varcharValues + ")");
String arrayValues = range(0, 5000)
.mapToObj(i -> format("ARRAY[%s, %s, %s]", i, i + 1, i + 2))
.collect(joining(", "));
assertQuery(session, "SELECT ARRAY[0, 0, 0] in (ARRAY[0, 0, 0], " + arrayValues + ")", "values true");
assertQuery(session, "SELECT ARRAY[0, 0, 0] in (" + arrayValues + ")", "values false");
}
/**
* Increased the optimizer timeouts from 30000ms and 20000ms to 40000ms and 30000ms respectively
*/
@Override
public void testLargeInWithHistograms()
{
String longValues = range(0, 10_000)
.mapToObj(Integer::toString)
.collect(joining(", "));
String query = "select orderpriority, sum(totalprice) from lineitem join orders on lineitem.orderkey = orders.orderkey where orders.orderkey in (" + longValues + ") group by 1";
Session session = Session.builder(getSession())
.setSystemProperty(ITERATIVE_OPTIMIZER_TIMEOUT, "40000ms")
.setSystemProperty(OPTIMIZER_USE_HISTOGRAMS, "true")
.build();
assertQuerySucceeds(session, query);
session = Session.builder(getSession())
.setSystemProperty(ITERATIVE_OPTIMIZER_TIMEOUT, "30000ms")
.setSystemProperty(OPTIMIZER_USE_HISTOGRAMS, "false")
.build();
assertQuerySucceeds(session, query);
}
@Override
public void testStringFilters()
{
// Type not supported for Iceberg: CHAR(10). Only test VARCHAR(10).
// Retained exactly the latter half of the overridden method in parent class.
assertUpdate("CREATE TABLE test_varcharn_filter (shipmode VARCHAR(10))");
assertTrue(getQueryRunner().tableExists(getSession(), "test_varcharn_filter"));
assertTableColumnNames("test_varcharn_filter", "shipmode");
assertUpdate("INSERT INTO test_varcharn_filter SELECT shipmode FROM lineitem", 60175);
assertQuery("SELECT count(*) FROM test_varcharn_filter WHERE shipmode = 'AIR'", "VALUES (8491)");
assertQuery("SELECT count(*) FROM test_varcharn_filter WHERE shipmode = 'AIR '", "VALUES (0)");
assertQuery("SELECT count(*) FROM test_varcharn_filter WHERE shipmode = 'AIR '", "VALUES (0)");
assertQuery("SELECT count(*) FROM test_varcharn_filter WHERE shipmode = 'AIR '", "VALUES (0)");
assertQuery("SELECT count(*) FROM test_varcharn_filter WHERE shipmode = 'NONEXIST'", "VALUES (0)");
}
@Test
public void testRenameView()
{
skipTestUnless(supportsViews());
assertQuerySucceeds("CREATE TABLE iceberg_test_table (_string VARCHAR, _integer INTEGER)");
assertUpdate("CREATE VIEW test_view_to_be_renamed AS SELECT * FROM iceberg_test_table");
assertUpdate("ALTER VIEW IF EXISTS test_view_to_be_renamed RENAME TO test_view_renamed");
assertUpdate("CREATE VIEW test_view2_to_be_renamed AS SELECT * FROM iceberg_test_table");
assertUpdate("ALTER VIEW test_view2_to_be_renamed RENAME TO test_view2_renamed");
assertQuerySucceeds("SELECT * FROM test_view_renamed");
assertQuerySucceeds("SELECT * FROM test_view2_renamed");
assertUpdate("DROP VIEW test_view_renamed");
assertUpdate("DROP VIEW test_view2_renamed");
assertUpdate("DROP TABLE iceberg_test_table");
}
@Test
public void testRenameViewIfNotExists()
{
String catalog = getSession().getCatalog().get();
String schema = getSession().getSchema().get();
skipTestUnless(supportsViews());
assertQueryFails("ALTER VIEW test_rename_view_not_exist RENAME TO test_renamed_view_not_exist",
format("line 1:1: View '%s.%s.test_rename_view_not_exist' does not exist", catalog, schema));
assertQuerySucceeds("ALTER VIEW IF EXISTS test_rename_view_not_exist RENAME TO test_renamed_view_not_exist");
}
}