QuickStatsProvider.java

/*
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package com.facebook.presto.hive.statistics;

import com.facebook.airlift.concurrent.BoundedExecutor;
import com.facebook.airlift.concurrent.ThreadPoolExecutorMBean;
import com.facebook.airlift.log.Logger;
import com.facebook.airlift.stats.TimeStat;
import com.facebook.presto.common.RuntimeUnit;
import com.facebook.presto.hive.DirectoryLister;
import com.facebook.presto.hive.HdfsContext;
import com.facebook.presto.hive.HdfsEnvironment;
import com.facebook.presto.hive.HiveClientConfig;
import com.facebook.presto.hive.HiveDirectoryContext;
import com.facebook.presto.hive.HiveFileInfo;
import com.facebook.presto.hive.NamenodeStats;
import com.facebook.presto.hive.PartitionNameWithVersion;
import com.facebook.presto.hive.filesystem.ExtendedFileSystem;
import com.facebook.presto.hive.metastore.ExtendedHiveMetastore;
import com.facebook.presto.hive.metastore.MetastoreContext;
import com.facebook.presto.hive.metastore.Partition;
import com.facebook.presto.hive.metastore.PartitionStatistics;
import com.facebook.presto.hive.metastore.StorageFormat;
import com.facebook.presto.hive.metastore.Table;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.SchemaTableName;
import com.google.common.base.Stopwatch;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat;
import org.apache.hadoop.mapred.InputFormat;
import org.weakref.jmx.Managed;
import org.weakref.jmx.Nested;

import java.io.IOException;
import java.time.Instant;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

import static com.facebook.airlift.concurrent.Threads.daemonThreadsNamed;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_BAD_DATA;
import static com.facebook.presto.hive.HivePartition.UNPARTITIONED_ID;
import static com.facebook.presto.hive.HiveSessionProperties.getQuickStatsBackgroundBuildTimeout;
import static com.facebook.presto.hive.HiveSessionProperties.getQuickStatsInlineBuildTimeout;
import static com.facebook.presto.hive.HiveSessionProperties.isQuickStatsEnabled;
import static com.facebook.presto.hive.HiveSessionProperties.isSkipEmptyFilesEnabled;
import static com.facebook.presto.hive.HiveSessionProperties.isUseListDirectoryCache;
import static com.facebook.presto.hive.HiveUtil.buildDirectoryContextProperties;
import static com.facebook.presto.hive.HiveUtil.getInputFormat;
import static com.facebook.presto.hive.HiveUtil.getTargetPathsHiveFileInfos;
import static com.facebook.presto.hive.HiveUtil.readSymlinkPaths;
import static com.facebook.presto.hive.NestedDirectoryPolicy.IGNORED;
import static com.facebook.presto.hive.NestedDirectoryPolicy.RECURSE;
import static com.facebook.presto.hive.metastore.PartitionStatistics.empty;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static java.util.concurrent.CompletableFuture.allOf;
import static java.util.concurrent.CompletableFuture.supplyAsync;
import static java.util.concurrent.Executors.newCachedThreadPool;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static java.util.stream.Collectors.toMap;

public class QuickStatsProvider
{
    public static final Logger log = Logger.get(QuickStatsProvider.class);
    public static final long MAX_CACHE_ENTRIES = 1_000_000L;
    private final Executor backgroundFetchExecutor;
    private final ThreadPoolExecutorMBean backgroundFetchExecutorMBean;
    private final ScheduledExecutorService inProgressReaperExecutor = new ScheduledThreadPoolExecutor(1, daemonThreadsNamed("in-progress-reaper"));
    private final HdfsEnvironment hdfsEnvironment;
    private final DirectoryLister directoryLister;
    private final List<QuickStatsBuilder> statsBuilderStrategies;
    private final boolean recursiveDirWalkerEnabled;
    private final ConcurrentHashMap<String, InProgressBuildInfo> inProgressBuilds = new ConcurrentHashMap<>();
    private final AtomicLong requestCount = new AtomicLong(0L);
    private final AtomicLong succesfulResolveFromCacheCount = new AtomicLong(0L);
    private final AtomicLong succesfulResolveFromProviderCount = new AtomicLong(0L);
    private final long reaperExpiryMillis;
    private final Cache<String, PartitionStatistics> partitionToStatsCache;
    private final NamenodeStats nameNodeStats;
    private final TimeStat buildDuration = new TimeStat(MILLISECONDS);
    private final ExtendedHiveMetastore metastore;

    public QuickStatsProvider(ExtendedHiveMetastore metastore,
            HdfsEnvironment hdfsEnvironment,
            DirectoryLister directoryLister,
            HiveClientConfig hiveClientConfig,
            NamenodeStats nameNodeStats,
            List<QuickStatsBuilder> statsBuilderStrategies)
    {
        this.metastore = metastore;
        this.hdfsEnvironment = hdfsEnvironment;
        this.directoryLister = directoryLister;
        this.recursiveDirWalkerEnabled = hiveClientConfig.getRecursiveDirWalkerEnabled();
        this.partitionToStatsCache = CacheBuilder.newBuilder()
                .maximumSize(MAX_CACHE_ENTRIES)
                .expireAfterWrite(hiveClientConfig.getQuickStatsCacheExpiry().roundTo(SECONDS), SECONDS)
                .build();
        this.reaperExpiryMillis = hiveClientConfig.getQuickStatsReaperExpiry().toMillis();
        this.nameNodeStats = nameNodeStats;
        this.statsBuilderStrategies = statsBuilderStrategies;
        ExecutorService coreExecutor = newCachedThreadPool(daemonThreadsNamed("quick-stats-bg-fetch-%s"));
        this.backgroundFetchExecutor = new BoundedExecutor(coreExecutor, hiveClientConfig.getMaxConcurrentQuickStatsCalls());
        this.backgroundFetchExecutorMBean = new ThreadPoolExecutorMBean((ThreadPoolExecutor) coreExecutor);
    }

    @Managed
    public long getRequestCount()
    {
        return requestCount.get();
    }

    @Managed
    public long getSuccesfulResolveFromCacheCount()
    {
        return succesfulResolveFromCacheCount.get();
    }

    @Managed
    public long getSuccesfulResolveFromProviderCount()
    {
        return succesfulResolveFromProviderCount.get();
    }

    @Managed
    @Nested
    public TimeStat getBuildDuration()
    {
        return buildDuration;
    }

    @Managed
    public Map<String, Instant> getInProgressBuildsSnapshot()
    {
        return inProgressBuilds.entrySet().stream().collect(toImmutableMap(Map.Entry::getKey, v -> v.getValue().getBuildStart()));
    }

    public Map<String, PartitionStatistics> getQuickStats(ConnectorSession session, SchemaTableName table,
            MetastoreContext metastoreContext, List<String> partitionIds)
    {
        if (!isQuickStatsEnabled(session)) {
            return partitionIds.stream().collect(toMap(k -> k, v -> empty()));
        }

        CompletableFuture<PartitionStatistics>[] partitionQuickStatCompletableFutures = new CompletableFuture[partitionIds.size()];
        for (int counter = 0; counter < partitionIds.size(); counter++) {
            String partitionId = partitionIds.get(counter);
            partitionQuickStatCompletableFutures[counter] = supplyAsync(() -> getQuickStats(session, table, metastoreContext, partitionId), backgroundFetchExecutor);
        }

        try {
            // Wait for all the partitions to get their quick stats
            // If this query is reading a partition for which we do not already have cached quick stats,
            // we will block the execution of the query until the stats are fetched for all such partitions,
            // or we time out waiting for the fetch
            allOf(partitionQuickStatCompletableFutures).get(getQuickStatsInlineBuildTimeoutMillis(session), MILLISECONDS);
        }
        catch (InterruptedException | ExecutionException e) {
            log.error(e);
            throw new RuntimeException(e);
        }
        catch (TimeoutException e) {
            log.warn(e, "Timeout while building quick stats");
        }

        ImmutableMap.Builder<String, PartitionStatistics> result = ImmutableMap.builder();
        for (int counter = 0; counter < partitionQuickStatCompletableFutures.length; counter++) {
            String partitionId = partitionIds.get(counter);
            CompletableFuture<PartitionStatistics> future = partitionQuickStatCompletableFutures[counter];
            if (future.isDone() && !future.isCancelled() && !future.isCompletedExceptionally()) {
                try {
                    result.put(partitionId, future.get());
                }
                catch (InterruptedException | ExecutionException e) {
                    // This should not happen because we checked that the future was completed successfully
                    log.error(e, "Failed to get value for a quick stats future which was completed successfully");
                    throw new RuntimeException(e);
                }
            }
            else {
                // If a future did not finish, or finished exceptionally, we do not add it to the results
                // A new query for the same partition could trigger a successful quick stats fetch for this partition
                result.put(partitionId, empty());
            }
        }

        return result.build();
    }

    public PartitionStatistics getQuickStats(ConnectorSession session, SchemaTableName table,
            MetastoreContext metastoreContext, String partitionId)
    {
        if (!isQuickStatsEnabled(session)) {
            return empty();
        }
        requestCount.incrementAndGet(); // New request was made to resolve quick stats for a partition

        // Check if we already have stats cached in partitionIdToQuickStatsCache. If so return from cache
        String partitionKey = String.join("/", table.toSchemaTablePrefix().toString(), partitionId);

        PartitionStatistics cachedValue = partitionToStatsCache.getIfPresent(partitionKey);
        if (cachedValue != null) {
            succesfulResolveFromCacheCount.incrementAndGet();
            return cachedValue;
        }

        // Check if we already have a quick stats build in progress for this partition key
        if (inProgressBuilds.containsKey(partitionKey)) {
            // Quick stats build is in progress for another query for the same partition
            long backgroundBuildTimeoutMs = getQuickStatsBackgroundBuildTimeout(session).toMillis();
            if (backgroundBuildTimeoutMs > 0) {
                return waitForInProgressBuild(backgroundBuildTimeoutMs, partitionKey);
            }
            else {
                // We don't want to wait for quick stats for this query
                return empty();
            }
        }

        // If not, atomically initiate a call to build quick stats in a background thread
        AtomicReference<CompletableFuture<PartitionStatistics>> partitionStatisticsCompletableFuture = new AtomicReference<>();
        inProgressBuilds.computeIfAbsent(partitionKey, (key) -> {
            CompletableFuture<PartitionStatistics> fetchFuture = supplyAsync(() -> buildQuickStats(partitionKey, partitionId, session, table, metastoreContext), backgroundFetchExecutor);
            partitionStatisticsCompletableFuture.set(fetchFuture);

            return new InProgressBuildInfo(fetchFuture, Instant.now());
        });

        CompletableFuture<PartitionStatistics> future = partitionStatisticsCompletableFuture.get();
        if (future != null) {
            // Add a hook to stop tracking the in-progress build for this partition once the future finishes (successfully or exceptionally)
            future.whenCompleteAsync((r, e) -> inProgressBuilds.remove(partitionKey), inProgressReaperExecutor);

            // Also add a hook to reap this in-progress thread if it doesn't finish in reaperExpiry seconds
            inProgressReaperExecutor.schedule(() -> {
                inProgressBuilds.remove(partitionKey);
                future.cancel(true);
            }, reaperExpiryMillis, MILLISECONDS);

            long inlineBuildTimeoutMillis = getQuickStatsInlineBuildTimeoutMillis(session);
            if (inlineBuildTimeoutMillis > 0) {
                // A background call to build quick stats was started, and we want to wait for quick stats to be built
                // Note : Only the first query that initiated the quick stats call for this partition will have to wait for the stats to be built
                try {
                    PartitionStatistics partitionStatistics = future.get(inlineBuildTimeoutMillis, MILLISECONDS);
                    succesfulResolveFromProviderCount.incrementAndGet(); // successfully resolved quick stats for the partition
                    return partitionStatistics;
                }
                catch (InterruptedException | ExecutionException e) {
                    log.error(e, "Error while building quick stats for partition : %s", partitionId);
                    // Return empty PartitionStats for this partition
                    return empty();
                }
                catch (TimeoutException e) {
                    log.warn(e, "Timeout while building quick stats for partition : %s", partitionId);
                    session.getRuntimeStats().addMetricValue("QuickStatsProvider/QuickStatsBuildTimeout", RuntimeUnit.NONE, 1L);
                    // Return empty PartitionStats for this partition
                    return empty();
                }
            }
            else {
                // We don't wish to wait for the background call to complete
                return empty();
            }
        }
        else {
            // The quick stats inline fetch was pre-empted by another thread
            // We get the up-to-date value by calling getQuickStats again
            return getQuickStats(session, table, metastoreContext, partitionId);
        }
    }

    private PartitionStatistics waitForInProgressBuild(long waitTimeMs, String partitionKey)
    {
        try {
            // Fetch and wait for the future from the already in-progress build triggered by another query
            return inProgressBuilds.get(partitionKey).getQuickStatsBuildFuture().get(waitTimeMs, MILLISECONDS);
        }
        catch (InterruptedException | ExecutionException | TimeoutException e) {
            // The failure or timeout of the future will be logged by the query that initiated the quick stats build
            // We simply return empty stats here
            return empty();
        }
    }

    @Managed
    @Nested
    public ThreadPoolExecutorMBean getExecutor()
    {
        return backgroundFetchExecutorMBean;
    }

    private long getQuickStatsInlineBuildTimeoutMillis(ConnectorSession session)
    {
        return getQuickStatsInlineBuildTimeout(session).toMillis();
    }

    private PartitionStatistics buildQuickStats(String partitionKey, String partitionId, ConnectorSession session, SchemaTableName table,
            MetastoreContext metastoreContext)
    {
        Table resolvedTable = metastore.getTable(metastoreContext, table.getSchemaName(), table.getTableName()).get();
        Optional<Partition> partition;
        Path path;
        StorageFormat storageFormat;
        if (UNPARTITIONED_ID.getPartitionName().equals(partitionId)) {
            partition = Optional.empty();
            path = new Path(resolvedTable.getStorage().getLocation());
            storageFormat = resolvedTable.getStorage().getStorageFormat();
        }
        else {
            partition = metastore.getPartitionsByNames(metastoreContext, table.getSchemaName(), table.getTableName(),
                    ImmutableList.of(new PartitionNameWithVersion(partitionId, Optional.empty()))).get(partitionId);
            checkState(partition.isPresent(), "getPartitionsByNames returned no partitions for partition with name [%s]", partitionId);
            path = new Path(partition.get().getStorage().getLocation());
            storageFormat = partition.get().getStorage().getStorageFormat();
        }

        HdfsContext hdfsContext = new HdfsContext(session, table.getSchemaName(), table.getTableName(), partitionId, false);
        HiveDirectoryContext hiveDirectoryContext = new HiveDirectoryContext(recursiveDirWalkerEnabled ? RECURSE : IGNORED, isUseListDirectoryCache(session),
                isSkipEmptyFilesEnabled(session), hdfsContext.getIdentity(), buildDirectoryContextProperties(session), session.getRuntimeStats());
        ExtendedFileSystem fs;
        try {
            fs = hdfsEnvironment.getFileSystem(hdfsContext, path);
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }

        Iterator<HiveFileInfo> fileList = directoryLister.list(fs, resolvedTable, path, partition, nameNodeStats, hiveDirectoryContext);

        InputFormat<?, ?> inputFormat = getInputFormat(hdfsEnvironment.getConfiguration(hdfsContext, path), storageFormat.getInputFormat(), storageFormat.getSerDe(), false);
        if (inputFormat instanceof SymlinkTextInputFormat) {
            // For symlinks, follow the paths in the manifest file and create a new iterator of the target files
            try {
                List<Path> targetPaths = readSymlinkPaths(fs, fileList);

                Map<Path, List<Path>> parentToTargets = targetPaths.stream().collect(Collectors.groupingBy(Path::getParent));

                ImmutableList.Builder<HiveFileInfo> targetFileInfoList = ImmutableList.builder();

                for (Map.Entry<Path, List<Path>> entry : parentToTargets.entrySet()) {
                    targetFileInfoList.addAll(getTargetPathsHiveFileInfos(
                            path,
                            partition,
                            entry.getKey(),
                            entry.getValue(),
                            hiveDirectoryContext,
                            fs,
                            directoryLister,
                            resolvedTable,
                            nameNodeStats,
                            session));
                }

                fileList = targetFileInfoList.build().iterator();
            }
            catch (IOException e) {
                throw new PrestoException(HIVE_BAD_DATA, "Error parsing symlinks", e);
            }
        }

        PartitionQuickStats partitionQuickStats = PartitionQuickStats.EMPTY;
        Stopwatch buildStopwatch = Stopwatch.createStarted();
        // Build quick stats one by one from statsBuilderStrategies. Do this until we get a non-empty PartitionQuickStats
        for (QuickStatsBuilder strategy : statsBuilderStrategies) {
            partitionQuickStats = strategy.buildQuickStats(session, metastore, table, metastoreContext, partitionId, fileList);

            if (partitionQuickStats != PartitionQuickStats.EMPTY) {
                // Strategy successfully resolved stats, don't explore other strategies
                // TODO : We can order the strategies based on table metadata, e.g Iceberg tables could use the IcebergQuickStatsBuilder first
                break;
            }
        }

        long buildMillis = buildStopwatch.elapsed(MILLISECONDS);
        session.getRuntimeStats().addMetricValue("QuickStatsProvider/BuildTimeMS/" + partitionKey, RuntimeUnit.NONE, buildMillis);
        buildDuration.add(buildMillis, MILLISECONDS);

        PartitionStatistics partitionStatistics = PartitionQuickStats.convertToPartitionStatistics(partitionQuickStats);

        // Update the cache with the computed partition stats
        partitionToStatsCache.put(partitionKey, partitionStatistics);

        return partitionStatistics;
    }

    private static class InProgressBuildInfo
    {
        private final CompletableFuture<PartitionStatistics> quickStatsBuildFuture;
        private final Instant buildStart;

        public InProgressBuildInfo(CompletableFuture<PartitionStatistics> quickStatsBuildFuture, Instant buildStart)
        {
            this.quickStatsBuildFuture = quickStatsBuildFuture;
            this.buildStart = buildStart;
        }

        public CompletableFuture<PartitionStatistics> getQuickStatsBuildFuture()
        {
            return quickStatsBuildFuture;
        }

        public Instant getBuildStart()
        {
            return buildStart;
        }
    }
}