LocalDispatchQuery.java
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.dispatcher;
import com.facebook.airlift.log.Logger;
import com.facebook.presto.Session;
import com.facebook.presto.common.ErrorCode;
import com.facebook.presto.event.QueryMonitor;
import com.facebook.presto.eventlistener.EventListenerManager;
import com.facebook.presto.execution.ClusterSizeMonitor;
import com.facebook.presto.execution.ExecutionFailureInfo;
import com.facebook.presto.execution.QueryExecution;
import com.facebook.presto.execution.QueryState;
import com.facebook.presto.execution.QueryStateMachine;
import com.facebook.presto.execution.StateMachine.StateChangeListener;
import com.facebook.presto.server.BasicQueryInfo;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.QueryId;
import com.facebook.presto.spi.WarningCollector;
import com.facebook.presto.spi.prerequisites.QueryPrerequisites;
import com.facebook.presto.spi.prerequisites.QueryPrerequisitesContext;
import com.facebook.presto.spi.resourceGroups.ResourceGroupQueryLimits;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import io.airlift.units.Duration;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import static com.facebook.airlift.concurrent.MoreFutures.addExceptionCallback;
import static com.facebook.airlift.concurrent.MoreFutures.addSuccessCallback;
import static com.facebook.airlift.concurrent.MoreFutures.tryGetFutureValue;
import static com.facebook.presto.execution.QueryState.FAILED;
import static com.facebook.presto.execution.QueryState.QUEUED;
import static com.facebook.presto.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR;
import static com.facebook.presto.spi.StandardErrorCode.USER_CANCELED;
import static com.facebook.presto.util.Failures.toFailure;
import static com.google.common.util.concurrent.Futures.nonCancellationPropagating;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
public class LocalDispatchQuery
implements DispatchQuery
{
private static final Logger log = Logger.get(LocalDispatchQuery.class);
private final QueryStateMachine stateMachine;
private final QueryMonitor queryMonitor;
private final ListenableFuture<QueryExecution> queryExecutionFuture;
private final ClusterSizeMonitor clusterSizeMonitor;
private final Executor queryExecutor;
private final Consumer<DispatchQuery> queryQueuer;
private final Consumer<QueryExecution> querySubmitter;
private final SettableFuture<?> submitted = SettableFuture.create();
private final AtomicReference<Optional<ResourceGroupQueryLimits>> resourceGroupQueryLimits = new AtomicReference<>(Optional.empty());
private final boolean retry;
private final QueryPrerequisites queryPrerequisites;
private final WarningCollector warningCollector;
/**
* Local dispatch query encapsulates QueryExecution and submit to the ResourceGroupManager waiting for resource to get executed.
*
* @param stateMachine the state machine to keep track of the state of the query
* @param queryMonitor the query monitor records information to the {@link EventListenerManager}
* @param queryExecutionFuture the query execution future
* @param clusterSizeMonitor the cluster size monitor provides a method to obtain a listener object when the minimum number of workers for the cluster has been met
* @param queryExecutor the query executor is used to start a future for query to get executed. This will trigger the query execution phase by involving {@code querySubmitter}
* @param queryQueuer the query queuer is used to register the query that is being queued while waiting for query prerequisites being returned
* @param querySubmitter the query submitter takes in query execution object. This will trigger query to start executed with {@link com.facebook.presto.execution.SqlQueryManager}
* @param retry if this is a retry query
* @param queryPrerequisites the query prerequisites are conditions when the query is ready to be queued for execution
*/
public LocalDispatchQuery(
QueryStateMachine stateMachine,
QueryMonitor queryMonitor,
ListenableFuture<QueryExecution> queryExecutionFuture,
ClusterSizeMonitor clusterSizeMonitor,
Executor queryExecutor,
Consumer<DispatchQuery> queryQueuer,
Consumer<QueryExecution> querySubmitter,
boolean retry,
QueryPrerequisites queryPrerequisites)
{
this.stateMachine = requireNonNull(stateMachine, "stateMachine is null");
this.queryMonitor = requireNonNull(queryMonitor, "queryMonitor is null");
this.queryExecutionFuture = requireNonNull(queryExecutionFuture, "queryExecutionFuture is null");
this.clusterSizeMonitor = requireNonNull(clusterSizeMonitor, "clusterSizeMonitor is null");
this.queryExecutor = requireNonNull(queryExecutor, "queryExecutor is null");
this.queryQueuer = requireNonNull(queryQueuer, "queryQueuer is null");
this.querySubmitter = requireNonNull(querySubmitter, "querySubmitter is null");
this.retry = retry;
this.queryPrerequisites = requireNonNull(queryPrerequisites, "queryPrerequisites is null");
this.warningCollector = requireNonNull(stateMachine.getWarningCollector(), "warningCollector is null");
addExceptionCallback(queryExecutionFuture, throwable -> {
if (stateMachine.transitionToFailed(throwable)) {
queryMonitor.queryImmediateFailureEvent(stateMachine.getBasicQueryInfo(Optional.empty()), toFailure(throwable));
}
});
stateMachine.addStateChangeListener(state -> {
if (state.isDone()) {
submitted.set(null);
}
});
}
@Override
public void startWaitingForPrerequisites()
{
// It's possible that queryExecution fails before we start for prerequisites, in that case, don't even
// start waiting for prerequisites
if (isDone()) {
return;
}
try {
Session session = stateMachine.getSession();
CompletableFuture<?> prerequisitesFuture = queryPrerequisites.waitForPrerequisites(
stateMachine.getQueryId(),
new QueryPrerequisitesContext(
session.getCatalog(),
session.getSchema(),
stateMachine.getBasicQueryInfo(Optional.empty()).getQuery(),
session.getSystemProperties(),
session.getConnectorProperties()),
warningCollector);
addStateChangeListener(state -> {
if (state.isDone()) {
queryPrerequisites.queryFinished(stateMachine.getQueryId());
if (!prerequisitesFuture.isDone()) {
prerequisitesFuture.cancel(true);
}
}
});
prerequisitesFuture.whenCompleteAsync((result, throwable) -> {
if (throwable != null) {
fail(throwable);
return;
}
queueQuery();
}, queryExecutor);
}
catch (Throwable t) {
fail(t);
throw t;
}
}
private void queueQuery()
{
if (stateMachine.transitionToQueued()) {
try {
queryQueuer.accept(this);
}
catch (Throwable t) {
fail(t);
}
}
}
@Override
public void startWaitingForResources()
{
if (stateMachine.transitionToWaitingForResources()) {
waitForMinimumCoordinatorSidecarsAndWorkers();
}
}
private void waitForMinimumCoordinatorSidecarsAndWorkers()
{
ListenableFuture<?> minimumResourcesFuture = Futures.allAsList(
clusterSizeMonitor.waitForMinimumCoordinatorSidecars(),
clusterSizeMonitor.waitForMinimumWorkers());
// when worker and sidecar requirement is met, wait for query execution to finish construction and then start the execution
addSuccessCallback(minimumResourcesFuture, () -> {
// It's the time to end waiting for resources
boolean isDispatching = stateMachine.transitionToDispatching();
addSuccessCallback(queryExecutionFuture, queryExecution -> startExecution(queryExecution, isDispatching));
});
addExceptionCallback(minimumResourcesFuture, throwable -> queryExecutor.execute(() -> fail(throwable)));
}
private void startExecution(QueryExecution queryExecution, boolean isDispatching)
{
queryExecutor.execute(() -> {
if (isDispatching) {
try {
resourceGroupQueryLimits.get().ifPresent(queryExecution::setResourceGroupQueryLimits);
querySubmitter.accept(queryExecution);
}
catch (Throwable t) {
// this should never happen but be safe
fail(t);
log.error(t, "query submitter threw exception");
throw t;
}
finally {
submitted.set(null);
}
}
});
}
@Override
public void recordHeartbeat()
{
stateMachine.recordHeartbeat();
}
@Override
public long getLastHeartbeatInMillis()
{
return stateMachine.getLastHeartbeatInMillis();
}
@Override
public ListenableFuture<?> getDispatchedFuture()
{
return nonCancellationPropagating(submitted);
}
@Override
public DispatchInfo getDispatchInfo()
{
// observe submitted before getting the state, to ensure a failed query stat is visible
boolean dispatched = submitted.isDone();
BasicQueryInfo queryInfo = stateMachine.getBasicQueryInfo(Optional.empty());
if (queryInfo.getState() == FAILED) {
ExecutionFailureInfo failureInfo = stateMachine.getFailureInfo()
.orElseGet(() -> toFailure(new PrestoException(GENERIC_INTERNAL_ERROR, "Query failed for an unknown reason")));
return DispatchInfo.failed(failureInfo, queryInfo.getQueryStats().getElapsedTime(), queryInfo.getQueryStats().getWaitingForPrerequisitesTime(), queryInfo.getQueryStats().getQueuedTime());
}
if (dispatched) {
return DispatchInfo.dispatched(new LocalCoordinatorLocation(), queryInfo.getQueryStats().getElapsedTime(), queryInfo.getQueryStats().getWaitingForPrerequisitesTime(), queryInfo.getQueryStats().getQueuedTime());
}
if (queryInfo.getState() == QUEUED) {
return DispatchInfo.queued(queryInfo.getQueryStats().getElapsedTime(), queryInfo.getQueryStats().getWaitingForPrerequisitesTime(), queryInfo.getQueryStats().getQueuedTime());
}
return DispatchInfo.waitingForPrerequisites(queryInfo.getQueryStats().getElapsedTime(), queryInfo.getQueryStats().getWaitingForPrerequisitesTime());
}
@Override
public QueryId getQueryId()
{
return stateMachine.getQueryId();
}
@Override
public boolean isDone()
{
return stateMachine.getQueryState().isDone();
}
@Override
public long getCreateTimeInMillis()
{
return stateMachine.getCreateTimeInMillis();
}
@Override
public long getExecutionStartTimeInMillis()
{
return stateMachine.getExecutionStartTimeInMillis();
}
@Override
public long getEndTimeInMillis()
{
return stateMachine.getEndTimeInMillis();
}
@Override
public Duration getTotalCpuTime()
{
return tryGetQueryExecution()
.map(QueryExecution::getTotalCpuTime)
.orElseGet(() -> new Duration(0, MILLISECONDS));
}
@Override
public long getTotalMemoryReservationInBytes()
{
return tryGetQueryExecution()
.map(QueryExecution::getTotalMemoryReservationInBytes)
.orElse(0L);
}
@Override
public long getUserMemoryReservationInBytes()
{
return tryGetQueryExecution()
.map(QueryExecution::getUserMemoryReservationInBytes)
.orElse(0L);
}
public int getRunningTaskCount()
{
return stateMachine.getCurrentRunningTaskCount();
}
@Override
public BasicQueryInfo getBasicQueryInfo()
{
return tryGetQueryExecution()
.map(QueryExecution::getBasicQueryInfo)
.orElseGet(() -> stateMachine.getBasicQueryInfo(Optional.empty()));
}
@Override
public Session getSession()
{
return stateMachine.getSession();
}
@Override
public void fail(Throwable throwable)
{
if (stateMachine.transitionToFailed(throwable)) {
queryMonitor.queryImmediateFailureEvent(stateMachine.getBasicQueryInfo(Optional.empty()), toFailure(throwable));
}
}
@Override
public void cancel()
{
if (stateMachine.transitionToCanceled()) {
BasicQueryInfo queryInfo = stateMachine.getBasicQueryInfo(Optional.empty());
ExecutionFailureInfo failureInfo = queryInfo.getFailureInfo();
failureInfo = failureInfo != null ? failureInfo : toFailure(new PrestoException(USER_CANCELED, "Query was canceled"));
queryMonitor.queryImmediateFailureEvent(queryInfo, failureInfo);
}
}
@Override
public void pruneExpiredQueryInfo()
{
stateMachine.pruneQueryInfoExpired();
}
@Override
public void pruneFinishedQueryInfo()
{
stateMachine.pruneQueryInfoFinished();
}
@Override
public Optional<ErrorCode> getErrorCode()
{
return stateMachine.getFailureInfo().map(ExecutionFailureInfo::getErrorCode);
}
@Override
public boolean isRetry()
{
return retry;
}
@Override
public void addStateChangeListener(StateChangeListener<QueryState> stateChangeListener)
{
stateMachine.addStateChangeListener(stateChangeListener);
}
@Override
public Optional<ResourceGroupQueryLimits> getResourceGroupQueryLimits()
{
return resourceGroupQueryLimits.get();
}
@Override
public void setResourceGroupQueryLimits(ResourceGroupQueryLimits resourceGroupQueryLimits)
{
if (!this.resourceGroupQueryLimits.compareAndSet(Optional.empty(), Optional.of(requireNonNull(resourceGroupQueryLimits, "resourceGroupQueryLimits is null")))) {
throw new IllegalStateException("Cannot set resourceGroupQueryLimits more than once");
}
}
private Optional<QueryExecution> tryGetQueryExecution()
{
try {
return tryGetFutureValue(queryExecutionFuture);
}
catch (Throwable ignored) {
return Optional.empty();
}
}
}