IcebergDistributedTestBase.java
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.iceberg;
import com.facebook.presto.Session;
import com.facebook.presto.Session.SessionBuilder;
import com.facebook.presto.common.QualifiedObjectName;
import com.facebook.presto.common.block.Block;
import com.facebook.presto.common.block.BlockBuilder;
import com.facebook.presto.common.transaction.TransactionId;
import com.facebook.presto.common.type.FixedWidthType;
import com.facebook.presto.common.type.TimeZoneKey;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.common.type.TypeParameter;
import com.facebook.presto.hive.BaseHiveColumnHandle;
import com.facebook.presto.hive.HdfsConfiguration;
import com.facebook.presto.hive.HdfsConfigurationInitializer;
import com.facebook.presto.hive.HdfsContext;
import com.facebook.presto.hive.HdfsEnvironment;
import com.facebook.presto.hive.HiveClientConfig;
import com.facebook.presto.hive.HiveHdfsConfiguration;
import com.facebook.presto.hive.MetastoreClientConfig;
import com.facebook.presto.hive.authentication.NoHdfsAuthentication;
import com.facebook.presto.hive.s3.HiveS3Config;
import com.facebook.presto.hive.s3.PrestoS3ConfigurationUpdater;
import com.facebook.presto.hive.s3.S3ConfigurationUpdater;
import com.facebook.presto.iceberg.delete.DeleteFile;
import com.facebook.presto.metadata.CatalogMetadata;
import com.facebook.presto.metadata.Metadata;
import com.facebook.presto.metadata.MetadataUtil;
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.Constraint;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.TableHandle;
import com.facebook.presto.spi.analyzer.MetadataResolver;
import com.facebook.presto.spi.connector.classloader.ClassLoaderSafeConnectorMetadata;
import com.facebook.presto.spi.plan.JoinNode;
import com.facebook.presto.spi.security.AllowAllAccessControl;
import com.facebook.presto.spi.statistics.ColumnStatistics;
import com.facebook.presto.spi.statistics.ConnectorHistogram;
import com.facebook.presto.spi.statistics.DoubleRange;
import com.facebook.presto.spi.statistics.Estimate;
import com.facebook.presto.spi.statistics.TableStatistics;
import com.facebook.presto.testing.MaterializedResult;
import com.facebook.presto.testing.MaterializedRow;
import com.facebook.presto.testing.QueryRunner;
import com.facebook.presto.tests.AbstractTestQueryFramework;
import com.google.common.base.Joiner;
import com.google.common.base.Strings;
import com.google.common.cache.CacheStats;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import org.apache.commons.math3.distribution.NormalDistribution;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.data.parquet.GenericParquetWriter;
import org.apache.iceberg.deletes.EqualityDeleteWriter;
import org.apache.iceberg.deletes.PositionDelete;
import org.apache.iceberg.deletes.PositionDeleteWriter;
import org.apache.iceberg.hadoop.HadoopOutputFile;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.TableScanUtil;
import org.apache.parquet.column.ParquetProperties.WriterVersion;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.hadoop.example.GroupReadSupport;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.schema.MessageType;
import org.intellij.lang.annotations.Language;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.Field;
import java.net.URI;
import java.nio.ByteBuffer;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import static com.facebook.presto.SystemSessionProperties.LEGACY_TIMESTAMP;
import static com.facebook.presto.SystemSessionProperties.OPTIMIZER_USE_HISTOGRAMS;
import static com.facebook.presto.common.type.BigintType.BIGINT;
import static com.facebook.presto.common.type.DateType.DATE;
import static com.facebook.presto.common.type.DoubleType.DOUBLE;
import static com.facebook.presto.common.type.IntegerType.INTEGER;
import static com.facebook.presto.common.type.RealType.REAL;
import static com.facebook.presto.common.type.TimeZoneKey.UTC_KEY;
import static com.facebook.presto.common.type.VarcharType.VARCHAR;
import static com.facebook.presto.hive.BaseHiveColumnHandle.ColumnType.SYNTHESIZED;
import static com.facebook.presto.hive.HiveCommonSessionProperties.PARQUET_BATCH_READ_OPTIMIZATION_ENABLED;
import static com.facebook.presto.iceberg.FileContent.EQUALITY_DELETES;
import static com.facebook.presto.iceberg.FileContent.POSITION_DELETES;
import static com.facebook.presto.iceberg.IcebergQueryRunner.ICEBERG_CATALOG;
import static com.facebook.presto.iceberg.IcebergQueryRunner.getIcebergDataDirectoryPath;
import static com.facebook.presto.iceberg.IcebergSessionProperties.DELETE_AS_JOIN_REWRITE_ENABLED;
import static com.facebook.presto.iceberg.IcebergSessionProperties.DELETE_AS_JOIN_REWRITE_MAX_DELETE_COLUMNS;
import static com.facebook.presto.iceberg.IcebergSessionProperties.PUSHDOWN_FILTER_ENABLED;
import static com.facebook.presto.iceberg.IcebergSessionProperties.STATISTIC_SNAPSHOT_RECORD_DIFFERENCE_WEIGHT;
import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.anyNot;
import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.anyTree;
import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.exchange;
import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.filter;
import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.node;
import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.output;
import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.tableScan;
import static com.facebook.presto.testing.MaterializedResult.resultBuilder;
import static com.facebook.presto.testing.TestingAccessControlManager.TestingPrivilegeType.SELECT_COLUMN;
import static com.facebook.presto.testing.TestingAccessControlManager.privilege;
import static com.facebook.presto.testing.TestingConnectorSession.SESSION;
import static com.facebook.presto.testing.assertions.Assert.assertEquals;
import static com.facebook.presto.tests.sql.TestTable.randomTableSuffix;
import static com.facebook.presto.type.DecimalParametricType.DECIMAL;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static java.lang.String.format;
import static java.nio.file.Files.createTempDirectory;
import static java.util.Objects.requireNonNull;
import static java.util.UUID.randomUUID;
import static java.util.function.Function.identity;
import static org.apache.iceberg.SnapshotSummary.TOTAL_DATA_FILES_PROP;
import static org.apache.iceberg.SnapshotSummary.TOTAL_DELETE_FILES_PROP;
import static org.apache.parquet.column.ParquetProperties.WriterVersion.PARQUET_1_0;
import static org.apache.parquet.column.ParquetProperties.WriterVersion.PARQUET_2_0;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
@Test(singleThreaded = true)
public abstract class IcebergDistributedTestBase
extends AbstractTestQueryFramework
{
private static final String METADATA_FILE_EXTENSION = ".metadata.json";
protected final CatalogType catalogType;
protected final Map<String, String> extraConnectorProperties;
protected IcebergQueryRunner icebergQueryRunner;
protected IcebergDistributedTestBase(CatalogType catalogType, Map<String, String> extraConnectorProperties)
{
this.catalogType = requireNonNull(catalogType, "catalogType is null");
this.extraConnectorProperties = requireNonNull(extraConnectorProperties, "extraConnectorProperties is null");
}
protected IcebergDistributedTestBase(CatalogType catalogType)
{
this(catalogType, ImmutableMap.of());
}
@Override
protected QueryRunner createQueryRunner()
throws Exception
{
this.icebergQueryRunner = IcebergQueryRunner.builder()
.setCatalogType(catalogType)
.setExtraConnectorProperties(extraConnectorProperties)
.build();
return icebergQueryRunner.getQueryRunner();
}
@Test
public void testDeleteOnV1Table()
{
// Test delete all rows
long totalCount = (long) getQueryRunner().execute("CREATE TABLE test_delete with (\"format-version\" = '1') as select * from lineitem")
.getOnlyValue();
assertUpdate("DELETE FROM test_delete", totalCount);
assertEquals(getQueryRunner().execute("SELECT count(*) FROM test_delete").getOnlyValue(), 0L);
assertQuerySucceeds("DROP TABLE test_delete");
// Test delete whole partitions identified by one partition column
totalCount = (long) getQueryRunner().execute("CREATE TABLE test_partitioned_drop WITH (\"format-version\" = '1', partitioning = ARRAY['bucket(orderkey, 2)', 'linenumber', 'linestatus']) as select * from lineitem")
.getOnlyValue();
long countPart1 = (long) getQueryRunner().execute("SELECT count(*) FROM test_partitioned_drop where linenumber = 1").getOnlyValue();
assertUpdate("DELETE FROM test_partitioned_drop WHERE linenumber = 1", countPart1);
long countPart2 = (long) getQueryRunner().execute("SELECT count(*) FROM test_partitioned_drop where linenumber > 4 and linenumber < 7").getOnlyValue();
assertUpdate("DELETE FROM test_partitioned_drop WHERE linenumber > 4 and linenumber < 7", countPart2);
long newTotalCount = (long) getQueryRunner().execute("SELECT count(*) FROM test_partitioned_drop")
.getOnlyValue();
assertEquals(totalCount - countPart1 - countPart2, newTotalCount);
assertQuerySucceeds("DROP TABLE test_partitioned_drop");
// Test delete whole partitions identified by two partition columns
totalCount = (long) getQueryRunner().execute("CREATE TABLE test_partitioned_drop WITH (\"format-version\" = '1', partitioning = ARRAY['bucket(orderkey, 2)', 'linenumber', 'linestatus']) as select * from lineitem")
.getOnlyValue();
long countPart1F = (long) getQueryRunner().execute("SELECT count(*) FROM test_partitioned_drop where linenumber = 1 and linestatus = 'F'").getOnlyValue();
assertUpdate("DELETE FROM test_partitioned_drop WHERE linenumber = 1 and linestatus = 'F'", countPart1F);
long countPart2O = (long) getQueryRunner().execute("SELECT count(*) FROM test_partitioned_drop where linenumber = 2 and linestatus = 'O'").getOnlyValue();
assertUpdate("DELETE FROM test_partitioned_drop WHERE linenumber = 2 and linestatus = 'O'", countPart2O);
long countPartOther = (long) getQueryRunner().execute("SELECT count(*) FROM test_partitioned_drop where linenumber not in (1, 3, 5, 7) and linestatus in ('O', 'F')").getOnlyValue();
assertUpdate("DELETE FROM test_partitioned_drop WHERE linenumber not in (1, 3, 5, 7) and linestatus in ('O', 'F')", countPartOther);
newTotalCount = (long) getQueryRunner().execute("SELECT count(*) FROM test_partitioned_drop")
.getOnlyValue();
assertEquals(totalCount - countPart1F - countPart2O - countPartOther, newTotalCount);
assertQuerySucceeds("DROP TABLE test_partitioned_drop");
String errorMessage1 = "This connector only supports delete where one or more partitions are deleted entirely for table versions older than 2";
// Do not support delete with filters on non-identity partition column on v1 table
assertUpdate("CREATE TABLE test_partitioned_drop WITH (\"format-version\" = '1', partitioning = ARRAY['bucket(orderkey, 2)', 'linenumber', 'linestatus']) as select * from lineitem", totalCount);
assertQueryFails("DELETE FROM test_partitioned_drop WHERE orderkey = 1", errorMessage1);
// Do not allow delete data at specified snapshot
String errorMessage2 = "This connector do not allow delete data at specified snapshot";
List<Long> snapshots = getQueryRunner().execute("SELECT snapshot_id FROM \"test_partitioned_drop$snapshots\"").getOnlyColumnAsSet()
.stream().map(Long.class::cast).collect(Collectors.toList());
for (long snapshot : snapshots) {
assertQueryFails("DELETE FROM \"test_partitioned_drop@" + snapshot + "\" WHERE linenumber = 1", errorMessage2);
}
assertQuerySucceeds("DROP TABLE test_partitioned_drop");
}
@Test
public void testDeleteWithPartitionSpecEvolution()
{
// Create a partitioned table and insert some value
assertQuerySucceeds("create table test_delete(a int, b varchar) with (partitioning = ARRAY['a'])");
assertQuerySucceeds("insert into test_delete values(1, '1001'), (1, '1002'), (2, '1003')");
assertEquals(getQueryRunner().execute("SELECT count(*) FROM test_delete").getOnlyValue(), 3L);
// Rename partition column would not affect metadata deletion filtering by it's value
assertQuerySucceeds("alter table test_delete rename column a to c");
assertQuerySucceeds("insert into test_delete(c, b) values(2, '1004')");
assertUpdate("DELETE FROM test_delete WHERE c = 2", 2L);
assertEquals(getQueryRunner().execute("SELECT count(*) FROM test_delete").getOnlyValue(), 2L);
// Add a new partition column, and insert some value
assertQuerySucceeds("alter table test_delete add column d bigint with(partitioning = 'identity')");
assertQuerySucceeds("insert into test_delete values(1, '1001', 10001), (2, '1003', 10002), (3, '1004', 10003)");
assertEquals(getQueryRunner().execute("SELECT count(*) FROM test_delete").getOnlyValue(), 5L);
// Deletion succeeds for merge-on-read mode even though column 'd' does not exist in older partition specs
assertUpdate("delete from test_delete where d = 10003", 1);
assertUpdate("delete from test_delete where d = 10002 and c = 2", 1);
// Deletion succeeds, because column 'c' exists in all partition specs
assertUpdate("DELETE FROM test_delete WHERE c = 1", 3);
assertEquals(getQueryRunner().execute("SELECT count(*) FROM test_delete").getOnlyValue(), 0L);
assertQuerySucceeds("DROP TABLE test_delete");
}
@Test
public void testRenamePartitionColumn()
{
assertQuerySucceeds("create table test_partitioned_table(a int, b varchar) with (partitioning = ARRAY['a'])");
assertQuerySucceeds("insert into test_partitioned_table values(1, '1001'), (2, '1002')");
assertEquals(getQueryRunner().execute("SELECT count(*) FROM \"test_partitioned_table$partitions\"").getOnlyValue(), 2L);
assertEquals(getQueryRunner().execute("SELECT row_count FROM \"test_partitioned_table$partitions\" where a = 1").getOnlyValue(), 1L);
assertEquals(getQueryRunner().execute("SELECT row_count FROM \"test_partitioned_table$partitions\" where a = 2").getOnlyValue(), 1L);
assertQuerySucceeds("alter table test_partitioned_table rename column a to c");
assertQuerySucceeds("insert into test_partitioned_table values(1, '5001'), (2, '5002'), (3, '5003')");
assertEquals(getQueryRunner().execute("SELECT count(*) FROM \"test_partitioned_table$partitions\"").getOnlyValue(), 3L);
assertEquals(getQueryRunner().execute("SELECT row_count FROM \"test_partitioned_table$partitions\" where c = 1").getOnlyValue(), 2L);
assertEquals(getQueryRunner().execute("SELECT row_count FROM \"test_partitioned_table$partitions\" where c = 2").getOnlyValue(), 2L);
assertEquals(getQueryRunner().execute("SELECT row_count FROM \"test_partitioned_table$partitions\" where c = 3").getOnlyValue(), 1L);
assertQuerySucceeds("DROP TABLE test_partitioned_table");
}
@Test
public void testAddPartitionColumn()
{
// Create unPartitioned table and insert some data
assertQuerySucceeds("create table add_partition_column(a int)");
assertQuerySucceeds("insert into add_partition_column values 1, 2, 3");
// Add identity partition column
assertQuerySucceeds("alter table add_partition_column add column b timestamp with(partitioning = 'identity')");
assertQuerySucceeds("insert into add_partition_column values (4, timestamp '1984-12-08 00:10:00'), (5, timestamp '2001-01-08 12:10:01')");
assertEquals(getQueryRunner().execute("SELECT count(*) FROM \"add_partition_column$partitions\"").getOnlyValue(), 3L);
assertEquals(getQueryRunner().execute("select row_count from \"add_partition_column$partitions\" where b is null").getOnlyValue(), 3L);
assertEquals(getQueryRunner().execute("select row_count from \"add_partition_column$partitions\" where b = timestamp '1984-12-08 00:10:00'").getOnlyValue(), 1L);
assertEquals(getQueryRunner().execute("select row_count from \"add_partition_column$partitions\" where b = timestamp '2001-01-08 12:10:01'").getOnlyValue(), 1L);
// Add truncate[2] partition column
assertQuerySucceeds("alter table add_partition_column add column c varchar with(partitioning = 'truncate(2)')");
assertQuerySucceeds("insert into add_partition_column values (6, timestamp '1984-12-08 00:10:00', 'hw1006'), (7, timestamp '1984-12-08 00:10:00', 'hw1007')");
assertEquals(getQueryRunner().execute("SELECT count(*) FROM \"add_partition_column$partitions\"").getOnlyValue(), 4L);
assertEquals(getQueryRunner().execute("select row_count from \"add_partition_column$partitions\" where b is null and c_trunc is null").getOnlyValue(), 3L);
assertEquals(getQueryRunner().execute("select row_count from \"add_partition_column$partitions\" where b = timestamp '1984-12-08 00:10:00' and c_trunc is null").getOnlyValue(), 1L);
assertEquals(getQueryRunner().execute("select row_count from \"add_partition_column$partitions\" where b = timestamp '2001-01-08 12:10:01' and c_trunc is null").getOnlyValue(), 1L);
assertEquals(getQueryRunner().execute("select row_count from \"add_partition_column$partitions\" where b = timestamp '1984-12-08 00:10:00' and c_trunc = 'hw'").getOnlyValue(), 2L);
assertQuerySucceeds("DROP TABLE add_partition_column");
// Create partitioned table and insert some data
assertQuerySucceeds("create table add_partition_column(a int, b timestamp, c varchar) with (partitioning = ARRAY['b', 'truncate(c, 2)'])");
assertQuerySucceeds("insert into add_partition_column values(1, timestamp '1984-12-08 00:10:00', 'hw1001'), (2, timestamp '2001-01-08 12:10:01', 'hw1002')");
assertEquals(getQueryRunner().execute("SELECT count(*) FROM \"add_partition_column$partitions\"").getOnlyValue(), 2L);
assertEquals(getQueryRunner().execute("select row_count from \"add_partition_column$partitions\" where b = timestamp '1984-12-08 00:10:00' and c_trunc = 'hw'").getOnlyValue(), 1L);
assertEquals(getQueryRunner().execute("select row_count from \"add_partition_column$partitions\" where b = timestamp '2001-01-08 12:10:01' and c_trunc = 'hw'").getOnlyValue(), 1L);
// Add bucket partition column
assertQuerySucceeds("alter table add_partition_column add column d bigint with(partitioning = 'bucket(2)')");
assertQuerySucceeds("insert into add_partition_column values(1, timestamp '1984-12-08 00:10:00', 'hw1001', 1234), (1, timestamp '1984-12-08 00:10:00', 'hw1001', 1344)");
assertEquals(getQueryRunner().execute("SELECT count(*) FROM \"add_partition_column$partitions\"").getOnlyValue(), 4L);
assertEquals(getQueryRunner().execute("select row_count from \"add_partition_column$partitions\" where b = timestamp '1984-12-08 00:10:00' and c_trunc = 'hw' and d_bucket is null").getOnlyValue(), 1L);
assertEquals(getQueryRunner().execute("select row_count from \"add_partition_column$partitions\" where b = timestamp '2001-01-08 12:10:01' and c_trunc = 'hw' and d_bucket is null").getOnlyValue(), 1L);
assertEquals(getQueryRunner().execute("select row_count from \"add_partition_column$partitions\" where b = timestamp '1984-12-08 00:10:00' and c_trunc = 'hw' and d_bucket = 1").getOnlyValue(), 1L);
assertEquals(getQueryRunner().execute("select row_count from \"add_partition_column$partitions\" where b = timestamp '1984-12-08 00:10:00' and c_trunc = 'hw' and d_bucket = 0").getOnlyValue(), 1L);
assertQuerySucceeds("DROP TABLE add_partition_column");
Session session = Session.builder(getSession())
.setTimeZoneKey(UTC_KEY)
.build();
// Create partitioned table and insert some data
assertQuerySucceeds(session, "create table add_partition_column(a int, b varchar) with (partitioning = ARRAY['truncate(b, 2)'])");
assertQuerySucceeds(session, "insert into add_partition_column values(1, 'hw1001'), (2, 'hw1002')");
assertEquals(getQueryRunner().execute(session, "SELECT count(*) FROM \"add_partition_column$partitions\"").getOnlyValue(), 1L);
assertEquals(getQueryRunner().execute(session, "select row_count from \"add_partition_column$partitions\" where b_trunc = 'hw'").getOnlyValue(), 2L);
// Add year partition column
assertQuerySucceeds(session, "alter table add_partition_column add column c date with(partitioning = 'year')");
assertQuerySucceeds(session, "insert into add_partition_column values(3, 'hw1003', date '1984-12-08'), (4, 'hw1004', date '2001-01-08')");
assertEquals(getQueryRunner().execute(session, "SELECT count(*) FROM \"add_partition_column$partitions\"").getOnlyValue(), 3L);
assertQuery(session, "SELECT b_trunc, c_year, row_count FROM \"add_partition_column$partitions\" ORDER BY c_year",
"VALUES ('hw', 14, 1), ('hw', 31, 1), ('hw', NULL, 2)");
assertQuery(session, "SELECT * FROM add_partition_column WHERE year(c) = 1984", "VALUES (3, 'hw1003', '1984-12-08')");
assertQuerySucceeds(session, "DROP TABLE add_partition_column");
// Create partitioned table and insert some data
assertQuerySucceeds(session, "create table add_partition_column(a int, b varchar) with (partitioning = ARRAY['truncate(b, 2)'])");
assertQuerySucceeds(session, "insert into add_partition_column values(1, 'hw1001'), (2, 'hw1002')");
assertEquals(getQueryRunner().execute(session, "SELECT count(*) FROM \"add_partition_column$partitions\"").getOnlyValue(), 1L);
assertEquals(getQueryRunner().execute(session, "select row_count from \"add_partition_column$partitions\" where b_trunc = 'hw'").getOnlyValue(), 2L);
// Add month partition column
assertQuerySucceeds(session, "alter table add_partition_column add column c timestamp with(partitioning = 'month')");
assertQuerySucceeds(session, "insert into add_partition_column values(3, 'hw1003', timestamp '1984-12-08 12:10:31.315'), (4, 'hw1004', timestamp '2001-01-08 10:01:01.123')");
assertEquals(getQueryRunner().execute(session, "SELECT count(*) FROM \"add_partition_column$partitions\"").getOnlyValue(), 3L);
assertQuery(session, "SELECT b_trunc, c_month, row_count FROM \"add_partition_column$partitions\" ORDER BY c_month",
"VALUES ('hw', 179, 1), ('hw', 372, 1), ('hw', NULL, 2)");
assertQuery(session, "SELECT * FROM add_partition_column WHERE month(c) = 12", "VALUES (3, 'hw1003', '1984-12-08 12:10:31.315')");
assertQuerySucceeds(session, "DROP TABLE add_partition_column");
// Create partitioned table and insert some data
assertQuerySucceeds(session, "create table add_partition_column(a int, b varchar) with (partitioning = ARRAY['truncate(b, 2)'])");
assertQuerySucceeds(session, "insert into add_partition_column values(1, 'hw1001'), (2, 'hw1002')");
assertEquals(getQueryRunner().execute(session, "SELECT count(*) FROM \"add_partition_column$partitions\"").getOnlyValue(), 1L);
assertEquals(getQueryRunner().execute(session, "select row_count from \"add_partition_column$partitions\" where b_trunc = 'hw'").getOnlyValue(), 2L);
// Add day partition column
assertQuerySucceeds(session, "alter table add_partition_column add column c date with(partitioning = 'day')");
assertQuerySucceeds(session, "insert into add_partition_column values(3, 'hw1003', date '1984-12-08'), (4, 'hw1004', date '2001-01-09')");
assertEquals(getQueryRunner().execute(session, "SELECT count(*) FROM \"add_partition_column$partitions\"").getOnlyValue(), 3L);
assertQuerySucceeds(session, "DROP TABLE add_partition_column");
// Create partitioned table and insert some data
assertQuerySucceeds(session, "create table add_partition_column(a int, b varchar) with (partitioning = ARRAY['truncate(b, 2)'])");
assertQuerySucceeds(session, "insert into add_partition_column values(1, 'hw1001'), (2, 'hw1002')");
assertEquals(getQueryRunner().execute(session, "SELECT count(*) FROM \"add_partition_column$partitions\"").getOnlyValue(), 1L);
assertEquals(getQueryRunner().execute(session, "select row_count from \"add_partition_column$partitions\" where b_trunc = 'hw'").getOnlyValue(), 2L);
// Add hour partition column
assertQuerySucceeds(session, "alter table add_partition_column add column c timestamp with(partitioning = 'hour')");
assertQuerySucceeds(session, "insert into add_partition_column values(3, 'hw1003', timestamp '1984-12-08 12:10:31.315'), (4, 'hw1004', timestamp '2001-01-08 10:01:01.123')");
assertEquals(getQueryRunner().execute(session, "SELECT count(*) FROM \"add_partition_column$partitions\"").getOnlyValue(), 3L);
assertQuery(session, "SELECT b_trunc, c_hour, row_count FROM \"add_partition_column$partitions\" ORDER BY c_hour",
"VALUES ('hw', 130932, 1), ('hw', 271930, 1), ('hw', NULL, 2)");
assertQuery(session, "SELECT * FROM add_partition_column WHERE hour(c) = 12", "VALUES (3, 'hw1003', '1984-12-08 12:10:31.315')");
// validate hour() unsupported for date type
assertQueryFails("alter table add_partition_column add column e date with(partitioning = 'hour')", ".*hour cannot transform date values from.*");
// validate unknown partition transform, take 'other' and 'null' for example.
assertQueryFails("alter table add_partition_column add column e varchar with(partitioning = 'other')", "Unknown partition transform: other");
assertQueryFails("alter table add_partition_column add column e varchar with(partitioning = 'null')", "Unknown partition transform: null");
// validate wrong parameter of transform function
String ignoreErrorMessage = ".*";
long maxValue = Integer.MAX_VALUE + 1;
assertQueryFails("alter table add_partition_column add column e bigint with (partitioning = 'bucket(-1)')", ignoreErrorMessage);
assertQueryFails("alter table add_partition_column add column e bigint with (partitioning = 'bucket(0)')", ignoreErrorMessage);
assertQueryFails("alter table add_partition_column add column e bigint with (partitioning = 'bucket(1d2f)')", ignoreErrorMessage);
assertQueryFails("alter table add_partition_column add column e bigint with (partitioning = 'bucket(" + maxValue + ")')", ignoreErrorMessage);
assertQueryFails("alter table add_partition_column add column f varchar with (partitioning = 'truncate(-1)')", ignoreErrorMessage);
assertQueryFails("alter table add_partition_column add column f varchar with (partitioning = 'truncate(0)')", ignoreErrorMessage);
assertQueryFails("alter table add_partition_column add column f varchar with (partitioning = 'truncate(1d2f)')", ignoreErrorMessage);
assertQueryFails("alter table add_partition_column add column f varchar with (partitioning = 'truncate(" + maxValue + ")')", ignoreErrorMessage);
assertQuerySucceeds("DROP TABLE add_partition_column");
// validate try to add duplicate name in partition spec
assertQueryFails("create table add_partition_column(a_bucket int, a varchar) with (partitioning = ARRAY['a', 'bucket(a, 2)'])", ignoreErrorMessage);
assertQuerySucceeds("create table add_partition_column(a int, b_trunc varchar) with (partitioning = ARRAY['bucket(a, 2)', 'b_trunc'])");
assertQueryFails("alter table add_partition_column add column a_bucket bigint with (partitioning = 'identity')", ignoreErrorMessage);
assertQueryFails("alter table add_partition_column add column b varchar with (partitioning = 'truncate(2)')", ignoreErrorMessage);
assertQuerySucceeds("DROP TABLE add_partition_column");
}
@Test
public void testDropPartitionColumn()
{
assertQuerySucceeds("create table test_drop_partition_column(a int, b varchar) with (partitioning = ARRAY['a'])");
assertQuerySucceeds("insert into test_drop_partition_column values(1, '1001'), (2, '1002'), (3, '1003')");
String errorMessage = "This connector does not support dropping columns which exist in any of the table's partition specs";
assertQueryFails("alter table test_drop_partition_column drop column a", errorMessage);
assertQuerySucceeds("DROP TABLE test_drop_partition_column");
assertQuerySucceeds("create table test_drop_partition_column(a int)");
assertQuerySucceeds("insert into test_drop_partition_column values 1, 2, 3");
assertQuerySucceeds("alter table test_drop_partition_column add column b varchar with (partitioning = 'identity')");
assertQuerySucceeds("insert into test_drop_partition_column values(4, '1004'), (5, '1005')");
assertQueryFails("alter table test_drop_partition_column drop column b", errorMessage);
assertQuerySucceeds("DROP TABLE test_drop_partition_column");
}
@Test
public void testTruncate()
{
// Test truncate empty table
assertUpdate("CREATE TABLE test_truncate_empty (i int)");
assertQuerySucceeds("TRUNCATE TABLE test_truncate_empty");
assertEquals(getQueryRunner().execute("SELECT count(*) FROM test_truncate_empty").getOnlyValue(), 0L);
assertQuerySucceeds("DROP TABLE test_truncate_empty");
// Test truncate table with rows
assertUpdate("CREATE TABLE test_truncate AS SELECT * FROM orders", "SELECT count(*) FROM orders");
assertQuerySucceeds("TRUNCATE TABLE test_truncate");
MaterializedResult result = getQueryRunner().execute("SELECT count(*) FROM test_truncate");
assertEquals(result.getOnlyValue(), 0L);
assertUpdate("DROP TABLE test_truncate");
// test truncate -> insert -> truncate
assertUpdate("CREATE TABLE test_truncate_empty (i int)");
assertQuerySucceeds("TRUNCATE TABLE test_truncate_empty");
assertEquals(getQueryRunner().execute("SELECT count(*) FROM test_truncate_empty").getOnlyValue(), 0L);
assertUpdate("INSERT INTO test_truncate_empty VALUES 1", 1);
assertEquals(getQueryRunner().execute("SELECT count(*) FROM test_truncate_empty").getOnlyValue(), 1L);
assertQuerySucceeds("TRUNCATE TABLE test_truncate_empty");
assertEquals(getQueryRunner().execute("SELECT count(*) FROM test_truncate_empty").getOnlyValue(), 0L);
assertQuerySucceeds("DROP TABLE test_truncate_empty");
// Test truncate access control
assertUpdate("CREATE TABLE test_truncate AS SELECT * FROM orders", "SELECT count(*) FROM orders");
assertAccessAllowed("TRUNCATE TABLE test_truncate", privilege("orders", SELECT_COLUMN));
assertUpdate("DROP TABLE test_truncate");
}
@Test
public void testTruncateTableWithDeleteFiles()
{
String tableName = "test_v2_row_delete_" + randomTableSuffix();
try {
assertUpdate("CREATE TABLE " + tableName + "(a int, b varchar) WITH (\"format-version\" = '2', \"write.delete.mode\" = 'merge-on-read')");
assertUpdate("INSERT INTO " + tableName + " VALUES (1, '1001'), (2, '1002'), (3, '1003')", 3);
// execute row level deletion
assertUpdate("DELETE FROM " + tableName + " WHERE b in ('1002', '1003')", 2);
assertQuery("SELECT * FROM " + tableName, "VALUES (1, '1001')");
Table icebergTable = loadTable(tableName);
assertHasDataFiles(icebergTable.currentSnapshot(), 1);
assertHasDeleteFiles(icebergTable.currentSnapshot(), 1);
// execute truncate table
assertUpdate("TRUNCATE TABLE " + tableName);
assertQuery("SELECT count(*) FROM " + tableName, "VALUES 0");
icebergTable = loadTable(tableName);
assertHasDataFiles(icebergTable.currentSnapshot(), 0);
assertHasDeleteFiles(icebergTable.currentSnapshot(), 0);
}
finally {
assertUpdate("DROP TABLE IF EXISTS " + tableName);
}
}
@Test
public void testShowColumnsForPartitionedTable()
{
// Partitioned Table with only identity partitions
getQueryRunner().execute("CREATE TABLE show_columns_only_identity_partition " +
"(id int," +
" name varchar," +
" team varchar) WITH (partitioning = ARRAY['team'])");
MaterializedResult actual = computeActual("SHOW COLUMNS FROM show_columns_only_identity_partition");
MaterializedResult expectedParametrizedVarchar = resultBuilder(getSession(), VARCHAR, VARCHAR, VARCHAR, VARCHAR)
.row("id", "integer", "", "")
.row("name", "varchar", "", "")
.row("team", "varchar", "partition key", "")
.build();
assertEquals(actual, expectedParametrizedVarchar);
// Partitioned Table with non identity partition transforms
getQueryRunner().execute("CREATE TABLE show_columns_with_non_identity_partition " +
"(id int," +
" name varchar," +
" team varchar) WITH (partitioning = ARRAY['truncate(team, 1)', 'team'])");
actual = computeActual("SHOW COLUMNS FROM show_columns_with_non_identity_partition");
expectedParametrizedVarchar = resultBuilder(getSession(), VARCHAR, VARCHAR, VARCHAR, VARCHAR)
.row("id", "integer", "", "")
.row("name", "varchar", "", "")
.row("team", "varchar", "partition by truncate[1], identity", "")
.build();
assertEquals(actual, expectedParametrizedVarchar);
}
@DataProvider(name = "timezones")
public Object[][] timezones()
{
return new Object[][] {
{"UTC", true},
{"America/Los_Angeles", true},
{"Asia/Shanghai", true},
{"None", false}};
}
@Test(dataProvider = "timezones")
public void testPartitionedByTimestampType(String zoneId, boolean legacyTimestamp)
{
Session sessionForTimeZone = sessionForTimezone(zoneId, legacyTimestamp);
testWithAllFileFormats(sessionForTimeZone, (session, fileFormat) -> testPartitionedByTimestampTypeForFormat(session, fileFormat));
}
private void testPartitionedByTimestampTypeForFormat(Session session, FileFormat fileFormat)
{
try {
// create iceberg table partitioned by column of TimestampType, and insert some data
assertQuerySucceeds(session, format("create table test_partition_columns(a bigint, b timestamp) with (partitioning = ARRAY['b'], \"write.format.default\" = '%s')", fileFormat.name()));
assertQuerySucceeds(session, "insert into test_partition_columns values(1, timestamp '1984-12-08 00:10:00'), (2, timestamp '2001-01-08 12:01:01')");
// validate return data of TimestampType
List<Object> timestampColumnDatas = getQueryRunner().execute(session, "select b from test_partition_columns order by a asc").getOnlyColumn().collect(Collectors.toList());
assertEquals(timestampColumnDatas.size(), 2);
assertEquals(timestampColumnDatas.get(0), LocalDateTime.parse("1984-12-08 00:10:00", DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
assertEquals(timestampColumnDatas.get(1), LocalDateTime.parse("2001-01-08 12:01:01", DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
// validate column of TimestampType exists in query filter
assertEquals(getQueryRunner().execute(session, "select b from test_partition_columns where b = timestamp '1984-12-08 00:10:00'").getOnlyValue(),
LocalDateTime.parse("1984-12-08 00:10:00", DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
assertEquals(getQueryRunner().execute(session, "select b from test_partition_columns where b = timestamp '2001-01-08 12:01:01'").getOnlyValue(),
LocalDateTime.parse("2001-01-08 12:01:01", DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
// validate column of TimestampType in system table "partitions"
assertEquals(getQueryRunner().execute(session, "select count(*) FROM \"test_partition_columns$partitions\"").getOnlyValue(), 2L);
assertEquals(getQueryRunner().execute(session, "select row_count from \"test_partition_columns$partitions\" where b = timestamp '1984-12-08 00:10:00'").getOnlyValue(), 1L);
assertEquals(getQueryRunner().execute(session, "select row_count from \"test_partition_columns$partitions\" where b = timestamp '2001-01-08 12:01:01'").getOnlyValue(), 1L);
// validate column of TimestampType exists in delete filter
assertUpdate(session, "delete from test_partition_columns WHERE b = timestamp '2001-01-08 12:01:01'", 1);
timestampColumnDatas = getQueryRunner().execute(session, "select b from test_partition_columns order by a asc").getOnlyColumn().collect(Collectors.toList());
assertEquals(timestampColumnDatas.size(), 1);
assertEquals(timestampColumnDatas.get(0), LocalDateTime.parse("1984-12-08 00:10:00", DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
assertEquals(getQueryRunner().execute(session, "select b FROM test_partition_columns where b = timestamp '1984-12-08 00:10:00'").getOnlyValue(),
LocalDateTime.parse("1984-12-08 00:10:00", DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
assertEquals(getQueryRunner().execute(session, "select count(*) from \"test_partition_columns$partitions\"").getOnlyValue(), 1L);
assertEquals(getQueryRunner().execute(session, "select row_count from \"test_partition_columns$partitions\" where b = timestamp '1984-12-08 00:10:00'").getOnlyValue(), 1L);
}
finally {
assertQuerySucceeds(session, "drop table test_partition_columns");
}
}
@Test
public void testCreateTableWithCustomLocation()
throws IOException
{
String tableName = "test_table_with_custom_location";
URI tableTargetURI = createTempDirectory(tableName).toUri();
try {
assertQuerySucceeds(format("create table %s (a int, b varchar)" +
" with (location = '%s')", tableName, tableTargetURI.toString()));
assertUpdate(format("insert into %s values(1, '1001'), (2, '1002')", tableName), 2);
assertQuery("select * from " + tableName, "values(1, '1001'), (2, '1002')");
TableMetadata tableMetadata = ((BaseTable) loadTable(tableName)).operations().current();
assertEquals(URI.create(tableMetadata.location() + File.separator), tableTargetURI);
}
finally {
assertUpdate("drop table if exists " + tableName);
}
}
@Test
protected void testCreateTableAndValidateIcebergTableName()
{
String tableName = "test_create_table_for_validate_name";
Session session = getSession();
assertUpdate(session, format("CREATE TABLE %s (col1 INTEGER, aDate DATE)", tableName));
Table icebergTable = loadTable(tableName);
String catalog = session.getCatalog().get();
String schemaName = session.getSchema().get();
assertEquals(icebergTable.name(), catalog + "." + schemaName + "." + tableName);
assertUpdate("DROP TABLE IF EXISTS " + tableName);
}
@Test
public void testPartitionedByTimeType()
{
// create iceberg table partitioned by column of TimestampType, and insert some data
assertQuerySucceeds("drop table if exists test_partition_columns_time");
assertQuerySucceeds("create table test_partition_columns_time(a bigint, b time) with (partitioning = ARRAY['b'])");
assertQuerySucceeds("insert into test_partition_columns_time values(1, time '00:10:00'), (2, time '12:01:01')");
// validate return data of TimestampType
List<Object> timestampColumnDatas = getQueryRunner().execute("select b from test_partition_columns_time order by a asc").getOnlyColumn().collect(Collectors.toList());
assertEquals(timestampColumnDatas.size(), 2);
assertEquals(timestampColumnDatas.get(0), LocalTime.parse("00:10:00", DateTimeFormatter.ofPattern("HH:mm:ss")));
assertEquals(timestampColumnDatas.get(1), LocalTime.parse("12:01:01", DateTimeFormatter.ofPattern("HH:mm:ss")));
// validate column of TimestampType exists in query filter
assertEquals(getQueryRunner().execute("select b from test_partition_columns_time where b = time '00:10:00'").getOnlyValue(),
LocalTime.parse("00:10:00", DateTimeFormatter.ofPattern("HH:mm:ss")));
assertEquals(getQueryRunner().execute("select b from test_partition_columns_time where b = time '12:01:01'").getOnlyValue(),
LocalTime.parse("12:01:01", DateTimeFormatter.ofPattern("HH:mm:ss")));
// validate column of TimestampType in system table "partitions"
assertEquals(getQueryRunner().execute("select count(*) FROM \"test_partition_columns_time$partitions\"").getOnlyValue(), 2L);
assertEquals(getQueryRunner().execute("select row_count from \"test_partition_columns_time$partitions\" where b = time '00:10:00'").getOnlyValue(), 1L);
assertEquals(getQueryRunner().execute("select row_count from \"test_partition_columns_time$partitions\" where b = time '12:01:01'").getOnlyValue(), 1L);
// validate column of TimestampType exists in delete filter
assertUpdate("delete from test_partition_columns_time WHERE b = time '12:01:01'", 1);
timestampColumnDatas = getQueryRunner().execute("select b from test_partition_columns_time order by a asc").getOnlyColumn().collect(Collectors.toList());
assertEquals(timestampColumnDatas.size(), 1);
assertEquals(timestampColumnDatas.get(0), LocalTime.parse("00:10:00", DateTimeFormatter.ofPattern("HH:mm:ss")));
assertEquals(getQueryRunner().execute("select b FROM test_partition_columns_time where b = time '00:10:00'").getOnlyValue(),
LocalTime.parse("00:10:00", DateTimeFormatter.ofPattern("HH:mm:ss")));
assertEquals(getQueryRunner().execute("select count(*) from \"test_partition_columns_time$partitions\"").getOnlyValue(), 1L);
assertEquals(getQueryRunner().execute("select row_count from \"test_partition_columns_time$partitions\" where b = time '00:10:00'").getOnlyValue(), 1L);
assertQuerySucceeds("drop table test_partition_columns_time");
}
@Test
public void testPartitionedByVarbinaryType()
{
// create iceberg table partitioned by column of VarbinaryType, and insert some data
assertQuerySucceeds("drop table if exists test_partition_columns_varbinary");
assertQuerySucceeds("create table test_partition_columns_varbinary(a bigint, b varbinary) with (partitioning = ARRAY['b'])");
assertQuerySucceeds("insert into test_partition_columns_varbinary values(1, X'bcd1'), (2, X'e3bcd1')");
// validate return data of VarbinaryType
List<Object> varbinaryColumnDatas = getQueryRunner().execute("select b from test_partition_columns_varbinary order by a asc").getOnlyColumn().collect(Collectors.toList());
assertEquals(varbinaryColumnDatas.size(), 2);
assertEquals(varbinaryColumnDatas.get(0), new byte[] {(byte) 0xbc, (byte) 0xd1});
assertEquals(varbinaryColumnDatas.get(1), new byte[] {(byte) 0xe3, (byte) 0xbc, (byte) 0xd1});
// validate column of VarbinaryType exists in query filter
assertEquals(getQueryRunner().execute("select b from test_partition_columns_varbinary where b = X'bcd1'").getOnlyValue(),
new byte[] {(byte) 0xbc, (byte) 0xd1});
assertEquals(getQueryRunner().execute("select b from test_partition_columns_varbinary where b = X'e3bcd1'").getOnlyValue(),
new byte[] {(byte) 0xe3, (byte) 0xbc, (byte) 0xd1});
// validate column of VarbinaryType in system table "partitions"
assertEquals(getQueryRunner().execute("select count(*) FROM \"test_partition_columns_varbinary$partitions\"").getOnlyValue(), 2L);
assertEquals(getQueryRunner().execute("select row_count from \"test_partition_columns_varbinary$partitions\" where b = X'bcd1'").getOnlyValue(), 1L);
assertEquals(getQueryRunner().execute("select row_count from \"test_partition_columns_varbinary$partitions\" where b = X'e3bcd1'").getOnlyValue(), 1L);
// validate column of VarbinaryType exists in delete filter
assertUpdate("delete from test_partition_columns_varbinary WHERE b = X'bcd1'", 1);
varbinaryColumnDatas = getQueryRunner().execute("select b from test_partition_columns_varbinary order by a asc").getOnlyColumn().collect(Collectors.toList());
assertEquals(varbinaryColumnDatas.size(), 1);
assertEquals(varbinaryColumnDatas.get(0), new byte[] {(byte) 0xe3, (byte) 0xbc, (byte) 0xd1});
assertEquals(getQueryRunner().execute("select b FROM test_partition_columns_varbinary where b = X'e3bcd1'").getOnlyValue(),
new byte[] {(byte) 0xe3, (byte) 0xbc, (byte) 0xd1});
assertEquals(getQueryRunner().execute("select count(*) from \"test_partition_columns_varbinary$partitions\"").getOnlyValue(), 1L);
assertEquals(getQueryRunner().execute("select row_count from \"test_partition_columns_varbinary$partitions\" where b = X'e3bcd1'").getOnlyValue(), 1L);
assertQuerySucceeds("drop table test_partition_columns_varbinary");
}
@DataProvider(name = "columnCount")
public Object[][] getColumnCount()
{
return new Object[][] {{2}, {16}, {100}};
}
@Test(dataProvider = "columnCount")
public void testReadWriteStatsWithColumnLimits(int columnCount)
{
try {
String columns = Joiner.on(", ")
.join(IntStream.iterate(2, i -> i + 1).limit(columnCount - 2)
.mapToObj(idx -> "column_" + idx + " int")
.iterator());
String comma = Strings.isNullOrEmpty(columns.trim()) ? "" : ", ";
// The columns number of `test_stats_with_column_limits` for which metrics are collected is set to `columnCount`
assertUpdate("CREATE TABLE test_stats_with_column_limits (column_0 int, column_1 varchar, " + columns + comma + "column_10001 varchar) with(\"write.metadata.metrics.max-inferred-column-defaults\" = " + columnCount + ")");
assertTrue(getQueryRunner().tableExists(getSession(), "test_stats_with_column_limits"));
List<String> columnNames = IntStream.iterate(0, i -> i + 1).limit(columnCount)
.mapToObj(idx -> "column_" + idx).collect(Collectors.toList());
columnNames.add("column_10001");
assertTableColumnNames("test_stats_with_column_limits", columnNames.toArray(new String[0]));
// test that stats don't exist before analyze
Function<Map<ColumnHandle, ColumnStatistics>, Map<String, ColumnStatistics>> remapper = (input) -> input.entrySet().stream().collect(Collectors.toMap(e -> ((IcebergColumnHandle) e.getKey()).getName(), Map.Entry::getValue));
Map<String, ColumnStatistics> columnStats;
TableStatistics stats = getTableStats("test_stats_with_column_limits");
columnStats = remapper.apply(stats.getColumnStatistics());
assertTrue(columnStats.isEmpty());
String values1 = Joiner.on(", ")
.join(IntStream.iterate(2, i -> i + 1).limit(columnCount - 2)
.mapToObj(idx -> "100" + idx)
.iterator());
String values2 = Joiner.on(", ")
.join(IntStream.iterate(2, i -> i + 1).limit(columnCount - 2)
.mapToObj(idx -> "200" + idx)
.iterator());
String values3 = Joiner.on(", ")
.join(IntStream.iterate(2, i -> i + 1).limit(columnCount - 2)
.mapToObj(idx -> "300" + idx)
.iterator());
// test after simple insert we get a good estimate
assertUpdate("INSERT INTO test_stats_with_column_limits VALUES " +
"(1, '1001', " + values1 + comma + "'abc'), " +
"(2, '2001', " + values2 + comma + "'xyz'), " +
"(3, '3001', " + values3 + comma + "'lmnopqrst')", 3);
getQueryRunner().execute("ANALYZE test_stats_with_column_limits");
stats = getTableStats("test_stats_with_column_limits");
columnStats = remapper.apply(stats.getColumnStatistics());
// `column_0` has columns statistics
ColumnStatistics columnStat = columnStats.get("column_0");
assertEquals(columnStat.getDistinctValuesCount(), Estimate.of(3.0));
assertEquals(columnStat.getNullsFraction(), Estimate.of(0.0));
assertEquals(columnStat.getDataSize(), Estimate.unknown());
// `column_1` has columns statistics
columnStat = columnStats.get("column_1");
assertEquals(columnStat.getDistinctValuesCount(), Estimate.of(3.0));
assertEquals(columnStat.getNullsFraction(), Estimate.of(0.0));
assertEquals(columnStat.getDataSize(), Estimate.of(12.0));
// `column_10001` do not have column statistics as its column index is
// larger than the max number for which metrics are collected
columnStat = columnStats.get("column_10001");
assertEquals(columnStat.getDistinctValuesCount(), Estimate.unknown());
assertEquals(columnStat.getNullsFraction(), Estimate.unknown());
assertEquals(columnStat.getDataSize(), Estimate.unknown());
}
finally {
assertUpdate("DROP TABLE IF EXISTS test_stats_with_column_limits");
}
}
@Test
public void testReadWriteStats()
{
assertUpdate("CREATE TABLE test_stats (col0 int, col_1 varchar)");
assertTrue(getQueryRunner().tableExists(getSession(), "test_stats"));
assertTableColumnNames("test_stats", "col0", "col_1");
// test that stats don't exist before analyze
Function<Map<ColumnHandle, ColumnStatistics>, Map<String, ColumnStatistics>> remapper = (input) -> input.entrySet().stream().collect(Collectors.toMap(e -> ((IcebergColumnHandle) e.getKey()).getName(), Map.Entry::getValue));
Map<String, ColumnStatistics> columnStats;
TableStatistics stats = getTableStats("test_stats");
columnStats = remapper.apply(stats.getColumnStatistics());
assertTrue(columnStats.isEmpty());
// test after simple insert we get a good estimate
assertUpdate("INSERT INTO test_stats VALUES (1, 'abc'), (2, 'xyz'), (3, 'lmnopqrst')", 3);
getQueryRunner().execute("ANALYZE test_stats");
stats = getTableStats("test_stats");
columnStats = remapper.apply(stats.getColumnStatistics());
ColumnStatistics columnStat = columnStats.get("col0");
assertEquals(columnStat.getDistinctValuesCount(), Estimate.of(3.0));
assertEquals(columnStat.getDataSize(), Estimate.unknown());
columnStat = columnStats.get("col_1");
assertEquals(columnStat.getDistinctValuesCount(), Estimate.of(3.0));
double dataSize = (double) (long) getQueryRunner().execute("SELECT sum_data_size_for_stats(col_1) FROM test_stats").getOnlyValue();
assertEquals(columnStat.getDataSize().getValue(), dataSize);
// test after inserting the same values, we still get the same estimate
assertUpdate("INSERT INTO test_stats VALUES (1, 'abc'), (2, 'xyz'), (3, 'lmnopqrst')", 3);
stats = getTableStats("test_stats");
columnStats = remapper.apply(stats.getColumnStatistics());
columnStat = columnStats.get("col0");
assertEquals(columnStat.getDistinctValuesCount(), Estimate.of(3.0));
assertEquals(columnStat.getDataSize(), Estimate.unknown());
columnStat = columnStats.get("col_1");
assertEquals(columnStat.getDistinctValuesCount(), Estimate.of(3.0));
assertEquals(columnStat.getDataSize().getValue(), dataSize);
// test after ANALYZING with the new inserts that the NDV estimate is the same and the data size matches
getQueryRunner().execute("ANALYZE test_stats");
stats = getTableStats("test_stats");
columnStats = remapper.apply(stats.getColumnStatistics());
columnStat = columnStats.get("col0");
assertEquals(columnStat.getDistinctValuesCount(), Estimate.of(3.0));
assertEquals(columnStat.getDataSize(), Estimate.unknown());
columnStat = columnStats.get("col_1");
assertEquals(columnStat.getDistinctValuesCount(), Estimate.of(3.0));
dataSize = (double) (long) getQueryRunner().execute("SELECT sum_data_size_for_stats(col_1) FROM test_stats").getOnlyValue();
assertEquals(columnStat.getDataSize().getValue(), dataSize);
// test after inserting a new value, but not analyzing, the estimate is the same.
assertUpdate("INSERT INTO test_stats VALUES (4, 'def')", 1);
stats = getTableStats("test_stats");
columnStats = remapper.apply(stats.getColumnStatistics());
columnStat = columnStats.get("col0");
assertEquals(columnStat.getDistinctValuesCount(), Estimate.of(3.0));
assertEquals(columnStat.getDataSize(), Estimate.unknown());
columnStat = columnStats.get("col_1");
assertEquals(columnStat.getDistinctValuesCount(), Estimate.of(3.0));
assertEquals(columnStat.getDataSize().getValue(), dataSize);
// test that after analyzing, the updates stats show up.
getQueryRunner().execute("ANALYZE test_stats");
stats = getTableStats("test_stats");
columnStats = remapper.apply(stats.getColumnStatistics());
columnStat = columnStats.get("col0");
assertEquals(columnStat.getDistinctValuesCount(), Estimate.of(4.0));
assertEquals(columnStat.getDataSize(), Estimate.unknown());
columnStat = columnStats.get("col_1");
assertEquals(columnStat.getDistinctValuesCount(), Estimate.of(4.0));
dataSize = (double) (long) getQueryRunner().execute("SELECT sum_data_size_for_stats(col_1) FROM test_stats").getOnlyValue();
assertEquals(columnStat.getDataSize().getValue(), dataSize);
// test adding a null value is successful, and analyze still runs successfully
assertUpdate("INSERT INTO test_stats VALUES (NULL, NULL)", 1);
assertQuerySucceeds("ANALYZE test_stats");
stats = getTableStats("test_stats");
columnStats = remapper.apply(stats.getColumnStatistics());
columnStat = columnStats.get("col0");
assertEquals(columnStat.getDistinctValuesCount(), Estimate.of(4.0));
assertEquals(columnStat.getDataSize(), Estimate.unknown());
columnStat = columnStats.get("col_1");
assertEquals(columnStat.getDistinctValuesCount(), Estimate.of(4.0));
dataSize = (double) (long) getQueryRunner().execute("SELECT sum_data_size_for_stats(col_1) FROM test_stats").getOnlyValue();
assertEquals(columnStat.getDataSize().getValue(), dataSize);
assertUpdate("DROP TABLE test_stats");
}
@Test
public void testReadWriteNDVsComplexTypes()
{
assertUpdate("CREATE TABLE test_stat_ndv_complex (col0 int, col1 date, col2 varchar, col3 row(c0 int))");
assertTrue(getQueryRunner().tableExists(getSession(), "test_stat_ndv_complex"));
assertTableColumnNames("test_stat_ndv_complex", "col0", "col1", "col2", "col3");
// test that stats don't exist before analyze
TableStatistics stats = getTableStats("test_stat_ndv_complex");
assertTrue(stats.getColumnStatistics().isEmpty());
// test after simple insert we get a good estimate
assertUpdate("INSERT INTO test_stat_ndv_complex VALUES (0, current_date, 't1', row(0))", 1);
getQueryRunner().execute("ANALYZE test_stat_ndv_complex");
stats = getTableStats("test_stat_ndv_complex");
assertEquals(columnStatsFor(stats, "col0").getDistinctValuesCount(), Estimate.of(1.0));
assertEquals(columnStatsFor(stats, "col1").getDistinctValuesCount(), Estimate.of(1.0));
assertEquals(columnStatsFor(stats, "col2").getDistinctValuesCount(), Estimate.of(1.0));
assertEquals(columnStatsFor(stats, "col3").getDistinctValuesCount(), Estimate.unknown());
// test after inserting the same values, we still get the same estimate
assertUpdate("INSERT INTO test_stat_ndv_complex VALUES (0, current_date, 't1', row(0))", 1);
stats = getTableStats("test_stat_ndv_complex");
assertEquals(columnStatsFor(stats, "col0").getDistinctValuesCount(), Estimate.of(1.0));
assertEquals(columnStatsFor(stats, "col1").getDistinctValuesCount(), Estimate.of(1.0));
assertEquals(columnStatsFor(stats, "col2").getDistinctValuesCount(), Estimate.of(1.0));
assertEquals(columnStatsFor(stats, "col3").getDistinctValuesCount(), Estimate.unknown());
// test after ANALYZING with the new inserts that the NDV estimate is the same
getQueryRunner().execute("ANALYZE test_stat_ndv_complex");
stats = getTableStats("test_stat_ndv_complex");
assertEquals(columnStatsFor(stats, "col0").getDistinctValuesCount(), Estimate.of(1.0));
assertEquals(columnStatsFor(stats, "col1").getDistinctValuesCount(), Estimate.of(1.0));
assertEquals(columnStatsFor(stats, "col2").getDistinctValuesCount(), Estimate.of(1.0));
assertEquals(columnStatsFor(stats, "col3").getDistinctValuesCount(), Estimate.unknown());
// test after inserting a new value, but not analyzing, the estimate is the same.
assertUpdate("INSERT INTO test_stat_ndv_complex VALUES (1, current_date + interval '1' day, 't2', row(1))", 1);
stats = getTableStats("test_stat_ndv_complex");
assertEquals(columnStatsFor(stats, "col0").getDistinctValuesCount(), Estimate.of(1.0));
assertEquals(columnStatsFor(stats, "col1").getDistinctValuesCount(), Estimate.of(1.0));
assertEquals(columnStatsFor(stats, "col2").getDistinctValuesCount(), Estimate.of(1.0));
assertEquals(columnStatsFor(stats, "col3").getDistinctValuesCount(), Estimate.unknown());
// test that after analyzing, the updates stats show up.
getQueryRunner().execute("ANALYZE test_stat_ndv_complex");
stats = getTableStats("test_stat_ndv_complex");
assertEquals(columnStatsFor(stats, "col0").getDistinctValuesCount(), Estimate.of(2.0));
assertEquals(columnStatsFor(stats, "col1").getDistinctValuesCount(), Estimate.of(2.0));
assertEquals(columnStatsFor(stats, "col2").getDistinctValuesCount(), Estimate.of(2.0));
assertEquals(columnStatsFor(stats, "col3").getDistinctValuesCount(), Estimate.unknown());
// test adding a null value is successful, and analyze still runs successfully
assertUpdate("INSERT INTO test_stat_ndv_complex VALUES (NULL, NULL, NULL, NULL)", 1);
assertQuerySucceeds("ANALYZE test_stat_ndv_complex");
stats = getTableStats("test_stat_ndv_complex");
assertEquals(columnStatsFor(stats, "col0").getDistinctValuesCount(), Estimate.of(2.0));
assertEquals(columnStatsFor(stats, "col1").getDistinctValuesCount(), Estimate.of(2.0));
assertEquals(columnStatsFor(stats, "col2").getDistinctValuesCount(), Estimate.of(2.0));
assertEquals(columnStatsFor(stats, "col3").getDistinctValuesCount(), Estimate.unknown());
assertUpdate("DROP TABLE test_stat_ndv_complex");
}
@Test
public void testNDVsAtSnapshot()
{
assertUpdate("CREATE TABLE test_stat_snap (col0 int, col1 varchar)");
assertTrue(getQueryRunner().tableExists(getSession(), "test_stat_snap"));
assertTableColumnNames("test_stat_snap", "col0", "col1");
assertEquals(getTableSnapshots("test_stat_snap").size(), 0);
assertQuerySucceeds("ANALYZE test_stat_snap");
assertUpdate("INSERT INTO test_stat_snap VALUES (0, '0')", 1);
assertQuerySucceeds("ANALYZE test_stat_snap");
assertUpdate("INSERT INTO test_stat_snap VALUES (1, '1')", 1);
assertQuerySucceeds("ANALYZE test_stat_snap");
assertUpdate("INSERT INTO test_stat_snap VALUES (2, '2')", 1);
assertQuerySucceeds("ANALYZE test_stat_snap");
assertUpdate("INSERT INTO test_stat_snap VALUES (3, '3')", 1);
assertQuerySucceeds("ANALYZE test_stat_snap");
assertEquals(getTableSnapshots("test_stat_snap").size(), 4);
List<Long> snaps = getTableSnapshots("test_stat_snap");
for (int i = 0; i < snaps.size(); i++) {
TableStatistics statistics = getTableStats("test_stat_snap", Optional.of(snaps.get(i)));
// assert either case as we don't have good control over the timing of when statistics files are written
ColumnStatistics col0Stats = columnStatsFor(statistics, "col0");
ColumnStatistics col1Stats = columnStatsFor(statistics, "col1");
final int idx = i;
assertEither(
() -> assertEquals(col0Stats.getDistinctValuesCount(), Estimate.of(idx)),
() -> assertEquals(col0Stats.getDistinctValuesCount(), Estimate.of(idx + 1)));
assertEither(
() -> assertEquals(col1Stats.getDistinctValuesCount(), Estimate.of(idx)),
() -> assertEquals(col1Stats.getDistinctValuesCount(), Estimate.of(idx + 1)));
}
assertUpdate("DROP TABLE test_stat_snap");
}
@Test
public void testStatsByDistance()
{
assertUpdate("CREATE TABLE test_stat_dist (col0 int)");
assertTrue(getQueryRunner().tableExists(getSession(), "test_stat_dist"));
assertTableColumnNames("test_stat_dist", "col0");
assertEquals(getTableSnapshots("test_stat_dist").size(), 0);
assertUpdate("INSERT INTO test_stat_dist VALUES 0", 1);
assertQuerySucceeds("ANALYZE test_stat_dist");
assertUpdate("INSERT INTO test_stat_dist VALUES 1", 1);
assertUpdate("INSERT INTO test_stat_dist VALUES 2", 1);
assertUpdate("INSERT INTO test_stat_dist VALUES 3", 1);
assertUpdate("INSERT INTO test_stat_dist VALUES 4", 1);
assertUpdate("INSERT INTO test_stat_dist VALUES 5", 1);
assertQuerySucceeds("ANALYZE test_stat_dist");
assertEquals(getTableSnapshots("test_stat_dist").size(), 6);
List<Long> snapshots = getTableSnapshots("test_stat_dist");
// set a high weight so the weighting calculation is mostly done by record count
Session weightedSession = Session.builder(getSession())
.setCatalogSessionProperty("iceberg", STATISTIC_SNAPSHOT_RECORD_DIFFERENCE_WEIGHT, "10000000")
.build();
Function<Integer, Estimate> ndvs = (x) -> columnStatsFor(getTableStats("test_stat_dist", Optional.of(snapshots.get(x)), weightedSession, Optional.empty()), "col0")
.getDistinctValuesCount();
assertEquals(ndvs.apply(0).getValue(), 1);
assertEquals(ndvs.apply(1).getValue(), 1);
assertEquals(ndvs.apply(2).getValue(), 1);
assertEquals(ndvs.apply(3).getValue(), 6);
assertEquals(ndvs.apply(4).getValue(), 6);
assertEquals(ndvs.apply(5).getValue(), 6);
assertUpdate("DROP TABLE test_stat_dist");
}
@Test
public void testStatsDataSizePrimitives()
{
assertUpdate("CREATE TABLE test_stat_data_size (c0 int, c1 bigint, c2 double, c3 decimal(4, 0), c4 varchar, c5 varchar(10), c6 date, c7 time, c8 timestamp, c10 boolean)");
assertUpdate("INSERT INTO test_stat_data_size VALUES (0, 1, 2.0, CAST(4.01 as decimal(4, 0)), 'testvc', 'testvc10', date '2024-03-14', localtime, localtimestamp, TRUE)", 1);
assertQuerySucceeds("ANALYZE test_stat_data_size");
TableStatistics stats = getTableStats("test_stat_data_size");
stats.getColumnStatistics().entrySet().stream()
.filter((e) -> ((IcebergColumnHandle) e.getKey()).getColumnType() != SYNTHESIZED)
.forEach((entry) -> {
IcebergColumnHandle handle = (IcebergColumnHandle) entry.getKey();
ColumnStatistics stat = entry.getValue();
if (handle.getType() instanceof FixedWidthType) {
assertEquals(stat.getDataSize(), Estimate.unknown());
}
else {
assertNotEquals(stat.getDataSize(), Estimate.unknown(), String.format("for column %s", handle));
assertTrue(stat.getDataSize().getValue() > 0);
}
});
getQueryRunner().execute("DROP TABLE test_stat_data_size");
}
private static void assertEither(Runnable first, Runnable second)
{
try {
first.run();
}
catch (AssertionError e) {
second.run();
}
}
private List<Long> getTableSnapshots(String tableName)
{
MaterializedResult result = getQueryRunner().execute(format("SELECT snapshot_id FROM \"%s$snapshots\" ORDER BY committed_at", tableName));
return result.getOnlyColumn().map(Long.class::cast).collect(Collectors.toList());
}
private TableStatistics getTableStats(String name)
{
return getTableStats(name, Optional.empty());
}
private TableStatistics getTableStats(String name, Optional<Long> snapshot)
{
return getTableStats(name, snapshot, getSession(), Optional.empty());
}
private TableStatistics getTableStats(String name, Optional<Long> snapshot, Session session)
{
return getTableStats(name, snapshot, session, Optional.empty());
}
private TableStatistics getTableStats(String name, Optional<Long> snapshot, Session session, Optional<List<String>> columns)
{
TransactionId transactionId = getQueryRunner().getTransactionManager().beginTransaction(false);
Session metadataSession = session.beginTransactionId(
transactionId,
getQueryRunner().getTransactionManager(),
new AllowAllAccessControl());
Metadata metadata = getDistributedQueryRunner().getMetadata();
MetadataResolver resolver = metadata.getMetadataResolver(metadataSession);
String tableName = snapshot.map(snap -> format("%s@%d", name, snap)).orElse(name);
String qualifiedName = format("%s.%s.%s", getSession().getCatalog().get(), getSession().getSchema().get(), tableName);
TableHandle handle = resolver.getTableHandle(QualifiedObjectName.valueOf(qualifiedName)).get();
TableStatistics tableStatistics = metadata.getTableStatistics(metadataSession,
handle,
new ArrayList<>(columns
.map(columnSet -> Maps.filterKeys(resolver.getColumnHandles(handle), columnSet::contains))
.orElseGet(() -> resolver.getColumnHandles(handle)).values()),
Constraint.alwaysTrue());
getQueryRunner().getTransactionManager().asyncAbort(transactionId);
return tableStatistics;
}
private static ColumnStatistics columnStatsFor(TableStatistics statistics, String name)
{
return statistics.getColumnStatistics().entrySet()
.stream().filter(entry -> ((IcebergColumnHandle) entry.getKey()).getName().equals(name))
.map(Map.Entry::getValue)
.findFirst()
.get();
}
@DataProvider(name = "fileFormat")
public Object[][] getFileFormat()
{
return new Object[][] {{"PARQUET"}, {"ORC"}};
}
@Test(dataProvider = "fileFormat")
public void testTableWithPositionDelete(String fileFormat)
throws Exception
{
String tableName = "test_v2_row_delete_" + randomTableSuffix();
assertUpdate("CREATE TABLE " + tableName + " with (\"write.format.default\" = '" + fileFormat + "') AS SELECT * FROM tpch.tiny.nation order by nationkey", 25);
Table icebergTable = updateTable(tableName);
String dataFilePath = (String) computeActual("SELECT file_path FROM \"" + tableName + "$files\" LIMIT 1").getOnlyValue();
writePositionDeleteToNationTable(icebergTable, dataFilePath, 0);
testCheckDeleteFiles(icebergTable, 1, ImmutableList.of(POSITION_DELETES));
assertQuery("SELECT count(*) FROM " + tableName, "VALUES 24");
assertQuery("SELECT nationkey FROM " + tableName, "SELECT nationkey FROM nation WHERE nationkey != 0");
writePositionDeleteToNationTable(icebergTable, dataFilePath, 8);
testCheckDeleteFiles(icebergTable, 2, ImmutableList.of(POSITION_DELETES, POSITION_DELETES));
assertQuery("SELECT count(*) FROM " + tableName, "VALUES 23");
assertQuery("SELECT nationkey FROM " + tableName, "SELECT nationkey FROM nation WHERE nationkey not in (0 ,8)");
}
@DataProvider(name = "equalityDeleteOptions")
public Object[][] equalityDeleteDataProvider()
{
return new Object[][] {
{"PARQUET", false},
{"PARQUET", true},
{"ORC", false},
{"ORC", true}};
}
private Session deleteAsJoinEnabled(boolean joinRewriteEnabled)
{
return Session.builder(getQueryRunner().getDefaultSession())
.setCatalogSessionProperty(ICEBERG_CATALOG, DELETE_AS_JOIN_REWRITE_ENABLED, String.valueOf(joinRewriteEnabled))
.build();
}
@Test(dataProvider = "equalityDeleteOptions")
public void testEqualityDeleteWithPartitionColumnMissingInSelect(String fileFormat, boolean joinRewriteEnabled)
throws Exception
{
Session session = deleteAsJoinEnabled(joinRewriteEnabled);
String tableName = "test_v2_row_delete_" + randomTableSuffix();
try {
assertUpdate("CREATE TABLE " + tableName + "(a int, b varchar) WITH (\"write.format.default\" = '" + fileFormat + "', partitioning=ARRAY['a'])");
assertUpdate("INSERT INTO " + tableName + " VALUES (1, '1001'), (2, '1002'), (2, '1010'), (3, '1003')", 4);
Table icebergTable = updateTable(tableName);
writeEqualityDeleteToNationTable(icebergTable, ImmutableMap.of("a", 2, "b", "1002"), ImmutableMap.of("a", 2));
writeEqualityDeleteToNationTable(icebergTable, ImmutableMap.of("b", "1010"), ImmutableMap.of("a", 2));
assertQuery(session, "SELECT b FROM " + tableName, "VALUES ('1001'), ('1003')");
assertQuery(session, "SELECT b FROM " + tableName + " WHERE a > 1", "VALUES ('1003')");
assertUpdate("ALTER TABLE " + tableName + " ADD COLUMN c int WITH (partitioning = 'identity')");
assertUpdate("INSERT INTO " + tableName + " VALUES (6, '1004', 1), (6, '1006', 2), (6, '1009', 2)", 3);
icebergTable = updateTable(tableName);
writeEqualityDeleteToNationTable(icebergTable, ImmutableMap.of("a", 6, "c", 2, "b", "1006"),
ImmutableMap.of("a", 6, "c", 2));
writeEqualityDeleteToNationTable(icebergTable, ImmutableMap.of("b", "1009"),
ImmutableMap.of("a", 6, "c", 2));
assertQuery(session, "SELECT a, b FROM " + tableName, "VALUES (1, '1001'), (3, '1003'), (6, '1004')");
assertQuery(session, "SELECT a, b FROM " + tableName + " WHERE a in (1, 6) and c < 3", "VALUES (6, '1004')");
assertUpdate("ALTER TABLE " + tableName + " ADD COLUMN d varchar WITH (partitioning = 'truncate(2)')");
assertUpdate("INSERT INTO " + tableName + " VALUES (6, '1004', 1, 'th001'), (6, '1006', 2, 'th002'), (6, '1006', 3, 'ti003')", 3);
icebergTable = updateTable(tableName);
writeEqualityDeleteToNationTable(icebergTable, ImmutableMap.of("a", 6, "c", 1, "d", "th001"),
ImmutableMap.of("a", 6, "c", 1, "d_trunc", "th"));
testCheckDeleteFiles(icebergTable, 5, ImmutableList.of(EQUALITY_DELETES, EQUALITY_DELETES, EQUALITY_DELETES, EQUALITY_DELETES, EQUALITY_DELETES));
assertQuery(session, "SELECT a, b, d FROM " + tableName, "VALUES (1, '1001', NULL), (3, '1003', NULL), (6, '1004', NULL), (6, '1006', 'th002'), (6, '1006', 'ti003')");
assertQuery(session, "SELECT a, b, d FROM " + tableName + " WHERE a in (1, 6) and d > 'th000'", "VALUES (6, '1006', 'th002'), (6, '1006', 'ti003')");
}
finally {
assertUpdate("DROP TABLE IF EXISTS " + tableName);
}
}
@Test(dataProvider = "equalityDeleteOptions")
public void testTableWithEqualityDelete(String fileFormat, boolean joinRewriteEnabled)
throws Exception
{
Session session = deleteAsJoinEnabled(joinRewriteEnabled);
String tableName = "test_v2_equality_delete" + randomTableSuffix();
assertUpdate(session, "CREATE TABLE " + tableName + " with (\"write.format.default\" = '" + fileFormat + "') AS SELECT * FROM tpch.tiny.nation", 25);
Table icebergTable = updateTable(tableName);
writeEqualityDeleteToNationTable(icebergTable, ImmutableMap.of("regionkey", 1L));
testCheckDeleteFiles(icebergTable, 1, ImmutableList.of(EQUALITY_DELETES));
assertQuery(session, "SELECT * FROM " + tableName, "SELECT * FROM nation WHERE regionkey != 1");
assertQuery(session, "SELECT nationkey FROM " + tableName, "SELECT nationkey FROM nation WHERE regionkey != 1");
}
@Test(dataProvider = "equalityDeleteOptions")
public void testTableWithEqualityDeleteDifferentColumnOrder(String fileFormat, boolean joinRewriteEnabled)
throws Exception
{
Session session = deleteAsJoinEnabled(joinRewriteEnabled);
// Specify equality delete filter with different column order from table definition
String tableName = "test_v2_equality_delete_different_order" + randomTableSuffix();
assertUpdate(session, "CREATE TABLE " + tableName + " with (\"write.format.default\" = '" + fileFormat + "') AS SELECT * FROM tpch.tiny.nation", 25);
Table icebergTable = updateTable(tableName);
writeEqualityDeleteToNationTable(icebergTable, ImmutableMap.of("regionkey", 1L, "name", "ARGENTINA"));
assertQuery(session, "SELECT * FROM " + tableName, "SELECT * FROM nation WHERE name != 'ARGENTINA'");
// natiokey is before the equality delete column in the table schema, comment is after
assertQuery(session, "SELECT nationkey, comment FROM " + tableName, "SELECT nationkey, comment FROM nation WHERE name != 'ARGENTINA'");
}
@Test(dataProvider = "equalityDeleteOptions")
public void testTableWithEqualityDeleteAndGroupByAndLimit(String fileFormat, boolean joinRewriteEnabled)
throws Exception
{
Session session = deleteAsJoinEnabled(joinRewriteEnabled);
Session disable = deleteAsJoinEnabled(false);
// Specify equality delete filter with different column order from table definition
String tableName = "test_v2_equality_delete_different_order" + randomTableSuffix();
assertUpdate(session, "CREATE TABLE " + tableName + " with (\"write.format.default\" = '" + fileFormat + "') AS SELECT * FROM tpch.tiny.nation", 25);
Table icebergTable = updateTable(tableName);
writeEqualityDeleteToNationTable(icebergTable, ImmutableMap.of("regionkey", 1L, "name", "ARGENTINA"));
assertQuery(session, "SELECT * FROM " + tableName, "SELECT * FROM nation WHERE name != 'ARGENTINA'");
// Test group by
assertQuery(session, "SELECT nationkey FROM " + tableName + " group by nationkey", "VALUES(0),(2),(3),(4),(5),(6),(7),(8),(9),(10),(11),(12),(13),(14),(15),(16),(17),(18),(19),(20),(21),(22),(23),(24)");
// Test group by with limit
assertQueryWithSameQueryRunner(session, "SELECT nationkey FROM " + tableName + " group by nationkey limit 100", disable);
}
@Test(dataProvider = "equalityDeleteOptions")
public void testTableWithPositionDeleteAndEqualityDelete(String fileFormat, boolean joinRewriteEnabled)
throws Exception
{
Session session = deleteAsJoinEnabled(joinRewriteEnabled);
String tableName = "test_v2_row_delete_" + randomTableSuffix();
assertUpdate("CREATE TABLE " + tableName + " with (\"write.format.default\" = '" + fileFormat + "') AS SELECT * FROM tpch.tiny.nation order by nationkey", 25);
Table icebergTable = updateTable(tableName);
String dataFilePath = (String) computeActual("SELECT file_path FROM \"" + tableName + "$files\" LIMIT 1").getOnlyValue();
writePositionDeleteToNationTable(icebergTable, dataFilePath, 0);
testCheckDeleteFiles(icebergTable, 1, ImmutableList.of(POSITION_DELETES));
assertQuery(session, "SELECT count(*) FROM " + tableName, "VALUES 24");
assertQuery(session, "SELECT nationkey FROM " + tableName, "SELECT nationkey FROM nation WHERE nationkey != 0");
writeEqualityDeleteToNationTable(icebergTable, ImmutableMap.of("regionkey", 1L));
testCheckDeleteFiles(icebergTable, 2, ImmutableList.of(POSITION_DELETES, EQUALITY_DELETES));
assertQuery(session, "SELECT * FROM " + tableName, "SELECT * FROM nation WHERE regionkey != 1 AND nationkey != 0");
assertQuery(session, "SELECT nationkey FROM " + tableName, "SELECT nationkey FROM nation WHERE regionkey != 1 AND nationkey != 0");
}
@Test(dataProvider = "equalityDeleteOptions")
public void testPartitionedTableWithEqualityDelete(String fileFormat, boolean joinRewriteEnabled)
throws Exception
{
Session session = deleteAsJoinEnabled(joinRewriteEnabled);
String tableName = "test_v2_equality_delete" + randomTableSuffix();
assertUpdate(session, "CREATE TABLE " + tableName + " WITH (partitioning = ARRAY['nationkey'], \"write.format.default\" = '" + fileFormat + "') " + " AS SELECT * FROM tpch.tiny.nation", 25);
Table icebergTable = updateTable(tableName);
writeEqualityDeleteToNationTable(icebergTable, ImmutableMap.of("regionkey", 1L, "nationkey", 1L), ImmutableMap.of("nationkey", 1L));
assertQuery(session, "SELECT * FROM " + tableName, "SELECT * FROM nation WHERE regionkey != 1 or nationkey != 1 ");
assertQuery(session, "SELECT nationkey, comment FROM " + tableName, "SELECT nationkey, comment FROM nation WHERE regionkey != 1 or nationkey != 1");
}
@Test(dataProvider = "equalityDeleteOptions")
public void testEqualityDeletesWithPartitions(String fileFormat, boolean joinRewriteEnabled)
throws Exception
{
Session session = deleteAsJoinEnabled(joinRewriteEnabled);
String tableName = "test_v2_row_delete_" + randomTableSuffix();
assertUpdate("CREATE TABLE " + tableName + " with (\"write.format.default\" = '" + fileFormat + "', partitioning = ARRAY['nationkey']) AS SELECT * FROM tpch.tiny.nation order by nationkey", 25);
Table icebergTable = updateTable(tableName);
List<Long> partitions = Arrays.asList(1L, 2L, 3L, 17L, 24L);
for (long nationKey : partitions) {
writeEqualityDeleteToNationTable(icebergTable, ImmutableMap.of("regionkey", 1L), ImmutableMap.of("nationkey", nationKey));
}
testCheckDeleteFiles(icebergTable, partitions.size(), partitions.stream().map(i -> EQUALITY_DELETES).collect(Collectors.toList()));
assertQuery(session, "SELECT * FROM " + tableName, "SELECT * FROM nation WHERE regionkey != 1");
assertQuery(session, "SELECT nationkey FROM " + tableName, "SELECT nationkey FROM nation WHERE regionkey != 1");
assertQuery(session, "SELECT name FROM " + tableName, "SELECT name FROM nation WHERE regionkey != 1");
}
@Test(dataProvider = "equalityDeleteOptions")
public void testEqualityDeletesWithHiddenPartitions(String fileFormat, boolean joinRewriteEnabled)
throws Exception
{
Session session = deleteAsJoinEnabled(joinRewriteEnabled);
String tableName = "test_v2_row_delete_" + randomTableSuffix();
assertUpdate("CREATE TABLE " + tableName + " with (\"write.format.default\" = '" + fileFormat + "', partitioning = ARRAY['bucket(nationkey,100)']) AS SELECT * FROM tpch.tiny.nation order by nationkey", 25);
Table icebergTable = updateTable(tableName);
PartitionTransforms.ColumnTransform columnTransform = PartitionTransforms.getColumnTransform(icebergTable.spec().fields().get(0), BIGINT);
BlockBuilder builder = BIGINT.createFixedSizeBlockBuilder(5);
List<Long> partitions = Arrays.asList(1L, 2L, 3L, 17L, 24L);
partitions.forEach(p -> BIGINT.writeLong(builder, p));
Block partitionsBlock = columnTransform.getTransform().apply(builder.build());
for (int i = 0; i < partitionsBlock.getPositionCount(); i++) {
writeEqualityDeleteToNationTable(icebergTable, ImmutableMap.of("regionkey", 1L), ImmutableMap.of("nationkey_bucket", partitionsBlock.getInt(i)));
String updatedPartitions = partitions.stream().limit(i + 1).map(Object::toString).collect(Collectors.joining(",", "(", ")"));
assertQuery(session, "SELECT * FROM " + tableName, "SELECT * FROM nation WHERE NOT(regionkey = 1 AND nationkey IN " + updatedPartitions + ")");
}
testCheckDeleteFiles(icebergTable, partitions.size(), partitions.stream().map(i -> EQUALITY_DELETES).collect(Collectors.toList()));
assertQuery(session, "SELECT * FROM " + tableName, "SELECT * FROM nation WHERE regionkey != 1");
assertQuery(session, "SELECT nationkey FROM " + tableName, "SELECT nationkey FROM nation WHERE regionkey != 1");
assertQuery(session, "SELECT name FROM " + tableName, "SELECT name FROM nation WHERE regionkey != 1");
}
@Test(dataProvider = "equalityDeleteOptions")
public void testEqualityDeletesWithCompositeKey(String fileFormat, boolean joinRewriteEnabled)
throws Exception
{
Session session = deleteAsJoinEnabled(joinRewriteEnabled);
String tableName = "test_v2_row_delete_" + randomTableSuffix();
assertUpdate("CREATE TABLE " + tableName + " with (\"write.format.default\" = '" + fileFormat + "') AS SELECT * FROM tpch.tiny.nation order by nationkey", 25);
Table icebergTable = updateTable(tableName);
writeEqualityDeleteToNationTable(icebergTable, ImmutableMap.of("regionkey", 0L, "name", "ALGERIA"));
testCheckDeleteFiles(icebergTable, 1, Stream.generate(() -> EQUALITY_DELETES).limit(1).collect(Collectors.toList()));
assertQuery(session, "SELECT * FROM " + tableName, "SELECT * FROM nation WHERE NOT(regionkey = 0 AND name = 'ALGERIA')");
assertQuery(session, "SELECT nationkey FROM " + tableName, "SELECT nationkey FROM nation WHERE NOT(regionkey = 0 AND name = 'ALGERIA')");
assertQuery(session, "SELECT name FROM " + tableName, "SELECT name FROM nation WHERE NOT(regionkey = 0 AND name = 'ALGERIA')");
}
@Test(dataProvider = "equalityDeleteOptions")
public void testEqualityDeletesWithMultipleDeleteSchemas(String fileFormat, boolean joinRewriteEnabled)
throws Exception
{
Session session = deleteAsJoinEnabled(joinRewriteEnabled);
String tableName = "test_v2_row_delete_" + randomTableSuffix();
assertUpdate("CREATE TABLE " + tableName + " with (\"write.format.default\" = '" + fileFormat + "') AS SELECT * FROM tpch.tiny.nation order by nationkey", 25);
Table icebergTable = updateTable(tableName);
writeEqualityDeleteToNationTable(icebergTable, ImmutableMap.of("regionkey", 1L));
writeEqualityDeleteToNationTable(icebergTable, ImmutableMap.of("nationkey", 10L));
writeEqualityDeleteToNationTable(icebergTable, ImmutableMap.of("regionkey", 2L, "nationkey", 9L));
testCheckDeleteFiles(icebergTable, 3, Stream.generate(() -> EQUALITY_DELETES).limit(3).collect(Collectors.toList()));
assertQuery(session, "SELECT \"$data_sequence_number\", regionkey, nationkey FROM \"" + tableName + "$equality_deletes\"", "VALUES (2, 1, null), (3, null, 10), (4, 2, 9)");
assertQuery(session, "SELECT * FROM " + tableName, "SELECT * FROM nation WHERE NOT(regionkey = 2 AND nationkey = 9) AND nationkey <> 10 AND regionkey <> 1");
assertQuery(session, "SELECT regionkey FROM " + tableName, "SELECT regionkey FROM nation WHERE NOT(regionkey = 2 AND nationkey = 9) AND nationkey <> 10 AND regionkey <> 1");
assertQuery(session, "SELECT nationkey FROM " + tableName, "SELECT nationkey FROM nation WHERE NOT(regionkey = 2 AND nationkey = 9) AND nationkey <> 10 AND regionkey <> 1");
}
@Test(dataProvider = "equalityDeleteOptions")
public void testTableWithPositionDeletesAndEqualityDeletes(String fileFormat, boolean joinRewriteEnabled)
throws Exception
{
Session session = deleteAsJoinEnabled(joinRewriteEnabled);
String tableName = "test_v2_row_delete_" + randomTableSuffix();
assertUpdate(session, "CREATE TABLE " + tableName + " with (\"write.format.default\" = '" + fileFormat + "') AS SELECT * FROM tpch.tiny.nation order by nationkey", 25);
Table icebergTable = updateTable(tableName);
String dataFilePath = (String) computeActual("SELECT file_path FROM \"" + tableName + "$files\" LIMIT 1").getOnlyValue();
writePositionDeleteToNationTable(icebergTable, dataFilePath, 0);
testCheckDeleteFiles(icebergTable, 1, ImmutableList.of(POSITION_DELETES));
assertQuery(session, "SELECT count(*) FROM " + tableName, "VALUES 24");
assertQuery(session, "SELECT nationkey FROM " + tableName, "SELECT nationkey FROM nation WHERE nationkey != 0");
writeEqualityDeleteToNationTable(icebergTable, ImmutableMap.of("regionkey", 1L));
testCheckDeleteFiles(icebergTable, 2, ImmutableList.of(POSITION_DELETES, EQUALITY_DELETES));
assertQuery(session, "SELECT count(*) FROM " + tableName, "VALUES 19");
assertQuery(session, "SELECT nationkey FROM " + tableName, "SELECT nationkey FROM nation WHERE regionkey != 1 AND nationkey != 0");
writePositionDeleteToNationTable(icebergTable, dataFilePath, 7);
testCheckDeleteFiles(icebergTable, 3, ImmutableList.of(POSITION_DELETES, POSITION_DELETES, EQUALITY_DELETES));
assertQuery(session, "SELECT count(*) FROM " + tableName, "VALUES 18");
assertQuery(session, "SELECT nationkey FROM " + tableName, "SELECT nationkey FROM nation WHERE regionkey != 1 AND nationkey NOT IN (0, 7)");
writeEqualityDeleteToNationTable(icebergTable, ImmutableMap.of("regionkey", 2L));
testCheckDeleteFiles(icebergTable, 4, ImmutableList.of(POSITION_DELETES, POSITION_DELETES, EQUALITY_DELETES, EQUALITY_DELETES));
assertQuery(session, "SELECT count(*) FROM " + tableName, "VALUES 13");
assertQuery(session, "SELECT nationkey FROM " + tableName, "SELECT nationkey FROM nation WHERE regionkey NOT IN (1, 2) AND nationkey NOT IN (0, 7)");
}
@Test(dataProvider = "equalityDeleteOptions")
public void testEqualityDeletesWithHiddenPartitionsEvolution(String fileFormat, boolean joinRewriteEnabled)
throws Exception
{
Session session = deleteAsJoinEnabled(joinRewriteEnabled);
String tableName = "test_v2_row_delete_" + randomTableSuffix();
assertUpdate("CREATE TABLE " + tableName + "(a int, b varchar) WITH (\"write.format.default\" = '" + fileFormat + "')");
assertUpdate("INSERT INTO " + tableName + " VALUES (1, '1001'), (2, '1002'), (3, '1003')", 3);
Table icebergTable = updateTable(tableName);
writeEqualityDeleteToNationTable(icebergTable, ImmutableMap.of("a", 2));
assertQuery(session, "SELECT * FROM " + tableName, "VALUES (1, '1001'), (3, '1003')");
assertUpdate("ALTER TABLE " + tableName + " ADD COLUMN c int WITH (partitioning = 'bucket(2)')");
assertUpdate("INSERT INTO " + tableName + " VALUES (6, '1004', 1), (6, '1006', 2)", 2);
icebergTable = updateTable(tableName);
PartitionTransforms.ColumnTransform columnTransform = PartitionTransforms.getColumnTransform(icebergTable.spec().fields().get(0), INTEGER);
BlockBuilder builder = BIGINT.createFixedSizeBlockBuilder(1);
List<Integer> partitions = Arrays.asList(1, 2);
partitions.forEach(p -> BIGINT.writeLong(builder, p));
Block partitionsBlock = columnTransform.getTransform().apply(builder.build());
writeEqualityDeleteToNationTable(icebergTable, ImmutableMap.of("a", 6, "c", 2),
ImmutableMap.of("c_bucket", partitionsBlock.getInt(0)));
assertQuery(session, "SELECT * FROM " + tableName, "VALUES (1, '1001', NULL), (3, '1003', NULL), (6, '1004', 1)");
assertUpdate("ALTER TABLE " + tableName + " ADD COLUMN d varchar WITH (partitioning = 'truncate(2)')");
assertUpdate("INSERT INTO " + tableName + " VALUES (6, '1004', 1, 'th001'), (6, '1006', 2, 'th002')", 2);
icebergTable = updateTable(tableName);
writeEqualityDeleteToNationTable(icebergTable, ImmutableMap.of("a", 6, "c", 1),
ImmutableMap.of("c_bucket", partitionsBlock.getInt(0), "d_trunc", "th"));
testCheckDeleteFiles(icebergTable, 3, ImmutableList.of(EQUALITY_DELETES, EQUALITY_DELETES, EQUALITY_DELETES));
assertQuery(session, "SELECT * FROM " + tableName, "VALUES (1, '1001', NULL, NULL), (3, '1003', NULL, NULL), (6, '1004', 1, NULL), (6, '1006', 2, 'th002')");
}
@Test(dataProvider = "equalityDeleteOptions")
public void testEqualityDeletesWithDataSequenceNumber(String fileFormat, boolean joinRewriteEnabled)
throws Exception
{
Session session = deleteAsJoinEnabled(joinRewriteEnabled);
String tableName = "test_v2_row_delete_" + randomTableSuffix();
String tableName2 = "test_v2_row_delete_2_" + randomTableSuffix();
assertUpdate("CREATE TABLE " + tableName + "(id int, data varchar) WITH (\"write.format.default\" = '" + fileFormat + "')");
assertUpdate("INSERT INTO " + tableName + " VALUES (1, 'a')", 1);
assertUpdate("CREATE TABLE " + tableName2 + "(id int, data varchar) WITH (\"write.format.default\" = '" + fileFormat + "')");
assertUpdate("INSERT INTO " + tableName2 + " VALUES (1, 'a')", 1);
Table icebergTable = updateTable(tableName);
writeEqualityDeleteToNationTable(icebergTable, ImmutableMap.of("id", 1));
Table icebergTable2 = updateTable(tableName2);
writeEqualityDeleteToNationTable(icebergTable2, ImmutableMap.of("id", 1));
assertUpdate("INSERT INTO " + tableName + " VALUES (1, 'b'), (2, 'a'), (3, 'a')", 3);
assertUpdate("INSERT INTO " + tableName2 + " VALUES (1, 'b'), (2, 'a'), (3, 'a')", 3);
assertQuery(session, "SELECT * FROM " + tableName, "VALUES (1, 'b'), (2, 'a'), (3, 'a')");
assertQuery(session, "SELECT \"$data_sequence_number\", * FROM " + tableName, "VALUES (3, 1, 'b'), (3, 2, 'a'), (3, 3, 'a')");
assertQuery(session, "SELECT a.\"$data_sequence_number\", b.\"$data_sequence_number\" from " + tableName + " as a, " + tableName2 + " as b where a.id = b.id",
"VALUES (3, 3), (3, 3), (3, 3)");
}
@Test
public void testPartShowStatsWithFilters()
{
assertQuerySucceeds("CREATE TABLE showstatsfilters (i int) WITH (partitioning = ARRAY['i'])");
assertQuerySucceeds("INSERT INTO showstatsfilters VALUES 1, 2, 3, 4, 5, 6, 7, 7, 7, 7");
assertQuerySucceeds("ANALYZE showstatsfilters");
MaterializedResult statsTable = getQueryRunner().execute("SHOW STATS for showstatsfilters");
MaterializedRow columnStats = statsTable.getMaterializedRows().get(0);
assertEquals(columnStats.getField(2), 7.0); // ndvs;
assertEquals(columnStats.getField(3), 0.0); // nulls
assertEquals(columnStats.getField(5), "1"); // min
assertEquals(columnStats.getField(6), "7"); // max
// EQ
statsTable = getQueryRunner().execute("SHOW STATS for (SELECT * FROM showstatsfilters WHERE i = 7)");
columnStats = statsTable.getMaterializedRows().get(0);
assertEquals(columnStats.getField(5), "7"); // min
assertEquals(columnStats.getField(6), "7"); // max
assertEquals(columnStats.getField(3), 0.0); // nulls
assertEquals((double) columnStats.getField(2), 7.0d * (4.0d / 10.0d), 1E-8); // ndvs;
// LT
statsTable = getQueryRunner().execute("SHOW STATS for (SELECT * FROM showstatsfilters WHERE i < 7)");
columnStats = statsTable.getMaterializedRows().get(0);
assertEquals(columnStats.getField(5), "1"); // min
assertEquals(columnStats.getField(6), "6"); // max
assertEquals(columnStats.getField(3), 0.0); // nulls
assertEquals((double) columnStats.getField(2), 7.0d * (6.0d / 10.0d), 1E-8); // ndvs;
// LTE
statsTable = getQueryRunner().execute("SHOW STATS for (SELECT * FROM showstatsfilters WHERE i <= 7)");
columnStats = statsTable.getMaterializedRows().get(0);
assertEquals(columnStats.getField(5), "1"); // min
assertEquals(columnStats.getField(6), "7"); // max
assertEquals(columnStats.getField(3), 0.0); // nulls
assertEquals(columnStats.getField(2), 7.0d); // ndvs;
// GT
statsTable = getQueryRunner().execute("SHOW STATS for (SELECT * FROM showstatsfilters WHERE i > 7)");
columnStats = statsTable.getMaterializedRows().get(0);
assertEquals(columnStats.getField(5), null); // min
assertEquals(columnStats.getField(6), null); // max
assertEquals(columnStats.getField(3), null); // nulls
assertEquals(columnStats.getField(2), null); // ndvs;
// GTE
statsTable = getQueryRunner().execute("SHOW STATS for (SELECT * FROM showstatsfilters WHERE i >= 7)");
columnStats = statsTable.getMaterializedRows().get(0);
assertEquals(columnStats.getField(5), "7"); // min
assertEquals(columnStats.getField(6), "7"); // max
assertEquals(columnStats.getField(3), 0.0); // nulls
assertEquals((double) columnStats.getField(2), 7.0d * (4.0d / 10.0d), 1E-8); // ndvs;
assertQuerySucceeds("DROP TABLE showstatsfilters");
}
@Test
public void testWithSortOrder()
throws IOException
{
String tableName = "test_create_sorted_table_" + randomTableSuffix();
assertUpdate("CREATE TABLE " + tableName + "(id int, emp_name varchar) WITH (sorted_by = ARRAY['id'])");
assertUpdate("INSERT INTO " + tableName + " VALUES (5, 'EEEE'), (3, 'CCCC'), (1, 'AAAA'), (2, 'BBBB'), (4,'DDDD')", 5);
for (Object filePath : computeActual("SELECT file_path from \"" + tableName + "$files\"").getOnlyColumnAsSet()) {
assertTrue(isFileSorted(String.valueOf(filePath), "id", "ASC"));
}
}
@Test
public void testWithDescSortOrder()
throws IOException
{
String tableName = "test_create_sorted_table_" + randomTableSuffix();
assertUpdate("CREATE TABLE " + tableName + "(id int, emp_name varchar) WITH (sorted_by = ARRAY['id DESC'])");
assertUpdate("INSERT INTO " + tableName + " VALUES (5, 'EEEE'), (3, 'CCCC'), (1, 'AAAA'), (2, 'BBBB'), (4,'DDDD')", 5);
for (Object filePath : computeActual("SELECT file_path from \"" + tableName + "$files\"").getOnlyColumnAsSet()) {
assertTrue(isFileSorted(String.valueOf(filePath), "id", "DESC"));
}
}
@Test
public void testWithoutSortOrder()
throws IOException
{
String tableName = "test_create_unsorted_table_" + randomTableSuffix();
assertUpdate("CREATE TABLE " + tableName + "(id int, emp_name varchar)");
assertUpdate("INSERT INTO " + tableName + " VALUES (1, 'EEEE'), (3, 'CCCC'), (5, 'AAAA'), (2, 'BBBB'), (4,'DDDD')", 5);
for (Object filePath : computeActual("SELECT file_path from \"" + tableName + "$files\"").getOnlyColumnAsSet()) {
assertFalse(isFileSorted(String.valueOf(filePath), "id", ""));
}
}
public boolean isFileSorted(String path, String sortColumnName, String sortOrder)
throws IOException
{
Path filePath = new Path(path);
Configuration configuration = getHdfsEnvironment().getConfiguration(new HdfsContext(SESSION), filePath);
try (ParquetReader<Group> reader = ParquetReader.builder(new GroupReadSupport(), new Path(path))
.withConf(configuration)
.build()) {
Group record;
ParquetMetadata readFooter = ParquetFileReader.readFooter(configuration, filePath);
MessageType schema = readFooter.getFileMetaData().getSchema();
Double previousValue = null;
while ((record = reader.read()) != null) {
for (int i = 0; i < record.getType().getFieldCount(); i++) {
String columnName = schema.getFieldName(i);
if (columnName.equals(sortColumnName)) {
Double currentValue = Double.parseDouble(record.getValueToString(i, i));
if (previousValue != null) {
boolean valueNotSorted = ("ASC".equals(sortOrder) || "".equals(sortOrder))
? currentValue.compareTo(previousValue) < 0
: currentValue.compareTo(previousValue) > 0;
if (valueNotSorted) {
return false;
}
}
previousValue = currentValue;
}
}
}
}
return true;
}
@Test
public void testMetadataDeleteOnUnPartitionedTableWithDeleteFiles()
{
String tableName = "test_v2_row_delete_" + randomTableSuffix();
try {
assertUpdate("CREATE TABLE " + tableName + "(a int, b varchar) WITH (\"format-version\" = '2', \"write.delete.mode\" = 'merge-on-read')");
assertUpdate("INSERT INTO " + tableName + " VALUES (1, '1001'), (2, '1002'), (3, '1003')", 3);
// execute row level deletion
assertUpdate("DELETE FROM " + tableName + " WHERE b in ('1002', '1003')", 2);
assertQuery("SELECT * FROM " + tableName, "VALUES (1, '1001')");
Table icebergTable = loadTable(tableName);
assertHasDataFiles(icebergTable.currentSnapshot(), 1);
assertHasDeleteFiles(icebergTable.currentSnapshot(), 1);
// execute whole table metadata deletion
assertUpdate("DELETE FROM " + tableName, 1);
assertQuery("SELECT count(*) FROM " + tableName, "VALUES 0");
icebergTable = loadTable(tableName);
assertHasDataFiles(icebergTable.currentSnapshot(), 0);
assertHasDeleteFiles(icebergTable.currentSnapshot(), 0);
}
finally {
assertUpdate("DROP TABLE IF EXISTS " + tableName);
}
}
@DataProvider(name = "validHistogramTypes")
public Object[][] validHistogramTypesDataProvider()
{
return new Object[][] {
// types not supported in Iceberg connector, but that histogram could support
// {TINYINT, new String[]{"1", "2", "10"}},
// {SMALLINT, new String[]{"1", "2", "10"}},
// {TIMESTAMP_WITH_TIME_ZONE, new String[]{"now() + interval '1' hour", "now() + interval '2' hour"}},
// iceberg stores microsecond precision but presto calculates on millisecond precision
// need a fix to properly convert for the optimizer.
// {TIMESTAMP, new String[] {"localtimestamp + interval '1' hour", "localtimestamp + interval '2' hour"}},
// {TIME, new String[] {"localtime", "localtime + interval '1' hour"}},
// supported types
{INTEGER, new String[] {"1", "5", "9"}},
{BIGINT, new String[] {"2", "4", "6"}},
{DOUBLE, new String[] {"1.0", "3.1", "4.6"}},
// short decimal
{DECIMAL.createType(ImmutableList.of(TypeParameter.of(2L), TypeParameter.of(1L))), new String[] {"0.0", "3.0", "4.0"}},
// long decimal
{DECIMAL.createType(ImmutableList.of(TypeParameter.of(38L), TypeParameter.of(1L))), new String[] {"0.0", "3.0", "4.0"}},
{DATE, new String[] {"date '2024-01-01'", "date '2024-03-30'", "date '2024-05-30'"}},
{REAL, new String[] {"1.0", "2.0", "3.0"}},
};
}
/**
* Verifies that the histogram is returned after ANALYZE for a variety of types
*/
@Test(dataProvider = "validHistogramTypes")
public void testHistogramStorage(Type type, Object[] values)
{
try {
Session session = Session.builder(getSession())
.setSystemProperty(OPTIMIZER_USE_HISTOGRAMS, "true")
.build();
assertQuerySucceeds("DROP TABLE IF EXISTS create_histograms");
assertQuerySucceeds(String.format("CREATE TABLE create_histograms (c %s)", type.getDisplayName()));
assertQuerySucceeds(String.format("INSERT INTO create_histograms VALUES %s", Joiner.on(", ").join(values)));
assertQuerySucceeds(session, "ANALYZE create_histograms");
TableStatistics tableStatistics = getTableStats("create_histograms");
Map<String, IcebergColumnHandle> nameToHandle = tableStatistics.getColumnStatistics().keySet()
.stream().map(IcebergColumnHandle.class::cast)
.collect(Collectors.toMap(BaseHiveColumnHandle::getName, identity()));
assertNotNull(nameToHandle.get("c"));
IcebergColumnHandle handle = nameToHandle.get("c");
ColumnStatistics statistics = tableStatistics.getColumnStatistics().get(handle);
assertTrue(statistics.getHistogram().isPresent());
}
finally {
assertQuerySucceeds("DROP TABLE IF EXISTS create_histograms");
}
}
@Test
public void testMetadataDeleteOnPartitionedTableWithDeleteFiles()
{
String tableName = "test_v2_row_delete_" + randomTableSuffix();
try {
assertUpdate("CREATE TABLE " + tableName + "(a int, b varchar) WITH (\"format-version\" = '2', \"write.delete.mode\" = 'merge-on-read', partitioning = ARRAY['a'])");
assertUpdate("INSERT INTO " + tableName + " VALUES (1, '1001'), (2, '1002'), (3, '1003')", 3);
// execute row level deletion
assertUpdate("DELETE FROM " + tableName + " WHERE b in ('1002', '1003')", 2);
assertQuery("SELECT * FROM " + tableName, "VALUES (1, '1001')");
Table icebergTable = loadTable(tableName);
assertHasDataFiles(icebergTable.currentSnapshot(), 3);
assertHasDeleteFiles(icebergTable.currentSnapshot(), 2);
// execute metadata deletion with filter
assertUpdate("DELETE FROM " + tableName + " WHERE a in (2, 3)", 0);
assertQuery("SELECT * FROM " + tableName, "VALUES (1, '1001')");
icebergTable = loadTable(tableName);
assertHasDataFiles(icebergTable.currentSnapshot(), 1);
assertHasDeleteFiles(icebergTable.currentSnapshot(), 0);
// execute whole table metadata deletion
assertUpdate("DELETE FROM " + tableName, 1);
assertQuery("SELECT count(*) FROM " + tableName, "VALUES 0");
icebergTable = loadTable(tableName);
assertHasDataFiles(icebergTable.currentSnapshot(), 0);
assertHasDeleteFiles(icebergTable.currentSnapshot(), 0);
}
finally {
assertUpdate("DROP TABLE IF EXISTS " + tableName);
}
}
@Test
public void testMetadataDeleteOnV2MorTableWithEmptyUnsupportedSpecs()
{
String tableName = "test_empty_partition_spec_table";
try {
// Create a table with no partition
assertUpdate("CREATE TABLE " + tableName + " (a INTEGER, b VARCHAR) WITH (\"format-version\" = '2', \"write.delete.mode\" = 'merge-on-read')");
// Do not insert data, and evaluate the partition spec by adding a partition column `c`
assertUpdate("ALTER TABLE " + tableName + " ADD COLUMN c INTEGER WITH (partitioning = 'identity')");
// Insert data under the new partition spec
assertUpdate("INSERT INTO " + tableName + " VALUES (1, '1001', 1), (2, '1002', 2), (3, '1003', 3), (4, '1004', 4)", 4);
Table icebergTable = loadTable(tableName);
assertHasDataFiles(icebergTable.currentSnapshot(), 4);
assertHasDeleteFiles(icebergTable.currentSnapshot(), 0);
// Do metadata delete on partition column `c`, because the initial partition spec contains no data
assertUpdate("DELETE FROM " + tableName + " WHERE c in (1, 3)", 2);
assertQuery("SELECT * FROM " + tableName, "VALUES (2, '1002', 2), (4, '1004', 4)");
icebergTable = loadTable(tableName);
assertHasDataFiles(icebergTable.currentSnapshot(), 2);
assertHasDeleteFiles(icebergTable.currentSnapshot(), 0);
}
finally {
assertUpdate("DROP TABLE IF EXISTS " + tableName);
}
}
@Test
public void testMetadataDeleteOnV2MorTableWithUnsupportedSpecsWhoseDataAllDeleted()
{
String tableName = "test_data_deleted_partition_spec_table";
try {
// Create a table with partition column `a`, and insert some data under this partition spec
assertUpdate("CREATE TABLE " + tableName + " (a INTEGER, b VARCHAR) WITH (\"format-version\" = '2', \"write.delete.mode\" = 'merge-on-read', partitioning = ARRAY['a'])");
assertUpdate("INSERT INTO " + tableName + " VALUES (1, '1001'), (2, '1002')", 2);
Table icebergTable = loadTable(tableName);
assertHasDataFiles(icebergTable.currentSnapshot(), 2);
assertHasDeleteFiles(icebergTable.currentSnapshot(), 0);
// Evaluate the partition spec by adding a partition column `c`, and insert some data under the new partition spec
assertUpdate("ALTER TABLE " + tableName + " ADD COLUMN c INTEGER WITH (partitioning = 'identity')");
assertUpdate("INSERT INTO " + tableName + " VALUES (3, '1003', 3), (4, '1004', 4), (5, '1005', 5)", 3);
icebergTable = loadTable(tableName);
assertHasDataFiles(icebergTable.currentSnapshot(), 5);
assertHasDeleteFiles(icebergTable.currentSnapshot(), 0);
// Execute row level delete with filter on column `c`, because we have data with old partition spec
assertUpdate("DELETE FROM " + tableName + " WHERE c > 3", 2);
icebergTable = loadTable(tableName);
assertHasDataFiles(icebergTable.currentSnapshot(), 5);
assertHasDeleteFiles(icebergTable.currentSnapshot(), 2);
// Do metadata delete on column `a`, because all partition specs contains partition column `a`
assertUpdate("DELETE FROM " + tableName + " WHERE a in (1, 2)", 2);
icebergTable = loadTable(tableName);
assertHasDataFiles(icebergTable.currentSnapshot(), 3);
assertHasDeleteFiles(icebergTable.currentSnapshot(), 2);
// Then do metadata delete on column `c`, because the old partition spec contains no data now
assertUpdate("DELETE FROM " + tableName + " WHERE c > 3", 0);
assertQuery("SELECT * FROM " + tableName, "VALUES (3, '1003', 3)");
icebergTable = loadTable(tableName);
assertHasDataFiles(icebergTable.currentSnapshot(), 1);
assertHasDeleteFiles(icebergTable.currentSnapshot(), 0);
}
finally {
assertUpdate("DROP TABLE IF EXISTS " + tableName);
}
}
@Test
public void testMetadataVersionsMaintainingProperties()
throws Exception
{
String settingTableName = "test_table_with_setting_properties";
String defaultTableName = "test_table_with_default_setting_properties";
try {
// Create a table with setting properties that maintain only 1 previous metadata version in current metadata,
// and delete unuseful metadata files after each commit
assertUpdate("CREATE TABLE " + settingTableName + " (a INTEGER, b VARCHAR)" +
" WITH (\"write.metadata.previous-versions-max\" = 1, \"write.metadata.delete-after-commit.enabled\" = true)");
// Create a table with default table properties that maintain 100 previous metadata versions in current metadata,
// and do not automatically delete any metadata files
assertUpdate("CREATE TABLE " + defaultTableName + " (a INTEGER, b VARCHAR)");
assertUpdate("INSERT INTO " + settingTableName + " VALUES (1, '1001'), (2, '1002')", 2);
assertUpdate("INSERT INTO " + settingTableName + " VALUES (3, '1003'), (4, '1004')", 2);
assertUpdate("INSERT INTO " + settingTableName + " VALUES (5, '1005'), (6, '1006')", 2);
assertUpdate("INSERT INTO " + settingTableName + " VALUES (7, '1007'), (8, '1008')", 2);
assertUpdate("INSERT INTO " + settingTableName + " VALUES (9, '1009'), (10, '1010')", 2);
assertUpdate("INSERT INTO " + defaultTableName + " VALUES (1, '1001'), (2, '1002')", 2);
assertUpdate("INSERT INTO " + defaultTableName + " VALUES (3, '1003'), (4, '1004')", 2);
assertUpdate("INSERT INTO " + defaultTableName + " VALUES (5, '1005'), (6, '1006')", 2);
assertUpdate("INSERT INTO " + defaultTableName + " VALUES (7, '1007'), (8, '1008')", 2);
assertUpdate("INSERT INTO " + defaultTableName + " VALUES (9, '1009'), (10, '1010')", 2);
Table settingTable = loadTable(settingTableName);
TableMetadata settingTableMetadata = ((BaseTable) settingTable).operations().current();
// Table `test_table_with_setting_properties`'s current metadata only record 1 previous metadata file
assertEquals(settingTableMetadata.previousFiles().size(), 1);
Table defaultTable = loadTable(defaultTableName);
TableMetadata defaultTableMetadata = ((BaseTable) defaultTable).operations().current();
// Table `test_table_with_default_setting_properties`'s current metadata record all 5 previous metadata files
assertEquals(defaultTableMetadata.previousFiles().size(), 5);
FileSystem fileSystem = getHdfsEnvironment().getFileSystem(new HdfsContext(SESSION), new Path(settingTable.location()));
// Table `test_table_with_setting_properties`'s all existing metadata files count is 2
FileStatus[] settingTableFiles = fileSystem.listStatus(new Path(settingTable.location(), "metadata"), name -> name.getName().contains(METADATA_FILE_EXTENSION));
assertEquals(settingTableFiles.length, 2);
// Table `test_table_with_default_setting_properties`'s all existing metadata files count is 6
FileStatus[] defaultTableFiles = fileSystem.listStatus(new Path(defaultTable.location(), "metadata"), name -> name.getName().contains(METADATA_FILE_EXTENSION));
assertEquals(defaultTableFiles.length, 6);
}
finally {
assertUpdate("DROP TABLE IF EXISTS " + settingTableName);
}
}
@DataProvider(name = "batchReadEnabled")
public Object[] batchReadEnabledReader()
{
return new Object[] {true, false};
}
private Session batchReadEnabledEnabledSession(boolean batchReadEnabled)
{
return Session.builder(getQueryRunner().getDefaultSession())
.setCatalogSessionProperty(ICEBERG_CATALOG, PARQUET_BATCH_READ_OPTIMIZATION_ENABLED, String.valueOf(batchReadEnabled))
.build();
}
@Test(dataProvider = "batchReadEnabled")
public void testDecimal(boolean decimalVectorReaderEnabled)
{
String tableName = "test_decimal_vector_reader";
try {
// Create a table with decimal column
assertUpdate("CREATE TABLE " + tableName + " (short_decimal_column_int32 decimal(5,2), short_decimal_column_int64 decimal(16, 4), long_decimal_column decimal(19, 5))");
String values = " VALUES (cast(-1.00 as decimal(5,2)), null, cast(9999999999.123 as decimal(19, 5)))," +
"(cast(1.00 as decimal(5,2)), cast(121321 as decimal(16, 4)), null)," +
"(cast(-1.00 as decimal(5,2)), cast(-1215789.45 as decimal(16, 4)), cast(1234584.21 as decimal(19, 5)))," +
"(cast(1.00 as decimal(5,2)), cast(-67867878.12 as decimal(16, 4)), cast(-9999999999.123 as decimal(19, 5)))";
// Insert data to table
assertUpdate("INSERT INTO " + tableName + values, 4);
Session session = batchReadEnabledEnabledSession(decimalVectorReaderEnabled);
assertQuery(session, "SELECT * FROM " + tableName, values);
}
finally {
assertUpdate("DROP TABLE IF EXISTS " + tableName);
}
}
@Test
public void testRefsTable()
{
assertUpdate("CREATE TABLE test_table_references (id1 BIGINT, id2 BIGINT)");
assertUpdate("INSERT INTO test_table_references VALUES (0, 00), (1, 10), (2, 20)", 3);
Table icebergTable = loadTable("test_table_references");
icebergTable.manageSnapshots().createBranch("testBranch").commit();
assertUpdate("INSERT INTO test_table_references VALUES (3, 30), (4, 40), (5, 50)", 3);
assertEquals(icebergTable.refs().size(), 2);
icebergTable.manageSnapshots().createTag("testTag", icebergTable.currentSnapshot().snapshotId()).commit();
assertEquals(icebergTable.refs().size(), 3);
assertUpdate("INSERT INTO test_table_references VALUES (6, 60), (7, 70), (8, 80)", 3);
assertQuery("SELECT count(*) FROM \"test_table_references$refs\"", "VALUES 3");
assertQuery("SELECT count(*) FROM test_table_references FOR SYSTEM_VERSION AS OF 'testBranch'", "VALUES 3");
assertQuery("SELECT count(*) FROM test_table_references FOR SYSTEM_VERSION AS OF 'testTag'", "VALUES 6");
assertQuery("SELECT count(*) FROM test_table_references FOR SYSTEM_VERSION AS OF 'main'", "VALUES 9");
assertQuery("SELECT * from \"test_table_references$refs\" where name = 'testBranch' and type = 'BRANCH'",
format("VALUES('%s', '%s', %s, %s, %s, %s)",
"testBranch",
"BRANCH",
icebergTable.refs().get("testBranch").snapshotId(),
icebergTable.refs().get("testBranch").maxRefAgeMs(),
icebergTable.refs().get("testBranch").minSnapshotsToKeep(),
icebergTable.refs().get("testBranch").maxSnapshotAgeMs()));
assertQuery("SELECT * from \"test_table_references$refs\" where type = 'TAG'",
format("VALUES('%s', '%s', %s, %s, %s, %s)",
"testTag",
"TAG",
icebergTable.refs().get("testTag").snapshotId(),
icebergTable.refs().get("testTag").maxRefAgeMs(),
icebergTable.refs().get("testTag").minSnapshotsToKeep(),
icebergTable.refs().get("testTag").maxSnapshotAgeMs()));
// test branch & tag access when schema is changed
assertUpdate("ALTER TABLE test_table_references DROP COLUMN id2");
assertUpdate("ALTER TABLE test_table_references ADD COLUMN id2_new BIGINT");
// since current table schema is changed from col id2 to id2_new
assertQuery("SELECT * FROM test_table_references where id1=1", "VALUES(1, NULL)");
assertQuery("SELECT * FROM test_table_references FOR SYSTEM_VERSION AS OF 'testBranch' where id1=1", "VALUES(1, NULL)");
// Currently Presto returns current table schema for any previous snapshot access https://github.com/prestodb/presto/issues/23553
// otherwise querying a tag uses the snapshot's schema https://iceberg.apache.org/docs/nightly/branching/#schema-selection-with-branches-and-tags
assertQuery("SELECT * FROM test_table_references FOR SYSTEM_VERSION AS OF 'testTag' where id1=1", "VALUES(1, NULL)");
}
@Test
public void testAllIcebergType()
{
String tmpTableName = "test_vector_reader_all_type";
try {
assertUpdate(format("" +
"CREATE TABLE %s ( " +
" c_boolean BOOLEAN, " +
" c_int INT," +
" c_bigint BIGINT, " +
" c_double DOUBLE, " +
" c_real REAL, " +
" c_date DATE, " +
" c_timestamp TIMESTAMP, " +
" c_varchar VARCHAR, " +
" c_varbinary VARBINARY, " +
" c_uuid UUID, " +
" c_array ARRAY(BIGINT), " +
" c_map MAP(VARCHAR, INT), " +
" c_row ROW(a INT, b VARCHAR) " +
") WITH (\"write.format.default\" = 'PARQUET')", tmpTableName));
assertUpdate(format("" +
"INSERT INTO %s " +
"SELECT c_boolean, c_int, c_bigint, c_double, c_real, c_date, c_timestamp, c_varchar, c_varbinary, c_uuid, c_array, c_map, c_row " +
"FROM ( " +
" VALUES " +
" (null, null, null, null, null, null, null, null, null, null, null, null, null), " +
" (true, INT '1245', BIGINT '1', DOUBLE '2.2', REAL '-24.124', DATE '2024-07-29', TIMESTAMP '2012-08-08 01:00', CAST('abc1' AS VARCHAR), to_ieee754_64(1), CAST('4ae71336-e44b-39bf-b9d2-752e234818a5' as UUID), sequence(0, 10), MAP(ARRAY['aaa', 'bbbb'], ARRAY[1, 2]), CAST(ROW(1, 'AAA') AS ROW(a INT, b VARCHAR)))," +
" (false, INT '-1245', BIGINT '-1', DOUBLE '2.3', REAL '243215.435', DATE '2024-07-29', TIMESTAMP '2012-09-09 00:00', CAST('cba2' AS VARCHAR), to_ieee754_64(4), CAST('4ae71336-e44b-39bf-b9d2-752e234818a5' as UUID), sequence(30, 35), MAP(ARRAY['ccc', 'bbbb'], ARRAY[-1, -2]), CAST(ROW(-1, 'AAA') AS ROW(a INT, b VARCHAR))) " +
") AS x (c_boolean, c_int, c_bigint, c_double, c_real, c_date, c_timestamp, c_varchar, c_varbinary, c_uuid, c_array, c_map, c_row)", tmpTableName), 3);
Session decimalVectorReaderEnabled = batchReadEnabledEnabledSession(true);
Session decimalVectorReaderDisable = batchReadEnabledEnabledSession(false);
assertQueryWithSameQueryRunner(decimalVectorReaderEnabled, "SELECT * FROM " + tmpTableName, decimalVectorReaderDisable);
}
finally {
assertUpdate("DROP TABLE IF EXISTS " + tmpTableName);
}
}
@Test
public void testExpireSnapshotWithDeletedEntries()
{
try {
assertUpdate("create table test_expire_snapshot_with_deleted_entry (a int, b varchar) with (partitioning = ARRAY['a'])");
assertUpdate("insert into test_expire_snapshot_with_deleted_entry values(1, '1001'), (1, '1002'), (2, '2001'), (2, '2002')", 4);
Table table = loadTable("test_expire_snapshot_with_deleted_entry");
long snapshotId1 = table.currentSnapshot().snapshotId();
// Execute metadata deletion which delete whole files from table metadata
assertUpdate("delete from test_expire_snapshot_with_deleted_entry where a = 1", 2);
table = loadTable("test_expire_snapshot_with_deleted_entry");
long snapshotId2 = table.currentSnapshot().snapshotId();
assertUpdate("insert into test_expire_snapshot_with_deleted_entry values(1, '1003'), (2, '2003'), (3, '3003')", 3);
table = loadTable("test_expire_snapshot_with_deleted_entry");
long snapshotId3 = table.currentSnapshot().snapshotId();
assertQuery("select snapshot_id from \"test_expire_snapshot_with_deleted_entry$snapshots\"", "values " + snapshotId1 + ", " + snapshotId2 + ", " + snapshotId3);
// Expire `snapshotId2` which contains a DELETED entry to delete a data file which is still referenced by `snapshotId1`
assertUpdate(format("call iceberg.system.expire_snapshots(schema => '%s', table_name => '%s', snapshot_ids => ARRAY[%d])", "tpch", "test_expire_snapshot_with_deleted_entry", snapshotId2));
assertQuery("select snapshot_id from \"test_expire_snapshot_with_deleted_entry$snapshots\"", "values " + snapshotId1 + ", " + snapshotId3);
// Execute time travel query successfully
assertQuery("select * from test_expire_snapshot_with_deleted_entry for version as of " + snapshotId1, "values(1, '1001'), (1, '1002'), (2, '2001'), (2, '2002')");
assertQuery("select * from test_expire_snapshot_with_deleted_entry for version as of " + snapshotId3, "values(1, '1003'), (2, '2001'), (2, '2002'), (2, '2003'), (3, '3003')");
}
finally {
assertUpdate("drop table if exists test_expire_snapshot_with_deleted_entry");
}
}
private void testPathHiddenColumn()
{
assertEquals(computeActual("SELECT \"$path\", * FROM test_hidden_columns").getRowCount(), 2);
// Fetch one of the file paths and use it in a filter
String filePath = (String) computeActual("SELECT \"$path\" from test_hidden_columns LIMIT 1").getOnlyValue();
assertEquals(
computeActual(format("SELECT * from test_hidden_columns WHERE \"$path\"='%s'", filePath)).getRowCount(),
1);
assertEquals(
(Long) computeActual(format("SELECT count(*) from test_hidden_columns WHERE \"$path\"='%s'", filePath))
.getOnlyValue(),
1L);
// Filter for $path that doesn't exist.
assertEquals(
(Long) computeActual(format("SELECT count(*) from test_hidden_columns WHERE \"$path\"='%s'", "non-existent-path"))
.getOnlyValue(),
0L);
}
private void testDataSequenceNumberHiddenColumn()
{
assertEquals(computeActual("SELECT \"$data_sequence_number\", * FROM test_hidden_columns").getRowCount(), 2);
// Fetch one of the data sequence numbers and use it in a filter
Long dataSequenceNumber = (Long) computeActual("SELECT \"$data_sequence_number\" from test_hidden_columns LIMIT 1").getOnlyValue();
assertEquals(
computeActual(format("SELECT * from test_hidden_columns WHERE \"$data_sequence_number\"=%d", dataSequenceNumber)).getRowCount(),
1);
assertEquals(
(Long) computeActual(format("SELECT count(*) from test_hidden_columns WHERE \"$data_sequence_number\"=%d", dataSequenceNumber))
.getOnlyValue(),
1L);
// Filter for $data_sequence_number that doesn't exist.
assertEquals(
(Long) computeActual(format("SELECT count(*) from test_hidden_columns WHERE \"$data_sequence_number\"=%d", 1000))
.getOnlyValue(),
0L);
}
@Test
public void testHiddenColumns()
{
assertUpdate("DROP TABLE IF EXISTS test_hidden_columns");
assertUpdate("CREATE TABLE test_hidden_columns AS SELECT * FROM tpch.tiny.region WHERE regionkey=0", 1);
assertUpdate("INSERT INTO test_hidden_columns SELECT * FROM tpch.tiny.region WHERE regionkey=1", 1);
testPathHiddenColumn();
testDataSequenceNumberHiddenColumn();
}
@Test
public void testDeletedHiddenColumn()
{
assertUpdate("DROP TABLE IF EXISTS test_deleted");
assertUpdate("CREATE TABLE test_deleted AS SELECT * FROM tpch.tiny.region WHERE regionkey=0", 1);
assertUpdate("INSERT INTO test_deleted SELECT * FROM tpch.tiny.region WHERE regionkey=1", 1);
assertQuery("SELECT \"$deleted\" FROM test_deleted", format("VALUES %s, %s", "false", "false"));
assertUpdate("DELETE FROM test_deleted WHERE regionkey=1", 1);
assertEquals(computeActual("SELECT * FROM test_deleted").getRowCount(), 1);
assertQuery("SELECT \"$deleted\" FROM test_deleted ORDER BY \"$deleted\"", format("VALUES %s, %s", "false", "true"));
}
@Test
public void testDeleteFilePathHiddenColumn()
{
assertUpdate("DROP TABLE IF EXISTS test_delete_file_path");
assertUpdate("CREATE TABLE test_delete_file_path AS SELECT * FROM tpch.tiny.region WHERE regionkey=0", 1);
assertUpdate("INSERT INTO test_delete_file_path SELECT * FROM tpch.tiny.region WHERE regionkey=1", 1);
assertQuery("SELECT \"$delete_file_path\" FROM test_delete_file_path", format("VALUES %s, %s", "NULL", "NULL"));
assertUpdate("DELETE FROM test_delete_file_path WHERE regionkey=1", 1);
assertEquals(computeActual("SELECT * FROM test_delete_file_path").getRowCount(), 1);
assertEquals(computeActual("SELECT \"$delete_file_path\" FROM test_delete_file_path").getRowCount(), 2);
assertUpdate("DELETE FROM test_delete_file_path WHERE regionkey=0", 1);
computeActual("SELECT \"$delete_file_path\" FROM test_delete_file_path").getMaterializedRows().forEach(row -> {
assertEquals(row.getFieldCount(), 1);
assertNotNull(row.getField(0));
});
}
@Test(dataProvider = "equalityDeleteOptions")
public void testEqualityDeletesWithDeletedHiddenColumn(String fileFormat, boolean joinRewriteEnabled)
throws Exception
{
Session session = deleteAsJoinEnabled(joinRewriteEnabled);
String tableName = "test_v2_row_delete_" + randomTableSuffix();
assertUpdate("CREATE TABLE " + tableName + "(id int, data varchar) WITH (\"write.format.default\" = '" + fileFormat + "')");
assertUpdate("INSERT INTO " + tableName + " VALUES (1, 'a')", 1);
Table icebergTable = updateTable(tableName);
writeEqualityDeleteToNationTable(icebergTable, ImmutableMap.of("id", 1));
assertUpdate("INSERT INTO " + tableName + " VALUES (1, 'b'), (2, 'a'), (3, 'a')", 3);
assertQuery(session, "SELECT * FROM " + tableName, "VALUES (1, 'b'), (2, 'a'), (3, 'a')");
assertQuery(session, "SELECT \"$deleted\", * FROM " + tableName,
"VALUES (true, 1, 'a'), (false, 1, 'b'), (false, 2, 'a'), (false, 3, 'a')");
}
@DataProvider(name = "pushdownFilterEnabled")
public Object[][] pushdownFilterEnabledProvider()
{
return new Object[][] {
{true},
{false}
};
}
@Test(dataProvider = "pushdownFilterEnabled")
public void testFilterWithRemainingPredicate(boolean pushdownFilterEnabled)
{
Session session = Session.builder(getSession())
.setCatalogSessionProperty("iceberg", PUSHDOWN_FILTER_ENABLED, Boolean.toString(pushdownFilterEnabled))
.build();
int rowCount = 100;
String pairs = Joiner.on(", ")
.join(IntStream.range(0, rowCount)
.map(idx -> idx * 2)
.mapToObj(idx -> "(" + idx + ", " + (idx + 1) + ")")
.iterator());
assertQuerySucceeds("CREATE TABLE test_filterstats_remaining_predicate(i int, j int)");
assertUpdate(format("INSERT INTO test_filterstats_remaining_predicate VALUES %s", pairs), rowCount);
assertQuerySucceeds("ANALYZE test_filterstats_remaining_predicate");
@Language("SQL") String query = "SELECT * FROM test_filterstats_remaining_predicate WHERE (i = 10 AND j = 11) OR (i = 20 AND j = 21)";
if (pushdownFilterEnabled) {
assertPlan(session, query,
output(
exchange(
tableScan("test_filterstats_remaining_predicate")
.withOutputRowCount(1))));
}
else {
assertPlan(session, query,
anyTree(
filter(tableScan("test_filterstats_remaining_predicate")
.withOutputRowCount(100))
.withOutputRowCount(1)));
}
assertQuerySucceeds("DROP TABLE test_filterstats_remaining_predicate");
}
public void testStatisticsFileCache()
throws Exception
{
assertQuerySucceeds("CREATE TABLE test_statistics_file_cache(i int)");
assertUpdate("INSERT INTO test_statistics_file_cache VALUES 1, 2, 3, 4, 5", 5);
assertQuerySucceeds("ANALYZE test_statistics_file_cache");
TransactionId transactionId = getQueryRunner().getTransactionManager().beginTransaction(false);
Session session = Session.builder(getSession())
.setTransactionId(transactionId)
.build();
Optional<TableHandle> handle = MetadataUtil.getOptionalTableHandle(session,
getQueryRunner().getTransactionManager(),
QualifiedObjectName.valueOf(session.getCatalog().get(), session.getSchema().get(), "test_statistics_file_cache"),
Optional.empty());
CatalogMetadata catalogMetadata = getQueryRunner().getTransactionManager()
.getCatalogMetadata(session.getTransactionId().get(), handle.get().getConnectorId());
// There isn't an easy way to access the cache internally, so use some reflection to grab it
Field delegate = ClassLoaderSafeConnectorMetadata.class.getDeclaredField("delegate");
delegate.setAccessible(true);
IcebergAbstractMetadata metadata = (IcebergAbstractMetadata) delegate.get(catalogMetadata.getMetadataFor(handle.get().getConnectorId()));
CacheStats initial = metadata.statisticsFileCache.stats();
assertEquals(metadata.statisticsFileCache.stats().minus(initial).hitCount(), 0);
TableStatistics stats = getTableStats("test_statistics_file_cache", Optional.empty(), getSession(), Optional.of(ImmutableList.of("i")));
assertEquals(stats.getRowCount().getValue(), 5);
assertEquals(metadata.statisticsFileCache.stats().minus(initial).missCount(), 1);
getTableStats("test_statistics_file_cache", Optional.empty(), getSession(), Optional.of(ImmutableList.of("i")));
assertEquals(metadata.statisticsFileCache.stats().minus(initial).missCount(), 1);
assertEquals(metadata.statisticsFileCache.stats().minus(initial).hitCount(), 1);
getQueryRunner().getTransactionManager().asyncAbort(transactionId);
getQueryRunner().execute("DROP TABLE test_statistics_file_cache");
}
@Test(dataProvider = "batchReadEnabled")
public void testUuidRoundTrip(boolean batchReadEnabled)
{
Session session = batchReadEnabledEnabledSession(batchReadEnabled);
try {
assertQuerySucceeds("CREATE TABLE uuid_roundtrip(u uuid)");
UUID uuid = UUID.fromString("11111111-2222-3333-4444-555555555555");
assertUpdate(format("INSERT INTO uuid_roundtrip VALUES CAST('%s' as uuid)", uuid), 1);
assertQuery(session, "SELECT CAST(u as varchar) FROM uuid_roundtrip", format("VALUES '%s'", uuid));
}
finally {
assertQuerySucceeds("DROP TABLE uuid_roundtrip");
}
}
@Test(dataProvider = "batchReadEnabled")
public void testUuidFilters(boolean batchReadEnabled)
{
Session session = batchReadEnabledEnabledSession(batchReadEnabled);
try {
int uuidCount = 100;
assertQuerySucceeds("CREATE TABLE uuid_filters(u uuid)");
List<UUID> uuids = IntStream.range(0, uuidCount)
.mapToObj(idx -> {
ByteBuffer buf = ByteBuffer.allocate(16);
if (idx % 2 == 0) {
buf.putLong(0L);
buf.putLong(idx);
}
else {
buf.putLong(idx);
buf.putLong(0L);
}
buf.flip();
return new UUID(buf.getLong(), buf.getLong());
})
.collect(Collectors.toList());
// shuffle to make sure parquet metadata stats are updated properly even with
// out-of-order values
Collections.shuffle(uuids);
assertUpdate(format("INSERT INTO uuid_filters VALUES %s",
Joiner.on(", ").join(uuids.stream().map(uuid -> format("CAST('%s' as uuid)", uuid)).iterator())), 100);
assertQuery(session, format("SELECT CAST(u as varchar) FROM uuid_filters WHERE u = CAST('%s' as uuid)", uuids.get(0)), format("VALUES '%s'", uuids.get(0)));
// sort so we can easily get lowest and highest
uuids.sort(Comparator.naturalOrder());
assertQuery(session, format("SELECT COUNT(*) FROM uuid_filters WHERE u >= CAST('%s' as uuid)", uuids.get(0)), format("VALUES %d", uuidCount));
assertQuery(session, format("SELECT COUNT(*) FROM uuid_filters WHERE u > CAST('%s' as uuid)", uuids.get(0)), format("VALUES %d", uuidCount - 1));
assertQuery(session, format("SELECT COUNT(*) FROM uuid_filters WHERE u <= CAST('%s' as uuid)", uuids.get(uuidCount - 1)), format("VALUES %d", uuidCount));
assertQuery(session, format("SELECT COUNT(*) FROM uuid_filters WHERE u < CAST('%s' as uuid)", uuids.get(uuidCount - 1)), format("VALUES %d", uuidCount - 1));
assertQuery(session, format("SELECT COUNT(*) FROM uuid_filters WHERE u < CAST('%s' as uuid)", uuids.get(50)), format("VALUES %d", 50));
assertQuery(session, format("SELECT COUNT(*) FROM uuid_filters WHERE u <= CAST('%s' as uuid)", uuids.get(50)), format("VALUES %d", 51));
}
finally {
assertQuerySucceeds("DROP TABLE uuid_filters");
}
}
@DataProvider(name = "parquetVersions")
public Object[][] parquetVersionsDataProvider()
{
return new Object[][] {
{PARQUET_1_0},
{PARQUET_2_0},
};
}
@Test(dataProvider = "parquetVersions")
public void testBatchReadOnTimeType(WriterVersion writerVersion)
{
Session parquetVersionSession = Session.builder(getSession())
.setCatalogSessionProperty("iceberg", "parquet_writer_version", writerVersion.toString())
.build();
assertQuerySucceeds(parquetVersionSession, "CREATE TABLE time_batch_read(i int, t time)");
assertUpdate(parquetVersionSession, "INSERT INTO time_batch_read VALUES (0, time '1:00:00.000'), (1, time '1:00:00.000'), (2, time '1:00:00.000')", 3);
Session disabledBatchRead = Session.builder(parquetVersionSession)
.setCatalogSessionProperty("iceberg", PARQUET_BATCH_READ_OPTIMIZATION_ENABLED, "false")
.build();
Session enabledBatchRead = Session.builder(parquetVersionSession)
.setCatalogSessionProperty("iceberg", PARQUET_BATCH_READ_OPTIMIZATION_ENABLED, "true")
.build();
@Language("SQL") String query = "SELECT t FROM time_batch_read ORDER BY i LIMIT 1";
MaterializedResult disabledResult = getQueryRunner().execute(disabledBatchRead, query);
MaterializedResult enabledResult = getQueryRunner().execute(enabledBatchRead, query);
assertEquals(disabledResult, enabledResult);
assertQuerySucceeds("DROP TABLE time_batch_read");
}
public void testAllNullHistogramColumn()
{
try {
Session session = Session.builder(getSession())
.setSystemProperty(OPTIMIZER_USE_HISTOGRAMS, "true")
.build();
assertQuerySucceeds("DROP TABLE IF EXISTS histogram_all_nulls");
assertQuerySucceeds("CREATE TABLE histogram_all_nulls (c bigint)");
TableStatistics stats = getTableStats("histogram_all_nulls", Optional.empty(), session);
assertFalse(stats.getColumnStatistics().values().stream().findFirst().isPresent());
assertUpdate("INSERT INTO histogram_all_nulls VALUES NULL, NULL, NULL, NULL, NULL", 5);
stats = getTableStats("histogram_all_nulls", Optional.empty(), session);
assertFalse(stats.getColumnStatistics().values().stream().findFirst()
.get().getHistogram().isPresent());
assertQuerySucceeds(session, "ANALYZE histogram_all_nulls");
stats = getTableStats("histogram_all_nulls", Optional.empty(), session);
assertFalse(stats.getColumnStatistics().values().stream().findFirst()
.get().getHistogram().isPresent());
}
finally {
assertQuerySucceeds("DROP TABLE IF EXISTS histogram_all_nulls");
}
}
@Test(dataProvider = "validHistogramTypes")
public void testHistogramShowStats(Type type, Object[] values)
{
try {
Session session = Session.builder(getSession())
.setSystemProperty(OPTIMIZER_USE_HISTOGRAMS, "true")
.build();
assertQuerySucceeds("DROP TABLE IF EXISTS create_histograms");
assertQuerySucceeds(String.format("CREATE TABLE show_histograms (c %s)", type.getDisplayName()));
assertQuerySucceeds(String.format("INSERT INTO show_histograms VALUES %s", Joiner.on(", ").join(values)));
assertQuerySucceeds(session, "ANALYZE show_histograms");
TableStatistics tableStatistics = getTableStats("show_histograms", Optional.empty(), session, Optional.empty());
Map<String, Optional<ConnectorHistogram>> histogramByColumnName = tableStatistics.getColumnStatistics()
.entrySet()
.stream()
.collect(toImmutableMap(
entry -> ((IcebergColumnHandle) entry.getKey()).getName(),
entry -> entry.getValue().getHistogram()));
MaterializedResult stats = getQueryRunner().execute("SHOW STATS for show_histograms");
stats.getMaterializedRows()
.forEach(row -> {
String name = (String) row.getField(0);
String histogram = (String) row.getField(7);
assertEquals(Optional.ofNullable(histogramByColumnName.get(name))
.flatMap(identity())
.map(Objects::toString).orElse(null),
histogram);
});
}
finally {
assertQuerySucceeds("DROP TABLE IF EXISTS show_histograms");
}
}
/**
* Verifies that when the users opts-in to using histograms that the
* optimizer estimates reflect the actual dataset for a variety of filter
* types (LTE, GT, EQ, NE) on a non-uniform data distribution
*/
@Test
public void testHistogramsUsedInOptimization()
{
Session histogramSession = Session.builder(getSession())
.setSystemProperty(OPTIMIZER_USE_HISTOGRAMS, "true")
.build();
// standard-normal distribution should have vastly different estimates than uniform at the tails (e.g. -3, +3)
NormalDistribution dist = new NormalDistribution(0, 1);
double[] values = dist.sample(1000);
Arrays.sort(values);
try {
assertQuerySucceeds("DROP TABLE IF EXISTS histogram_validation");
assertQuerySucceeds("CREATE TABLE histogram_validation (c double)");
assertQuerySucceeds(String.format("INSERT INTO histogram_validation VALUES %s", Joiner.on(", ").join(Arrays.stream(values).iterator())));
assertQuerySucceeds(histogramSession, "ANALYZE histogram_validation");
Consumer<Double> assertFilters = (value) -> {
// use Math.abs because if the value isn't found, the returned value of binary
// search is (- insert index). The absolute value index tells us roughly how
// many records would have been returned regardless of if the actual value is in the
// dataset
double estimatedRowCount = Math.abs(Arrays.binarySearch(values, value));
assertPlan(histogramSession, "SELECT * FROM histogram_validation WHERE c <= " + value,
output(anyTree(tableScan("histogram_validation"))).withApproximateOutputRowCount(estimatedRowCount, 25));
// check that inverse filter equals roughly the inverse number of rows
assertPlan(histogramSession, "SELECT * FROM histogram_validation WHERE c > " + value,
output(anyTree(tableScan("histogram_validation"))).withApproximateOutputRowCount(Math.max(0.0, values.length - estimatedRowCount), 25));
// having an exact random double value from the distribution exist more than once is exceedingly rare.
// the histogram calculation should return 1 (and the inverse) in both situations
assertPlan(histogramSession, "SELECT * FROM histogram_validation WHERE c = " + value,
output(anyTree(tableScan("histogram_validation"))).withApproximateOutputRowCount(1.0, 25));
assertPlan(histogramSession, "SELECT * FROM histogram_validation WHERE c != " + value,
output(anyTree(tableScan("histogram_validation"))).withApproximateOutputRowCount(values.length - 1, 25));
};
assertFilters.accept(values[1]); // choose 1 greater than the min value
assertFilters.accept(-2.0); // should be very unlikely to generate a distribution where all values > -2.0
assertFilters.accept(-1.0);
assertFilters.accept(0.0);
assertFilters.accept(1.0);
assertFilters.accept(2.0); // should be very unlikely to generate a distribution where all values < 2.0
assertFilters.accept(values[values.length - 2]); // choose 1 less than the max value
}
finally {
assertQuerySucceeds("DROP TABLE IF EXISTS histogram_validation");
}
}
/**
* Verifies that the data in the histogram matches the mins/maxs of the values
* in the table when created
*/
@Test(dataProvider = "validHistogramTypes")
public void testHistogramReconstruction(Type type, Object[] values)
{
try {
Session session = Session.builder(getSession())
.setSystemProperty(OPTIMIZER_USE_HISTOGRAMS, "true")
.build();
assertQuerySucceeds("DROP TABLE IF EXISTS verify_histograms");
assertQuerySucceeds(String.format("CREATE TABLE verify_histograms (c %s)", type.getDisplayName()));
assertQuerySucceeds(String.format("INSERT INTO verify_histograms VALUES %s", Joiner.on(", ").join(values)));
assertQuerySucceeds(session, "ANALYZE verify_histograms");
TableStatistics tableStatistics = getTableStats("verify_histograms", Optional.empty(), session, Optional.empty());
Map<String, IcebergColumnHandle> nameToHandle = tableStatistics.getColumnStatistics().keySet()
.stream().map(IcebergColumnHandle.class::cast)
.collect(Collectors.toMap(BaseHiveColumnHandle::getName, identity()));
assertNotNull(nameToHandle.get("c"));
IcebergColumnHandle handle = nameToHandle.get("c");
ColumnStatistics statistics = tableStatistics.getColumnStatistics().get(handle);
ConnectorHistogram histogram = statistics.getHistogram().get();
DoubleRange range = statistics.getRange().get();
double min = range.getMin();
double max = range.getMax();
assertEquals(histogram.inverseCumulativeProbability(0.0).getValue(), min);
assertEquals(histogram.inverseCumulativeProbability(1.0).getValue(), max);
}
finally {
assertQuerySucceeds("DROP TABLE IF EXISTS verify_histograms");
}
}
@Test
public void testInformationSchemaQueries()
{
assertQuerySucceeds("CREATE SCHEMA ICEBERG.TEST_SCHEMA1");
assertQuerySucceeds("CREATE SCHEMA ICEBERG.TEST_SCHEMA2");
assertQuerySucceeds("CREATE TABLE ICEBERG.TEST_SCHEMA1.ICEBERG_T1(i int)");
assertQuerySucceeds("CREATE TABLE ICEBERG.TEST_SCHEMA1.ICEBERG_T2(i int)");
assertQuerySucceeds("CREATE TABLE ICEBERG.TEST_SCHEMA2.ICEBERG_T3(i int)");
assertQuerySucceeds("CREATE TABLE ICEBERG.TEST_SCHEMA2.ICEBERG_T4(i int)");
assertQuery("SELECT table_name FROM iceberg.information_schema.tables WHERE table_schema ='test_schema1'", "VALUES 'iceberg_t1', 'iceberg_t2'");
assertQuery("SELECT table_name FROM iceberg.information_schema.tables WHERE table_schema ='test_schema2'", "VALUES 'iceberg_t3', 'iceberg_t4'");
//query on non-existing schema
assertQueryReturnsEmptyResult("SELECT table_name FROM iceberg.information_schema.tables WHERE table_schema = 'NON_EXISTING_SCHEMA'");
assertQueryReturnsEmptyResult("SELECT table_name FROM iceberg.information_schema.tables WHERE table_schema = 'non_existing_schema'");
assertQuerySucceeds("DROP TABLE ICEBERG.TEST_SCHEMA1.ICEBERG_T1");
assertQuerySucceeds("DROP TABLE ICEBERG.TEST_SCHEMA1.ICEBERG_T2");
assertQuerySucceeds("DROP TABLE ICEBERG.TEST_SCHEMA2.ICEBERG_T3");
assertQuerySucceeds("DROP TABLE ICEBERG.TEST_SCHEMA2.ICEBERG_T4");
assertQuerySucceeds("DROP SCHEMA ICEBERG.TEST_SCHEMA1");
assertQuerySucceeds("DROP SCHEMA ICEBERG.TEST_SCHEMA2");
}
@Test
public void testUpdateWithPredicates()
{
String tableName = "test_update_predicates_" + randomTableSuffix();
assertUpdate("CREATE TABLE " + tableName + "(id int, full_name varchar(20))");
assertUpdate("INSERT INTO " + tableName + " VALUES (1, 'aaaa'), (2, 'bbbb'), (3, 'cccc')", 3);
// update single row on id
assertUpdate("UPDATE " + tableName + " SET full_name = 'aaaa AAAA' WHERE id = 1", 1);
assertQuery("SELECT id, full_name FROM " + tableName, "VALUES (1, 'aaaa AAAA'), (2, 'bbbb'), (3, 'cccc')");
// update single row with compound predicate
assertUpdate("UPDATE " + tableName + " SET full_name = 'aaaa' WHERE id = 1 and full_name='aaaa AAAA'", 1);
assertQuery("SELECT id, full_name FROM " + tableName, "VALUES (1, 'aaaa'), (2, 'bbbb'), (3, 'cccc')");
// update multiple rows at once
assertUpdate("UPDATE " + tableName + " SET full_name = 'ssss' WHERE id != 1 ", 2);
assertQuery("SELECT id, full_name FROM " + tableName, "VALUES (1, 'aaaa'), (2, 'ssss'), (3, 'ssss')");
// update with filter matching no rows
assertUpdate("UPDATE " + tableName + " SET full_name = 'ssss' WHERE id > 4 ", 0);
assertQuery("SELECT id, full_name FROM " + tableName, "VALUES (1, 'aaaa'), (2, 'ssss'), (3, 'ssss')");
// add column and update null values
assertUpdate("ALTER TABLE " + tableName + " ADD column email varchar");
assertQuery("SELECT id, full_name, email FROM " + tableName, "VALUES (1, 'aaaa', NULL), (2, 'ssss', NULL), (3, 'ssss', NULL)");
assertUpdate("INSERT INTO " + tableName + " VALUES (4, 'dddd', 'ddd@gmail.com')", 1);
assertUpdate("UPDATE " + tableName + " SET email = 'abc@gmail.com' WHERE id in(1, 2, 3)", 3);
assertQuery("SELECT id, full_name, email FROM " + tableName, "VALUES (1, 'aaaa', 'abc@gmail.com'), (2, 'ssss', 'abc@gmail.com'), (3, 'ssss', 'abc@gmail.com'), (4, 'dddd', 'ddd@gmail.com') ");
// set all values to null
assertUpdate("UPDATE " + tableName + " SET email = NULL", 4);
assertQuery("SELECT email FROM " + tableName + " WHERE email is NULL", "VALUES NULL, NULL, NULL, NULL");
// update nulls to non-null
assertUpdate("UPDATE " + tableName + " SET email = 'test@gmail.com' WHERE email is NULL", 4);
assertQuery("SELECT count(*) FROM " + tableName + " WHERE email is not NULL", "VALUES 4");
}
@Test
public void testUpdateAllValues()
{
String tableName = "test_update_all_values_" + randomTableSuffix();
assertUpdate("CREATE TABLE " + tableName + "(a int, b int, c int)");
assertUpdate("INSERT INTO " + tableName + " VALUES (1, 2, 3), (11, 12, 13), (21, 22, 23)", 3);
assertUpdate("UPDATE " + tableName + " SET a = a + 1, b = b - 1, c = c * 2", 3);
assertQuery("SELECT * FROM " + tableName, "VALUES (2, 1, 6), (12, 11, 26), (22, 21, 46)");
// update multiple columns with predicate
assertUpdate("UPDATE " + tableName + " SET a = a + 1, b = b - 1, c = c * 2 WHERE a = 2 AND b = 1", 1);
assertQuery("SELECT * FROM " + tableName, "VALUES (3, 0, 12), (12, 11, 26), (22, 21, 46)");
}
@Test
public void testUpdateRowType()
{
String tableName = "test_update_row_type" + randomTableSuffix();
assertUpdate("CREATE TABLE " + tableName + "(int_t INT, row_t ROW(f1 INT, f2 INT))");
assertUpdate("INSERT INTO " + tableName + " VALUES (1, ROW(2, 3)), (11, ROW(12, 13)), (21, ROW(22, 23))", 3);
assertUpdate("UPDATE " + tableName + " SET int_t = int_t - 1 WHERE row_t.f2 = 3", 1);
assertQuery("SELECT int_t, row_t.f1, row_t.f2 FROM " + tableName, "VALUES (0, 2, 3), (11, 12, 13), (21, 22, 23)");
assertUpdate("UPDATE " + tableName + " SET row_t = ROW(row_t.f1, row_t.f2 + 1) WHERE int_t = 11", 1);
assertQuery("SELECT int_t, row_t.f1, row_t.f2 FROM " + tableName, "VALUES (0, 2, 3), (11, 12, 14), (21, 22, 23)");
assertUpdate("UPDATE " + tableName + " SET row_t = ROW(row_t.f1 * 2, row_t.f2) WHERE row_t.f1 = 22", 1);
assertQuery("SELECT int_t, row_t.f1, row_t.f2 FROM " + tableName, "VALUES (0, 2, 3), (11, 12, 14), (21, 44, 23)");
}
public void testUpdateOnPartitionTable()
{
String tableName = "test_update_partition_column_" + randomTableSuffix();
assertUpdate("CREATE TABLE " + tableName + "(a int, b varchar(10))" + "with(partitioning=ARRAY['a'])");
assertUpdate("INSERT INTO " + tableName + " VALUES (1, 'first'), (1, 'second'), (2, 'third')", 3);
// update all rows on partition column
assertUpdate("UPDATE " + tableName + " SET a = a + 1", 3);
assertQuery("SELECT a, b FROM " + tableName, "VALUES (2,'first'), (2,'second'), (3,'third')");
assertUpdate("UPDATE " + tableName + " SET a = a + (CASE b WHEN 'first' THEN 1 ELSE 0 END)", 3);
assertQuery("SELECT a, b FROM " + tableName, "VALUES (3,'first'), (2,'second'), (3,'third')");
// update on partition column with predicate
assertUpdate("UPDATE " + tableName + " SET a = a + 1 WHERE b = 'second'", 1);
assertQuery("SELECT a, b FROM " + tableName, "VALUES (3,'first'), (3,'second'), (3,'third')");
// update non-partition column on a partitioned table without a predicate
assertUpdate("UPDATE " + tableName + " SET a = a + 1 WHERE b = 'second'", 1);
assertQuery("SELECT a, b FROM " + tableName, "VALUES (3,'first'), (4,'second'), (3,'third')");
// update non-partition column on a partitioned table with a predicate
assertUpdate("UPDATE " + tableName + " SET b = CONCAT(CAST(a as varchar), CASE a WHEN 1 THEN 'st' WHEN 2 THEN 'nd' WHEN 3 THEN 'rd' ELSE 'th' END) WHERE b = 'second'", 1);
assertQuery("SELECT a, b FROM " + tableName, "VALUES (3,'first'), (4,'4th'), (3,'third')");
}
private void testCheckDeleteFiles(Table icebergTable, int expectedSize, List<FileContent> expectedFileContent)
{
// check delete file list
TableScan tableScan = icebergTable.newScan().useSnapshot(icebergTable.currentSnapshot().snapshotId());
Iterator<FileScanTask> iterator = TableScanUtil.splitFiles(tableScan.planFiles(), tableScan.targetSplitSize()).iterator();
List<DeleteFile> deleteFiles = new ArrayList<>();
while (iterator.hasNext()) {
FileScanTask task = iterator.next();
List<org.apache.iceberg.DeleteFile> deletes = task.deletes();
deletes.forEach(delete -> deleteFiles.add(DeleteFile.fromIceberg(delete)));
}
assertEquals(deleteFiles.size(), expectedSize);
List<FileContent> fileContents = deleteFiles.stream().map(DeleteFile::content).sorted().collect(Collectors.toList());
assertEquals(fileContents, expectedFileContent);
}
private void writePositionDeleteToNationTable(Table icebergTable, String dataFilePath, long deletePos)
throws IOException
{
java.nio.file.Path dataDirectory = getDistributedQueryRunner().getCoordinator().getDataDirectory();
File metastoreDir = getIcebergDataDirectoryPath(dataDirectory, catalogType.name(), new IcebergConfig().getFileFormat(), false).toFile();
Path metadataDir = new Path(metastoreDir.toURI());
String deleteFileName = "delete_file_" + randomUUID();
FileSystem fs = getHdfsEnvironment().getFileSystem(new HdfsContext(SESSION), metadataDir);
Path path = new Path(metadataDir, deleteFileName);
PositionDeleteWriter<Record> writer = Parquet.writeDeletes(HadoopOutputFile.fromPath(path, fs))
.createWriterFunc(GenericParquetWriter::buildWriter)
.forTable(icebergTable)
.overwrite()
.rowSchema(icebergTable.schema())
.withSpec(PartitionSpec.unpartitioned())
.buildPositionWriter();
PositionDelete<Record> positionDelete = PositionDelete.create();
PositionDelete<Record> record = positionDelete.set(dataFilePath, deletePos, GenericRecord.create(icebergTable.schema()));
try (Closeable ignored = writer) {
writer.write(record);
}
icebergTable.newRowDelta().addDeletes(writer.toDeleteFile()).commit();
}
private void writeEqualityDeleteToNationTable(Table icebergTable, Map<String, Object> overwriteValues)
throws Exception
{
writeEqualityDeleteToNationTable(icebergTable, overwriteValues, Collections.emptyMap());
}
private void writeEqualityDeleteToNationTable(Table icebergTable, Map<String, Object> overwriteValues, Map<String, Object> partitionValues)
throws Exception
{
java.nio.file.Path dataDirectory = getDistributedQueryRunner().getCoordinator().getDataDirectory();
File metastoreDir = getIcebergDataDirectoryPath(dataDirectory, catalogType.name(), new IcebergConfig().getFileFormat(), false).toFile();
Path metadataDir = new Path(metastoreDir.toURI());
String deleteFileName = "delete_file_" + randomUUID();
FileSystem fs = getHdfsEnvironment().getFileSystem(new HdfsContext(SESSION), metadataDir);
Schema deleteRowSchema = icebergTable.schema().select(overwriteValues.keySet());
Parquet.DeleteWriteBuilder writerBuilder = Parquet.writeDeletes(HadoopOutputFile.fromPath(new Path(metadataDir, deleteFileName), fs))
.forTable(icebergTable)
.rowSchema(deleteRowSchema)
.createWriterFunc(GenericParquetWriter::buildWriter)
.equalityFieldIds(deleteRowSchema.columns().stream().map(Types.NestedField::fieldId).collect(Collectors.toList()))
.overwrite();
if (!partitionValues.isEmpty()) {
GenericRecord partitionData = GenericRecord.create(icebergTable.spec().partitionType());
writerBuilder.withPartition(partitionData.copy(partitionValues));
}
EqualityDeleteWriter<Object> writer = writerBuilder.buildEqualityWriter();
Record dataDelete = GenericRecord.create(deleteRowSchema);
try (Closeable ignored = writer) {
writer.write(dataDelete.copy(overwriteValues));
}
icebergTable.newRowDelta().addDeletes(writer.toDeleteFile()).commit();
}
protected HdfsEnvironment getHdfsEnvironment()
{
HiveClientConfig hiveClientConfig = new HiveClientConfig();
MetastoreClientConfig metastoreClientConfig = new MetastoreClientConfig();
HiveS3Config hiveS3Config = new HiveS3Config();
return getHdfsEnvironment(hiveClientConfig, metastoreClientConfig, hiveS3Config);
}
public static HdfsEnvironment getHdfsEnvironment(HiveClientConfig hiveClientConfig, MetastoreClientConfig metastoreClientConfig, HiveS3Config hiveS3Config)
{
S3ConfigurationUpdater s3ConfigurationUpdater = new PrestoS3ConfigurationUpdater(hiveS3Config);
HdfsConfiguration hdfsConfiguration = new HiveHdfsConfiguration(new HdfsConfigurationInitializer(hiveClientConfig, metastoreClientConfig, s3ConfigurationUpdater, ignored -> {}),
ImmutableSet.of(), hiveClientConfig);
return new HdfsEnvironment(hdfsConfiguration, metastoreClientConfig, new NoHdfsAuthentication());
}
private Table updateTable(String tableName)
{
BaseTable table = (BaseTable) loadTable(tableName);
TableOperations operations = table.operations();
TableMetadata currentMetadata = operations.current();
operations.commit(currentMetadata, currentMetadata.upgradeToFormatVersion(2));
return table;
}
protected Table loadTable(String tableName)
{
tableName = normalizeIdentifier(tableName, ICEBERG_CATALOG);
Catalog catalog = CatalogUtil.loadCatalog(catalogType.getCatalogImpl(), ICEBERG_CATALOG, getProperties(), new Configuration());
return catalog.loadTable(TableIdentifier.of("tpch", tableName));
}
protected Map<String, String> getProperties()
{
Path metastoreDir = getCatalogDirectory();
return ImmutableMap.of("warehouse", metastoreDir.toString());
}
protected Path getCatalogDirectory()
{
java.nio.file.Path dataDirectory = getDistributedQueryRunner().getCoordinator().getDataDirectory();
switch (catalogType) {
case HIVE:
case HADOOP:
case NESSIE:
return new Path(getIcebergDataDirectoryPath(dataDirectory, catalogType.name(), new IcebergConfig().getFileFormat(), false).toFile().toURI());
}
throw new PrestoException(NOT_SUPPORTED, "Unsupported Presto Iceberg catalog type " + catalogType);
}
private Session sessionForTimezone(String zoneId, boolean legacyTimestamp)
{
SessionBuilder sessionBuilder = Session.builder(getSession())
.setSystemProperty(LEGACY_TIMESTAMP, String.valueOf(legacyTimestamp));
if (legacyTimestamp) {
sessionBuilder.setTimeZoneKey(TimeZoneKey.getTimeZoneKey(zoneId));
}
return sessionBuilder.build();
}
private void testWithAllFileFormats(Session session, BiConsumer<Session, FileFormat> test)
{
test.accept(session, FileFormat.PARQUET);
test.accept(session, FileFormat.ORC);
}
private void assertHasDataFiles(Snapshot snapshot, int dataFilesCount)
{
Map<String, String> map = snapshot.summary();
int totalDataFiles = Integer.valueOf(map.get(TOTAL_DATA_FILES_PROP));
assertEquals(totalDataFiles, dataFilesCount);
}
private void assertHasDeleteFiles(Snapshot snapshot, int deleteFilesCount)
{
Map<String, String> map = snapshot.summary();
int totalDeleteFiles = Integer.valueOf(map.get(TOTAL_DELETE_FILES_PROP));
assertEquals(totalDeleteFiles, deleteFilesCount);
}
@Test
public void testSortByAllTypes()
{
String tableName = "test_sort_by_all_types_" + randomTableSuffix();
assertUpdate("" +
"CREATE TABLE " + tableName + " (" +
" a_boolean boolean, " +
" an_integer integer, " +
" a_bigint bigint, " +
" a_real real, " +
" a_double double, " +
" a_short_decimal decimal(5,2), " +
" a_long_decimal decimal(38,20), " +
" a_varchar varchar, " +
" a_varbinary varbinary, " +
" a_date date, " +
" a_timestamp timestamp, " +
" an_array array(varchar), " +
" a_map map(integer, varchar) " +
") " +
"WITH (" +
"sorted_by = ARRAY[" +
" 'a_boolean', " +
" 'an_integer', " +
" 'a_bigint', " +
" 'a_real', " +
" 'a_double', " +
" 'a_short_decimal', " +
" 'a_long_decimal', " +
" 'a_varchar', " +
" 'a_varbinary', " +
" 'a_date', " +
" 'a_timestamp' " +
" ]" +
")");
String values = "(" +
"true, " +
"1, " +
"BIGINT '2', " +
"REAL '3.0', " +
"DOUBLE '4.0', " +
"DECIMAL '5.00', " +
"CAST(DECIMAL '6.00' AS decimal(38,20)), " +
"VARCHAR 'seven', " +
"X'88888888', " +
"DATE '2022-09-09', " +
"TIMESTAMP '2022-11-11 11:11:11.000000', " +
"ARRAY[VARCHAR 'four', 'teen'], " +
"MAP(ARRAY[15], ARRAY[VARCHAR 'fifteen']))";
String highValues = "(" +
"true, " +
"999999999, " +
"BIGINT '999999999', " +
"REAL '999.999', " +
"DOUBLE '999.999', " +
"DECIMAL '999.99', " +
"DECIMAL '6.00', " +
"'zzzzzzzzzzzzzz', " +
"X'FFFFFFFF', " +
"DATE '2099-12-31', " +
"TIMESTAMP '2099-12-31 23:59:59.000000', " +
"ARRAY['zzzz', 'zzzz'], " +
"MAP(ARRAY[999], ARRAY['zzzz']))";
String lowValues = "(" +
"false, " +
"0, " +
"BIGINT '0', " +
"REAL '0', " +
"DOUBLE '0', " +
"DECIMAL '0', " +
"DECIMAL '0', " +
"'', " +
"X'00000000', " +
"DATE '2000-01-01', " +
"TIMESTAMP '2000-01-01 00:00:00.000000', " +
"ARRAY['', ''], " +
"MAP(ARRAY[0], ARRAY['']))";
assertUpdate("INSERT INTO " + tableName + " VALUES " + values + ", " + highValues + ", " + lowValues, 3);
dropTable(getSession(), tableName);
}
@Test
public void testEmptySortedByList()
{
String tableName = "test_empty_sorted_by_list_" + randomTableSuffix();
assertUpdate("" +
"CREATE TABLE " + tableName + " (a_boolean boolean, an_integer integer) " +
" WITH (partitioning = ARRAY['an_integer'], sorted_by = ARRAY[])");
assertUpdate("INSERT INTO " + tableName + " VALUES (false, 3), (true, 1), (true, 5), (false, 6), (false, 2)", 5);
dropTable(getSession(), tableName);
}
@Test(dataProvider = "sortedTableWithQuotedIdentifierCasing")
public void testCreateSortedTableWithQuotedIdentifierCasing(String columnName, String sortField)
{
String tableName = "test_create_sorted_table_with_quotes_" + randomTableSuffix();
assertUpdate(format("CREATE TABLE %s (%s bigint) WITH (sorted_by = ARRAY['%s'])", tableName, columnName, sortField));
assertUpdate(format("INSERT INTO " + tableName + " VALUES (1),(3),(2),(5),(4)"), 5);
dropTable(getSession(), tableName);
}
@Test
public void testSortedByAscNullFirstOrder()
{
String tableName = "test_empty_sorted_by_asc_null_first_" + randomTableSuffix();
assertUpdate("" +
"CREATE TABLE " + tableName + " (id int, name varchar(20))" +
" WITH ( sorted_by = ARRAY['id'])");
assertUpdate("INSERT INTO " + tableName + " VALUES (11, 'ddd'), (20, 'ccc'), (null, 'vvv'), (3, 'aaa'), (5, 'eeee')", 5);
dropTable(getSession(), tableName);
}
@Test
public void testSortedByAscNullLastOrder()
{
String tableName = "test_empty_sorted_by_asc_null_last_" + randomTableSuffix();
assertUpdate("" +
"CREATE TABLE " + tableName + " (id int, name varchar(20))" +
" WITH ( sorted_by = ARRAY['id ASC NULLS LAST'])");
assertUpdate("INSERT INTO " + tableName + " VALUES (11, 'ddd'), (20, 'ccc'), (null, 'vvv'), (3, 'aaa'), (5, 'eeee')", 5);
dropTable(getSession(), tableName);
}
@Test
public void testSortedByDescNullFirstOrder()
{
String tableName = "test_empty_sorted_by_desc_null_first_" + randomTableSuffix();
assertUpdate("" +
"CREATE TABLE " + tableName + " (id int, name varchar(20))" +
" WITH ( sorted_by = ARRAY['id DESC NULLS FIRST'])");
assertUpdate("INSERT INTO " + tableName + " VALUES (11, 'ddd'), (20, 'ccc'), (null, 'vvv'), (3, 'aaa'), (5, 'eeee')", 5);
dropTable(getSession(), tableName);
}
@Test
public void testSortedByDescNullLastOrder()
{
String tableName = "test_empty_sorted_by_desc_null_last" + randomTableSuffix();
assertUpdate("" +
"CREATE TABLE " + tableName + " (id int, name varchar(20))" +
" WITH ( sorted_by = ARRAY['id DESC NULLS LAST'])");
assertUpdate("INSERT INTO " + tableName + " VALUES (11, 'ddd'), (20, 'ccc'), (null, 'vvv'), (3, 'aaa'), (5, 'eeee')", 5);
dropTable(getSession(), tableName);
}
@DataProvider(name = "sortedTableWithQuotedIdentifierCasing")
public static Object[][] sortedTableWithQuotedIdentifierCasing()
{
return new Object[][] {
{"col", "col"},
{"\"col\"", "col"},
{"col", "\"col\""},
{"\"col\"", "\"col\""},
};
}
@Test(dataProvider = "sortedTableWithSortTransform")
public void testCreateSortedTableWithSortTransform(String columnName, String sortField)
{
String tableName = "test_sort_with_transform_" + randomTableSuffix();
String query = format("CREATE TABLE %s (%s TIMESTAMP) WITH (sorted_by = ARRAY['%s'])", tableName, columnName, sortField);
assertQueryFails(query, Pattern.quote(format("Unable to parse sort field: [%s]", sortField)));
}
public void testStatisticsFileCacheInvalidationProcedure()
{
assertQuerySucceeds("CREATE TABLE test_statistics_file_cache_procedure(i int)");
assertUpdate("INSERT INTO test_statistics_file_cache_procedure VALUES 1, 2, 3, 4, 5", 5);
assertQuerySucceeds("ANALYZE test_statistics_file_cache_procedure");
for (int i = 0; i < 3; i++) {
assertQuerySucceeds("SHOW STATS FOR test_statistics_file_cache_procedure");
}
String jmxMetricsQuery = format("SELECT sum(\"cachestats.hitcount\"), sum(\"cachestats.size\"), sum(\"cachestats.misscount\") " +
"from jmx.current.\"com.facebook.presto.iceberg.statistics:name=%s,type=statisticsfilecache\"", getSession().getCatalog().get());
MaterializedResult result = computeActual(jmxMetricsQuery);
long afterHitCount = (long) result.getMaterializedRows().get(0).getField(0);
long afterCacheSize = (long) result.getMaterializedRows().get(0).getField(1);
long afterMissCount = (long) result.getMaterializedRows().get(0).getField(2);
assertTrue(afterHitCount > 0);
assertTrue(afterCacheSize > 0);
//test invalidate_statistics_file_cache procedure
assertQuerySucceeds(format("CALL %s.system.invalidate_statistics_file_cache()", getSession().getCatalog().get()));
MaterializedResult resultAfterProcedure = computeActual(jmxMetricsQuery);
long afterProcedureCacheSize = (long) resultAfterProcedure.getMaterializedRows().get(0).getField(1);
assertTrue(afterProcedureCacheSize == 0);
assertQuerySucceeds("SHOW STATS FOR test_statistics_file_cache_procedure");
MaterializedResult resultAfter = computeActual(jmxMetricsQuery);
long newCacheSize = (long) resultAfter.getMaterializedRows().get(0).getField(1);
long newMissCount = (long) resultAfter.getMaterializedRows().get(0).getField(2);
assertTrue(newCacheSize > 0);
assertTrue(afterMissCount < newMissCount);
getQueryRunner().execute("DROP TABLE test_statistics_file_cache_procedure");
}
@DataProvider(name = "sortedTableWithSortTransform")
public static Object[][] sortedTableWithSortTransform()
{
return new Object[][] {
{"col", "bucket(col, 3)"},
{"col", "bucket(\"col\", 3)"},
{"col", "truncate(col, 3)"},
{"col", "year(col)"},
{"col", "month(col)"},
{"col", "date(col)"},
{"col", "hour(col)"},
};
}
protected void dropTable(Session session, String table)
{
assertUpdate(session, "DROP TABLE " + table);
assertFalse(getQueryRunner().tableExists(session, table));
}
@Test
public void testEqualityDeleteAsJoinWithMaximumFieldsLimitUnderLimit()
throws Exception
{
int maxColumns = 10;
String tableName = "test_eq_delete_under_max_cols_" + randomTableSuffix();
Session session = Session.builder(getQueryRunner().getDefaultSession())
.setCatalogSessionProperty(ICEBERG_CATALOG, DELETE_AS_JOIN_REWRITE_ENABLED, "true")
// Make sure the max columns is set to one more than the number of columns in the table
.setCatalogSessionProperty(ICEBERG_CATALOG, DELETE_AS_JOIN_REWRITE_MAX_DELETE_COLUMNS, "" + (maxColumns + 1))
.build();
try {
// Test with exactly max columns - should work fine
// Create table with specified number of columns
List<String> columnDefinitions = IntStream.range(0, maxColumns)
.mapToObj(i -> "col_" + i + " varchar")
.collect(Collectors.toList());
columnDefinitions.add(0, "id bigint");
String createTableSql = "CREATE TABLE " + tableName + " (" +
String.join(", ", columnDefinitions) + ")";
assertUpdate(session, createTableSql);
// Insert test rows
for (int row = 1; row <= 3; row++) {
final int currentRow = row;
List<String> values = IntStream.range(0, maxColumns)
.mapToObj(i -> "'val_" + currentRow + "_" + i + "'")
.collect(Collectors.toList());
values.add(0, String.valueOf(currentRow));
String insertSql = "INSERT INTO " + tableName + " VALUES (" +
String.join(", ", values) + ")";
assertUpdate(session, insertSql, 1);
}
// Verify all rows exist
assertQuery(session, "SELECT count(*) FROM " + tableName, "VALUES (3)");
// Update table to format version 2 and create equality delete files
Table icebergTable = updateTable(tableName);
// Create equality delete using ALL columns
Map<String, Object> deleteRow = new HashMap<>();
deleteRow.put("id", 2L);
for (int i = 0; i < maxColumns; i++) {
deleteRow.put("col_" + i, "val_2_" + i);
}
// Write equality delete with ALL columns
writeEqualityDeleteToNationTable(icebergTable, deleteRow);
// Query should work correctly regardless of optimization
assertQuery(session, "SELECT count(*) FROM " + tableName, "VALUES (2)");
assertQuery(session, "SELECT id FROM " + tableName + " ORDER BY id", "VALUES (1), (3)");
// With <= max columns, query plan should use JOIN (optimization enabled)
assertPlan(session, "SELECT * FROM " + tableName,
anyTree(
node(JoinNode.class,
anyTree(tableScan(tableName)),
anyTree(tableScan(tableName)))));
}
finally {
dropTable(session, tableName);
}
}
@Test
public void testEqualityDeleteAsJoinWithMaximumFieldsLimitOverLimit()
throws Exception
{
int maxColumns = 10;
String tableName = "test_eq_delete_max_cols_" + randomTableSuffix();
Session session = Session.builder(getQueryRunner().getDefaultSession())
.setCatalogSessionProperty(ICEBERG_CATALOG, DELETE_AS_JOIN_REWRITE_ENABLED, "true")
.setCatalogSessionProperty(ICEBERG_CATALOG, DELETE_AS_JOIN_REWRITE_MAX_DELETE_COLUMNS, "" + maxColumns)
.build();
try {
// Test with max columns - optimization should be disabled to prevent stack overflow
// Create table with specified number of columns
List<String> columnDefinitions = IntStream.range(0, maxColumns)
.mapToObj(i -> "col_" + i + " varchar")
.collect(Collectors.toList());
columnDefinitions.add(0, "id bigint");
String createTableSql = "CREATE TABLE " + tableName + " (" +
String.join(", ", columnDefinitions) + ")";
assertUpdate(session, createTableSql);
// Insert test rows
for (int row = 1; row <= 3; row++) {
final int currentRow = row;
List<String> values = IntStream.range(0, maxColumns)
.mapToObj(i -> "'val_" + currentRow + "_" + i + "'")
.collect(Collectors.toList());
values.add(0, String.valueOf(currentRow));
String insertSql = "INSERT INTO " + tableName + " VALUES (" +
String.join(", ", values) + ")";
assertUpdate(session, insertSql, 1);
}
// Verify all rows exist
assertQuery(session, "SELECT count(*) FROM " + tableName, "VALUES (3)");
// Update table to format version 2 and create equality delete files
Table icebergTable = updateTable(tableName);
// Create equality delete using ALL columns
Map<String, Object> deleteRow = new HashMap<>();
deleteRow.put("id", 2L);
for (int i = 0; i < maxColumns; i++) {
deleteRow.put("col_" + i, "val_2_" + i);
}
// Write equality delete with ALL columns
writeEqualityDeleteToNationTable(icebergTable, deleteRow);
// Query should work correctly regardless of optimization
assertQuery(session, "SELECT count(*) FROM " + tableName, "VALUES (2)");
assertQuery(session, "SELECT id FROM " + tableName + " ORDER BY id", "VALUES (1), (3)");
// With > max columns, optimization is disabled - no JOIN in plan
// Verify the query works but doesn't contain a join node
assertQuery(session, "SELECT * FROM " + tableName + " WHERE id = 1",
"VALUES (" + Stream.concat(Stream.of("1"),
IntStream.range(0, maxColumns).mapToObj(i -> "'val_1_" + i + "'"))
.collect(Collectors.joining(", ")) + ")");
// To verify no join is present, we can check that the plan only contains table scan
assertPlan(session, "SELECT * FROM " + tableName,
anyTree(
anyNot(JoinNode.class,
tableScan(tableName))));
}
finally {
dropTable(session, tableName);
}
}
}