HistoryBasedPlanStatisticsCalculator.java

/*
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.facebook.presto.cost;

import com.facebook.presto.Session;
import com.facebook.presto.common.plan.PlanCanonicalizationStrategy;
import com.facebook.presto.spi.plan.PlanNode;
import com.facebook.presto.spi.plan.PlanNodeWithHash;
import com.facebook.presto.spi.statistics.HistoricalPlanStatistics;
import com.facebook.presto.spi.statistics.HistoricalPlanStatisticsEntry;
import com.facebook.presto.spi.statistics.HistoryBasedPlanStatisticsProvider;
import com.facebook.presto.spi.statistics.HistoryBasedSourceInfo;
import com.facebook.presto.spi.statistics.PlanStatistics;
import com.facebook.presto.sql.planner.PlanCanonicalInfoProvider;
import com.facebook.presto.sql.planner.TypeProvider;
import com.facebook.presto.sql.planner.iterative.Lookup;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.function.Supplier;

import static com.facebook.presto.SystemSessionProperties.enableVerboseHistoryBasedOptimizerRuntimeStats;
import static com.facebook.presto.SystemSessionProperties.estimateSizeUsingVariablesForHBO;
import static com.facebook.presto.SystemSessionProperties.getHistoryBasedOptimizerTimeoutLimit;
import static com.facebook.presto.SystemSessionProperties.getHistoryInputTableStatisticsMatchingThreshold;
import static com.facebook.presto.SystemSessionProperties.isVerboseRuntimeStatsEnabled;
import static com.facebook.presto.SystemSessionProperties.useHistoryBasedPlanStatisticsEnabled;
import static com.facebook.presto.common.RuntimeMetricName.HISTORY_OPTIMIZER_QUERY_REGISTRATION_GET_PLAN_NODE_HASHES;
import static com.facebook.presto.common.RuntimeMetricName.HISTORY_OPTIMIZER_QUERY_REGISTRATION_GET_STATISTICS;
import static com.facebook.presto.common.RuntimeUnit.NANO;
import static com.facebook.presto.cost.HistoricalPlanStatisticsUtil.getSelectedHistoricalPlanStatisticsEntry;
import static com.facebook.presto.cost.HistoryBasedPlanStatisticsManager.historyBasedPlanCanonicalizationStrategyList;
import static com.facebook.presto.spi.statistics.PlanStatistics.toConfidenceLevel;
import static com.facebook.presto.sql.planner.iterative.Plans.resolveGroupReferences;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.graph.Traverser.forTree;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.NANOSECONDS;

public class HistoryBasedPlanStatisticsCalculator
        implements StatsCalculator
{
    private final Supplier<HistoryBasedPlanStatisticsProvider> historyBasedPlanStatisticsProvider;
    private final HistoryBasedStatisticsCacheManager historyBasedStatisticsCacheManager;
    private final StatsCalculator delegate;
    private final PlanCanonicalInfoProvider planCanonicalInfoProvider;

    public HistoryBasedPlanStatisticsCalculator(
            Supplier<HistoryBasedPlanStatisticsProvider> historyBasedPlanStatisticsProvider,
            HistoryBasedStatisticsCacheManager historyBasedStatisticsCacheManager,
            StatsCalculator delegate,
            PlanCanonicalInfoProvider planCanonicalInfoProvider)
    {
        this.historyBasedPlanStatisticsProvider = requireNonNull(historyBasedPlanStatisticsProvider, "historyBasedPlanStatisticsProvider is null");
        this.historyBasedStatisticsCacheManager = requireNonNull(historyBasedStatisticsCacheManager, "historyBasedStatisticsCacheManager is null");
        this.delegate = requireNonNull(delegate, "delegate is null");
        this.planCanonicalInfoProvider = requireNonNull(planCanonicalInfoProvider, "planHasher is null");
    }

    @Override
    public PlanNodeStatsEstimate calculateStats(PlanNode node, StatsProvider sourceStats, Lookup lookup, Session session, TypeProvider types)
    {
        PlanNodeStatsEstimate delegateStats = delegate.calculateStats(node, sourceStats, lookup, session, types);
        return getStatistics(node, session, lookup, delegateStats);
    }

    @Override
    public boolean registerPlan(PlanNode root, Session session, long startTimeInNano, long timeoutInMilliseconds)
    {
        // If previous registration timeout for this query, this run is likely to timeout too, return false.
        if (historyBasedStatisticsCacheManager.historyBasedQueryRegistrationTimeout(session.getQueryId())) {
            return false;
        }
        // record the statsEquivalentPlanNode of root node, and do serialization if enabled when query completes to avoid introduce additional latency for HBO optimizer
        if (root.getStatsEquivalentPlanNode().isPresent()) {
            historyBasedStatisticsCacheManager.setStatsEquivalentPlanRootNode(session.getQueryId(), root.getStatsEquivalentPlanNode().get());
        }
        ImmutableList.Builder<PlanNodeWithHash> planNodesWithHash = ImmutableList.builder();
        Iterable<PlanNode> planNodeIterable = forTree(PlanNode::getSources).depthFirstPreOrder(root);
        boolean enableVerboseRuntimeStats = isVerboseRuntimeStatsEnabled(session) || enableVerboseHistoryBasedOptimizerRuntimeStats(session);
        long profileStartTime = 0;
        for (PlanNode plan : planNodeIterable) {
            if (checkTimeOut(startTimeInNano, timeoutInMilliseconds)) {
                historyBasedStatisticsCacheManager.setHistoryBasedQueryRegistrationTimeout(session.getQueryId());
                return false;
            }
            if (plan.getStatsEquivalentPlanNode().isPresent()) {
                if (enableVerboseRuntimeStats) {
                    profileStartTime = System.nanoTime();
                }
                planNodesWithHash.addAll(getPlanNodeHashes(plan, session, false).values());
                if (enableVerboseRuntimeStats) {
                    session.getRuntimeStats().addMetricValue(HISTORY_OPTIMIZER_QUERY_REGISTRATION_GET_PLAN_NODE_HASHES, NANO, System.nanoTime() - profileStartTime);
                }
            }
        }
        try {
            if (enableVerboseRuntimeStats) {
                profileStartTime = System.nanoTime();
            }
            historyBasedStatisticsCacheManager.getStatisticsCache(session.getQueryId(), historyBasedPlanStatisticsProvider, getHistoryBasedOptimizerTimeoutLimit(session).toMillis()).getAll(planNodesWithHash.build());
            if (enableVerboseRuntimeStats) {
                session.getRuntimeStats().addMetricValue(HISTORY_OPTIMIZER_QUERY_REGISTRATION_GET_STATISTICS, NANO, System.nanoTime() - profileStartTime);
            }
        }
        catch (ExecutionException e) {
            throw new RuntimeException("Unable to register plan: ", e.getCause());
        }

        if (checkTimeOut(startTimeInNano, timeoutInMilliseconds)) {
            historyBasedStatisticsCacheManager.setHistoryBasedQueryRegistrationTimeout(session.getQueryId());
        }
        // Return true even if get empty history statistics, so that HistoricalStatisticsEquivalentPlanMarkingOptimizer still return the plan with StatsEquivalentPlanNode which
        // will be used in populating history statistics
        return true;
    }

    private boolean checkTimeOut(long startTimeInNano, long timeoutInMilliseconds)
    {
        return NANOSECONDS.toMillis(System.nanoTime() - startTimeInNano) > timeoutInMilliseconds;
    }

    @VisibleForTesting
    public PlanCanonicalInfoProvider getPlanCanonicalInfoProvider()
    {
        return planCanonicalInfoProvider;
    }

    @VisibleForTesting
    public StatsCalculator getDelegate()
    {
        return delegate;
    }

    @VisibleForTesting
    public Supplier<HistoryBasedPlanStatisticsProvider> getHistoryBasedPlanStatisticsProvider()
    {
        return historyBasedPlanStatisticsProvider;
    }

    private Map<PlanCanonicalizationStrategy, PlanNodeWithHash> getPlanNodeHashes(PlanNode plan, Session session, boolean cacheOnly)
    {
        if (!useHistoryBasedPlanStatisticsEnabled(session) || !plan.getStatsEquivalentPlanNode().isPresent()) {
            return ImmutableMap.of();
        }

        PlanNode statsEquivalentPlanNode = plan.getStatsEquivalentPlanNode().get();
        ImmutableMap.Builder<PlanCanonicalizationStrategy, PlanNodeWithHash> allHashesBuilder = ImmutableMap.builder();
        for (PlanCanonicalizationStrategy strategy : historyBasedPlanCanonicalizationStrategyList(session)) {
            Optional<String> hash = planCanonicalInfoProvider.hash(session, statsEquivalentPlanNode, strategy, cacheOnly);
            if (hash.isPresent()) {
                allHashesBuilder.put(strategy, new PlanNodeWithHash(statsEquivalentPlanNode, hash));
            }
        }

        return allHashesBuilder.build();
    }

    private PlanNodeStatsEstimate getStatistics(PlanNode planNode, Session session, Lookup lookup, PlanNodeStatsEstimate delegateStats)
    {
        if (!useHistoryBasedPlanStatisticsEnabled(session)) {
            return delegateStats;
        }

        PlanNode plan = resolveGroupReferences(planNode, lookup);
        Map<PlanCanonicalizationStrategy, PlanNodeWithHash> allHashes = getPlanNodeHashes(plan, session, true);

        Map<PlanNodeWithHash, HistoricalPlanStatistics> statistics = ImmutableMap.of();
        try {
            statistics = historyBasedStatisticsCacheManager
                    .getStatisticsCache(session.getQueryId(), historyBasedPlanStatisticsProvider, getHistoryBasedOptimizerTimeoutLimit(session).toMillis())
                    .getAll(allHashes.values().stream().distinct().collect(toImmutableList()));
        }
        catch (ExecutionException e) {
            throw new RuntimeException(format("Unable to get plan statistics for %s", planNode), e.getCause());
        }
        double historyMatchingThreshold = getHistoryInputTableStatisticsMatchingThreshold(session);
        // Return statistics corresponding to first strategy that we find, in order specified by `historyBasedPlanCanonicalizationStrategyList`
        for (PlanCanonicalizationStrategy strategy : historyBasedPlanCanonicalizationStrategyList(session)) {
            for (Map.Entry<PlanNodeWithHash, HistoricalPlanStatistics> entry : statistics.entrySet()) {
                if (allHashes.containsKey(strategy) && entry.getKey().getHash().isPresent() && allHashes.get(strategy).equals(entry.getKey())) {
                    Optional<List<PlanStatistics>> inputTableStatistics = getPlanNodeInputTableStatistics(plan, session, strategy, true);
                    if (inputTableStatistics.isPresent()) {
                        Optional<HistoricalPlanStatisticsEntry> historicalPlanStatisticsEntry = getSelectedHistoricalPlanStatisticsEntry(entry.getValue(), inputTableStatistics.get(), historyMatchingThreshold);
                        if (historicalPlanStatisticsEntry.isPresent()) {
                            PlanStatistics predictedPlanStatistics = historicalPlanStatisticsEntry.get().getPlanStatistics();
                            if ((toConfidenceLevel(predictedPlanStatistics.getConfidence()).getConfidenceOrdinal() >= delegateStats.confidenceLevel().getConfidenceOrdinal())) {
                                return delegateStats.combineStats(
                                        predictedPlanStatistics,
                                        new HistoryBasedSourceInfo(entry.getKey().getHash(), inputTableStatistics, Optional.ofNullable(historicalPlanStatisticsEntry.get().getHistoricalPlanStatisticsEntryInfo()), estimateSizeUsingVariablesForHBO(session)));
                            }
                        }
                    }
                }
            }
        }

        return delegateStats;
    }

    private Optional<List<PlanStatistics>> getPlanNodeInputTableStatistics(PlanNode plan, Session session, PlanCanonicalizationStrategy strategy, boolean cacheOnly)
    {
        if (!useHistoryBasedPlanStatisticsEnabled(session) || !plan.getStatsEquivalentPlanNode().isPresent()) {
            return Optional.empty();
        }

        PlanNode statsEquivalentPlanNode = plan.getStatsEquivalentPlanNode().get();
        return planCanonicalInfoProvider.getInputTableStatistics(session, statsEquivalentPlanNode, strategy, cacheOnly);
    }
}