DispatchManager.java

/*
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.facebook.presto.dispatcher;

import com.facebook.airlift.concurrent.BoundedExecutor;
import com.facebook.presto.Session;
import com.facebook.presto.common.analyzer.PreparedQuery;
import com.facebook.presto.common.resourceGroups.QueryType;
import com.facebook.presto.execution.QueryIdGenerator;
import com.facebook.presto.execution.QueryInfo;
import com.facebook.presto.execution.QueryManagerConfig;
import com.facebook.presto.execution.QueryManagerStats;
import com.facebook.presto.execution.QueryTracker;
import com.facebook.presto.execution.resourceGroups.ResourceGroupManager;
import com.facebook.presto.execution.warnings.WarningCollectorFactory;
import com.facebook.presto.resourcemanager.ClusterQueryTrackerService;
import com.facebook.presto.resourcemanager.ClusterStatusSender;
import com.facebook.presto.server.BasicQueryInfo;
import com.facebook.presto.server.SessionContext;
import com.facebook.presto.server.SessionPropertyDefaults;
import com.facebook.presto.server.SessionSupplier;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.QueryId;
import com.facebook.presto.spi.analyzer.AnalyzerOptions;
import com.facebook.presto.spi.analyzer.QueryPreparerProvider;
import com.facebook.presto.spi.resourceGroups.SelectionContext;
import com.facebook.presto.spi.resourceGroups.SelectionCriteria;
import com.facebook.presto.spi.security.AccessControl;
import com.facebook.presto.sql.analyzer.QueryPreparerProviderManager;
import com.facebook.presto.transaction.TransactionManager;
import com.google.common.util.concurrent.AbstractFuture;
import com.google.common.util.concurrent.ListenableFuture;
import org.weakref.jmx.Flatten;
import org.weakref.jmx.Managed;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.inject.Inject;

import java.security.Principal;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.Executor;

import static com.facebook.presto.Session.SessionBuilder;
import static com.facebook.presto.SystemSessionProperties.getAnalyzerType;
import static com.facebook.presto.metadata.SessionPropertyManager.createTestingSessionPropertyManager;
import static com.facebook.presto.spi.StandardErrorCode.QUERY_TEXT_TOO_LARGE;
import static com.facebook.presto.util.AnalyzerUtil.createAnalyzerOptions;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.util.concurrent.Futures.immediateFuture;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;

/**
 * This class defines the Query dispatch process handled by Dispatch Manager
 */
public class DispatchManager
{
    private final QueryIdGenerator queryIdGenerator;
    private final ResourceGroupManager<?> resourceGroupManager;
    private final WarningCollectorFactory warningCollectorFactory;
    private final DispatchQueryFactory dispatchQueryFactory;
    private final FailedDispatchQueryFactory failedDispatchQueryFactory;
    private final TransactionManager transactionManager;
    private final AccessControl accessControl;
    private final SessionSupplier sessionSupplier;
    private final SessionPropertyDefaults sessionPropertyDefaults;

    private final int maxQueryLength;

    private final Executor queryExecutor;
    private final BoundedExecutor boundedQueryExecutor;

    private final ClusterStatusSender clusterStatusSender;

    private final QueryTracker<DispatchQuery> queryTracker;

    private final QueryManagerStats stats = new QueryManagerStats();

    private final QueryPreparerProviderManager queryPreparerProviderManager;

    /**
     * Dispatch Manager is used for the pre-queuing part of queries prior to the query execution phase.
     *
     * Dispatch Manager object is instantiated when the presto server is launched by server bootstrap time. It is a critical component in resource management section of the query.
     *
     * @param queryIdGenerator query ID generator for generating a new query ID when a query is created
     * @param queryPreparerProviderManager provides access to registered query preparer providers
     * @param resourceGroupManager the resource group manager to select corresponding resource group for query to retrieve basic information from session context for selection context
     * @param warningCollectorFactory the warning collector factory to collect presto warning in a query session
     * @param dispatchQueryFactory the dispatch query factory is used to create a {@link DispatchQuery} object.  The dispatch query is submitted to the {@link ResourceGroupManager} which enqueues the query.
     * @param failedDispatchQueryFactory the failed dispatch query factory is used to register a failed query
     * @param transactionManager the transaction manager is used to active existing transaction if this is a transaction control statement
     * @param accessControl the access control is used as part of activate transaction operation
     * @param sessionSupplier the session supplier to create a query session
     * @param sessionPropertyDefaults allow dispatch manager to apply system default session properties
     * @param queryManagerConfig contains all query manager config properties
     * @param dispatchExecutor the dispatch executor contains both pre-queued query executor {@link BoundedExecutor} and post-queued query executor {@link Executor}
     * @param clusterStatusSender An API to register a created query to resource manager for sending heartbeat and start task execution
     */
    @Inject
    public DispatchManager(
            QueryIdGenerator queryIdGenerator,
            QueryPreparerProviderManager queryPreparerProviderManager,
            @SuppressWarnings("rawtypes") ResourceGroupManager resourceGroupManager,
            WarningCollectorFactory warningCollectorFactory,
            DispatchQueryFactory dispatchQueryFactory,
            FailedDispatchQueryFactory failedDispatchQueryFactory,
            TransactionManager transactionManager,
            AccessControl accessControl,
            SessionSupplier sessionSupplier,
            SessionPropertyDefaults sessionPropertyDefaults,
            QueryManagerConfig queryManagerConfig,
            DispatchExecutor dispatchExecutor,
            ClusterStatusSender clusterStatusSender,
            Optional<ClusterQueryTrackerService> clusterQueryTrackerService)
    {
        this.queryIdGenerator = requireNonNull(queryIdGenerator, "queryIdGenerator is null");
        this.queryPreparerProviderManager = requireNonNull(queryPreparerProviderManager, "queryPreparerProviderManager is null");
        this.resourceGroupManager = requireNonNull(resourceGroupManager, "resourceGroupManager is null");
        this.warningCollectorFactory = requireNonNull(warningCollectorFactory, "warningCollectorFactory is null");
        this.dispatchQueryFactory = requireNonNull(dispatchQueryFactory, "dispatchQueryFactory is null");
        this.failedDispatchQueryFactory = requireNonNull(failedDispatchQueryFactory, "failedDispatchQueryFactory is null");
        this.transactionManager = requireNonNull(transactionManager, "transactionManager is null");
        this.accessControl = requireNonNull(accessControl, "accessControl is null");
        this.sessionSupplier = requireNonNull(sessionSupplier, "sessionSupplier is null");
        this.sessionPropertyDefaults = requireNonNull(sessionPropertyDefaults, "sessionPropertyDefaults is null");

        this.maxQueryLength = queryManagerConfig.getMaxQueryLength();

        this.queryExecutor = requireNonNull(dispatchExecutor, "dispatchExecutor is null").getExecutor();
        this.boundedQueryExecutor = requireNonNull(dispatchExecutor, "dispatchExecutor is null").getBoundedExecutor();

        this.clusterStatusSender = requireNonNull(clusterStatusSender, "clusterStatusSender is null");

        this.queryTracker = new QueryTracker<>(queryManagerConfig, dispatchExecutor.getScheduledExecutor(), clusterQueryTrackerService);
    }

    /**
     * Start query tracker as a background task.
     */
    @PostConstruct
    public void start()
    {
        queryTracker.start();
    }

    /**
     * Stop any running queries and cancel background tasks if any.
     */
    @PreDestroy
    public void stop()
    {
        queryTracker.stop();
    }

    /**
     * This method returns the statistics from query manager
     *
     * @return {@link QueryManagerStats}
     */
    @Managed
    @Flatten
    public QueryManagerStats getStats()
    {
        return stats;
    }

    /**
     * Create a query id
     *
     * This method is called when a {@code Query} object is created
     *
     * @return {@link QueryId}
     */
    public QueryId createQueryId()
    {
        return queryIdGenerator.createNextQueryId();
    }

    /**
     * Create a listenable future to start executing a query for a given queryID and slug
     * <br>
     * This method instantiates a dispatch query with the query tracker. The logic flow is as follows:
     * <ol>
     *     <li> Check to see if the query is too big. This is to protect the coordinator not be overwhelmed </li>
     *     <li> Take the raw session information from {@code sessionContext} into a genuine session object of the query that can be used to check for access control,
     *     privacy/security guarded by session properties, check for user query id, etc </li>
     *     <li> {@code prepareQuery} is responsible for calling SQL parsing and generate abstract syntax tree (AST). This wil return a query object with placeholders
     *     for prepared statement to fill in the actual query execution</li>
     *     <li> Select corresponding resource group {@link ResourceGroupManager} for the query which is done in two steps
     *     <ul>
     *         <li> Retrieve basic information from the session context and use this to prepare for selection context.</li>
     *         <li> The selection context will then be used by the {@link ResourceGroupManager} to figure out what resource group to go to
     *         and which resource group the query should belong to. </li>
     *     </ul>
     *     <li> Enhance the session with session property defaults. User may use the plugin feature to provide default session property overrides
     *     for dynamically configurable feature-toggle type of use cases. </li>
     *     <li> Create a {@link DispatchQuery} object. The dispatch query is submitted to the {@link ResourceGroupManager} which enqueues the query. </li>
     *     <li> The event of creating the dispatch query is logged after registering to the query tracker which is used to keep track of the state of the query.
     *     The log is done by adding a state change listener to the query.
     *     The state transition listener is useful to understand the state when a query has moved from created to running, running to error completed. </li>
     *     <li> Once dispatch query object is created and it's registered with the query tracker, start sending heard beat to indicate that this query is now running
     *     to the {@link ResourceGroupManager}. This is no-op for no disaggregated coordinator setup</li>
     *     <li> invoke query prerequisite manager by {@code startWaitingForResources} to start process pre-resource management stage.
     *     <ul>
     *         <li>This is to allow user to add a plugin and custom functionality to the query prior to it getting queued so that user may for instance prepare for something
     *         prior to that query getting queued. By default this is a no-op</li>
     *         <li> proceed to queue the query {@code queueQuery()} which internally change the state machine of the local dispatch query as {@code QUEUED},
     *         and then call query queuer to submit the query to the {@link ResourceGroupManager} </li>
     *     </ul>
     *     </li>
     * </ol>
     *
     * @param queryId the query id
     * @param slug the query slug
     * @param retryCount per-query retry limit due to communication failures
     * @param sessionContext the raw session context
     * @param query the query in String
     * @return the listenable future
     * @see ResourceGroupManager <a href="https://prestodb.io/docs/current/admin/resource-groups.html">Resource Groups</a>
     */
    public ListenableFuture<?> createQuery(QueryId queryId, String slug, int retryCount, SessionContext sessionContext, String query)
    {
        requireNonNull(queryId, "queryId is null");
        requireNonNull(sessionContext, "sessionFactory is null");
        requireNonNull(query, "query is null");
        checkArgument(!query.isEmpty(), "query must not be empty string");
        checkArgument(!queryTracker.tryGetQuery(queryId).isPresent(), "query %s already exists", queryId);

        DispatchQueryCreationFuture queryCreationFuture = new DispatchQueryCreationFuture();
        boundedQueryExecutor.execute(() -> {
            try {
                createQueryInternal(queryId, slug, retryCount, sessionContext, query, resourceGroupManager);
            }
            finally {
                queryCreationFuture.set(null);
            }
        });
        return queryCreationFuture;
    }

    /**
     * Creates and registers a dispatch query with the query tracker.  This method will never fail to register a query with the query
     * tracker. If an error occurs while creating a dispatch query, a failed dispatch will be created and registered.
     */
    private <C> void createQueryInternal(QueryId queryId, String slug, int retryCount, SessionContext sessionContext, String query, ResourceGroupManager<C> resourceGroupManager)
    {
        Session session = null;
        SessionBuilder sessionBuilder = null;
        PreparedQuery preparedQuery;
        try {
            if (query.length() > maxQueryLength) {
                int queryLength = query.length();
                query = query.substring(0, maxQueryLength);
                throw new PrestoException(QUERY_TEXT_TOO_LARGE, format("Query text length (%s) exceeds the maximum length (%s)", queryLength, maxQueryLength));
            }

            // decode session
            sessionBuilder = sessionSupplier.createSessionBuilder(queryId, sessionContext, warningCollectorFactory);
            session = sessionBuilder.build();

            // prepare query
            AnalyzerOptions analyzerOptions = createAnalyzerOptions(session, sessionBuilder.getWarningCollector());
            QueryPreparerProvider queryPreparerProvider = queryPreparerProviderManager.getQueryPreparerProvider(getAnalyzerType(session));
            preparedQuery = queryPreparerProvider.getQueryPreparer().prepareQuery(analyzerOptions, query, sessionBuilder.getPreparedStatements(), sessionBuilder.getWarningCollector());
            query = preparedQuery.getFormattedQuery().orElse(query);

            // select resource group
            Optional<QueryType> queryType = preparedQuery.getQueryType();
            sessionBuilder.setQueryType(queryType);
            SelectionContext<C> selectionContext = resourceGroupManager.selectGroup(new SelectionCriteria(
                    sessionContext.getIdentity().getPrincipal().isPresent(),
                    sessionContext.getIdentity().getUser(),
                    Optional.ofNullable(sessionContext.getSource()),
                    sessionContext.getClientTags(),
                    sessionContext.getResourceEstimates(),
                    queryType.map(Enum::name),
                    Optional.ofNullable(sessionContext.getClientInfo()),
                    Optional.ofNullable(sessionContext.getSchema()),
                    sessionContext.getIdentity().getPrincipal().map(Principal::getName)));

            // apply system default session properties (does not override user set properties)
            sessionPropertyDefaults.applyDefaultProperties(sessionBuilder, queryType.map(Enum::name), Optional.of(selectionContext.getResourceGroupId()));

            session = sessionBuilder.build();
            if (sessionContext.getTransactionId().isPresent()) {
                session = session.beginTransactionId(sessionContext.getTransactionId().get(), preparedQuery.isRollbackStatement(), transactionManager, accessControl);
            }

            // mark existing transaction as active
            transactionManager.activateTransaction(session, preparedQuery.isTransactionControlStatement(), accessControl);

            DispatchQuery dispatchQuery = dispatchQueryFactory.createDispatchQuery(
                    session,
                    query,
                    preparedQuery,
                    slug,
                    retryCount,
                    selectionContext.getResourceGroupId(),
                    queryType,
                    session.getWarningCollector(),
                    (dq) -> resourceGroupManager.submit(dq, selectionContext, queryExecutor));

            boolean queryAdded = queryCreated(dispatchQuery);
            if (queryAdded && !dispatchQuery.isDone()) {
                try {
                    clusterStatusSender.registerQuery(dispatchQuery);
                    dispatchQuery.startWaitingForPrerequisites();
                }
                catch (Throwable e) {
                    // dispatch query has already been registered, so just fail it directly
                    dispatchQuery.fail(e);
                }
            }
        }
        catch (Throwable throwable) {
            // creation must never fail, so register a failed query in this case
            if (session == null) {
                session = Session.builder(createTestingSessionPropertyManager())
                        .setQueryId(queryId)
                        .setIdentity(sessionContext.getIdentity())
                        .setSource(sessionContext.getSource())
                        .build();
            }
            DispatchQuery failedDispatchQuery = failedDispatchQueryFactory.createFailedDispatchQuery(session, query, Optional.empty(), throwable);
            queryCreated(failedDispatchQuery);
        }
    }

    private boolean queryCreated(DispatchQuery dispatchQuery)
    {
        boolean queryAdded = queryTracker.addQuery(dispatchQuery);

        // only add state tracking if this query instance will actually be used for the execution
        if (queryAdded) {
            dispatchQuery.addStateChangeListener(newState -> {
                if (newState.isDone()) {
                    // execution MUST be added to the expiration queue or there will be a leak
                    queryTracker.expireQuery(dispatchQuery.getQueryId());
                }
            });
            stats.trackQueryStats(dispatchQuery);
        }

        return queryAdded;
    }

    /**
     * Wait for dispatched listenable future.
     *
     * @param queryId the query id
     * @return the listenable future
     */
    public ListenableFuture<?> waitForDispatched(QueryId queryId)
    {
        return queryTracker.tryGetQuery(queryId)
                .map(dispatchQuery -> {
                    dispatchQuery.recordHeartbeat();
                    return dispatchQuery.getDispatchedFuture();
                })
                .orElseGet(() -> immediateFuture(null));
    }

    /**
     * Return a list of {@link BasicQueryInfo}.
     *
     */
    public List<BasicQueryInfo> getQueries()
    {
        return queryTracker.getAllQueries().stream()
                .map(DispatchQuery::getBasicQueryInfo)
                .collect(toImmutableList());
    }

    /**
     * Return a lightweight query info.
     *
     * @param queryId the query id
     * @return {@link BasicQueryInfo}
     */
    public BasicQueryInfo getQueryInfo(QueryId queryId)
    {
        return queryTracker.getQuery(queryId).getBasicQueryInfo();
    }

    /**
     * Return dispatch info
     *
     * @param queryId the query id
     * @return an optional of {@link DispatchInfo}
     */
    public Optional<DispatchInfo> getDispatchInfo(QueryId queryId)
    {
        return queryTracker.tryGetQuery(queryId)
                .map(dispatchQuery -> {
                    dispatchQuery.recordHeartbeat();
                    return dispatchQuery.getDispatchInfo();
                });
    }

    /**
     * Check if a given queryId exists in query tracker
     *
     * @param queryId the query id
     */
    public boolean isQueryPresent(QueryId queryId)
    {
        return queryTracker.tryGetQuery(queryId).isPresent();
    }

    /**
     * For a given queryId, trigger immediate query failure if exists in query tracker along with a given reason
     *
     * @param queryId the query id
     * @param cause the cause
     */
    public void failQuery(QueryId queryId, Throwable cause)
    {
        requireNonNull(cause, "cause is null");

        queryTracker.tryGetQuery(queryId)
                .ifPresent(query -> query.fail(cause));
    }

    /**
     * For a given queryId, make the query state Cancel and trigger immediate query failure.
     *
     * @param queryId the query id
     */
    public void cancelQuery(QueryId queryId)
    {
        queryTracker.tryGetQuery(queryId)
                .ifPresent(DispatchQuery::cancel);
    }

    private static class DispatchQueryCreationFuture
            extends AbstractFuture<QueryInfo>
    {
        @Override
        protected boolean set(QueryInfo value)
        {
            return super.set(value);
        }

        @Override
        protected boolean setException(Throwable throwable)
        {
            return super.setException(throwable);
        }

        @Override
        public boolean cancel(boolean mayInterruptIfRunning)
        {
            // query submission can not be canceled
            return false;
        }
    }
}