QueuedStatementResource.java
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.server.protocol;
import com.facebook.airlift.log.Logger;
import com.facebook.airlift.stats.TimeStat;
import com.facebook.presto.client.QueryError;
import com.facebook.presto.client.QueryResults;
import com.facebook.presto.common.ErrorCode;
import com.facebook.presto.dispatcher.DispatchExecutor;
import com.facebook.presto.dispatcher.DispatchInfo;
import com.facebook.presto.dispatcher.DispatchManager;
import com.facebook.presto.execution.ExecutionFailureInfo;
import com.facebook.presto.metadata.SessionPropertyManager;
import com.facebook.presto.server.HttpRequestSessionContext;
import com.facebook.presto.server.ServerConfig;
import com.facebook.presto.server.SessionContext;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.QueryId;
import com.facebook.presto.sql.parser.SqlParserOptions;
import com.facebook.presto.tracing.TracerProviderManager;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Ordering;
import com.google.common.util.concurrent.ListenableFuture;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import org.weakref.jmx.Managed;
import org.weakref.jmx.Nested;
import javax.annotation.PreDestroy;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.security.RolesAllowed;
import javax.inject.Inject;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.HeaderParam;
import javax.ws.rs.POST;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import javax.ws.rs.core.UriInfo;
import java.net.URI;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicLong;
import static com.facebook.airlift.concurrent.MoreFutures.addTimeout;
import static com.facebook.airlift.concurrent.Threads.threadsNamed;
import static com.facebook.airlift.http.server.AsyncResponseHandler.bindAsyncResponse;
import static com.facebook.presto.client.PrestoHeaders.PRESTO_PREFIX_URL;
import static com.facebook.presto.server.protocol.QueryResourceUtil.NO_DURATION;
import static com.facebook.presto.server.protocol.QueryResourceUtil.abortIfPrefixUrlInvalid;
import static com.facebook.presto.server.protocol.QueryResourceUtil.createQueuedQueryResults;
import static com.facebook.presto.server.protocol.QueryResourceUtil.getQueuedUri;
import static com.facebook.presto.server.protocol.QueryResourceUtil.getScheme;
import static com.facebook.presto.server.security.RoleType.USER;
import static com.facebook.presto.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR;
import static com.facebook.presto.spi.StandardErrorCode.RETRY_QUERY_NOT_FOUND;
import static com.google.common.base.MoreObjects.firstNonNull;
import static com.google.common.base.Strings.isNullOrEmpty;
import static com.google.common.base.Verify.verify;
import static com.google.common.net.HttpHeaders.X_FORWARDED_PROTO;
import static com.google.common.util.concurrent.Futures.immediateFailedFuture;
import static com.google.common.util.concurrent.Futures.immediateFuture;
import static com.google.common.util.concurrent.Futures.transformAsync;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static io.airlift.units.DataSize.Unit.MEGABYTE;
import static java.util.Locale.ENGLISH;
import static java.util.Objects.requireNonNull;
import static java.util.UUID.randomUUID;
import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static javax.ws.rs.core.MediaType.APPLICATION_JSON;
import static javax.ws.rs.core.MediaType.TEXT_PLAIN_TYPE;
import static javax.ws.rs.core.Response.Status.BAD_REQUEST;
import static javax.ws.rs.core.Response.Status.CONFLICT;
import static javax.ws.rs.core.Response.Status.NOT_FOUND;
@Path("/")
@RolesAllowed(USER)
public class QueuedStatementResource
{
private static final Logger log = Logger.get(QueuedStatementResource.class);
private static final Duration MAX_WAIT_TIME = new Duration(1, SECONDS);
private static final DataSize TARGET_RESULT_SIZE = new DataSize(1, MEGABYTE);
private static final Ordering<Comparable<Duration>> WAIT_ORDERING = Ordering.natural().nullsLast();
private final DispatchManager dispatchManager;
private final ExecutingQueryResponseProvider executingQueryResponseProvider;
private final Executor responseExecutor;
private final ScheduledExecutorService timeoutExecutor;
private final Map<QueryId, Query> queries = new ConcurrentHashMap<>(); // a mapping from current query id to current query
private final Map<QueryId, Query> retriedQueries = new ConcurrentHashMap<>(); // a mapping from old to-be-retried query id to the current retry query
private final ScheduledExecutorService queryPurger = newSingleThreadScheduledExecutor(threadsNamed("dispatch-query-purger"));
private final boolean compressionEnabled;
private final boolean nestedDataSerializationEnabled;
private final SqlParserOptions sqlParserOptions;
private final TracerProviderManager tracerProviderManager;
private final SessionPropertyManager sessionPropertyManager; // We may need some system default session property values at early query stage even before session is created.
private final QueryBlockingRateLimiter queryRateLimiter;
@Inject
public QueuedStatementResource(
DispatchManager dispatchManager,
DispatchExecutor executor,
ExecutingQueryResponseProvider executingQueryResponseProvider,
SqlParserOptions sqlParserOptions,
ServerConfig serverConfig,
TracerProviderManager tracerProviderManager,
SessionPropertyManager sessionPropertyManager,
QueryBlockingRateLimiter queryRateLimiter)
{
this.dispatchManager = requireNonNull(dispatchManager, "dispatchManager is null");
this.executingQueryResponseProvider = requireNonNull(executingQueryResponseProvider, "executingQueryResponseProvider is null");
this.sqlParserOptions = requireNonNull(sqlParserOptions, "sqlParserOptions is null");
this.compressionEnabled = requireNonNull(serverConfig, "serverConfig is null").isQueryResultsCompressionEnabled();
this.nestedDataSerializationEnabled = requireNonNull(serverConfig, "serverConfig is null").isNestedDataSerializationEnabled();
this.responseExecutor = requireNonNull(executor, "responseExecutor is null").getExecutor();
this.timeoutExecutor = requireNonNull(executor, "timeoutExecutor is null").getScheduledExecutor();
this.tracerProviderManager = requireNonNull(tracerProviderManager, "tracerProviderManager is null");
this.sessionPropertyManager = sessionPropertyManager;
this.queryRateLimiter = requireNonNull(queryRateLimiter, "queryRateLimiter is null");
queryPurger.scheduleWithFixedDelay(
() -> {
try {
// snapshot the queries before checking states to avoid registration race
purgeQueries(queries);
purgeQueries(retriedQueries);
}
catch (Throwable e) {
log.error(e, "Error removing old queries");
}
},
200,
200,
MILLISECONDS);
}
@Managed
@Nested
public TimeStat getRateLimiterBlockTime()
{
return queryRateLimiter.getRateLimiterBlockTime();
}
@PreDestroy
public void stop()
{
queryPurger.shutdownNow();
}
/**
* HTTP endpoint for submitting queries to the Presto Coordinator
* Presto performs lazy execution. The submission of a query returns
* a placeholder for the result set, but the query gets
* scheduled/dispatched only when the client polls for results
* @param statement The statement or sql query string submitted
* @param xForwardedProto Forwarded protocol (http or https)
* @param servletRequest The http request
* @param uriInfo {@link javax.ws.rs.core.UriInfo}
* @return {@link javax.ws.rs.core.Response} HTTP response code
*/
@POST
@Path("/v1/statement")
@Produces(APPLICATION_JSON)
public Response postStatement(
String statement,
@DefaultValue("false") @QueryParam("binaryResults") boolean binaryResults,
@HeaderParam(X_FORWARDED_PROTO) String xForwardedProto,
@HeaderParam(PRESTO_PREFIX_URL) String xPrestoPrefixUrl,
@Context HttpServletRequest servletRequest,
@Context UriInfo uriInfo)
{
if (isNullOrEmpty(statement)) {
throw badRequest(BAD_REQUEST, "SQL statement is empty");
}
abortIfPrefixUrlInvalid(xPrestoPrefixUrl);
// TODO: For future cases we may want to start tracing from client. Then continuation of tracing
// will be needed instead of creating a new trace here.
SessionContext sessionContext = new HttpRequestSessionContext(
servletRequest,
sqlParserOptions,
tracerProviderManager.getTracerProvider(),
Optional.of(sessionPropertyManager));
Query query = new Query(statement, sessionContext, dispatchManager, executingQueryResponseProvider, 0);
queries.put(query.getQueryId(), query);
return withCompressionConfiguration(Response.ok(query.getInitialQueryResults(uriInfo, xForwardedProto, xPrestoPrefixUrl, binaryResults)), compressionEnabled).build();
}
/**
* HTTP endpoint for submitting queries to the Presto Coordinator.
* Presto performs lazy execution. The submission of a query returns
* a placeholder for the result set, but the query gets
* scheduled/dispatched only when the client polls for results.
* This endpoint accepts a pre-minted queryId and slug, instead of
* generating it.
*
* @param statement The statement or sql query string submitted
* @param queryId Pre-minted query ID to associate with this query
* @param slug Pre-minted slug to protect this query
* @param xForwardedProto Forwarded protocol (http or https)
* @param servletRequest The http request
* @param uriInfo {@link javax.ws.rs.core.UriInfo}
* @return {@link javax.ws.rs.core.Response} HTTP response code
*/
@PUT
@Path("/v1/statement/{queryId}")
@Produces(APPLICATION_JSON)
public Response putStatement(
String statement,
@PathParam("queryId") QueryId queryId,
@QueryParam("slug") String slug,
@DefaultValue("false") @QueryParam("binaryResults") boolean binaryResults,
@HeaderParam(X_FORWARDED_PROTO) String xForwardedProto,
@HeaderParam(PRESTO_PREFIX_URL) String xPrestoPrefixUrl,
@Context HttpServletRequest servletRequest,
@Context UriInfo uriInfo)
{
if (isNullOrEmpty(statement)) {
throw badRequest(BAD_REQUEST, "SQL statement is empty");
}
abortIfPrefixUrlInvalid(xPrestoPrefixUrl);
// TODO: For future cases we may want to start tracing from client. Then continuation of tracing
// will be needed instead of creating a new trace here.
SessionContext sessionContext = new HttpRequestSessionContext(
servletRequest,
sqlParserOptions,
tracerProviderManager.getTracerProvider(),
Optional.of(sessionPropertyManager));
Query attemptedQuery = new Query(statement, sessionContext, dispatchManager, executingQueryResponseProvider, 0, queryId, slug);
Query query = queries.computeIfAbsent(queryId, unused -> attemptedQuery);
if (attemptedQuery != query && !attemptedQuery.getSlug().equals(query.getSlug()) || query.getLastToken() != 0) {
throw badRequest(CONFLICT, "Query already exists");
}
return withCompressionConfiguration(Response.ok(query.getInitialQueryResults(uriInfo, xForwardedProto, xPrestoPrefixUrl, binaryResults)), compressionEnabled).build();
}
/**
* HTTP endpoint for re-processing a failed query
* @param queryId Query Identifier of the query to be retried
* @param xForwardedProto Forwarded protocol (http or https)
* @param uriInfo {@link javax.ws.rs.core.UriInfo}
* @return {@link javax.ws.rs.core.Response} HTTP response code
*/
@GET
@Path("/v1/statement/queued/retry/{queryId}")
@Produces(APPLICATION_JSON)
public Response retryFailedQuery(
@PathParam("queryId") QueryId queryId,
@DefaultValue("false") @QueryParam("binaryResults") boolean binaryResults,
@HeaderParam(X_FORWARDED_PROTO) String xForwardedProto,
@HeaderParam(PRESTO_PREFIX_URL) String xPrestoPrefixUrl,
@Context UriInfo uriInfo)
{
abortIfPrefixUrlInvalid(xPrestoPrefixUrl);
Query failedQuery = queries.get(queryId);
if (failedQuery == null) {
// TODO: purge retryable queries slower than normal ones
throw new PrestoException(RETRY_QUERY_NOT_FOUND, "failed to find the query to retry with ID " + queryId);
}
int retryCount = failedQuery.getRetryCount() + 1;
Query query = new Query(
"-- retry query " + queryId + "; attempt: " + retryCount + "\n" + failedQuery.getQuery(),
failedQuery.getSessionContext(),
dispatchManager,
executingQueryResponseProvider,
retryCount);
retriedQueries.putIfAbsent(queryId, query);
synchronized (retriedQueries.get(queryId)) {
if (retriedQueries.get(queryId).getQueryId().equals(query.getQueryId())) {
queries.put(query.getQueryId(), query);
}
else {
// other thread has already created the new retry query
// use the existing one
query = retriedQueries.get(queryId);
}
}
return withCompressionConfiguration(Response.ok(query.getInitialQueryResults(uriInfo, xForwardedProto, xPrestoPrefixUrl, binaryResults)), compressionEnabled).build();
}
/**
* HTTP endpoint for retrieving the status of a submitted query
* @param queryId Query Identifier of query whose status is polled
* @param token Monotonically increasing token that identifies the next batch of query results
* @param slug Unique security token generated for each query that controls access to that query's results
* @param maxWait Time to wait for the query to be dispatched
* @param xForwardedProto Forwarded protocol (http or https)
* @param uriInfo {@link javax.ws.rs.core.UriInfo}
* @param asyncResponse
*/
@GET
@Path("/v1/statement/queued/{queryId}/{token}")
@Produces(APPLICATION_JSON)
public void getStatus(
@PathParam("queryId") QueryId queryId,
@PathParam("token") long token,
@QueryParam("slug") String slug,
@QueryParam("maxWait") Duration maxWait,
@DefaultValue("false") @QueryParam("binaryResults") boolean binaryResults,
@HeaderParam(X_FORWARDED_PROTO) String xForwardedProto,
@HeaderParam(PRESTO_PREFIX_URL) String xPrestoPrefixUrl,
@Context UriInfo uriInfo,
@Suspended AsyncResponse asyncResponse)
{
abortIfPrefixUrlInvalid(xPrestoPrefixUrl);
Query query = getQuery(queryId, slug);
ListenableFuture<Double> acquirePermitAsync = queryRateLimiter.acquire(queryId);
ListenableFuture<?> waitForDispatchedAsync = transformAsync(
acquirePermitAsync,
acquirePermitTimeSeconds -> {
queryRateLimiter.addRateLimiterBlockTime(new Duration(acquirePermitTimeSeconds, SECONDS));
return query.waitForDispatched();
},
responseExecutor);
// wait for query to be dispatched, up to the wait timeout
ListenableFuture<?> futureStateChange = addTimeout(
waitForDispatchedAsync,
() -> null,
WAIT_ORDERING.min(MAX_WAIT_TIME, maxWait),
timeoutExecutor);
// when state changes, fetch the next result
ListenableFuture<Response> queryResultsFuture = transformAsync(
futureStateChange,
ignored -> query.toResponse(token, uriInfo, xForwardedProto, xPrestoPrefixUrl, WAIT_ORDERING.min(MAX_WAIT_TIME, maxWait), compressionEnabled, nestedDataSerializationEnabled, binaryResults),
responseExecutor);
bindAsyncResponse(asyncResponse, queryResultsFuture, responseExecutor);
}
/**
* HTTP endpoint to cancel execution of a query in flight
* @param queryId Query Identifier of query to be canceled
* @param token Monotonically increasing token that identifies the next batch of query results
* @param slug Unique security token generated for each query that controls access to that query's results
* @return {@link javax.ws.rs.core.Response} HTTP response code
*/
@DELETE
@Path("/v1/statement/queued/{queryId}/{token}")
@Produces(APPLICATION_JSON)
public Response cancelQuery(
@PathParam("queryId") QueryId queryId,
@PathParam("token") long token,
@QueryParam("slug") String slug)
{
getQuery(queryId, slug).cancel();
return Response.noContent().build();
}
private Query getQuery(QueryId queryId, String slug)
{
Query query = queries.get(queryId);
if (query == null || !query.getSlug().equals(slug)) {
throw badRequest(NOT_FOUND, "Query not found");
}
return query;
}
private void purgeQueries(Map<QueryId, Query> queries)
{
for (Entry<QueryId, Query> entry : ImmutableSet.copyOf(queries.entrySet())) {
if (!entry.getValue().isSubmissionFinished()) {
continue;
}
// forget about this query if the query manager is no longer tracking it
if (!dispatchManager.isQueryPresent(entry.getKey())) {
queries.remove(entry.getKey());
}
}
}
private static WebApplicationException badRequest(Status status, String message)
{
throw new WebApplicationException(
Response.status(status)
.type(TEXT_PLAIN_TYPE)
.entity(message)
.build());
}
private static Response.ResponseBuilder withCompressionConfiguration(Response.ResponseBuilder builder, boolean compressionEnabled)
{
if (!compressionEnabled) {
builder.encoding("identity");
}
return builder;
}
private static final class Query
{
private final String query;
private final SessionContext sessionContext;
private final DispatchManager dispatchManager;
private final ExecutingQueryResponseProvider executingQueryResponseProvider;
private final QueryId queryId;
private final String slug;
private final AtomicLong lastToken = new AtomicLong();
private final int retryCount;
@GuardedBy("this")
private ListenableFuture<?> querySubmissionFuture;
public Query(String query, SessionContext sessionContext, DispatchManager dispatchManager, ExecutingQueryResponseProvider executingQueryResponseProvider, int retryCount)
{
this(query, sessionContext, dispatchManager, executingQueryResponseProvider, retryCount, dispatchManager.createQueryId(), createSlug());
}
public Query(
String query,
SessionContext sessionContext,
DispatchManager dispatchManager,
ExecutingQueryResponseProvider executingQueryResponseProvider,
int retryCount,
QueryId queryId,
String slug)
{
this.query = requireNonNull(query, "query is null");
this.sessionContext = requireNonNull(sessionContext, "sessionContext is null");
this.dispatchManager = requireNonNull(dispatchManager, "dispatchManager is null");
this.executingQueryResponseProvider = requireNonNull(executingQueryResponseProvider, "executingQueryResponseProvider is null");
this.retryCount = retryCount;
this.queryId = requireNonNull(queryId, "queryId is null");
this.slug = requireNonNull(slug, "slug is null");
}
/**
* Returns the unique identifier of a query
*/
public QueryId getQueryId()
{
return queryId;
}
/**
* Returns the query string
*/
public String getQuery()
{
return query;
}
/**
* Returns the session context of the query
*/
public SessionContext getSessionContext()
{
return sessionContext;
}
/**
* Returns the secure slug associated with the query
*/
public String getSlug()
{
return slug;
}
/**
* Returns the last token of the result set
*/
public long getLastToken()
{
return lastToken.get();
}
/**
* Returns the retry attempt of the query
*/
public int getRetryCount()
{
return retryCount;
}
/**
* Checks whether the query has been processed by the dispatchManager
*/
public synchronized boolean isSubmissionFinished()
{
return querySubmissionFuture != null && querySubmissionFuture.isDone();
}
/**
* Submit query to dispatchManager, if required, and for the dispatchManager to process the query
*/
private ListenableFuture<?> waitForDispatched()
{
// if query submission has not finished, wait for it to finish
synchronized (this) {
if (querySubmissionFuture == null) {
querySubmissionFuture = dispatchManager.createQuery(queryId, slug, retryCount, sessionContext, query);
}
if (!querySubmissionFuture.isDone()) {
return querySubmissionFuture;
}
}
// otherwise, wait for the query to finish
return dispatchManager.waitForDispatched(queryId);
}
/**
* Returns a placeholder for query results for the client to poll
* @param uriInfo {@link javax.ws.rs.core.UriInfo}
* @param xForwardedProto Forwarded protocol (http or https)
* @return {@link com.facebook.presto.client.QueryResults}
*/
public synchronized QueryResults getInitialQueryResults(UriInfo uriInfo, String xForwardedProto, String xPrestoPrefixUrl, boolean binaryResults)
{
verify(lastToken.get() == 0);
verify(querySubmissionFuture == null);
return createQueryResults(
1,
uriInfo,
xForwardedProto,
xPrestoPrefixUrl,
DispatchInfo.waitingForPrerequisites(NO_DURATION, NO_DURATION),
binaryResults);
}
public ListenableFuture<Response> toResponse(
long token,
UriInfo uriInfo,
String xForwardedProto,
String xPrestoPrefixUrl,
Duration maxWait,
boolean compressionEnabled,
boolean nestedDataSerializationEnabled,
boolean binaryResults)
{
long lastToken = this.lastToken.get();
// token should be the last token or the next token
if (token != lastToken && token != lastToken + 1) {
throw new WebApplicationException(Response.Status.GONE);
}
// advance (or stay at) the token
this.lastToken.compareAndSet(lastToken, token);
synchronized (this) {
// if query submission has not finished, return simple empty result
if (querySubmissionFuture == null || !querySubmissionFuture.isDone()) {
QueryResults queryResults = createQueryResults(
token + 1,
uriInfo,
xForwardedProto,
xPrestoPrefixUrl,
DispatchInfo.waitingForPrerequisites(NO_DURATION, NO_DURATION),
binaryResults);
return immediateFuture(withCompressionConfiguration(Response.ok(queryResults), compressionEnabled).build());
}
}
Optional<DispatchInfo> dispatchInfo = dispatchManager.getDispatchInfo(queryId);
if (!dispatchInfo.isPresent()) {
// query should always be found, but it may have just been determined to be abandoned
return immediateFailedFuture(new WebApplicationException(Response
.status(NOT_FOUND)
.build()));
}
if (waitForDispatched().isDone()) {
Optional<ListenableFuture<Response>> executingQueryResponse = executingQueryResponseProvider.waitForExecutingResponse(
queryId,
slug,
dispatchInfo.get(),
uriInfo,
xPrestoPrefixUrl,
getScheme(xForwardedProto, uriInfo),
maxWait,
TARGET_RESULT_SIZE,
compressionEnabled,
nestedDataSerializationEnabled,
binaryResults);
if (executingQueryResponse.isPresent()) {
return executingQueryResponse.get();
}
}
return immediateFuture(withCompressionConfiguration(Response.ok(
createQueryResults(token + 1, uriInfo, xForwardedProto, xPrestoPrefixUrl, dispatchInfo.get(), binaryResults)), compressionEnabled)
.build());
}
public synchronized void cancel()
{
querySubmissionFuture.addListener(() -> dispatchManager.cancelQuery(queryId), directExecutor());
}
private QueryResults createQueryResults(long token, UriInfo uriInfo, String xForwardedProto, String xPrestoPrefixUrl, DispatchInfo dispatchInfo, boolean binaryResults)
{
URI nextUri = getNextUri(token, uriInfo, xForwardedProto, xPrestoPrefixUrl, dispatchInfo, binaryResults);
Optional<QueryError> queryError = dispatchInfo.getFailureInfo()
.map(this::toQueryError);
return createQueuedQueryResults(
queryId,
nextUri,
queryError,
uriInfo,
xForwardedProto,
xPrestoPrefixUrl,
dispatchInfo.getElapsedTime(),
dispatchInfo.getQueuedTime(),
dispatchInfo.getWaitingForPrerequisitesTime());
}
private static String createSlug()
{
return "x" + randomUUID().toString().toLowerCase(ENGLISH).replace("-", "");
}
private URI getNextUri(long token, UriInfo uriInfo, String xForwardedProto, String xPrestoPrefixUrl, DispatchInfo dispatchInfo, boolean binaryResults)
{
// if failed, query is complete
if (dispatchInfo.getFailureInfo().isPresent()) {
return null;
}
return getQueuedUri(queryId, slug, token, uriInfo, xForwardedProto, xPrestoPrefixUrl, binaryResults);
}
private QueryError toQueryError(ExecutionFailureInfo executionFailureInfo)
{
ErrorCode errorCode;
if (executionFailureInfo.getErrorCode() != null) {
errorCode = executionFailureInfo.getErrorCode();
}
else {
errorCode = GENERIC_INTERNAL_ERROR.toErrorCode();
log.warn("Failed query %s has no error code", queryId);
}
return new QueryError(
firstNonNull(executionFailureInfo.getMessage(), "Internal error"),
null,
errorCode.getCode(),
errorCode.getName(),
errorCode.getType().toString(),
errorCode.isRetriable(),
executionFailureInfo.getErrorLocation(),
executionFailureInfo.toFailureInfo());
}
}
}