LocalDispatchQueryFactory.java
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.dispatcher;
import com.facebook.presto.Session;
import com.facebook.presto.common.analyzer.PreparedQuery;
import com.facebook.presto.common.resourceGroups.QueryType;
import com.facebook.presto.event.QueryMonitor;
import com.facebook.presto.execution.ClusterSizeMonitor;
import com.facebook.presto.execution.ExecutionFactoriesManager;
import com.facebook.presto.execution.LocationFactory;
import com.facebook.presto.execution.QueryExecution;
import com.facebook.presto.execution.QueryExecution.QueryExecutionFactory;
import com.facebook.presto.execution.QueryManager;
import com.facebook.presto.execution.QueryStateMachine;
import com.facebook.presto.execution.resourceGroups.ResourceGroupManager;
import com.facebook.presto.metadata.Metadata;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.WarningCollector;
import com.facebook.presto.spi.resourceGroups.ResourceGroupId;
import com.facebook.presto.spi.security.AccessControl;
import com.facebook.presto.sql.analyzer.AnalyzerProviderManager;
import com.facebook.presto.tracing.NoopTracerProvider;
import com.facebook.presto.tracing.QueryStateTracingListener;
import com.facebook.presto.transaction.TransactionManager;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import javax.inject.Inject;
import java.util.Optional;
import java.util.function.Consumer;
import static com.facebook.presto.SystemSessionProperties.getAnalyzerType;
import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
import static java.util.Objects.requireNonNull;
/**
* The local dispatch query factory is responsible for creating a query in {@link QueryManager} that will begin to execute the query
*/
public class LocalDispatchQueryFactory
implements DispatchQueryFactory
{
private final QueryManager queryManager;
private final TransactionManager transactionManager;
private final AccessControl accessControl;
private final Metadata metadata;
private final QueryMonitor queryMonitor;
private final LocationFactory locationFactory;
private final ClusterSizeMonitor clusterSizeMonitor;
private final ExecutionFactoriesManager executionFactoriesManager;
private final ListeningExecutorService executor;
private final QueryPrerequisitesManager queryPrerequisitesManager;
private final AnalyzerProviderManager analyzerProviderManager;
/**
* Instantiates a new Local dispatch query factory.
*
* @param queryManager the query manager
* @param transactionManager the transaction manager
* @param accessControl the access control
* @param metadata the metadata
* @param queryMonitor the query monitor
* @param locationFactory the location factory
* @param executionFactoriesManager the execution factories manager
* @param clusterSizeMonitor the cluster size monitor
* @param dispatchExecutor the dispatch executor
* @param queryPrerequisitesManager the query prerequisites manager
*/
@Inject
public LocalDispatchQueryFactory(
QueryManager queryManager,
TransactionManager transactionManager,
AccessControl accessControl,
Metadata metadata,
QueryMonitor queryMonitor,
LocationFactory locationFactory,
ExecutionFactoriesManager executionFactoriesManager,
ClusterSizeMonitor clusterSizeMonitor,
DispatchExecutor dispatchExecutor,
QueryPrerequisitesManager queryPrerequisitesManager,
AnalyzerProviderManager analyzerProviderManager)
{
this.queryManager = requireNonNull(queryManager, "queryManager is null");
this.transactionManager = requireNonNull(transactionManager, "transactionManager is null");
this.accessControl = requireNonNull(accessControl, "accessControl is null");
this.metadata = requireNonNull(metadata, "metadata is null");
this.queryMonitor = requireNonNull(queryMonitor, "queryMonitor is null");
this.locationFactory = requireNonNull(locationFactory, "locationFactory is null");
this.executionFactoriesManager = requireNonNull(executionFactoriesManager, "executionFactoriesManager is null");
this.clusterSizeMonitor = requireNonNull(clusterSizeMonitor, "clusterSizeMonitor is null");
this.executor = requireNonNull(dispatchExecutor, "executorService is null").getExecutor();
this.queryPrerequisitesManager = requireNonNull(queryPrerequisitesManager, "queryPrerequisitesManager is null");
this.analyzerProviderManager = requireNonNull(analyzerProviderManager, "analyzerProviderManager is null");
}
/**
* This method instantiates a new dispatch query object as part of preparing phase for pre-query execution.
*
* The dispatch query is submitted to the {@link ResourceGroupManager} which enqueues the query.
* The event of creating the dispatch query is logged after registering to the query tracker which is used to keep track of the state of the query.
* The log is done by adding a state change listener to the query.
* The state transition listener is useful to understand the state when a query has moved from created to running, running to error completed.
* Once dispatch query object is created and it's registered with the query tracker, start sending heard beat to indicate that this query is now running
* to the {@link ResourceGroupManager}. This is no-op for no disaggregated coordinator setup
*
* @param session the session
* @param analyzerProvider the analyzer provider
* @param query the query
* @param preparedQuery the prepared query
* @param slug the unique query slug for each {@code Query} object
* @param retryCount the query retry count
* @param resourceGroup the resource group to be used
* @param queryType the query type derived from the {@code PreparedQuery statement}
* @param warningCollector the warning collector
* @param queryQueuer the query queuer is invoked when a query is to submit to the {@link com.facebook.presto.execution.resourceGroups.ResourceGroupManager}
* @return
*/
@Override
public DispatchQuery createDispatchQuery(
Session session,
String query,
PreparedQuery preparedQuery,
String slug,
int retryCount,
ResourceGroupId resourceGroup,
Optional<QueryType> queryType,
WarningCollector warningCollector,
Consumer<DispatchQuery> queryQueuer)
{
QueryStateMachine stateMachine = QueryStateMachine.begin(
query,
preparedQuery.getPrepareSql(),
session,
locationFactory.createQueryLocation(session.getQueryId()),
resourceGroup,
queryType,
preparedQuery.isTransactionControlStatement(),
transactionManager,
accessControl,
executor,
metadata,
warningCollector);
stateMachine.addStateChangeListener(new QueryStateTracingListener(stateMachine.getSession().getTracer().orElse(NoopTracerProvider.NOOP_TRACER)));
queryMonitor.queryCreatedEvent(stateMachine.getBasicQueryInfo(Optional.empty()));
ListenableFuture<QueryExecution> queryExecutionFuture = executor.submit(() -> {
QueryExecutionFactory<?> queryExecutionFactory = executionFactoriesManager.getExecutionFactory(preparedQuery);
if (queryExecutionFactory == null) {
throw new PrestoException(NOT_SUPPORTED, "Unsupported statement type: " + preparedQuery.getStatementClass().getSimpleName());
}
return queryExecutionFactory.createQueryExecution(analyzerProviderManager.getAnalyzerProvider(getAnalyzerType(session)), preparedQuery, stateMachine, slug, retryCount, warningCollector, queryType, accessControl, query);
});
return new LocalDispatchQuery(
stateMachine,
queryMonitor,
queryExecutionFuture,
clusterSizeMonitor,
executor,
queryQueuer,
queryManager::createQuery,
retryCount > 0,
queryPrerequisitesManager);
}
}