Query.java

/*
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.facebook.presto.server.protocol;

import com.facebook.airlift.log.Logger;
import com.facebook.presto.Session;
import com.facebook.presto.client.Column;
import com.facebook.presto.client.FailureInfo;
import com.facebook.presto.client.QueryError;
import com.facebook.presto.client.QueryResults;
import com.facebook.presto.client.StatementStats;
import com.facebook.presto.common.ErrorCode;
import com.facebook.presto.common.Page;
import com.facebook.presto.common.block.BlockEncodingSerde;
import com.facebook.presto.common.transaction.TransactionId;
import com.facebook.presto.common.type.BooleanType;
import com.facebook.presto.common.type.StandardTypes;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.execution.QueryExecution;
import com.facebook.presto.execution.QueryInfo;
import com.facebook.presto.execution.QueryManager;
import com.facebook.presto.execution.QueryState;
import com.facebook.presto.execution.StageInfo;
import com.facebook.presto.execution.buffer.PagesSerdeFactory;
import com.facebook.presto.operator.ExchangeClient;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.QueryId;
import com.facebook.presto.spi.function.SqlFunctionId;
import com.facebook.presto.spi.function.SqlInvokedFunction;
import com.facebook.presto.spi.page.PagesSerde;
import com.facebook.presto.spi.page.SerializedPage;
import com.facebook.presto.spi.security.SelectedRole;
import com.facebook.presto.spi.tracing.Tracer;
import com.facebook.presto.transaction.TransactionInfo;
import com.facebook.presto.transaction.TransactionManager;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import io.airlift.slice.DynamicSliceOutput;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;

import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.UriBuilder;
import javax.ws.rs.core.UriInfo;

import java.net.URI;
import java.util.Base64;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;

import static com.facebook.airlift.concurrent.MoreFutures.addTimeout;
import static com.facebook.presto.SystemSessionProperties.getExchangeCompressionCodec;
import static com.facebook.presto.SystemSessionProperties.getQueryRetryLimit;
import static com.facebook.presto.SystemSessionProperties.getQueryRetryMaxExecutionTime;
import static com.facebook.presto.SystemSessionProperties.getTargetResultSize;
import static com.facebook.presto.SystemSessionProperties.isExchangeChecksumEnabled;
import static com.facebook.presto.SystemSessionProperties.retryQueryWithHistoryBasedOptimizationEnabled;
import static com.facebook.presto.SystemSessionProperties.trackHistoryBasedPlanStatisticsEnabled;
import static com.facebook.presto.SystemSessionProperties.useHistoryBasedPlanStatisticsEnabled;
import static com.facebook.presto.execution.QueryState.FAILED;
import static com.facebook.presto.execution.QueryState.WAITING_FOR_PREREQUISITES;
import static com.facebook.presto.server.protocol.QueryResourceUtil.toStatementStats;
import static com.facebook.presto.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR;
import static com.facebook.presto.spi.page.PagesSerdeUtil.writeSerializedPage;
import static com.facebook.presto.util.Failures.toFailure;
import static com.google.common.base.MoreObjects.firstNonNull;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Verify.verify;
import static com.google.common.util.concurrent.Futures.immediateFuture;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;

@ThreadSafe
class Query
{
    private static final Logger log = Logger.get(Query.class);
    private static final Base64.Encoder BASE64_ENCODER = Base64.getEncoder();
    private static Optional<QueryId> originalBeforeRetryQueryId = Optional.empty();
    private static Optional<Integer> previousQueryTopLevelPlanHash = Optional.empty();
    private static Optional<QueryError> previousQueryFailureError = Optional.empty();

    private final QueryManager queryManager;
    private final TransactionManager transactionManager;
    private final QueryId queryId;
    private final Session session;
    private final String slug;

    @GuardedBy("this")
    private final ExchangeClient exchangeClient;

    private final Executor resultsProcessorExecutor;
    private final ScheduledExecutorService timeoutExecutor;

    private final PagesSerde serde;
    private final RetryCircuitBreaker retryCircuitBreaker;

    @GuardedBy("this")
    private OptionalLong nextToken = OptionalLong.of(0);

    @GuardedBy("this")
    private QueryResults lastResult;

    @GuardedBy("this")
    private long lastToken = -1;

    @GuardedBy("this")
    private List<Column> columns;

    @GuardedBy("this")
    private List<Type> types;

    @GuardedBy("this")
    private Optional<String> setCatalog = Optional.empty();

    @GuardedBy("this")
    private Optional<String> setSchema = Optional.empty();

    @GuardedBy("this")
    private Map<String, String> setSessionProperties = ImmutableMap.of();

    @GuardedBy("this")
    private Set<String> resetSessionProperties = ImmutableSet.of();

    @GuardedBy("this")
    private Map<String, SelectedRole> setRoles = ImmutableMap.of();

    @GuardedBy("this")
    private Map<String, String> addedPreparedStatements = ImmutableMap.of();

    @GuardedBy("this")
    private Set<String> deallocatedPreparedStatements = ImmutableSet.of();

    @GuardedBy("this")
    private Optional<TransactionId> startedTransactionId = Optional.empty();

    @GuardedBy("this")
    private boolean clearTransactionId;

    @GuardedBy("this")
    private Long updateCount;

    @GuardedBy("this")
    private boolean hasProducedResult;

    @GuardedBy("this")
    private Map<SqlFunctionId, SqlInvokedFunction> addedSessionFunctions = ImmutableMap.of();

    @GuardedBy("this")
    private Set<SqlFunctionId> removedSessionFunctions = ImmutableSet.of();

    public static Query create(
            Session session,
            String slug,
            QueryManager queryManager,
            TransactionManager transactionManager,
            ExchangeClient exchangeClient,
            Executor dataProcessorExecutor,
            ScheduledExecutorService timeoutExecutor,
            BlockEncodingSerde blockEncodingSerde,
            RetryCircuitBreaker retryCircuitBreaker)
    {
        Query result = new Query(session, slug, queryManager, transactionManager, exchangeClient, dataProcessorExecutor, timeoutExecutor, blockEncodingSerde, retryCircuitBreaker);

        result.queryManager.addOutputInfoListener(result.getQueryId(), result::setQueryOutputInfo);

        result.queryManager.addStateChangeListener(result.getQueryId(), (state) -> {
            if (state.isDone()) {
                QueryInfo queryInfo = queryManager.getFullQueryInfo(result.getQueryId());
                result.closeExchangeClientIfNecessary(queryInfo);
            }
        });

        return result;
    }

    private Query(
            Session session,
            String slug,
            QueryManager queryManager,
            TransactionManager transactionManager,
            ExchangeClient exchangeClient,
            Executor resultsProcessorExecutor,
            ScheduledExecutorService timeoutExecutor,
            BlockEncodingSerde blockEncodingSerde,
            RetryCircuitBreaker retryCircuitBreaker)
    {
        requireNonNull(session, "session is null");
        requireNonNull(slug, "slug is null");
        requireNonNull(queryManager, "queryManager is null");
        requireNonNull(transactionManager, "transactionManager is null");
        requireNonNull(exchangeClient, "exchangeClient is null");
        requireNonNull(resultsProcessorExecutor, "resultsProcessorExecutor is null");
        requireNonNull(timeoutExecutor, "timeoutExecutor is null");
        requireNonNull(blockEncodingSerde, "serde is null");
        requireNonNull(retryCircuitBreaker, "retryCircuitBreaker is null");

        this.queryManager = queryManager;
        this.transactionManager = transactionManager;

        this.queryId = session.getQueryId();
        this.session = session;
        this.slug = slug;
        this.exchangeClient = exchangeClient;
        this.resultsProcessorExecutor = resultsProcessorExecutor;
        this.timeoutExecutor = timeoutExecutor;

        this.serde = new PagesSerdeFactory(blockEncodingSerde, getExchangeCompressionCodec(session), isExchangeChecksumEnabled(session)).createPagesSerde();
        this.retryCircuitBreaker = retryCircuitBreaker;
    }

    public void cancel()
    {
        queryManager.cancelQuery(queryId);
        dispose();
    }

    public synchronized void dispose()
    {
        exchangeClient.close();
    }

    public QueryId getQueryId()
    {
        return queryId;
    }

    public boolean isSlugValid(String slug)
    {
        return this.slug.equals(slug);
    }

    public Tracer getTracer()
    {
        Optional<Tracer> tracer = session.getTracer();
        checkArgument(tracer.isPresent(), "tracer is not present");
        return tracer.get();
    }

    public synchronized Optional<String> getSetCatalog()
    {
        return setCatalog;
    }

    public synchronized Optional<String> getSetSchema()
    {
        return setSchema;
    }

    public synchronized Map<String, String> getSetSessionProperties()
    {
        return setSessionProperties;
    }

    public synchronized Set<String> getResetSessionProperties()
    {
        return resetSessionProperties;
    }

    public synchronized Map<String, SelectedRole> getSetRoles()
    {
        return setRoles;
    }

    public synchronized Map<String, String> getAddedPreparedStatements()
    {
        return addedPreparedStatements;
    }

    public synchronized Set<String> getDeallocatedPreparedStatements()
    {
        return deallocatedPreparedStatements;
    }

    public synchronized Optional<TransactionId> getStartedTransactionId()
    {
        return startedTransactionId;
    }

    public synchronized boolean isClearTransactionId()
    {
        return clearTransactionId;
    }

    public synchronized Map<SqlFunctionId, SqlInvokedFunction> getAddedSessionFunctions()
    {
        return addedSessionFunctions;
    }

    public synchronized Set<SqlFunctionId> getRemovedSessionFunctions()
    {
        return removedSessionFunctions;
    }

    public synchronized ListenableFuture<QueryResults> waitForResults(long token, UriInfo uriInfo, String scheme, Duration wait, DataSize targetResultSize, boolean binaryResults)
    {
        // before waiting, check if this request has already been processed and cached
        Optional<QueryResults> cachedResult = getCachedResult(token);
        if (cachedResult.isPresent()) {
            return immediateFuture(cachedResult.get());
        }

        // wait for a results data or query to finish, up to the wait timeout
        ListenableFuture<?> futureStateChange = addTimeout(
                getFutureStateChange(),
                () -> null,
                wait,
                timeoutExecutor);

        // when state changes, fetch the next result
        return Futures.transform(futureStateChange, ignored -> getNextResultWithRetry(token, uriInfo, scheme, targetResultSize, binaryResults), resultsProcessorExecutor);
    }

    private synchronized ListenableFuture<?> getFutureStateChange()
    {
        // if the exchange client is open, wait for data
        if (!exchangeClient.isClosed()) {
            return exchangeClient.isBlocked();
        }

        // otherwise, wait for the query to finish
        queryManager.recordHeartbeat(queryId);
        try {
            return queryDoneFuture(queryManager.getQueryState(queryId));
        }
        catch (NoSuchElementException e) {
            return immediateFuture(null);
        }
    }

    private synchronized Optional<QueryResults> getCachedResult(long token)
    {
        // is this the first request?
        if (lastResult == null) {
            return Optional.empty();
        }

        // is the a repeated request for the last results?
        if (token == lastToken) {
            // tell query manager we are still interested in the query
            queryManager.recordHeartbeat(queryId);
            return Optional.of(lastResult);
        }

        // if this is a result before the lastResult, the data is gone
        if (token < lastToken) {
            throw new WebApplicationException(Response.Status.GONE);
        }

        // if this is a request for a result after the end of the stream, return not found
        if (!nextToken.isPresent()) {
            throw new WebApplicationException(Response.Status.NOT_FOUND);
        }

        // if this is not a request for the next results, return not found
        if (token != nextToken.getAsLong()) {
            // unknown token
            throw new WebApplicationException(Response.Status.NOT_FOUND);
        }

        return Optional.empty();
    }

    private synchronized QueryResults getNextResultWithRetry(long token, UriInfo uriInfo, String scheme, DataSize targetResultSize, boolean binaryResults)
    {
        QueryResults queryResults = getNextResult(token, uriInfo, scheme, targetResultSize, binaryResults);

        if (queryResults.getError() == null) {
            return queryResults;
        }

        boolean historyBasedOptimizationEnabled = useHistoryBasedPlanStatisticsEnabled(session) && trackHistoryBasedPlanStatisticsEnabled(session);
        boolean hasNotRetried = queryManager.getQueryRetryCount(queryId) < 1;

        if (historyBasedOptimizationEnabled && hasNotRetried && retryConditionsMet(queryResults) && retryQueryWithHistoryBasedOptimizationEnabled(session)) {
            originalBeforeRetryQueryId = Optional.of(queryId);
            previousQueryTopLevelPlanHash = getCurrentTopLevelPlanHash();
            previousQueryFailureError = Optional.of(queryResults.getError());
        }
        else if (queryManager.getQueryRetryCount(queryId) == 1 && retryQueryWithHistoryBasedOptimizationEnabled(session) && retryConditionsMet(queryResults) && historyBasedOptimizationEnabled) {
            Optional<Integer> currentTopLevelPlanHash = getCurrentTopLevelPlanHash();

            if (previousQueryTopLevelPlanHash.isPresent() && currentTopLevelPlanHash.isPresent() && currentTopLevelPlanHash.equals(previousQueryTopLevelPlanHash)
                    || (!previousQueryTopLevelPlanHash.isPresent() && !currentTopLevelPlanHash.isPresent())) {
                queryManager.failQuery(queryId, new PrestoException(GENERIC_INTERNAL_ERROR, "Since the plan hashes did not change, your retry query will not execute." +
                        "Your original error was " + previousQueryFailureError.get() + ". Original QueryId: " + originalBeforeRetryQueryId +
                        ". Retry QueryId: " + queryId));
            }

            originalBeforeRetryQueryId = Optional.empty();
            previousQueryTopLevelPlanHash = Optional.empty();
            previousQueryFailureError = Optional.empty();

            return queryResults;
        }
        else {
            if (!retryConditionsMet(queryResults)) {
                return queryResults;
            }
        }

        // build a new query with next uri
        // we expect failed nodes have been removed from discovery server upon query failure
        return new QueryResults(
                queryId.toString(),
                queryResults.getInfoUri(),
                queryResults.getPartialCancelUri(),
                createRetryUri(scheme, uriInfo),
                queryResults.getColumns(),
                null,
                null,
                StatementStats.builder()
                        .setState(WAITING_FOR_PREREQUISITES.toString())
                        .setWaitingForPrerequisites(true)
                        .build(),
                null,
                ImmutableList.of(),
                queryResults.getUpdateInfo(),
                queryResults.getUpdateCount());
    }

    private synchronized QueryResults getNextResult(long token, UriInfo uriInfo, String scheme, DataSize targetResultSize, boolean binaryResults)
    {
        // check if the result for the token have already been created
        Optional<QueryResults> cachedResult = getCachedResult(token);
        if (cachedResult.isPresent()) {
            return cachedResult.get();
        }

        verify(nextToken.isPresent(), "Can not generate next result when next token is not present");
        verify(token == nextToken.getAsLong(), "Expected token to equal next token");
        URI queryHtmlUri = uriInfo.getRequestUriBuilder()
                .scheme(scheme)
                .replacePath("ui/query.html")
                .replaceQuery(queryId.toString())
                .build();

        // Remove as many pages as possible from the exchange until just greater than DESIRED_RESULT_BYTES
        // NOTE: it is critical that query results are created for the pages removed from the exchange
        // client while holding the lock because the query may transition to the finished state when the
        // last page is removed.  If another thread observes this state before the response is cached
        // the pages will be lost.
        Iterable<List<Object>> data = null;
        List<String> binaryData = null;
        try {
            long rows = 0;
            long bytes = 0;
            long targetResultBytes = targetResultSize.toBytes();
            if (binaryResults) {
                ImmutableList.Builder<String> pages = ImmutableList.builder();
                while (bytes < targetResultBytes) {
                    SerializedPage serializedPage = exchangeClient.pollPage();
                    if (serializedPage == null) {
                        break;
                    }

                    rows += serializedPage.getPositionCount();
                    bytes += serializedPage.getSizeInBytes();

                    DynamicSliceOutput sliceOutput = new DynamicSliceOutput(1000);
                    writeSerializedPage(sliceOutput, serializedPage);

                    String encodedPage = BASE64_ENCODER.encodeToString(sliceOutput.slice().byteArray());
                    pages.add(encodedPage);
                }
                if (rows > 0) {
                    binaryData = pages.build();
                }
            }
            else {
                ImmutableList.Builder<RowIterable> pages = ImmutableList.builder();
                while (bytes < targetResultBytes) {
                    SerializedPage serializedPage = exchangeClient.pollPage();
                    if (serializedPage == null) {
                        break;
                    }

                    Page page = serde.deserialize(serializedPage);
                    bytes += page.getLogicalSizeInBytes();
                    rows += page.getPositionCount();
                    pages.add(new RowIterable(session.toConnectorSession(), types, page));
                }
                if (rows > 0) {
                    // client implementations do not properly handle empty list of data
                    data = Iterables.concat(pages.build());
                }
            }
            if (rows > 0) {
                hasProducedResult = true;
            }
        }
        catch (Exception e) {
            queryManager.failQuery(queryId, e);
        }

        // get the query info before returning
        // force update if query manager is closed
        QueryInfo queryInfo = queryManager.getFullQueryInfo(queryId);
        queryManager.recordHeartbeat(queryId);

        // TODO: figure out a better way to do this
        // grab the update count for non-queries
        if ((data != null) && (queryInfo.getUpdateInfo() != null) && (updateCount == null) &&
                (columns.size() == 1) && (columns.get(0).getType().equals(StandardTypes.BIGINT))) {
            Iterator<List<Object>> iterator = data.iterator();
            if (iterator.hasNext()) {
                Number number = (Number) iterator.next().get(0);
                if (number != null) {
                    updateCount = number.longValue();
                }
            }
        }

        closeExchangeClientIfNecessary(queryInfo);

        // for queries with no output, return a fake result for clients that require it
        if ((queryInfo.getState() == QueryState.FINISHED) && !queryInfo.getOutputStage().isPresent()) {
            columns = ImmutableList.of(new Column("result", BooleanType.BOOLEAN));
            data = ImmutableSet.of(ImmutableList.of(true));
        }

        // advance next token
        // only return a next if
        // (1) the query is not done AND the query state is not FAILED
        //   OR
        // (2)there is more data to send (due to buffering)
        if ((!queryInfo.isFinalQueryInfo() && queryInfo.getState() != FAILED) || !exchangeClient.isClosed()) {
            nextToken = OptionalLong.of(token + 1);
        }
        else {
            nextToken = OptionalLong.empty();
        }

        URI nextResultsUri = null;
        if (nextToken.isPresent()) {
            nextResultsUri = createNextResultsUri(scheme, uriInfo, nextToken.getAsLong(), binaryResults);
        }

        // update catalog, schema, and path
        setCatalog = queryInfo.getSetCatalog();
        setSchema = queryInfo.getSetSchema();

        // update setSessionProperties
        setSessionProperties = queryInfo.getSetSessionProperties();
        resetSessionProperties = queryInfo.getResetSessionProperties();

        // update setRoles
        setRoles = queryInfo.getSetRoles();

        // update preparedStatements
        addedPreparedStatements = queryInfo.getAddedPreparedStatements();
        deallocatedPreparedStatements = queryInfo.getDeallocatedPreparedStatements();

        // update startedTransactionId
        startedTransactionId = queryInfo.getStartedTransactionId();
        clearTransactionId = queryInfo.isClearTransactionId();

        // update sessionFunctions
        addedSessionFunctions = queryInfo.getAddedSessionFunctions();
        removedSessionFunctions = queryInfo.getRemovedSessionFunctions();

        // first time through, self is null
        QueryResults queryResults = new QueryResults(
                queryId.toString(),
                queryHtmlUri,
                findCancelableLeafStage(queryInfo),
                nextResultsUri,
                columns,
                data,
                binaryData,
                toStatementStats(queryInfo),
                toQueryError(queryInfo),
                queryInfo.getWarnings(),
                queryInfo.getUpdateInfo(),
                updateCount);

        // cache the new result
        lastToken = token;
        lastResult = queryResults;

        return queryResults;
    }

    private synchronized void closeExchangeClientIfNecessary(QueryInfo queryInfo)
    {
        // Close the exchange client if the query has failed, or if the query
        // is done and it does not have an output stage. The latter happens
        // for data definition executions, as those do not have output.
        if ((queryInfo.getState() == FAILED) ||
                (queryInfo.getState().isDone() && !queryInfo.getOutputStage().isPresent())) {
            exchangeClient.close();
        }
    }

    private synchronized void setQueryOutputInfo(QueryExecution.QueryOutputInfo outputInfo)
    {
        // if first callback, set column names
        if (columns == null) {
            List<String> columnNames = outputInfo.getColumnNames();
            List<Type> columnTypes = outputInfo.getColumnTypes();
            checkArgument(columnNames.size() == columnTypes.size(), "Column names and types size mismatch");

            ImmutableList.Builder<Column> list = ImmutableList.builder();
            for (int i = 0; i < columnNames.size(); i++) {
                list.add(new Column(columnNames.get(i), columnTypes.get(i)));
            }
            columns = list.build();
            types = outputInfo.getColumnTypes();
        }

        outputInfo.getBufferLocations().forEach(exchangeClient::addLocation);
        if (outputInfo.isNoMoreBufferLocations()) {
            exchangeClient.noMoreLocations();
        }
    }

    private ListenableFuture<?> queryDoneFuture(QueryState currentState)
    {
        if (currentState.isDone()) {
            return immediateFuture(null);
        }
        return Futures.transformAsync(queryManager.getStateChange(queryId, currentState), this::queryDoneFuture, directExecutor());
    }

    private synchronized URI createNextResultsUri(String scheme, UriInfo uriInfo, long nextToken, boolean binaryResults)
    {
        UriBuilder uri = uriInfo.getBaseUriBuilder()
                .scheme(scheme)
                .replacePath("/v1/statement/executing")
                .path(queryId.toString())
                .path(String.valueOf(nextToken))
                .replaceQuery("")
                .queryParam("slug", this.slug);
        if (binaryResults) {
            uri.queryParam("binaryResults", "true");
        }
        Optional<DataSize> targetResultSize = getTargetResultSize(session);
        if (targetResultSize.isPresent()) {
            uri = uri.queryParam("targetResultSize", targetResultSize.get());
        }
        return uri.build();
    }

    private synchronized URI createRetryUri(String scheme, UriInfo uriInfo)
    {
        UriBuilder uri = uriInfo.getBaseUriBuilder()
                .scheme(scheme)
                .replacePath("/v1/statement/queued/retry")
                .path(queryId.toString())
                .replaceQuery("");
        Optional<DataSize> targetResultSize = getTargetResultSize(session);
        if (targetResultSize.isPresent()) {
            uri = uri.queryParam("targetResultSize", targetResultSize.get());
        }
        return uri.build();
    }

    private static URI findCancelableLeafStage(QueryInfo queryInfo)
    {
        // if query is running, find the leaf-most running stage
        return queryInfo.getOutputStage().map(Query::findCancelableLeafStage).orElse(null);
    }

    private static URI findCancelableLeafStage(StageInfo stage)
    {
        // if this stage is already done, we can't cancel it
        if (stage.getLatestAttemptExecutionInfo().getState().isDone()) {
            return null;
        }

        // attempt to find a cancelable sub stage
        // check in reverse order since build side of a join will be later in the list
        for (StageInfo subStage : Lists.reverse(stage.getSubStages())) {
            URI leafStage = findCancelableLeafStage(subStage);
            if (leafStage != null) {
                return leafStage;
            }
        }

        // no matching sub stage, so return this stage
        return stage.getSelf();
    }

    private boolean retryConditionsMet(QueryResults queryResults)
    {
        if (queryResults.getError() == null) {
            return false;
        }

        if (!retryQueryWithHistoryBasedOptimizationEnabled(session)) {
            if (!queryResults.getError().isRetriable()) {
                return false;
            }

            // check if we have exceeded the global limit
            retryCircuitBreaker.incrementFailure();
            if (!retryCircuitBreaker.isRetryAllowed()) {
                return false;
            }

            if (queryManager.getQueryRetryCount(queryId) >= getQueryRetryLimit(session)) {
                return false;
            }
        }

        if (hasProducedResult) {
            return false;
        }

        // check if we have exceeded the local limit
        if (queryManager.getQueryInfo(queryId).getQueryStats().getExecutionTime().toMillis() > getQueryRetryMaxExecutionTime(session).toMillis()) {
            return false;
        }

        // no support for transactions
        if (session.getTransactionId().isPresent() &&
                !transactionManager.getOptionalTransactionInfo(session.getRequiredTransactionId()).map(TransactionInfo::isAutoCommitContext).orElse(true)) {
            return false;
        }

        return true;
    }

    private Optional<Integer> getCurrentTopLevelPlanHash()
    {
        if (queryManager.getFullQueryInfo(queryId).getPlanCanonicalInfo().isEmpty()) {
            return Optional.empty();
        }
        return Optional.of(queryManager.getFullQueryInfo(queryId).getPlanCanonicalInfo().get(0).getCanonicalPlan().getPlan().hashCode());
    }

    private static QueryError toQueryError(QueryInfo queryInfo)
    {
        QueryState state = queryInfo.getState();
        if (state != FAILED) {
            return null;
        }

        FailureInfo failure;
        if (queryInfo.getFailureInfo() != null) {
            failure = queryInfo.getFailureInfo().toFailureInfo();
        }
        else {
            log.warn("Query %s in state %s has no failure info", queryInfo.getQueryId(), state);
            failure = toFailure(new RuntimeException(format("Query is %s (reason unknown)", state))).toFailureInfo();
        }

        ErrorCode errorCode;
        if (queryInfo.getErrorCode() != null) {
            errorCode = queryInfo.getErrorCode();
        }
        else {
            errorCode = GENERIC_INTERNAL_ERROR.toErrorCode();
            log.warn("Failed query %s has no error code", queryInfo.getQueryId());
        }
        return new QueryError(
                firstNonNull(failure.getMessage(), "Internal error"),
                null,
                errorCode.getCode(),
                errorCode.getName(),
                errorCode.getType().toString(),
                errorCode.isRetriable(),
                failure.getErrorLocation(),
                failure);
    }
}