QueryResource.java
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.server;
import com.facebook.presto.dispatcher.DispatchManager;
import com.facebook.presto.execution.QueryInfo;
import com.facebook.presto.execution.QueryManager;
import com.facebook.presto.execution.QueryState;
import com.facebook.presto.execution.StageId;
import com.facebook.presto.metadata.InternalNode;
import com.facebook.presto.metadata.InternalNodeManager;
import com.facebook.presto.resourcemanager.ResourceManagerProxy;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.QueryId;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Ordering;
import javax.annotation.security.RolesAllowed;
import javax.inject.Inject;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.DELETE;
import javax.ws.rs.GET;
import javax.ws.rs.HeaderParam;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.QueryParam;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import javax.ws.rs.core.UriInfo;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.NoSuchElementException;
import java.util.Optional;
import static com.facebook.presto.connector.system.KillQueryProcedure.createKillQueryException;
import static com.facebook.presto.connector.system.KillQueryProcedure.createPreemptQueryException;
import static com.facebook.presto.execution.QueryState.FAILED;
import static com.facebook.presto.execution.QueryState.QUEUED;
import static com.facebook.presto.execution.QueryState.RUNNING;
import static com.facebook.presto.server.security.RoleType.ADMIN;
import static com.facebook.presto.server.security.RoleType.USER;
import static com.google.common.base.MoreObjects.firstNonNull;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.net.HttpHeaders.X_FORWARDED_PROTO;
import static java.lang.String.format;
import static java.util.Comparator.comparing;
import static java.util.Comparator.comparingInt;
import static java.util.Objects.requireNonNull;
import static javax.ws.rs.core.Response.Status.BAD_REQUEST;
import static javax.ws.rs.core.Response.Status.NO_CONTENT;
import static javax.ws.rs.core.Response.Status.SERVICE_UNAVAILABLE;
/**
* Manage queries scheduled on this node
*/
@Path("/v1/query")
@RolesAllowed({USER, ADMIN})
public class QueryResource
{
public static final Comparator<BasicQueryInfo> QUERIES_ORDERING = Ordering
.<BasicQueryInfo>from(comparingInt(
basicQueryInfo -> {
if (basicQueryInfo.getState() == RUNNING) {
return 0;
}
else if (basicQueryInfo.getState() == QUEUED) {
return 1;
}
else if (!basicQueryInfo.getState().isDone()) {
return 2;
}
else if (basicQueryInfo.getState() == FAILED) {
return 3;
}
else {
return 4;
}
}))
.compound(Collections.reverseOrder(comparing(item -> item.getQueryStats().getCreateTime())));
// TODO There should be a combined interface for this
private final boolean resourceManagerEnabled;
private final DispatchManager dispatchManager;
private final QueryManager queryManager;
private final InternalNodeManager internalNodeManager;
private final Optional<ResourceManagerProxy> proxyHelper;
@Inject
public QueryResource(
ServerConfig serverConfig,
DispatchManager dispatchManager,
QueryManager queryManager,
InternalNodeManager internalNodeManager,
Optional<ResourceManagerProxy> proxyHelper)
{
this.resourceManagerEnabled = requireNonNull(serverConfig, "serverConfig is null").isResourceManagerEnabled();
this.dispatchManager = requireNonNull(dispatchManager, "dispatchManager is null");
this.queryManager = requireNonNull(queryManager, "queryManager is null");
this.internalNodeManager = requireNonNull(internalNodeManager, "internalNodeManager is null");
this.proxyHelper = requireNonNull(proxyHelper, "proxyHelper is null");
}
@GET
public void getAllQueryInfo(
@QueryParam("state") String stateFilter,
@QueryParam("limit") Integer limitFilter,
@HeaderParam(X_FORWARDED_PROTO) String xForwardedProto,
@Context UriInfo uriInfo,
@Context HttpServletRequest servletRequest,
@Suspended AsyncResponse asyncResponse)
{
if (resourceManagerEnabled) {
proxyResponse(servletRequest, asyncResponse, xForwardedProto, uriInfo);
return;
}
int limit = firstNonNull(limitFilter, Integer.MAX_VALUE);
if (limit <= 0) {
throw new WebApplicationException(Response
.status(BAD_REQUEST)
.type(MediaType.TEXT_PLAIN)
.entity(format("Parameter 'limit' for getAllQueryInfo must be positive. Got %d.", limit))
.build());
}
List<BasicQueryInfo> queries = new ArrayList<>(dispatchManager.getQueries());
// Filter list by the query state (if specified).
if (stateFilter != null) {
QueryState expectedState = QueryState.valueOf(stateFilter.toUpperCase(Locale.ENGLISH));
queries.removeIf(item -> item.getState() != expectedState);
}
// If limit is smaller than number of queries, then ensure that the more recent items are at the front.
if (limit < queries.size()) {
queries.sort(QUERIES_ORDERING);
}
else {
limit = queries.size();
}
ImmutableList.Builder<BasicQueryInfo> builder = new ImmutableList.Builder<>();
builder.addAll(queries.subList(0, limit));
asyncResponse.resume(Response.ok(builder.build()).build());
}
@GET
@Path("{queryId}")
public void getQueryInfo(
@PathParam("queryId") QueryId queryId,
@HeaderParam(X_FORWARDED_PROTO) String xForwardedProto,
@Context UriInfo uriInfo,
@Context HttpServletRequest servletRequest,
@Suspended AsyncResponse asyncResponse)
{
requireNonNull(queryId, "queryId is null");
if (resourceManagerEnabled && !dispatchManager.isQueryPresent(queryId)) {
proxyResponse(servletRequest, asyncResponse, xForwardedProto, uriInfo);
return;
}
try {
QueryInfo queryInfo = queryManager.getFullQueryInfo(queryId);
asyncResponse.resume(Response.ok(queryInfo).build());
}
catch (NoSuchElementException e) {
try {
BasicQueryInfo basicQueryInfo = dispatchManager.getQueryInfo(queryId);
asyncResponse.resume(Response.ok(basicQueryInfo).build());
}
catch (NoSuchElementException ex) {
asyncResponse.resume(Response.status(Status.GONE).build());
}
}
}
@DELETE
@Path("{queryId}")
public void cancelQuery(
@PathParam("queryId") QueryId queryId,
@HeaderParam(X_FORWARDED_PROTO) String xForwardedProto,
@Context UriInfo uriInfo,
@Context HttpServletRequest servletRequest,
@Suspended AsyncResponse asyncResponse)
{
requireNonNull(queryId, "queryId is null");
if (resourceManagerEnabled && !dispatchManager.isQueryPresent(queryId)) {
proxyResponse(servletRequest, asyncResponse, xForwardedProto, uriInfo);
return;
}
dispatchManager.cancelQuery(queryId);
asyncResponse.resume(Response.status(NO_CONTENT).build());
}
@PUT
@Path("{queryId}/killed")
public void killQuery(
@PathParam("queryId") QueryId queryId,
String message,
@HeaderParam(X_FORWARDED_PROTO) String xForwardedProto,
@Context UriInfo uriInfo,
@Context HttpServletRequest servletRequest,
@Suspended AsyncResponse asyncResponse)
{
if (resourceManagerEnabled && !dispatchManager.isQueryPresent(queryId)) {
proxyResponse(servletRequest, asyncResponse, xForwardedProto, uriInfo);
return;
}
asyncResponse.resume(failQuery(queryId, createKillQueryException(message)));
}
@PUT
@Path("{queryId}/preempted")
public void preemptQuery(
@PathParam("queryId") QueryId queryId,
String message,
@HeaderParam(X_FORWARDED_PROTO) String xForwardedProto,
@Context UriInfo uriInfo,
@Context HttpServletRequest servletRequest,
@Suspended AsyncResponse asyncResponse)
{
if (!dispatchManager.isQueryPresent(queryId)) {
proxyResponse(servletRequest, asyncResponse, xForwardedProto, uriInfo);
return;
}
asyncResponse.resume(failQuery(queryId, createPreemptQueryException(message)));
}
private Response failQuery(QueryId queryId, PrestoException queryException)
{
requireNonNull(queryId, "queryId is null");
try {
BasicQueryInfo state = dispatchManager.getQueryInfo(queryId);
// check before killing to provide the proper error code (this is racy)
if (state.getState().isDone()) {
return Response.status(Status.CONFLICT).build();
}
dispatchManager.failQuery(queryId, queryException);
// verify if the query was failed (if not, we lost the race)
if (!queryException.getErrorCode().equals(dispatchManager.getQueryInfo(queryId).getErrorCode())) {
return Response.status(Status.CONFLICT).build();
}
return Response.status(Status.OK).build();
}
catch (NoSuchElementException e) {
return Response.status(Status.GONE).build();
}
}
@DELETE
@Path("stage/{stageId}")
public void cancelStage(
@PathParam("stageId") StageId stageId,
@HeaderParam(X_FORWARDED_PROTO) String xForwardedProto,
@Context UriInfo uriInfo,
@Context HttpServletRequest servletRequest,
@Suspended AsyncResponse asyncResponse)
{
requireNonNull(stageId, "stageId is null");
if (!dispatchManager.isQueryPresent(stageId.getQueryId())) {
proxyResponse(servletRequest, asyncResponse, xForwardedProto, uriInfo);
return;
}
queryManager.cancelStage(stageId);
asyncResponse.resume(Response.ok().build());
}
private void proxyResponse(HttpServletRequest servletRequest, AsyncResponse asyncResponse, String xForwardedProto, UriInfo uriInfo)
{
try {
checkState(proxyHelper.isPresent());
Iterator<InternalNode> resourceManagers = internalNodeManager.getResourceManagers().iterator();
if (!resourceManagers.hasNext()) {
asyncResponse.resume(Response.status(SERVICE_UNAVAILABLE).build());
return;
}
InternalNode resourceManagerNode = resourceManagers.next();
URI uri = uriInfo.getRequestUriBuilder()
.scheme(resourceManagerNode.getInternalUri().getScheme())
.host(resourceManagerNode.getHostAndPort().toInetAddress().getHostName())
.port(resourceManagerNode.getInternalUri().getPort())
.build();
proxyHelper.get().performRequest(servletRequest, asyncResponse, uri);
}
catch (Exception e) {
asyncResponse.resume(e);
}
}
}