TestIcebergHiveStatistics.java
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.iceberg.hive;
import com.facebook.presto.Session;
import com.facebook.presto.common.QualifiedObjectName;
import com.facebook.presto.common.RuntimeMetric;
import com.facebook.presto.common.RuntimeStats;
import com.facebook.presto.common.predicate.Domain;
import com.facebook.presto.common.predicate.Range;
import com.facebook.presto.common.predicate.TupleDomain;
import com.facebook.presto.common.predicate.ValueSet;
import com.facebook.presto.common.transaction.TransactionId;
import com.facebook.presto.hive.HdfsConfiguration;
import com.facebook.presto.hive.HdfsConfigurationInitializer;
import com.facebook.presto.hive.HdfsEnvironment;
import com.facebook.presto.hive.HiveClientConfig;
import com.facebook.presto.hive.HiveHdfsConfiguration;
import com.facebook.presto.hive.MetastoreClientConfig;
import com.facebook.presto.hive.authentication.NoHdfsAuthentication;
import com.facebook.presto.hive.metastore.ExtendedHiveMetastore;
import com.facebook.presto.iceberg.CatalogType;
import com.facebook.presto.iceberg.IcebergCatalogName;
import com.facebook.presto.iceberg.IcebergColumnHandle;
import com.facebook.presto.iceberg.IcebergHiveTableOperationsConfig;
import com.facebook.presto.iceberg.IcebergMetadataColumn;
import com.facebook.presto.iceberg.IcebergQueryRunner;
import com.facebook.presto.iceberg.IcebergUtil;
import com.facebook.presto.iceberg.ManifestFileCache;
import com.facebook.presto.metadata.CatalogManager;
import com.facebook.presto.metadata.Metadata;
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ConnectorId;
import com.facebook.presto.spi.Constraint;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.SchemaTableName;
import com.facebook.presto.spi.TableHandle;
import com.facebook.presto.spi.analyzer.MetadataResolver;
import com.facebook.presto.spi.plan.TableScanNode;
import com.facebook.presto.spi.security.AllowAllAccessControl;
import com.facebook.presto.spi.statistics.ColumnStatistics;
import com.facebook.presto.spi.statistics.Estimate;
import com.facebook.presto.spi.statistics.TableStatistics;
import com.facebook.presto.sql.planner.Plan;
import com.facebook.presto.testing.MaterializedResult;
import com.facebook.presto.testing.QueryRunner;
import com.facebook.presto.tests.AbstractTestQueryFramework;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.iceberg.StatisticsFile;
import org.apache.iceberg.Table;
import org.apache.iceberg.UpdateStatistics;
import org.intellij.lang.annotations.Language;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import java.io.File;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import static com.facebook.presto.SystemSessionProperties.OPTIMIZER_USE_HISTOGRAMS;
import static com.facebook.presto.common.type.DoubleType.DOUBLE;
import static com.facebook.presto.hive.BaseHiveColumnHandle.ColumnType.PARTITION_KEY;
import static com.facebook.presto.hive.BaseHiveColumnHandle.ColumnType.REGULAR;
import static com.facebook.presto.hive.metastore.InMemoryCachingHiveMetastore.memoizeMetastore;
import static com.facebook.presto.iceberg.CatalogType.HIVE;
import static com.facebook.presto.iceberg.IcebergQueryRunner.ICEBERG_CATALOG;
import static com.facebook.presto.iceberg.IcebergQueryRunner.TEST_DATA_DIRECTORY;
import static com.facebook.presto.iceberg.IcebergSessionProperties.HIVE_METASTORE_STATISTICS_MERGE_STRATEGY;
import static com.facebook.presto.iceberg.IcebergSessionProperties.PUSHDOWN_FILTER_ENABLED;
import static com.facebook.presto.iceberg.IcebergSessionProperties.STATISTICS_KLL_SKETCH_K_PARAMETER;
import static com.facebook.presto.iceberg.statistics.KllHistogram.isKllHistogramSupportedType;
import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
import static com.facebook.presto.spi.statistics.ColumnStatisticType.NUMBER_OF_DISTINCT_VALUES;
import static com.facebook.presto.spi.statistics.ColumnStatisticType.TOTAL_SIZE_IN_BYTES;
import static com.facebook.presto.testing.assertions.Assert.assertEquals;
import static com.facebook.presto.transaction.TransactionBuilder.transaction;
import static java.lang.String.format;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
/**
* This class tests the ability for the Hive implementation of the IcebergMetadata to store and read
* table statistics
*/
public class TestIcebergHiveStatistics
extends AbstractTestQueryFramework
{
private IcebergQueryRunner icebergQueryRunner;
@Override
protected QueryRunner createQueryRunner()
throws Exception
{
icebergQueryRunner = IcebergQueryRunner.builder()
.setExtraConnectorProperties(ImmutableMap.of("iceberg.hive-statistics-merge-strategy", NUMBER_OF_DISTINCT_VALUES.name()))
.build();
return icebergQueryRunner.getQueryRunner();
}
private static final Set<String> NUMERIC_ORDERS_COLUMNS = ImmutableSet.<String>builder()
.add("orderkey")
.add("custkey")
.add("totalprice")
.add("orderdate")
.add("shippriority")
.build();
private static final Set<String> NON_NUMERIC_ORDERS_COLUMNS = ImmutableSet.<String>builder()
.add("orderstatus")
.add("orderpriority")
.add("clerk")
.add("comment")
.build();
private static final Set<String> ALL_ORDERS_COLUMNS = Arrays.stream(StatsSchema.values()).map(StatsSchema::getColumnName).collect(Collectors.toSet());
enum StatsSchema
{
COLUMN_NAME("column_name"),
DATA_SIZE("data_size"),
DISTINCT_VALUES_COUNT("distinct_values_count"),
NULLS_FRACTION("nulls_fraction"),
ROW_COUNT("row_count"),
LOW_VALUE("low_value"),
HIGH_VALUE("high_value");
private final String name;
StatsSchema(String name)
{
this.name = name;
}
public String getColumnName()
{
return name;
}
}
/**
* ensures that the stats not provided by {@link com.facebook.presto.iceberg.TableStatisticsMaker} are
* populated and served from the metadata after running an ANALYZE query
*/
@Test
public void testSimpleAnalyze()
{
assertQuerySucceeds("CREATE TABLE simpleAnalyze as SELECT * FROM orders");
MaterializedResult stats = getQueryRunner().execute("SHOW STATS FOR simpleAnalyze");
assertStatValueAbsent(StatsSchema.DISTINCT_VALUES_COUNT, stats, NUMERIC_ORDERS_COLUMNS);
assertQuerySucceeds("ANALYZE simpleAnalyze");
stats = getQueryRunner().execute("SHOW STATS FOR simpleAnalyze");
assertStatValuePresent(StatsSchema.DISTINCT_VALUES_COUNT, stats, NUMERIC_ORDERS_COLUMNS);
assertStatValuePresent(StatsSchema.NULLS_FRACTION, stats, NUMERIC_ORDERS_COLUMNS);
assertStatValuePresent(StatsSchema.NULLS_FRACTION, stats, NON_NUMERIC_ORDERS_COLUMNS);
}
/**
* Tests the TableStatisticsMaker is used, even when ANALYZE has not been run yet
*/
@Test
public void testStatsBeforeAnalyze()
{
assertQuerySucceeds("CREATE TABLE statsBeforeAnalyze as SELECT * FROM orders");
MaterializedResult stats = getQueryRunner().execute("SHOW STATS FOR statsBeforeAnalyze");
assertStatValueAbsent(StatsSchema.DISTINCT_VALUES_COUNT, stats, NUMERIC_ORDERS_COLUMNS);
assertStatValuePresent(StatsSchema.DATA_SIZE, stats, ALL_ORDERS_COLUMNS);
assertStatValuePresent(StatsSchema.LOW_VALUE, stats, NUMERIC_ORDERS_COLUMNS);
assertStatValuePresent(StatsSchema.HIGH_VALUE, stats, ALL_ORDERS_COLUMNS);
}
@Test
public void testStatsWithPartitionedTableAnalyzed()
{
assertQuerySucceeds("CREATE TABLE statsNoPartitionAnalyze as SELECT * FROM orders LIMIT 100");
assertQuerySucceeds("CREATE TABLE statsWithPartitionAnalyze WITH (partitioning = ARRAY['orderdate']) as SELECT * FROM statsNoPartitionAnalyze");
assertQuerySucceeds("ANALYZE statsNoPartitionAnalyze");
assertQuerySucceeds("ANALYZE statsWithPartitionAnalyze");
deleteTableStatistics("statsWithPartitionAnalyze");
Metadata meta = getQueryRunner().getMetadata();
TransactionId txid = getQueryRunner().getTransactionManager().beginTransaction(false);
Session session = getSession().beginTransactionId(txid, getQueryRunner().getTransactionManager(), new AllowAllAccessControl());
Map<String, ColumnHandle> noPartColumns = getColumnHandles("statsnopartitionanalyze", session);
Map<String, ColumnHandle> partColumns = getColumnHandles("statswithpartitionanalyze", session);
List<ColumnHandle> noPartColumnHandles = new ArrayList<>(noPartColumns.values());
List<ColumnHandle> partColumnHandles = new ArrayList<>(partColumns.values());
// Test that with all columns and no constraints that stats are equivalent
TableStatistics statsNoPartition = meta.getTableStatistics(session, getAnalyzeTableHandle("statsNoPartitionAnalyze", session), noPartColumnHandles, Constraint.alwaysTrue());
TableStatistics statsWithPartition = meta.getTableStatistics(session, getAnalyzeTableHandle("statsWithPartitionAnalyze", session), partColumnHandles, Constraint.alwaysTrue());
assertNDVsPresent(statsNoPartition);
assertNDVsNotPresent(statsWithPartition);
assertEquals(statsNoPartition.getRowCount(), statsWithPartition.getRowCount());
// Test that with one column and no constraints that stats are equivalent
statsNoPartition = meta.getTableStatistics(session, getAnalyzeTableHandle("statsNoPartitionAnalyze", session), Collections.singletonList(noPartColumns.get("totalprice")), Constraint.alwaysTrue());
statsWithPartition = meta.getTableStatistics(session, getAnalyzeTableHandle("statsWithPartitionAnalyze", session), Collections.singletonList(partColumns.get("totalprice")), Constraint.alwaysTrue());
assertEquals(statsNoPartition.getRowCount(), statsWithPartition.getRowCount());
assertEquals(statsNoPartition.getColumnStatistics().size(), 1);
assertEquals(statsWithPartition.getColumnStatistics().size(), 1);
assertNotNull(statsWithPartition.getColumnStatistics().get(partColumns.get("totalprice")));
assertNotNull(statsNoPartition.getColumnStatistics().get(noPartColumns.get("totalprice")));
assertNDVsPresent(statsNoPartition);
assertNDVsNotPresent(statsWithPartition);
// Test that with all columns and a Tuple constraint that stats are equivalent
statsNoPartition = meta.getTableStatistics(session, getAnalyzeTableHandle("statsNoPartitionAnalyze", session), noPartColumnHandles, Constraint.alwaysTrue());
statsWithPartition = meta.getTableStatistics(session, getAnalyzeTableHandle("statsWithPartitionAnalyze", session), partColumnHandles, Constraint.alwaysTrue());
assertEquals(statsNoPartition.getRowCount(), statsWithPartition.getRowCount());
assertEquals(statsNoPartition.getColumnStatistics().size(), noPartColumnHandles.size());
assertEquals(statsWithPartition.getColumnStatistics().size(), partColumnHandles.size());
assertNDVsPresent(statsNoPartition);
assertNDVsNotPresent(statsWithPartition);
// Test that with one column and a constraint on that column that the partitioned stats return less values
Constraint<ColumnHandle> noPartConstraint = constraintWithMinValue(noPartColumns.get("totalprice"), 100000.0);
Constraint<ColumnHandle> partConstraint = constraintWithMinValue(partColumns.get("totalprice"), 100000.0);
statsNoPartition = meta.getTableStatistics(session, getAnalyzeTableHandle("statsNoPartitionAnalyze", session), Collections.singletonList(noPartColumns.get("totalprice")), noPartConstraint);
statsWithPartition = meta.getTableStatistics(session, getAnalyzeTableHandle("statsWithPartitionAnalyze", session), Collections.singletonList(partColumns.get("totalprice")), partConstraint);
// ensure partitioned table actually returns less rows
assertEquals(statsNoPartition.getColumnStatistics().size(), 1);
assertEquals(statsWithPartition.getColumnStatistics().size(), 1);
assertNotNull(statsWithPartition.getColumnStatistics().get(partColumns.get("totalprice")));
assertNotNull(statsNoPartition.getColumnStatistics().get(noPartColumns.get("totalprice")));
assertNDVsPresent(statsNoPartition);
assertNDVsNotPresent(statsWithPartition);
// partitioned table should have stats partially filtered since data should span > 1 file
assertTrue(statsWithPartition.getRowCount().getValue() < statsNoPartition.getRowCount().getValue());
// Test that with one column and a constraint on a different column that stats are equivalent.
noPartConstraint = constraintWithMinValue(noPartColumns.get("totalprice"), 100000.0);
partConstraint = constraintWithMinValue(partColumns.get("totalprice"), 100000.0);
statsNoPartition = meta.getTableStatistics(session, getAnalyzeTableHandle("statsNoPartitionAnalyze", session), Collections.singletonList(noPartColumns.get("orderkey")), noPartConstraint);
statsWithPartition = meta.getTableStatistics(session, getAnalyzeTableHandle("statsWithPartitionAnalyze", session), Collections.singletonList(partColumns.get("orderkey")), partConstraint);
assertEquals(statsNoPartition.getColumnStatistics().size(), 1);
assertEquals(statsWithPartition.getColumnStatistics().size(), 1);
assertNotNull(statsWithPartition.getColumnStatistics().get(partColumns.get("orderkey")));
assertNotNull(statsNoPartition.getColumnStatistics().get(noPartColumns.get("orderkey")));
assertNDVsPresent(statsNoPartition);
assertNDVsNotPresent(statsWithPartition);
// partitioned table should have stats partially filtered since data should span > 1 file
assertTrue(statsWithPartition.getRowCount().getValue() < statsNoPartition.getRowCount().getValue());
getQueryRunner().getTransactionManager().asyncAbort(txid);
assertQuerySucceeds("DROP TABLE statsNoPartitionAnalyze");
assertQuerySucceeds("DROP TABLE statsWithPartitionAnalyze");
}
@Test
public void testStatsWithPartitionedTablesNoAnalyze()
{
assertQuerySucceeds("CREATE TABLE statsNoPartition as SELECT * FROM orders LIMIT 100");
assertQuerySucceeds("CREATE TABLE statsWithPartition WITH (partitioning = ARRAY['orderdate']) as SELECT * FROM statsNoPartition");
Metadata meta = getQueryRunner().getMetadata();
TransactionId txid = getQueryRunner().getTransactionManager().beginTransaction(false);
Session s = getSession().beginTransactionId(txid, getQueryRunner().getTransactionManager(), new AllowAllAccessControl());
Map<String, ColumnHandle> noPartColumns = getColumnHandles("statsnopartition", s);
Map<String, ColumnHandle> partColumns = getColumnHandles("statswithpartition", s);
List<ColumnHandle> noPartColumnHandles = new ArrayList<>(noPartColumns.values());
List<ColumnHandle> partColumnHandles = new ArrayList<>(partColumns.values());
// Test that with all columns and no constraints that stats are equivalent
TableStatistics statsNoPartition = meta.getTableStatistics(s, getAnalyzeTableHandle("statsNoPartition", s), noPartColumnHandles, Constraint.alwaysTrue());
TableStatistics statsWithPartition = meta.getTableStatistics(s, getAnalyzeTableHandle("statsWithPartition", s), partColumnHandles, Constraint.alwaysTrue());
assertEquals(statsNoPartition.getRowCount(), statsWithPartition.getRowCount());
columnStatsEqual(statsNoPartition.getColumnStatistics(), statsWithPartition.getColumnStatistics());
// Test that with one column and no constraints that stats are equivalent
statsNoPartition = meta.getTableStatistics(s, getAnalyzeTableHandle("statsNoPartition", s), Collections.singletonList(noPartColumns.get("totalprice")), Constraint.alwaysTrue());
statsWithPartition = meta.getTableStatistics(s, getAnalyzeTableHandle("statsWithPartition", s), Collections.singletonList(partColumns.get("totalprice")), Constraint.alwaysTrue());
assertEquals(statsNoPartition.getRowCount(), statsWithPartition.getRowCount());
assertEquals(statsNoPartition.getColumnStatistics().size(), 1);
assertEquals(statsWithPartition.getColumnStatistics().size(), 1);
assertNotNull(statsWithPartition.getColumnStatistics().get(partColumns.get("totalprice")));
assertNotNull(statsNoPartition.getColumnStatistics().get(noPartColumns.get("totalprice")));
columnStatsEqual(statsNoPartition.getColumnStatistics(), statsWithPartition.getColumnStatistics());
// Test that with all columns and a Tuple constraint that stats are equivalent
statsNoPartition = meta.getTableStatistics(s, getAnalyzeTableHandle("statsNoPartition", s), noPartColumnHandles, Constraint.alwaysTrue());
statsWithPartition = meta.getTableStatistics(s, getAnalyzeTableHandle("statsWithPartition", s), partColumnHandles, Constraint.alwaysTrue());
assertEquals(statsNoPartition.getRowCount(), statsWithPartition.getRowCount());
assertEquals(statsNoPartition.getColumnStatistics().size(), noPartColumnHandles.size());
assertEquals(statsWithPartition.getColumnStatistics().size(), partColumnHandles.size());
columnStatsEqual(statsNoPartition.getColumnStatistics(), statsWithPartition.getColumnStatistics());
// Test that with one column and a constraint on that column that the partitioned stats return less values
Constraint<ColumnHandle> noPartConstraint = constraintWithMinValue(noPartColumns.get("totalprice"), 100000.0);
Constraint<ColumnHandle> partConstraint = constraintWithMinValue(partColumns.get("totalprice"), 100000.0);
statsNoPartition = meta.getTableStatistics(s, getAnalyzeTableHandle("statsNoPartition", s), Collections.singletonList(noPartColumns.get("totalprice")), noPartConstraint);
statsWithPartition = meta.getTableStatistics(s, getAnalyzeTableHandle("statsWithPartition", s), Collections.singletonList(partColumns.get("totalprice")), partConstraint);
// ensure partitioned table actually returns less rows
assertEquals(statsNoPartition.getColumnStatistics().size(), 1);
assertEquals(statsWithPartition.getColumnStatistics().size(), 1);
assertNotNull(statsWithPartition.getColumnStatistics().get(partColumns.get("totalprice")));
assertNotNull(statsNoPartition.getColumnStatistics().get(noPartColumns.get("totalprice")));
// partitioned table should have stats partially filtered since data should span > 1 file
assertTrue(statsWithPartition.getRowCount().getValue() < statsNoPartition.getRowCount().getValue());
// Test that with one column and a constraint on a different column that stats are equivalent.
noPartConstraint = constraintWithMinValue(noPartColumns.get("totalprice"), 100000.0);
partConstraint = constraintWithMinValue(partColumns.get("totalprice"), 100000.0);
statsNoPartition = meta.getTableStatistics(s, getAnalyzeTableHandle("statsNoPartition", s), Collections.singletonList(noPartColumns.get("orderkey")), noPartConstraint);
statsWithPartition = meta.getTableStatistics(s, getAnalyzeTableHandle("statsWithPartition", s), Collections.singletonList(partColumns.get("orderkey")), partConstraint);
assertEquals(statsNoPartition.getColumnStatistics().size(), 1);
assertEquals(statsWithPartition.getColumnStatistics().size(), 1);
assertNotNull(statsWithPartition.getColumnStatistics().get(partColumns.get("orderkey")));
assertNotNull(statsNoPartition.getColumnStatistics().get(noPartColumns.get("orderkey")));
// partitioned table should have stats partially filtered since data should span > 1 file
assertTrue(statsWithPartition.getRowCount().getValue() < statsNoPartition.getRowCount().getValue());
getQueryRunner().getTransactionManager().asyncAbort(txid);
assertQuerySucceeds("DROP TABLE statsNoPartition");
assertQuerySucceeds("DROP TABLE statsWithPartition");
}
@Test
public void testHiveStatisticsMergeFlags()
{
assertQuerySucceeds("CREATE TABLE mergeFlagsStats (i int, v varchar)");
assertQuerySucceeds("INSERT INTO mergeFlagsStats VALUES (0, '1'), (1, '22'), (2, '333'), (NULL, 'aaaaa'), (4, NULL)");
assertQuerySucceeds("ANALYZE mergeFlagsStats");
// invalidate puffin files so only hive stats can be returned
deleteTableStatistics("mergeFlagsStats");
// Test stats without merging doesn't return NDVs or data size
Session session = Session.builder(getSession())
.setCatalogSessionProperty("iceberg", HIVE_METASTORE_STATISTICS_MERGE_STRATEGY, "")
.build();
TableStatistics stats = getTableStatistics(session, "mergeFlagsStats");
Map<String, ColumnStatistics> columnStatistics = getColumnNameMap(stats);
assertEquals(columnStatistics.get("i").getDistinctValuesCount(), Estimate.unknown());
assertEquals(columnStatistics.get("i").getDataSize(), Estimate.unknown());
assertEquals(columnStatistics.get("v").getDistinctValuesCount(), Estimate.unknown());
assertEquals(columnStatistics.get("v").getDataSize(), Estimate.unknown());
// Test stats merging for NDVs
session = Session.builder(getSession())
.setCatalogSessionProperty("iceberg", HIVE_METASTORE_STATISTICS_MERGE_STRATEGY, NUMBER_OF_DISTINCT_VALUES.name())
.build();
stats = getTableStatistics(session, "mergeFlagsStats");
columnStatistics = getColumnNameMap(stats);
assertEquals(columnStatistics.get("i").getDistinctValuesCount(), Estimate.of(4.0));
assertEquals(columnStatistics.get("i").getDataSize(), Estimate.unknown());
assertEquals(columnStatistics.get("v").getDistinctValuesCount(), Estimate.of(4.0));
assertEquals(columnStatistics.get("v").getDataSize(), Estimate.unknown());
// Test stats for data size
session = Session.builder(getSession())
.setCatalogSessionProperty("iceberg", HIVE_METASTORE_STATISTICS_MERGE_STRATEGY, TOTAL_SIZE_IN_BYTES.name())
.build();
stats = getTableStatistics(session, "mergeFlagsStats");
columnStatistics = getColumnNameMap(stats);
assertEquals(columnStatistics.get("i").getDistinctValuesCount(), Estimate.unknown());
assertEquals(columnStatistics.get("i").getDataSize(), Estimate.unknown()); // fixed-width isn't collected
assertEquals(columnStatistics.get("v").getDistinctValuesCount(), Estimate.unknown());
assertEquals(columnStatistics.get("v").getDataSize(), Estimate.of(11));
// Test stats for both
session = Session.builder(getSession())
.setCatalogSessionProperty("iceberg", HIVE_METASTORE_STATISTICS_MERGE_STRATEGY, NUMBER_OF_DISTINCT_VALUES.name() + "," + TOTAL_SIZE_IN_BYTES)
.build();
stats = getTableStatistics(session, "mergeFlagsStats");
columnStatistics = getColumnNameMap(stats);
assertEquals(columnStatistics.get("i").getDistinctValuesCount(), Estimate.of(4.0));
assertEquals(columnStatistics.get("i").getDataSize(), Estimate.unknown());
assertEquals(columnStatistics.get("v").getDistinctValuesCount(), Estimate.of(4.0));
assertEquals(columnStatistics.get("v").getDataSize(), Estimate.of(11));
}
@DataProvider(name = "pushdownFilterEnabled")
public Object[][] pushdownFilterPropertyProvider()
{
return new Object[][] {
{true, true},
{true, false},
{false, true},
{false, false},
};
}
@Test(dataProvider = "pushdownFilterEnabled")
public void testPredicateOnlyColumnInStatisticsOutput(boolean pushdownFilterEnabled, boolean partitioned)
{
assertQuerySucceeds(format("CREATE TABLE scanFilterStats (i int, j int, k int)%s", partitioned ? " WITH (partitioning = ARRAY['j'])" : ""));
assertUpdate("INSERT INTO scanFilterStats VALUES (1, 2, 3), (3, 4, 5), (5, 6, 7)", 3);
Session session = Session.builder(getSession())
.setCatalogSessionProperty("iceberg", PUSHDOWN_FILTER_ENABLED, Boolean.toString(pushdownFilterEnabled))
.build();
try {
TableStatistics est = getScanStatsEstimate(session, "SELECT k from scanFilterStats WHERE j > 2 AND i = 3");
assertEquals(est.getColumnStatistics().size(), 3);
}
finally {
getQueryRunner().execute("DROP TABLE scanFilterStats");
}
}
@Test
public void testStatisticsCachePartialEviction()
{
String catalogName = "ice_stat_file_cache";
String tableName = "lineitem_statisticsFileCache";
try {
Map<String, String> catalogProperties = ImmutableMap.<String, String>builder().putAll(icebergQueryRunner.getIcebergCatalogs().get("iceberg"))
.put("iceberg.max-statistics-file-cache-size", "1024B")
.build();
getQueryRunner().createCatalog(catalogName, "iceberg", catalogProperties);
Session session = Session.builder(getSession())
// runtime stats must be reset manually when using the builder
.setRuntimeStats(new RuntimeStats())
.setCatalog(catalogName)
// set histograms enabled to increase statistics cache size
.setSystemProperty(OPTIMIZER_USE_HISTOGRAMS, "true")
.setCatalogSessionProperty(catalogName, STATISTICS_KLL_SKETCH_K_PARAMETER, "32768")
.build();
assertQuerySucceeds(format("CREATE TABLE %s as SELECT * FROM lineitem", tableName));
assertQuerySucceeds(session, "ANALYZE " + tableName);
// get table statistics, to populate some of the cache
TableStatistics statistics = getTableStatistics(getQueryRunner(), session, tableName);
assertTrue(statistics.getColumnStatistics().values().stream().map(ColumnStatistics::getHistogram).anyMatch(Optional::isPresent));
RuntimeStats runtimeStats = session.getRuntimeStats();
runtimeStats.getMetrics().keySet().stream().filter(name -> name.contains("ColumnCount")).findFirst()
.ifPresent(stat -> assertEquals(runtimeStats.getMetric(stat).getSum(), 32, "column count must be 32 on metric: " + stat));
runtimeStats.getMetrics().keySet().stream().filter(name -> name.contains("PuffinFileSize")).findFirst()
.ifPresent(stat -> assertTrue(runtimeStats.getMetric(stat).getSum() > 1024));
// get them again to trigger retrieval of _some_ cached statistics
statistics = getTableStatistics(getQueryRunner(), session, tableName);
RuntimeMetric partialMiss = runtimeStats.getMetrics().keySet().stream().filter(name -> name.contains("PartialMiss")).findFirst()
.map(runtimeStats::getMetric)
.orElseThrow(() -> new RuntimeException("partial miss on statistics cache should have occurred"));
assertTrue(partialMiss.getCount() > 0);
statistics.getColumnStatistics().forEach((handle, stats) -> {
assertFalse(stats.getDistinctValuesCount().isUnknown());
if (isKllHistogramSupportedType(((IcebergColumnHandle) handle).getType())) {
assertTrue(stats.getHistogram().isPresent());
}
});
}
finally {
assertQuerySucceeds("DROP TABLE " + tableName);
}
}
private TableStatistics getScanStatsEstimate(Session session, @Language("SQL") String sql)
{
Plan plan = plan(sql, session);
TableScanNode node = plan.getPlanIdNodeMap().values().stream()
.filter(planNode -> planNode instanceof TableScanNode)
.map(TableScanNode.class::cast)
.findFirst().get();
return transaction(getQueryRunner().getTransactionManager(), new AllowAllAccessControl())
.singleStatement()
.execute(session, (Function<Session, TableStatistics>) txnSession -> getQueryRunner().getMetadata()
.getTableStatistics(txnSession,
node.getTable(),
ImmutableList.copyOf(node.getAssignments().values()),
new Constraint<>(node.getCurrentConstraint())));
}
private static TableStatistics getTableStatistics(QueryRunner queryRunner, Session session, String table)
{
Metadata meta = queryRunner.getMetadata();
TransactionId txid = queryRunner.getTransactionManager().beginTransaction(false);
Session txnSession = session.beginTransactionId(txid, queryRunner.getTransactionManager(), new AllowAllAccessControl());
Map<String, ColumnHandle> columnHandles = getColumnHandles(queryRunner, table, txnSession);
List<ColumnHandle> columnHandleList = new ArrayList<>(columnHandles.values());
TableStatistics tableStatistics = meta.getTableStatistics(txnSession, getAnalyzeTableHandle(queryRunner, table, txnSession), columnHandleList, Constraint.alwaysTrue());
queryRunner.getTransactionManager().asyncAbort(txid);
return tableStatistics;
}
private TableStatistics getTableStatistics(Session session, String table)
{
return getTableStatistics(getQueryRunner(), session, table);
}
private void columnStatsEqual(Map<ColumnHandle, ColumnStatistics> actualStats, Map<ColumnHandle, ColumnStatistics> expectedStats)
{
for (ColumnHandle handle : expectedStats.keySet()) {
ColumnStatistics expected = expectedStats.get(handle);
if (((IcebergColumnHandle) handle).getColumnType() == PARTITION_KEY) {
handle = new IcebergColumnHandle(
((IcebergColumnHandle) handle).getColumnIdentity(),
((IcebergColumnHandle) handle).getType(),
((IcebergColumnHandle) handle).getComment(),
REGULAR,
handle.getRequiredSubfields());
}
ColumnStatistics actual = actualStats.get(handle);
assertEquals(actual.getRange(), expected.getRange(), "range for col: " + handle);
assertEquals(actual.getNullsFraction(), expected.getNullsFraction(), "nulls fraction for col: " + handle);
assertEquals(actual.getDistinctValuesCount(), expected.getDistinctValuesCount(), "NDVs for col: " + handle);
}
}
private static Constraint<ColumnHandle> constraintWithMinValue(ColumnHandle col, Double min)
{
return new Constraint<>(
TupleDomain.withColumnDomains(
ImmutableMap.of(col, Domain.create(ValueSet.ofRanges(Range.greaterThan(DOUBLE, min)), true))));
}
private static TableHandle getAnalyzeTableHandle(QueryRunner queryRunner, String tableName, Session session)
{
Metadata meta = queryRunner.getMetadata();
return meta.getTableHandleForStatisticsCollection(
session,
new QualifiedObjectName(session.getCatalog().get(), session.getSchema().get(), tableName.toLowerCase(Locale.US)),
Collections.emptyMap()).get();
}
private TableHandle getAnalyzeTableHandle(String tableName, Session session)
{
return getAnalyzeTableHandle(getQueryRunner(), tableName, session);
}
private static TableHandle getTableHandle(QueryRunner queryRunner, String tableName, Session session)
{
MetadataResolver resolver = queryRunner.getMetadata().getMetadataResolver(session);
return resolver.getTableHandle(new QualifiedObjectName(session.getCatalog().get(), session.getSchema().get(), tableName.toLowerCase(Locale.US))).get();
}
private static Map<String, ColumnHandle> getColumnHandles(QueryRunner queryRunner, String tableName, Session session)
{
return queryRunner.getMetadata().getColumnHandles(session, getTableHandle(queryRunner, tableName, session)).entrySet().stream()
.filter(entry -> !IcebergMetadataColumn.isMetadataColumnId(((IcebergColumnHandle) (entry.getValue())).getId()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
private Map<String, ColumnHandle> getColumnHandles(String tableName, Session session)
{
return getColumnHandles(getQueryRunner(), tableName, session);
}
static void assertStatValuePresent(StatsSchema column, MaterializedResult result, Set<String> columnNames)
{
assertStatValue(column, result, columnNames, null, true);
}
static void assertStatValueAbsent(StatsSchema column, MaterializedResult result, Set<String> columnNames)
{
assertStatValue(column, result, columnNames, null, false);
}
static void assertStatValue(StatsSchema column, MaterializedResult result, Set<String> columnNames, Object value, boolean assertNot)
{
result.getMaterializedRows().forEach(row -> {
if (columnNames.contains((String) row.getField(StatsSchema.COLUMN_NAME.ordinal()))) {
Object resultValue = row.getField(column.ordinal());
if (assertNot) {
assertNotEquals(resultValue, value);
}
else {
assertEquals(resultValue, value);
}
}
});
}
private void deleteTableStatistics(String tableName)
{
Table icebergTable = loadTable(tableName);
UpdateStatistics statsUpdate = icebergTable.updateStatistics();
icebergTable.statisticsFiles().stream().map(StatisticsFile::snapshotId).forEach(statsUpdate::removeStatistics);
statsUpdate.commit();
}
private Table loadTable(String tableName)
{
tableName = normalizeIdentifier(tableName, ICEBERG_CATALOG);
CatalogManager catalogManager = getDistributedQueryRunner().getCoordinator().getCatalogManager();
ConnectorId connectorId = catalogManager.getCatalog(ICEBERG_CATALOG).get().getConnectorId();
return IcebergUtil.getHiveIcebergTable(getFileHiveMetastore(),
getHdfsEnvironment(),
new IcebergHiveTableOperationsConfig(),
new ManifestFileCache(CacheBuilder.newBuilder().build(), false, 0, 1024),
getQueryRunner().getDefaultSession().toConnectorSession(connectorId),
new IcebergCatalogName(ICEBERG_CATALOG),
SchemaTableName.valueOf("tpch." + tableName));
}
protected ExtendedHiveMetastore getFileHiveMetastore()
{
IcebergFileHiveMetastore fileHiveMetastore = new IcebergFileHiveMetastore(getHdfsEnvironment(),
Optional.of(getCatalogDirectory(HIVE))
.filter(File::exists)
.map(File::getPath)
.orElseThrow(() -> new RuntimeException("Catalog directory does not exist: " + getCatalogDirectory(HIVE))),
"test");
return memoizeMetastore(fileHiveMetastore, false, 1000, 0);
}
protected static HdfsEnvironment getHdfsEnvironment()
{
HiveClientConfig hiveClientConfig = new HiveClientConfig();
MetastoreClientConfig metastoreClientConfig = new MetastoreClientConfig();
HdfsConfiguration hdfsConfiguration = new HiveHdfsConfiguration(new HdfsConfigurationInitializer(hiveClientConfig, metastoreClientConfig),
ImmutableSet.of(),
hiveClientConfig);
return new HdfsEnvironment(hdfsConfiguration, metastoreClientConfig, new NoHdfsAuthentication());
}
protected File getCatalogDirectory(CatalogType catalogType)
{
Path dataDirectory = getDistributedQueryRunner().getCoordinator().getDataDirectory();
switch (catalogType) {
case HIVE:
return dataDirectory
.resolve(TEST_DATA_DIRECTORY)
.resolve(HIVE.name())
.toFile();
case HADOOP:
case NESSIE:
return dataDirectory.toFile();
}
throw new PrestoException(NOT_SUPPORTED, "Unsupported Presto Iceberg catalog type " + catalogType);
}
private static Map<String, ColumnStatistics> getColumnNameMap(TableStatistics statistics)
{
return statistics.getColumnStatistics().entrySet().stream().collect(Collectors.toMap(e ->
((IcebergColumnHandle) e.getKey()).getName(),
Map.Entry::getValue));
}
static void assertNDVsPresent(TableStatistics stats)
{
for (Map.Entry<ColumnHandle, ColumnStatistics> entry : stats.getColumnStatistics().entrySet()) {
assertFalse(entry.getValue().getDistinctValuesCount().isUnknown(), entry.getKey() + " NDVs are unknown");
}
}
static void assertNDVsNotPresent(TableStatistics stats)
{
for (Map.Entry<ColumnHandle, ColumnStatistics> entry : stats.getColumnStatistics().entrySet()) {
assertTrue(entry.getValue().getDistinctValuesCount().isUnknown(), entry.getKey() + " NDVs are not unknown");
}
}
}