ResourceManagerClusterStatusSender.java

/*
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.facebook.presto.resourcemanager;

import com.facebook.drift.client.DriftClient;
import com.facebook.presto.execution.ManagedQueryExecution;
import com.facebook.presto.execution.resourceGroups.ResourceGroupManager;
import com.facebook.presto.metadata.InternalNodeManager;
import com.facebook.presto.server.BasicQueryInfo;
import com.facebook.presto.server.NodeStatus;
import com.facebook.presto.server.ServerConfig;
import com.facebook.presto.server.StatusResource;
import com.facebook.presto.spi.HostAddress;
import com.facebook.presto.spi.QueryId;
import com.facebook.presto.util.PeriodicTaskExecutor;
import io.airlift.units.Duration;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.inject.Inject;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;

import static com.google.common.collect.ImmutableList.toImmutableList;
import static java.util.Objects.requireNonNull;

public class ResourceManagerClusterStatusSender
        implements ClusterStatusSender
{
    private final DriftClient<ResourceManagerClient> resourceManagerClient;
    private final InternalNodeManager internalNodeManager;
    private final ResourceGroupManager<?> resourceGroupManager;
    private final Supplier<NodeStatus> statusSupplier;
    private final ScheduledExecutorService executor;
    private final Duration queryHeartbeatInterval;

    private final Map<QueryId, PeriodicTaskExecutor> queries = new ConcurrentHashMap<>();

    private final PeriodicTaskExecutor nodeHeartbeatSender;
    private final Optional<PeriodicTaskExecutor> resourceRuntimeHeartbeatSender;

    @Inject
    public ResourceManagerClusterStatusSender(
            @ForResourceManager DriftClient<ResourceManagerClient> resourceManagerClient,
            InternalNodeManager internalNodeManager,
            StatusResource statusResource,
            @ForResourceManager ScheduledExecutorService executor,
            ResourceManagerConfig resourceManagerConfig,
            ServerConfig serverConfig,
            ResourceGroupManager<?> resourceGroupManager)
    {
        this(
                resourceManagerClient,
                internalNodeManager,
                requireNonNull(statusResource, "statusResource is null")::getStatus,
                executor,
                resourceManagerConfig,
                serverConfig,
                resourceGroupManager);
    }

    public ResourceManagerClusterStatusSender(
            DriftClient<ResourceManagerClient> resourceManagerClient,
            InternalNodeManager internalNodeManager,
            Supplier<NodeStatus> statusResource,
            ScheduledExecutorService executor,
            ResourceManagerConfig resourceManagerConfig,
            ServerConfig serverConfig,
            ResourceGroupManager<?> resourceGroupManager)
    {
        this.resourceManagerClient = requireNonNull(resourceManagerClient, "resourceManagerService is null");
        this.internalNodeManager = requireNonNull(internalNodeManager, "internalNodeManager is null");
        this.statusSupplier = requireNonNull(statusResource, "statusResource is null");
        this.executor = requireNonNull(executor, "executor is null");
        this.queryHeartbeatInterval = requireNonNull(resourceManagerConfig, "resourceManagerConfig is null").getQueryHeartbeatInterval();
        this.nodeHeartbeatSender = new PeriodicTaskExecutor(resourceManagerConfig.getNodeHeartbeatInterval().toMillis(), executor, this::sendNodeHeartbeat);
        this.resourceRuntimeHeartbeatSender = serverConfig.isCoordinator() ? Optional.of(
                new PeriodicTaskExecutor(resourceManagerConfig.getResourceGroupRuntimeHeartbeatInterval().toMillis(), executor, this::sendResourceGroupRuntimeHeartbeat)) : Optional.empty();
        this.resourceGroupManager = requireNonNull(resourceGroupManager, "resourceGroupManager is null");
    }

    @PostConstruct
    public void init()
    {
        nodeHeartbeatSender.start();
        if (resourceRuntimeHeartbeatSender.isPresent()) {
            resourceRuntimeHeartbeatSender.get().start();
        }
    }

    @PreDestroy
    public void stop()
    {
        queries.values().forEach(PeriodicTaskExecutor::stop);
        if (nodeHeartbeatSender != null) {
            nodeHeartbeatSender.stop();
        }
        if (resourceRuntimeHeartbeatSender.isPresent()) {
            resourceRuntimeHeartbeatSender.get().stop();
        }
    }

    @Override
    public void registerQuery(ManagedQueryExecution queryExecution)
    {
        QueryId queryId = queryExecution.getBasicQueryInfo().getQueryId();
        queries.computeIfAbsent(queryId, unused -> {
            AtomicLong sequenceId = new AtomicLong();
            PeriodicTaskExecutor taskExecutor = new PeriodicTaskExecutor(
                    queryHeartbeatInterval.toMillis(),
                    executor,
                    () -> sendQueryHeartbeat(queryExecution, sequenceId.incrementAndGet()));
            taskExecutor.start();
            return taskExecutor;
        });
        queryExecution.addStateChangeListener(newState -> {
            if (newState.isDone()) {
                queries.computeIfPresent(queryId, (unused, queryHeartbeatSender) -> {
                    queryHeartbeatSender.forceRun();
                    queryHeartbeatSender.stop();
                    return null;
                });
            }
        });
    }

    private void sendQueryHeartbeat(ManagedQueryExecution queryExecution, long sequenceId)
    {
        BasicQueryInfo basicQueryInfo = queryExecution.getBasicQueryInfo();
        String nodeIdentifier = internalNodeManager.getCurrentNode().getNodeIdentifier();
        getResourceManagers().forEach(hostAndPort ->
                resourceManagerClient.get(Optional.of(hostAndPort.toString())).queryHeartbeat(nodeIdentifier, basicQueryInfo, sequenceId));
    }

    private void sendNodeHeartbeat()
    {
        getResourceManagers().forEach(hostAndPort ->
                resourceManagerClient.get(Optional.of(hostAndPort.toString())).nodeHeartbeat(statusSupplier.get()));
    }

    private List<HostAddress> getResourceManagers()
    {
        return internalNodeManager.getResourceManagers().stream()
                .filter(node -> node.getThriftPort().isPresent())
                .map(resourceManagerNode -> {
                    HostAddress hostAndPort = resourceManagerNode.getHostAndPort();
                    return HostAddress.fromParts(hostAndPort.getHostText(), resourceManagerNode.getThriftPort().getAsInt());
                })
                .collect(toImmutableList());
    }

    public void sendResourceGroupRuntimeHeartbeat()
    {
        List resourceGroupRuntimeInfos = resourceGroupManager.getResourceGroupRuntimeInfos();
        getResourceManagers().forEach(hostAndPort ->
                resourceManagerClient.get(Optional.of(hostAndPort.toString())).resourceGroupRuntimeHeartbeat(internalNodeManager.getCurrentNode().getNodeIdentifier(), resourceGroupRuntimeInfos));
    }
}