ParquetQuickStatsBuilder.java
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.hive.statistics;
import com.facebook.airlift.concurrent.BoundedExecutor;
import com.facebook.airlift.concurrent.ThreadPoolExecutorMBean;
import com.facebook.airlift.log.Logger;
import com.facebook.airlift.stats.DistributionStat;
import com.facebook.airlift.stats.TimeStat;
import com.facebook.presto.common.RuntimeUnit;
import com.facebook.presto.hive.FileFormatDataSourceStats;
import com.facebook.presto.hive.HdfsContext;
import com.facebook.presto.hive.HdfsEnvironment;
import com.facebook.presto.hive.HiveClientConfig;
import com.facebook.presto.hive.HiveFileContext;
import com.facebook.presto.hive.HiveFileInfo;
import com.facebook.presto.hive.PartitionNameWithVersion;
import com.facebook.presto.hive.metastore.ExtendedHiveMetastore;
import com.facebook.presto.hive.metastore.MetastoreContext;
import com.facebook.presto.hive.metastore.Partition;
import com.facebook.presto.hive.metastore.StorageFormat;
import com.facebook.presto.hive.metastore.Table;
import com.facebook.presto.parquet.ParquetDataSource;
import com.facebook.presto.parquet.cache.ParquetFileMetadata;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.SchemaTableName;
import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableList;
import io.airlift.slice.Slice;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.column.statistics.DoubleStatistics;
import org.apache.parquet.column.statistics.FloatStatistics;
import org.apache.parquet.column.statistics.IntStatistics;
import org.apache.parquet.column.statistics.LongStatistics;
import org.apache.parquet.column.statistics.Statistics;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
import org.apache.parquet.hadoop.metadata.ColumnPath;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.schema.LogicalTypeAnnotation.DateLogicalTypeAnnotation;
import org.apache.parquet.schema.LogicalTypeAnnotation.LogicalTypeAnnotationVisitor;
import org.apache.parquet.schema.LogicalTypeAnnotation.TimeLogicalTypeAnnotation;
import org.apache.parquet.schema.PrimitiveType;
import org.weakref.jmx.Managed;
import org.weakref.jmx.Nested;
import java.time.LocalDate;
import java.time.chrono.ChronoLocalDate;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeoutException;
import static com.facebook.airlift.concurrent.Threads.daemonThreadsNamed;
import static com.facebook.presto.hive.CacheQuota.NO_CACHE_CONSTRAINTS;
import static com.facebook.presto.hive.HiveCommonSessionProperties.getReadNullMaskedParquetEncryptedValue;
import static com.facebook.presto.hive.HivePartition.UNPARTITIONED_ID;
import static com.facebook.presto.hive.parquet.HdfsParquetDataSource.buildHdfsParquetDataSource;
import static com.facebook.presto.hive.parquet.ParquetPageSourceFactory.PARQUET_SERDE_CLASS_NAMES;
import static com.facebook.presto.hive.parquet.ParquetPageSourceFactory.createDecryptor;
import static com.facebook.presto.parquet.cache.MetadataReader.readFooter;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.CompletableFuture.supplyAsync;
import static java.util.concurrent.Executors.newCachedThreadPool;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
public class ParquetQuickStatsBuilder
implements QuickStatsBuilder
{
public static final Logger log = Logger.get(ParquetQuickStatsBuilder.class);
private final Executor footerFetchExecutor;
private final ThreadPoolExecutorMBean footerFetchExecutorMBean;
private final HdfsEnvironment hdfsEnvironment;
private final FileFormatDataSourceStats stats;
private final long footerFetchTimeoutMillis;
private final TimeStat footerFetchDuration = new TimeStat(MILLISECONDS);
private final DistributionStat fileCountPerPartition = new DistributionStat();
private final DistributionStat footerByteSizeDistribution = new DistributionStat();
public ParquetQuickStatsBuilder(FileFormatDataSourceStats stats, HdfsEnvironment hdfsEnvironment, HiveClientConfig hiveClientConfig)
{
this.stats = stats;
this.hdfsEnvironment = hdfsEnvironment;
this.footerFetchTimeoutMillis = hiveClientConfig.getParquetQuickStatsFileMetadataFetchTimeout().roundTo(MILLISECONDS);
ExecutorService coreExecutor = newCachedThreadPool(daemonThreadsNamed("parquet-quick-stats-bg-fetch-%s"));
this.footerFetchExecutor = new BoundedExecutor(coreExecutor, hiveClientConfig.getMaxConcurrentParquetQuickStatsCalls());
this.footerFetchExecutorMBean = new ThreadPoolExecutorMBean((ThreadPoolExecutor) coreExecutor);
}
private static void processColumnMetadata(ParquetMetadata parquetMetadata, Map<ColumnPath, ColumnQuickStats<?>> rolledUpColStats)
{
List<BlockMetaData> rowGroups = parquetMetadata.getBlocks();
for (BlockMetaData rowGroup : rowGroups) {
long rowCount = rowGroup.getRowCount();
for (ColumnChunkMetaData columnChunkMetaData : rowGroup.getColumns()) {
ColumnPath columnKey = columnChunkMetaData.getPath();
if (columnKey.size() > 1) {
// We do not support reading/using stats for nested columns at the moment. These columns have a HiveColumnHandle#ColumnType == SYNTHESIZED
// TODO : When we do add this support, map the column handles to the parquet path to build stats for these nested columns
continue;
}
String columnName = columnKey.toArray()[0];
PrimitiveType columnPrimitiveType = columnChunkMetaData.getPrimitiveType();
Statistics colStats = columnChunkMetaData.getStatistics();
long nullsCount = colStats.getNumNulls();
// We set the default the mapped column type to SLICE since this treats the column as a collection of bytes
// with no min/max stats. The only relevant stats are NULL count and ROW count
ColumnType mappedType = ColumnType.SLICE;
switch (columnPrimitiveType.getPrimitiveTypeName()) {
case INT64:
mappedType = ColumnType.LONG;
break;
case INT32:
mappedType = ColumnType.INTEGER;
break;
case BOOLEAN:
mappedType = ColumnType.BOOLEAN;
break;
case BINARY:
// BINARY primitive type should be mapped to SLICE since it won't have a min/max
mappedType = ColumnType.SLICE;
break;
case FLOAT:
mappedType = ColumnType.FLOAT;
break;
case DOUBLE:
mappedType = ColumnType.DOUBLE;
break;
default:
case INT96:
case FIXED_LEN_BYTE_ARRAY:
break;
}
if (columnPrimitiveType.getLogicalTypeAnnotation() != null) {
// Use logical information to decipher stats info for specific logical types
Optional<ColumnType> transformed = columnPrimitiveType.getLogicalTypeAnnotation().accept(new LogicalTypeAnnotationVisitor<ColumnType>()
{
@Override
public Optional<ColumnType> visit(DateLogicalTypeAnnotation dateLogicalType)
{
return Optional.of(ColumnType.DATE);
}
@Override
public Optional<ColumnType> visit(TimeLogicalTypeAnnotation timeLogicalType)
{
return Optional.of(ColumnType.TIME);
}
});
if (transformed.isPresent()) {
mappedType = transformed.get();
}
}
switch (mappedType) {
case INTEGER: {
ColumnQuickStats<Integer> toMerge = (ColumnQuickStats<Integer>) rolledUpColStats.getOrDefault(columnKey, new ColumnQuickStats<>(columnName, Integer.class));
IntStatistics asIntegerStats = ((IntStatistics) colStats);
toMerge.setMinValue(asIntegerStats.getMin());
toMerge.setMaxValue(asIntegerStats.getMax());
toMerge.addToNullsCount(nullsCount);
toMerge.addToRowCount(rowCount);
rolledUpColStats.put(columnKey, toMerge);
break;
}
case LONG: {
ColumnQuickStats<Long> toMerge = (ColumnQuickStats<Long>) rolledUpColStats.getOrDefault(columnKey, new ColumnQuickStats<>(columnName, Long.class));
LongStatistics asLongStats = ((LongStatistics) colStats);
toMerge.setMinValue(asLongStats.getMin());
toMerge.setMaxValue(asLongStats.getMax());
toMerge.addToNullsCount(nullsCount);
toMerge.addToRowCount(rowCount);
rolledUpColStats.put(columnKey, toMerge);
break;
}
case DOUBLE: {
ColumnQuickStats<Double> toMerge = (ColumnQuickStats<Double>) rolledUpColStats.getOrDefault(columnKey, new ColumnQuickStats<>(columnName, Double.class));
DoubleStatistics asDoubleStats = ((DoubleStatistics) colStats);
toMerge.setMinValue(asDoubleStats.getMin());
toMerge.setMaxValue(asDoubleStats.getMax());
toMerge.addToNullsCount(nullsCount);
toMerge.addToRowCount(rowCount);
rolledUpColStats.put(columnKey, toMerge);
break;
}
case FLOAT: {
ColumnQuickStats<Float> toMerge = (ColumnQuickStats<Float>) rolledUpColStats.getOrDefault(columnKey, new ColumnQuickStats<>(columnName, Float.class));
FloatStatistics asFloatStats = ((FloatStatistics) colStats);
toMerge.setMinValue(asFloatStats.getMin());
toMerge.setMaxValue(asFloatStats.getMax());
toMerge.addToNullsCount(nullsCount);
toMerge.addToRowCount(rowCount);
rolledUpColStats.put(columnKey, toMerge);
break;
}
case BOOLEAN: {
ColumnQuickStats<Boolean> toMerge = (ColumnQuickStats<Boolean>) rolledUpColStats.getOrDefault(columnKey, new ColumnQuickStats<>(columnName, Boolean.class));
toMerge.addToNullsCount(nullsCount);
toMerge.addToRowCount(rowCount);
// TODO : Boolean stats store trueCount and falseCount
rolledUpColStats.put(columnKey, toMerge);
break;
}
case DATE: {
ColumnQuickStats<ChronoLocalDate> toMerge = (ColumnQuickStats<ChronoLocalDate>) rolledUpColStats.getOrDefault(columnKey,
new ColumnQuickStats<>(columnName, ChronoLocalDate.class));
IntStatistics asIntStats = ((IntStatistics) colStats);
toMerge.setMinValue(LocalDate.ofEpochDay(asIntStats.getMin()));
toMerge.setMaxValue(LocalDate.ofEpochDay(asIntStats.getMax()));
toMerge.addToNullsCount(nullsCount);
toMerge.addToRowCount(rowCount);
rolledUpColStats.put(columnKey, toMerge);
break;
}
default:
case SLICE: {
ColumnQuickStats<Slice> toMerge = (ColumnQuickStats<Slice>) rolledUpColStats.getOrDefault(columnKey, new ColumnQuickStats<>(columnName, Slice.class));
toMerge.addToNullsCount(nullsCount);
toMerge.addToRowCount(rowCount);
rolledUpColStats.put(columnKey, toMerge);
break;
}
}
}
}
}
@Managed
@Nested
public TimeStat getFooterFetchDuration()
{
return footerFetchDuration;
}
@Managed
@Nested
public DistributionStat getFooterByteSizeDistribution()
{
return footerByteSizeDistribution;
}
@Managed
@Nested
public DistributionStat getFileCountPerPartitionDistribution()
{
return fileCountPerPartition;
}
@Managed
@Nested
public ThreadPoolExecutorMBean getExecutor()
{
return footerFetchExecutorMBean;
}
@Override
public PartitionQuickStats buildQuickStats(ConnectorSession session, ExtendedHiveMetastore metastore,
SchemaTableName table, MetastoreContext metastoreContext, String partitionId, Iterator<HiveFileInfo> files)
{
requireNonNull(session);
requireNonNull(metastore);
requireNonNull(table);
requireNonNull(metastoreContext);
requireNonNull(partitionId);
requireNonNull(files);
if (!files.hasNext()) {
return PartitionQuickStats.EMPTY;
}
// TODO: Consider refactoring storage and/or table format to the interface when we implement an ORC/Iceberg quick stats builder
StorageFormat storageFormat;
if (UNPARTITIONED_ID.getPartitionName().equals(partitionId)) {
Table resolvedTable = metastore.getTable(metastoreContext, table.getSchemaName(), table.getTableName()).get();
storageFormat = resolvedTable.getStorage().getStorageFormat();
}
else {
Partition partition = metastore.getPartitionsByNames(metastoreContext, table.getSchemaName(), table.getTableName(),
ImmutableList.of(new PartitionNameWithVersion(partitionId, Optional.empty()))).get(partitionId).get();
storageFormat = partition.getStorage().getStorageFormat();
}
if (!PARQUET_SERDE_CLASS_NAMES.contains(storageFormat.getSerDe())) {
// Not a parquet table/partition
return PartitionQuickStats.EMPTY;
}
// We want to keep the number of files we use to build quick stats bounded, so that
// 1. We can control total file IO overhead in a measurable way
// 2. Planning time remains bounded
// Future work here is to sample the file list, read their stats only and extrapolate the overall stats (TODO)
List<CompletableFuture<ParquetMetadata>> footerFetchCompletableFutures = new ArrayList<>();
int filesCount = 0;
while (files.hasNext()) {
HiveFileInfo file = files.next();
filesCount++;
Path path = new Path(file.getPath());
long fileSize = file.getLength();
HiveFileContext hiveFileContext = new HiveFileContext(
true,
NO_CACHE_CONSTRAINTS,
Optional.empty(),
OptionalLong.of(fileSize),
OptionalLong.empty(),
OptionalLong.empty(),
file.getFileModifiedTime(),
false);
HdfsContext hdfsContext = new HdfsContext(session, table.getSchemaName(), table.getTableName());
Configuration configuration = hdfsEnvironment.getConfiguration(hdfsContext, path);
footerFetchCompletableFutures.add(supplyAsync(() -> {
Stopwatch footerFetchDuration = Stopwatch.createStarted();
try (FSDataInputStream inputStream = hdfsEnvironment.getFileSystem(hdfsContext, path).openFile(path, hiveFileContext);
ParquetDataSource parquetDataSource = buildHdfsParquetDataSource(inputStream, path, stats)) {
ParquetFileMetadata parquetFileMetadata = readFooter(parquetDataSource,
fileSize,
createDecryptor(configuration, path),
getReadNullMaskedParquetEncryptedValue(session));
footerByteSizeDistribution.add(parquetFileMetadata.getMetadataSize());
return parquetFileMetadata.getParquetMetadata();
}
catch (Exception e) {
log.error(e);
throw new RuntimeException(e);
}
finally {
this.footerFetchDuration.add(footerFetchDuration.elapsed(MILLISECONDS), MILLISECONDS);
}
}, footerFetchExecutor));
}
// Record a metric about how many files were seen
session.getRuntimeStats().addMetricValue(String.format("ParquetQuickStatsBuilder/FileCount/%s/%s", table.getTableName(), partitionId), RuntimeUnit.NONE, filesCount);
fileCountPerPartition.add(filesCount);
HashMap<ColumnPath, ColumnQuickStats<?>> rolledUpColStats = new HashMap<>();
try {
// Wait for footer reads to finish
CompletableFuture<Void> overallCompletableFuture = CompletableFuture.allOf(footerFetchCompletableFutures.toArray(new CompletableFuture[0]));
overallCompletableFuture.get(footerFetchTimeoutMillis, MILLISECONDS);
for (CompletableFuture<ParquetMetadata> future : footerFetchCompletableFutures) {
ParquetMetadata parquetMetadata = future.get();
processColumnMetadata(parquetMetadata, rolledUpColStats);
}
}
catch (InterruptedException | ExecutionException | TimeoutException e) {
log.error(e, "Failed to read/build stats from parquet footer");
throw new RuntimeException(e);
}
if (rolledUpColStats.isEmpty()) {
return PartitionQuickStats.EMPTY;
}
return new PartitionQuickStats(partitionId, rolledUpColStats.values(), filesCount);
}
enum ColumnType
{
INTEGER,
LONG,
FLOAT,
DOUBLE,
SLICE,
DATE,
TIME,
BOOLEAN
}
}