TestIcebergSystemTables.java
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.iceberg;
import com.facebook.presto.Session;
import com.facebook.presto.common.transaction.TransactionId;
import com.facebook.presto.spi.security.AllowAllAccessControl;
import com.facebook.presto.testing.MaterializedResult;
import com.facebook.presto.testing.MaterializedRow;
import com.facebook.presto.testing.QueryRunner;
import com.facebook.presto.tests.AbstractTestQueryFramework;
import com.facebook.presto.tests.DistributedQueryRunner;
import com.facebook.presto.transaction.TransactionManager;
import com.google.common.collect.ImmutableMap;
import io.airlift.units.DataSize;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import java.nio.file.Path;
import java.time.LocalDate;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import static com.facebook.presto.iceberg.CatalogType.HIVE;
import static com.facebook.presto.iceberg.IcebergQueryRunner.ICEBERG_CATALOG;
import static com.facebook.presto.iceberg.IcebergQueryRunner.getIcebergDataDirectoryPath;
import static com.facebook.presto.testing.TestingSession.testSessionBuilder;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static org.assertj.core.api.Assertions.assertThat;
import static org.testng.Assert.assertEquals;
public class TestIcebergSystemTables
extends AbstractTestQueryFramework
{
private static final int DEFAULT_PRECISION = 5;
@Override
protected QueryRunner createQueryRunner()
throws Exception
{
Session session = testSessionBuilder()
.setCatalog(ICEBERG_CATALOG)
.build();
DistributedQueryRunner queryRunner = DistributedQueryRunner.builder(session).build();
Path dataDirectory = queryRunner.getCoordinator().getDataDirectory();
Path catalogDirectory = getIcebergDataDirectoryPath(dataDirectory, HIVE.name(), new IcebergConfig().getFileFormat(), false);
queryRunner.installPlugin(new IcebergPlugin());
Map<String, String> icebergProperties = ImmutableMap.<String, String>builder()
.put("hive.metastore", "file")
.put("hive.metastore.catalog.dir", catalogDirectory.toFile().toURI().toString())
.build();
queryRunner.createCatalog(ICEBERG_CATALOG, "iceberg", icebergProperties);
return queryRunner;
}
@BeforeClass
@Override
public void init()
throws Exception
{
super.init();
assertUpdate("CREATE SCHEMA test_schema");
assertUpdate("CREATE TABLE test_schema.test_table (_bigint BIGINT, _date DATE) WITH (partitioning = ARRAY['_date'])");
assertUpdate("INSERT INTO test_schema.test_table VALUES (0, CAST('2019-09-08' AS DATE)), (1, CAST('2019-09-09' AS DATE)), (2, CAST('2019-09-09' AS DATE))", 3);
assertUpdate("INSERT INTO test_schema.test_table VALUES (3, CAST('2019-09-09' AS DATE)), (4, CAST('2019-09-10' AS DATE)), (5, CAST('2019-09-10' AS DATE))", 3);
assertQuery("SELECT count(*) FROM test_schema.test_table", "VALUES 6");
assertUpdate("CREATE TABLE test_schema.test_table_v1 (_bigint BIGINT, _date DATE) WITH (\"format-version\" = '1', partitioning = ARRAY['_date'])");
assertUpdate("INSERT INTO test_schema.test_table_v1 VALUES (0, CAST('2019-09-08' AS DATE)), (1, CAST('2019-09-09' AS DATE)), (2, CAST('2019-09-09' AS DATE))", 3);
assertUpdate("INSERT INTO test_schema.test_table_v1 VALUES (3, CAST('2019-09-09' AS DATE)), (4, CAST('2019-09-10' AS DATE)), (5, CAST('2019-09-10' AS DATE))", 3);
assertQuery("SELECT count(*) FROM test_schema.test_table_v1", "VALUES 6");
assertUpdate("CREATE TABLE test_schema.test_table_multilevel_partitions (_varchar VARCHAR, _bigint BIGINT, _date DATE) WITH (partitioning = ARRAY['_bigint', '_date'])");
assertUpdate("INSERT INTO test_schema.test_table_multilevel_partitions VALUES ('a', 0, CAST('2019-09-08' AS DATE)), ('a', 1, CAST('2019-09-08' AS DATE)), ('a', 0, CAST('2019-09-09' AS DATE))", 3);
assertQuery("SELECT count(*) FROM test_schema.test_table_multilevel_partitions", "VALUES 3");
assertUpdate("CREATE TABLE test_schema.test_table_drop_column (_varchar VARCHAR, _bigint BIGINT, _date DATE) WITH (partitioning = ARRAY['_date'])");
assertUpdate("INSERT INTO test_schema.test_table_drop_column VALUES ('a', 0, CAST('2019-09-08' AS DATE)), ('a', 1, CAST('2019-09-09' AS DATE)), ('b', 2, CAST('2019-09-09' AS DATE))", 3);
assertUpdate("INSERT INTO test_schema.test_table_drop_column VALUES ('c', 3, CAST('2019-09-09' AS DATE)), ('a', 4, CAST('2019-09-10' AS DATE)), ('b', 5, CAST('2019-09-10' AS DATE))", 3);
assertQuery("SELECT count(*) FROM test_schema.test_table_drop_column", "VALUES 6");
assertUpdate("ALTER TABLE test_schema.test_table_drop_column DROP COLUMN _varchar");
assertUpdate("CREATE TABLE test_schema.test_table_orc (_bigint BIGINT) WITH (\"format-version\" = '1', \"write.format.default\" = 'ORC')");
assertUpdate("INSERT INTO test_schema.test_table_orc VALUES (0), (1), (2)", 3);
assertUpdate("CREATE TABLE test_schema.test_metadata_versions_maintain (_bigint BIGINT)" +
" WITH (\"write.metadata.previous-versions-max\" = 1, \"write.metadata.delete-after-commit.enabled\" = true)");
assertUpdate("CREATE TABLE test_schema.test_metrics_max_inferred_column (_bigint BIGINT)" +
" WITH (\"write.metadata.metrics.max-inferred-column-defaults\" = 16)");
}
@Test
public void testPartitionTable()
{
assertQuery("SELECT count(*) FROM test_schema.test_table", "VALUES 6");
assertQuery("SHOW COLUMNS FROM test_schema.\"test_table$partitions\"",
"VALUES ('_date', 'date', '', '')," +
"('row_count', 'bigint', '', '')," +
"('file_count', 'bigint', '', '')," +
"('total_size', 'bigint', '', '')," +
"('_bigint', 'row(\"min\" bigint, \"max\" bigint, \"null_count\" bigint)', '', '')");
MaterializedResult result = computeActual("SELECT * from test_schema.\"test_table$partitions\"");
assertEquals(result.getRowCount(), 3);
Map<LocalDate, MaterializedRow> rowsByPartition = result.getMaterializedRows().stream()
.collect(toImmutableMap(row -> (LocalDate) row.getField(0), Function.identity()));
// Test if row counts are computed correctly
assertEquals(rowsByPartition.get(LocalDate.parse("2019-09-08")).getField(1), 1L);
assertEquals(rowsByPartition.get(LocalDate.parse("2019-09-09")).getField(1), 3L);
assertEquals(rowsByPartition.get(LocalDate.parse("2019-09-10")).getField(1), 2L);
// Test if min/max values and null value count are computed correctly.
assertEquals(
rowsByPartition.get(LocalDate.parse("2019-09-08")).getField(4),
new MaterializedRow(DEFAULT_PRECISION, 0L, 0L, 0L).getFields());
assertEquals(
rowsByPartition.get(LocalDate.parse("2019-09-09")).getField(4),
new MaterializedRow(DEFAULT_PRECISION, 1L, 3L, 0L).getFields());
assertEquals(
rowsByPartition.get(LocalDate.parse("2019-09-10")).getField(4),
new MaterializedRow(DEFAULT_PRECISION, 4L, 5L, 0L).getFields());
}
@Test
public void testHistoryTable()
{
assertQuery("SHOW COLUMNS FROM test_schema.\"test_table$history\"",
"VALUES ('made_current_at', 'timestamp with time zone', '', '')," +
"('snapshot_id', 'bigint', '', '')," +
"('parent_id', 'bigint', '', '')," +
"('is_current_ancestor', 'boolean', '', '')");
// Test the number of history entries
assertQuery("SELECT count(*) FROM test_schema.\"test_table$history\"", "VALUES 2");
}
@Test
public void testSnapshotsTable()
{
assertQuery("SHOW COLUMNS FROM test_schema.\"test_table$snapshots\"",
"VALUES ('committed_at', 'timestamp with time zone', '', '')," +
"('snapshot_id', 'bigint', '', '')," +
"('parent_id', 'bigint', '', '')," +
"('operation', 'varchar', '', '')," +
"('manifest_list', 'varchar', '', '')," +
"('summary', 'map(varchar, varchar)', '', '')");
assertQuery("SELECT operation FROM test_schema.\"test_table$snapshots\"", "VALUES 'append', 'append'");
assertQuery("SELECT summary['total-records'] FROM test_schema.\"test_table$snapshots\"", "VALUES '3', '6'");
}
@Test
public void testManifestsTable()
{
assertQuery("SHOW COLUMNS FROM test_schema.\"test_table$manifests\"",
"VALUES ('path', 'varchar', '', '')," +
"('length', 'bigint', '', '')," +
"('partition_spec_id', 'integer', '', '')," +
"('added_snapshot_id', 'bigint', '', '')," +
"('added_data_files_count', 'integer', '', '')," +
"('existing_data_files_count', 'integer', '', '')," +
"('deleted_data_files_count', 'integer', '', '')," +
"('partitions', 'array(row(\"contains_null\" boolean, \"lower_bound\" varchar, \"upper_bound\" varchar))', '', '')");
assertQuerySucceeds("SELECT * FROM test_schema.\"test_table$manifests\"");
assertQuerySucceeds("SELECT * FROM test_schema.\"test_table_multilevel_partitions$manifests\"");
}
@Test
public void testFilesTable()
{
assertQuery("SHOW COLUMNS FROM test_schema.\"test_table$files\"",
"VALUES ('content', 'integer', '', '')," +
"('file_path', 'varchar', '', '')," +
"('file_format', 'varchar', '', '')," +
"('record_count', 'bigint', '', '')," +
"('file_size_in_bytes', 'bigint', '', '')," +
"('column_sizes', 'map(integer, bigint)', '', '')," +
"('value_counts', 'map(integer, bigint)', '', '')," +
"('null_value_counts', 'map(integer, bigint)', '', '')," +
"('nan_value_counts', 'map(integer, bigint)', '', '')," +
"('lower_bounds', 'map(integer, varchar)', '', '')," +
"('upper_bounds', 'map(integer, varchar)', '', '')," +
"('key_metadata', 'varbinary', '', '')," +
"('split_offsets', 'array(bigint)', '', '')," +
"('equality_ids', 'array(integer)', '', '')");
assertQuerySucceeds("SELECT * FROM test_schema.\"test_table$files\"");
}
@Test
public void testRefsTable()
{
assertQuery("SHOW COLUMNS FROM test_schema.\"test_table$refs\"",
"VALUES ('name', 'varchar', '', '')," +
"('type', 'varchar', '', '')," +
"('snapshot_id', 'bigint', '', '')," +
"('max_reference_age_in_ms', 'bigint', '', '')," +
"('min_snapshots_to_keep', 'bigint', '', '')," +
"('max_snapshot_age_in_ms', 'bigint', '', '')");
assertQuerySucceeds("SELECT * FROM test_schema.\"test_table$refs\"");
// Check main branch entry
assertQuery("SELECT count(*) FROM test_schema.\"test_table$refs\"", "VALUES 1");
assertQuery("SELECT name FROM test_schema.\"test_table$refs\"", "VALUES 'main'");
assertQuerySucceeds("SELECT * FROM test_schema.\"test_table_multilevel_partitions$refs\"");
}
@Test
public void testSessionPropertiesInManuallyStartedTransaction()
{
try {
assertUpdate("create table test_schema.test_session_properties_table(a int, b varchar)");
// The default value of table property `delete_mode` is `merge-on-read`
MaterializedResult materializedRows = getQueryRunner().execute("select * from test_schema.\"test_session_properties_table$properties\"");
assertThat(materializedRows)
.anySatisfy(row -> assertThat(row)
.isEqualTo(new MaterializedRow(MaterializedResult.DEFAULT_PRECISION, "write.delete.mode", "merge-on-read")));
// Simulate `set session iceberg.merge_on_read_enabled=false` to disable merge on read mode for iceberg tables in session level
Session session = Session.builder(getQueryRunner().getDefaultSession())
.setCatalogSessionProperty(ICEBERG_CATALOG, "merge_on_read_enabled", "false")
.build();
// Simulate `start transaction` to begin a transaction
TransactionManager transactionManager = getQueryRunner().getTransactionManager();
TransactionId txnId = transactionManager.beginTransaction(false);
Session txnSession = session.beginTransactionId(txnId, transactionManager, new AllowAllAccessControl());
// Query should fail because of the conflicts between session property and table property in table mode validation
assertQueryFails(txnSession, "select * from test_schema.test_session_properties_table", "merge-on-read table mode not supported yet");
transactionManager.asyncAbort(txnId);
}
finally {
assertUpdate("drop table if exists test_schema.test_session_properties_table");
}
}
protected void checkTableProperties(String schemaName, String tableName, String deleteMode, String dataWriteLocation)
{
checkTableProperties(schemaName, tableName, deleteMode, 10, ImmutableMap.of("write.data.path", dataWriteLocation));
}
protected void checkTableProperties(String tableName, String deleteMode)
{
checkTableProperties("test_schema", tableName, deleteMode, 9, ImmutableMap.of());
}
protected void checkTableProperties(String schemaName, String tableName, String deleteMode, int propertiesCount, Map<String, String> additionalValidateProperties)
{
assertQuery(String.format("SHOW COLUMNS FROM %s.\"%s$properties\"", schemaName, tableName),
"VALUES ('key', 'varchar', '', '')," + "('value', 'varchar', '', '')");
assertQuery(String.format("SELECT COUNT(*) FROM %s.\"%s$properties\"", schemaName, tableName), "VALUES " + propertiesCount);
List<MaterializedRow> materializedRows = computeActual(getSession(),
String.format("SELECT * FROM %s.\"%s$properties\"", schemaName, tableName)).getMaterializedRows();
assertThat(materializedRows).hasSize(propertiesCount);
assertThat(materializedRows)
.anySatisfy(row -> assertThat(row)
.isEqualTo(new MaterializedRow(MaterializedResult.DEFAULT_PRECISION, "write.delete.mode", deleteMode)))
.anySatisfy(row -> assertThat(row)
.isEqualTo(new MaterializedRow(MaterializedResult.DEFAULT_PRECISION, "write.update.mode", deleteMode)))
.anySatisfy(row -> assertThat(row)
.isEqualTo(new MaterializedRow(MaterializedResult.DEFAULT_PRECISION, "write.format.default", "PARQUET")))
.anySatisfy(row -> assertThat(row)
.isEqualTo(new MaterializedRow(MaterializedResult.DEFAULT_PRECISION, "write.parquet.compression-codec", "GZIP")))
.anySatisfy(row -> assertThat(row)
.isEqualTo(new MaterializedRow(MaterializedResult.DEFAULT_PRECISION, "commit.retry.num-retries", "4")))
.anySatisfy(row -> assertThat(row)
.isEqualTo(new MaterializedRow(MaterializedResult.DEFAULT_PRECISION, "write.metadata.previous-versions-max", "100")))
.anySatisfy(row -> assertThat(row)
.isEqualTo(new MaterializedRow(MaterializedResult.DEFAULT_PRECISION, "write.metadata.delete-after-commit.enabled", "false")))
.anySatisfy(row -> assertThat(row)
.isEqualTo(new MaterializedRow(MaterializedResult.DEFAULT_PRECISION, "write.metadata.metrics.max-inferred-column-defaults", "100")))
.anySatisfy(row -> assertThat(row)
.isEqualTo(new MaterializedRow(MaterializedResult.DEFAULT_PRECISION, IcebergTableProperties.TARGET_SPLIT_SIZE, Long.toString(DataSize.valueOf("128MB").toBytes()))));
additionalValidateProperties.entrySet().stream()
.forEach(entry -> assertThat(materializedRows)
.anySatisfy(row -> assertThat(row)
.isEqualTo(new MaterializedRow(MaterializedResult.DEFAULT_PRECISION, entry.getKey(), entry.getValue()))));
}
protected void checkORCFormatTableProperties(String tableName, String deleteMode)
{
assertQuery(String.format("SHOW COLUMNS FROM test_schema.\"%s$properties\"", tableName),
"VALUES ('key', 'varchar', '', '')," + "('value', 'varchar', '', '')");
assertQuery(String.format("SELECT COUNT(*) FROM test_schema.\"%s$properties\"", tableName), "VALUES 10");
List<MaterializedRow> materializedRows = computeActual(getSession(),
String.format("SELECT * FROM test_schema.\"%s$properties\"", tableName)).getMaterializedRows();
assertThat(materializedRows).hasSize(10);
assertThat(materializedRows)
.anySatisfy(row -> assertThat(row)
.isEqualTo(new MaterializedRow(MaterializedResult.DEFAULT_PRECISION, "write.delete.mode", deleteMode)))
.anySatisfy(row -> assertThat(row)
.isEqualTo(new MaterializedRow(MaterializedResult.DEFAULT_PRECISION, "write.update.mode", deleteMode)))
.anySatisfy(row -> assertThat(row)
.isEqualTo(new MaterializedRow(MaterializedResult.DEFAULT_PRECISION, "write.format.default", "ORC")))
.anySatisfy(row -> assertThat(row)
.isEqualTo(new MaterializedRow(MaterializedResult.DEFAULT_PRECISION, "write.orc.compression-codec", "ZLIB")))
.anySatisfy(row -> assertThat(row)
.isEqualTo(new MaterializedRow(MaterializedResult.DEFAULT_PRECISION, "write.parquet.compression-codec", "zstd")))
.anySatisfy(row -> assertThat(row)
.isEqualTo(new MaterializedRow(MaterializedResult.DEFAULT_PRECISION, "commit.retry.num-retries", "4")))
.anySatisfy(row -> assertThat(row)
.isEqualTo(new MaterializedRow(MaterializedResult.DEFAULT_PRECISION, "write.metadata.previous-versions-max", "100")))
.anySatisfy(row -> assertThat(row)
.isEqualTo(new MaterializedRow(MaterializedResult.DEFAULT_PRECISION, "write.metadata.delete-after-commit.enabled", "false")))
.anySatisfy(row -> assertThat(row)
.isEqualTo(new MaterializedRow(MaterializedResult.DEFAULT_PRECISION, "write.metadata.metrics.max-inferred-column-defaults", "100")))
.anySatisfy(row -> assertThat(row)
.isEqualTo(new MaterializedRow(MaterializedResult.DEFAULT_PRECISION, IcebergTableProperties.TARGET_SPLIT_SIZE, Long.toString(DataSize.valueOf("128MB").toBytes()))));
}
@Test
public void testPropertiesTable()
{
// Test table properties for all supported format versions
checkTableProperties("test_table_v1", "copy-on-write");
checkTableProperties("test_table", "merge-on-read");
checkORCFormatTableProperties("test_table_orc", "copy-on-write");
}
@Test
public void testFilesTableOnDropColumn()
{
assertQuery("SELECT sum(record_count) FROM test_schema.\"test_table_drop_column$files\"", "VALUES 6");
}
@Test
public void testAlterTableColumnNotNull()
{
String tableName = "test_schema.test_table_add_column";
assertUpdate("CREATE TABLE " + tableName + " (c1 INTEGER, c2 INTEGER)");
assertQueryFails("ALTER TABLE " + tableName + " ADD COLUMN c3 INTEGER NOT NULL",
"This connector does not support add column with non null");
assertUpdate("INSERT INTO " + tableName + " VALUES (1,1)", 1);
assertQueryFails("ALTER TABLE " + tableName + " ADD COLUMN c3 INTEGER NOT NULL",
"This connector does not support add column with non null");
}
@Test
public void testMetadataVersionsMaintainingProperties()
{
MaterializedResult materializedRows = getQueryRunner().execute("select * from test_schema.\"test_metadata_versions_maintain$properties\"");
assertThat(materializedRows)
.anySatisfy(row -> assertThat(row)
.isEqualTo(new MaterializedRow(MaterializedResult.DEFAULT_PRECISION, "write.metadata.previous-versions-max", "1")))
.anySatisfy(row -> assertThat(row)
.isEqualTo(new MaterializedRow(MaterializedResult.DEFAULT_PRECISION, "write.metadata.delete-after-commit.enabled", "true")));
}
@Test
public void testMetricsMaxInferredColumnProperties()
{
MaterializedResult materializedRows = getQueryRunner().execute("select * from test_schema.\"test_metrics_max_inferred_column$properties\"");
assertThat(materializedRows)
.anySatisfy(row -> assertThat(row)
.isEqualTo(new MaterializedRow(MaterializedResult.DEFAULT_PRECISION, "write.metadata.metrics.max-inferred-column-defaults", "16")));
}
@AfterClass(alwaysRun = true)
public void tearDown()
{
assertUpdate("DROP TABLE IF EXISTS test_schema.test_table");
assertUpdate("DROP TABLE IF EXISTS test_schema.test_table_v1");
assertUpdate("DROP TABLE IF EXISTS test_schema.test_table_orc");
assertUpdate("DROP TABLE IF EXISTS test_schema.test_table_multilevel_partitions");
assertUpdate("DROP TABLE IF EXISTS test_schema.test_table_drop_column");
assertUpdate("DROP TABLE IF EXISTS test_schema.test_table_add_column");
assertUpdate("DROP TABLE IF EXISTS test_schema.test_metadata_versions_maintain");
assertUpdate("DROP TABLE IF EXISTS test_schema.test_metrics_max_inferred_column");
assertUpdate("DROP SCHEMA IF EXISTS test_schema");
}
}