ResourceGroupStateInfoResource.java
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.server;
import com.facebook.presto.execution.resourceGroups.ResourceGroupManager;
import com.facebook.presto.metadata.InternalNode;
import com.facebook.presto.metadata.InternalNodeManager;
import com.facebook.presto.resourcemanager.ResourceManagerProxy;
import com.facebook.presto.spi.resourceGroups.ResourceGroupId;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import io.airlift.units.Duration;
import javax.annotation.security.RolesAllowed;
import javax.inject.Inject;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.Encoded;
import javax.ws.rs.GET;
import javax.ws.rs.HeaderParam;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.UriInfo;
import java.io.UnsupportedEncodingException;
import java.net.URI;
import java.net.URLDecoder;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Optional;
import static com.facebook.presto.server.security.RoleType.ADMIN;
import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Strings.isNullOrEmpty;
import static com.google.common.base.Suppliers.memoizeWithExpiration;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.net.HttpHeaders.X_FORWARDED_PROTO;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static javax.ws.rs.core.Response.Status.BAD_REQUEST;
import static javax.ws.rs.core.Response.Status.NOT_FOUND;
import static javax.ws.rs.core.Response.Status.SERVICE_UNAVAILABLE;
@Path("/v1/resourceGroupState")
@RolesAllowed(ADMIN)
public class ResourceGroupStateInfoResource
{
private static class ResourceGroupStateInfoKey
{
private final ResourceGroupId resourceGroupId;
private final boolean includeQueryInfo;
private final boolean summarizeSubGroups;
private final boolean includeStaticSubgroupsOnly;
public ResourceGroupStateInfoKey(ResourceGroupId resourceGroupId, boolean includeQueryInfo, boolean summarizeSubGroups, boolean includeStaticSubgroupsOnly)
{
this.resourceGroupId = requireNonNull(resourceGroupId, "resourceGroupId is null");
this.includeQueryInfo = includeQueryInfo;
this.summarizeSubGroups = summarizeSubGroups;
this.includeStaticSubgroupsOnly = includeStaticSubgroupsOnly;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ResourceGroupStateInfoKey that = (ResourceGroupStateInfoKey) o;
return Objects.equals(that.resourceGroupId, resourceGroupId) &&
that.includeQueryInfo == includeQueryInfo &&
that.summarizeSubGroups == summarizeSubGroups &&
that.includeStaticSubgroupsOnly == includeStaticSubgroupsOnly;
}
@Override
public int hashCode()
{
return Objects.hash(resourceGroupId, includeQueryInfo, summarizeSubGroups, includeStaticSubgroupsOnly);
}
@Override
public String toString()
{
return toStringHelper(this)
.add("resourceGroupId", resourceGroupId)
.add("includeQueryInfo", includeQueryInfo)
.add("summarizeSubGroups", summarizeSubGroups)
.add("includeStaticSubgroupsOnly", includeStaticSubgroupsOnly)
.toString();
}
}
private final ResourceGroupManager<?> resourceGroupManager;
private final boolean resourceManagerEnabled;
private final InternalNodeManager internalNodeManager;
private final Optional<ResourceManagerProxy> proxyHelper;
private final Map<ResourceGroupStateInfoKey, Supplier<ResourceGroupInfo>> resourceGroupStateInfoKeySupplierMap;
private final Supplier<List<ResourceGroupInfo>> rootResourceGroupInfoSupplier;
private final Duration expirationDuration;
@Inject
public ResourceGroupStateInfoResource(
ServerConfig serverConfig,
ResourceGroupManager<?> resourceGroupManager,
InternalNodeManager internalNodeManager,
Optional<ResourceManagerProxy> proxyHelper)
{
this.resourceManagerEnabled = requireNonNull(serverConfig, "serverConfig is null").isResourceManagerEnabled();
this.resourceGroupManager = requireNonNull(resourceGroupManager, "resourceGroupManager is null");
this.internalNodeManager = requireNonNull(internalNodeManager, "internalNodeManager is null");
this.proxyHelper = requireNonNull(proxyHelper, "proxyHelper is null");
this.resourceGroupStateInfoKeySupplierMap = new HashMap<>();
this.expirationDuration = requireNonNull(serverConfig, "serverConfig is null").getClusterResourceGroupStateInfoExpirationDuration();
this.rootResourceGroupInfoSupplier = expirationDuration.getValue() > 0 ?
memoizeWithExpiration(() -> resourceGroupManager.getRootResourceGroups(), expirationDuration.toMillis(), MILLISECONDS) :
() -> resourceGroupManager.getRootResourceGroups();
}
@GET
@Produces(MediaType.APPLICATION_JSON)
@Encoded
@Path("{resourceGroupId: .*}")
public void getResourceGroupInfos(
@PathParam("resourceGroupId") String resourceGroupIdString,
@QueryParam("includeQueryInfo") @DefaultValue("true") boolean includeQueryInfo,
@QueryParam("includeLocalInfoOnly") @DefaultValue("false") boolean includeLocalInfoOnly,
@QueryParam("summarizeSubgroups") @DefaultValue("true") boolean summarizeSubgroups,
@QueryParam("includeStaticSubgroupsOnly") @DefaultValue("false") boolean includeStaticSubgroupsOnly,
@HeaderParam(X_FORWARDED_PROTO) String xForwardedProto,
@Context UriInfo uriInfo,
@Context HttpServletRequest servletRequest,
@Suspended AsyncResponse asyncResponse)
{
if (resourceManagerEnabled && !includeLocalInfoOnly) {
proxyResourceGroupInfoResponse(servletRequest, asyncResponse, xForwardedProto, uriInfo);
return;
}
try {
if (isNullOrEmpty(resourceGroupIdString)) {
// return root groups if no group id is specified
asyncResponse.resume(Response.ok().entity(rootResourceGroupInfoSupplier.get()).build());
}
else {
ResourceGroupId resourceGroupId = getResourceGroupId(resourceGroupIdString);
ResourceGroupStateInfoKey resourceGroupStateInfoKey = new ResourceGroupStateInfoKey(resourceGroupId, includeQueryInfo, summarizeSubgroups, includeStaticSubgroupsOnly);
Supplier<ResourceGroupInfo> resourceGroupInfoSupplier = resourceGroupStateInfoKeySupplierMap.getOrDefault(resourceGroupStateInfoKey, expirationDuration.getValue() > 0 ?
Suppliers.memoizeWithExpiration(() -> getResourceGroupInfo(resourceGroupId, includeQueryInfo, summarizeSubgroups, includeStaticSubgroupsOnly), expirationDuration.toMillis(), MILLISECONDS) :
() -> getResourceGroupInfo(resourceGroupId, includeQueryInfo, summarizeSubgroups, includeStaticSubgroupsOnly));
resourceGroupStateInfoKeySupplierMap.putIfAbsent(resourceGroupStateInfoKey, resourceGroupInfoSupplier);
asyncResponse.resume(Response.ok().entity(resourceGroupInfoSupplier.get()).build());
}
}
catch (NoSuchElementException | IllegalArgumentException e) {
asyncResponse.resume(Response.status(NOT_FOUND).build());
}
}
private ResourceGroupInfo getResourceGroupInfo(ResourceGroupId resourceGroupId, boolean includeQueryInfo, boolean summarizeSubgroups, boolean includeStaticSubgroupsOnly)
{
return resourceGroupManager.getResourceGroupInfo(
resourceGroupId,
includeQueryInfo,
summarizeSubgroups,
includeStaticSubgroupsOnly);
}
private ResourceGroupId getResourceGroupId(String resourceGroupIdString)
{
return new ResourceGroupId(
Arrays.stream(resourceGroupIdString.split("/"))
.map(ResourceGroupStateInfoResource::urlDecode)
.collect(toImmutableList()));
}
private static String urlDecode(String value)
{
try {
return URLDecoder.decode(value, "UTF-8");
}
catch (UnsupportedEncodingException e) {
throw new WebApplicationException(BAD_REQUEST);
}
}
//TODO move this to a common place and reuse in all resource
private void proxyResourceGroupInfoResponse(HttpServletRequest servletRequest, AsyncResponse asyncResponse, String xForwardedProto, UriInfo uriInfo)
{
try {
checkState(proxyHelper.isPresent());
Iterator<InternalNode> resourceManagers = internalNodeManager.getResourceManagers().iterator();
if (!resourceManagers.hasNext()) {
asyncResponse.resume(Response.status(SERVICE_UNAVAILABLE).build());
return;
}
InternalNode resourceManagerNode = resourceManagers.next();
URI uri = uriInfo.getRequestUriBuilder()
.scheme(resourceManagerNode.getInternalUri().getScheme())
.host(resourceManagerNode.getHostAndPort().toInetAddress().getHostName())
.port(resourceManagerNode.getInternalUri().getPort())
.build();
proxyHelper.get().performRequest(servletRequest, asyncResponse, uri);
}
catch (Exception e) {
asyncResponse.resume(e);
}
}
}