DistributedClusterStatsResource.java

/*
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.facebook.presto.resourcemanager;

import com.facebook.presto.execution.QueryState;
import com.facebook.presto.execution.scheduler.NodeSchedulerConfig;
import com.facebook.presto.metadata.InternalNodeManager;
import com.facebook.presto.server.BasicQueryInfo;
import com.facebook.presto.server.ClusterStatsResource.ClusterStats;
import com.facebook.presto.server.ServerConfig;
import com.facebook.presto.spi.NodeState;
import com.facebook.presto.spi.memory.MemoryPoolId;
import com.facebook.presto.spi.memory.MemoryPoolInfo;
import io.airlift.units.Duration;

import javax.annotation.security.RolesAllowed;
import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;

import java.util.Map;
import java.util.function.Supplier;

import static com.facebook.presto.server.security.RoleType.ADMIN;
import static com.facebook.presto.server.security.RoleType.USER;
import static com.google.common.base.Suppliers.memoizeWithExpiration;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;

@Path("/v1/cluster")
@RolesAllowed({USER, ADMIN})
public class DistributedClusterStatsResource
{
    private final boolean isIncludeCoordinator;
    private final ResourceManagerClusterStateProvider clusterStateProvider;
    private final InternalNodeManager internalNodeManager;
    private final Supplier<ClusterStats> clusterStatsSupplier;

    @Inject
    public DistributedClusterStatsResource(
            NodeSchedulerConfig nodeSchedulerConfig,
            ServerConfig serverConfig,
            ResourceManagerClusterStateProvider clusterStateProvider,
            InternalNodeManager internalNodeManager)
    {
        this.isIncludeCoordinator = requireNonNull(nodeSchedulerConfig, "nodeSchedulerConfig is null").isIncludeCoordinator();
        this.clusterStateProvider = requireNonNull(clusterStateProvider, "nodeStateManager is null");
        this.internalNodeManager = requireNonNull(internalNodeManager, "internalNodeManager is null");
        Duration expirationDuration = requireNonNull(serverConfig, "serverConfig is null").getClusterStatsExpirationDuration();
        this.clusterStatsSupplier = expirationDuration.getValue() > 0 ? memoizeWithExpiration(this::calculateClusterStats, expirationDuration.toMillis(), MILLISECONDS) : this::calculateClusterStats;
    }

    @GET
    @Produces(MediaType.APPLICATION_JSON)
    public Response getClusterStats()
    {
        return Response.ok(clusterStatsSupplier.get()).build();
    }

    private ClusterStats calculateClusterStats()
    {
        long runningQueries = 0;
        long blockedQueries = 0;
        long queuedQueries = 0;

        long activeNodes = internalNodeManager.getNodes(NodeState.ACTIVE).size();
        if (!isIncludeCoordinator) {
            activeNodes -= internalNodeManager.getCoordinators().size();
        }
        activeNodes -= internalNodeManager.getResourceManagers().size();

        long runningDrivers = 0;
        long runningTasks = 0;
        double memoryReservation = 0;

        long totalInputRows = 0;
        long totalInputBytes = 0;
        long totalCpuTimeSecs = 0;

        for (BasicQueryInfo query : clusterStateProvider.getClusterQueries()) {
            if (query.getState() == QueryState.QUEUED) {
                queuedQueries++;
            }
            else if (query.getState() == QueryState.RUNNING) {
                if (query.getQueryStats().isFullyBlocked()) {
                    blockedQueries++;
                }
                else {
                    runningQueries++;
                }
            }

            if (!query.getState().isDone()) {
                totalInputBytes += query.getQueryStats().getRawInputDataSize().toBytes();
                totalInputRows += query.getQueryStats().getRawInputPositions();
                totalCpuTimeSecs += query.getQueryStats().getTotalCpuTime().getValue(SECONDS);

                memoryReservation += query.getQueryStats().getUserMemoryReservation().toBytes();
                runningDrivers += query.getQueryStats().getRunningDrivers();
                runningTasks += query.getQueryStats().getRunningTasks();
            }
        }
        return new ClusterStats(
                runningQueries,
                blockedQueries,
                queuedQueries,
                activeNodes,
                runningDrivers,
                runningTasks,
                memoryReservation,
                totalInputRows,
                totalInputBytes,
                totalCpuTimeSecs,
                clusterStateProvider.getAdjustedQueueSize());
    }

    @GET
    @Path("memory")
    public Response getClusterMemoryPoolInfo()
    {
        Map<MemoryPoolId, MemoryPoolInfo> memoryPoolInfos = clusterStateProvider.getClusterMemoryPoolInfo().entrySet().stream()
                .collect(toImmutableMap(Map.Entry::getKey, e -> e.getValue().getMemoryPoolInfo()));
        return Response.ok()
                .entity(memoryPoolInfos)
                .build();
    }

    @GET
    @Path("workerMemory")
    public Response getWorkerMemoryInfo()
    {
        return Response.ok()
                .entity(clusterStateProvider.getWorkerMemoryInfo())
                .build();
    }
}