ClusterSizeMonitor.java

/*
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.facebook.presto.execution;

import com.facebook.presto.execution.scheduler.NodeSchedulerConfig;
import com.facebook.presto.metadata.AllNodes;
import com.facebook.presto.metadata.InternalNodeManager;
import com.facebook.presto.server.ServerConfig;
import com.facebook.presto.spi.Node;
import com.facebook.presto.spi.PrestoException;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import io.airlift.units.Duration;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.annotation.concurrent.GuardedBy;
import javax.inject.Inject;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.function.Consumer;

import static com.facebook.airlift.concurrent.Threads.threadsNamed;
import static com.facebook.presto.spi.StandardErrorCode.GENERIC_INSUFFICIENT_RESOURCES;
import static com.facebook.presto.spi.StandardErrorCode.NO_CPP_SIDECARS;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.util.concurrent.Futures.immediateFuture;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
import static java.util.concurrent.TimeUnit.MILLISECONDS;

public class ClusterSizeMonitor
{
    private final InternalNodeManager nodeManager;
    private final boolean includeCoordinator;
    private final int workerMinCount;
    private final int workerMinCountActive;
    private final Duration executionMaxWait;
    private final int coordinatorMinCount;
    private final int coordinatorMinCountActive;
    private final Duration coordinatorMaxWait;

    private final Duration coordinatorSidecarMaxWait;
    private final int resourceManagerMinCountActive;
    private final ScheduledExecutorService executor;

    private final Consumer<AllNodes> listener = this::updateAllNodes;
    private final boolean isCoordinatorSidecarEnabled;

    @GuardedBy("this")
    private int currentWorkerCount;

    @GuardedBy("this")
    private int currentCoordinatorCount;

    @GuardedBy("this")
    private int currentResourceManagerCount;

    @GuardedBy("this")
    private int currentCoordinatorSidecarCount;

    @GuardedBy("this")
    private final List<SettableFuture<?>> workerSizeFutures = new ArrayList<>();

    @GuardedBy("this")
    private final List<SettableFuture<?>> coordinatorSizeFutures = new ArrayList<>();

    @GuardedBy("this")
    private final List<SettableFuture<?>> coordinatorSidecarSizeFutures = new ArrayList<>();

    @Inject
    public ClusterSizeMonitor(InternalNodeManager nodeManager, NodeSchedulerConfig nodeSchedulerConfig, QueryManagerConfig queryManagerConfig, NodeResourceStatusConfig nodeResourceStatusConfig, ServerConfig serverConfig)
    {
        this(
                nodeManager,
                requireNonNull(nodeSchedulerConfig, "nodeSchedulerConfig is null").isIncludeCoordinator(),
                requireNonNull(queryManagerConfig, "queryManagerConfig is null").getRequiredWorkers(),
                requireNonNull(nodeResourceStatusConfig, "nodeResourceStatusConfig is null").getRequiredWorkersActive(),
                queryManagerConfig.getRequiredWorkersMaxWait(),
                queryManagerConfig.getRequiredCoordinators(),
                nodeResourceStatusConfig.getRequiredCoordinatorsActive(),
                queryManagerConfig.getRequiredCoordinatorsMaxWait(),
                queryManagerConfig.getRequiredCoordinatorSidecarsMaxWait(),
                nodeResourceStatusConfig.getRequiredResourceManagersActive(),
                serverConfig.isCoordinatorSidecarEnabled());
    }

    public ClusterSizeMonitor(
            InternalNodeManager nodeManager,
            boolean includeCoordinator,
            int workerMinCount,
            int workerMinCountActive,
            Duration executionMaxWait,
            int coordinatorMinCount,
            int coordinatorMinCountActive,
            Duration coordinatorMaxWait,
            Duration coordinatorSidecarMaxWait,
            int resourceManagerMinCountActive,
            boolean isCoordinatorSidecarEnabled)
    {
        this.nodeManager = requireNonNull(nodeManager, "nodeManager is null");
        this.includeCoordinator = includeCoordinator;
        checkArgument(workerMinCount >= 0, "executionMinCount is negative");
        this.workerMinCount = workerMinCount;
        checkArgument(workerMinCountActive >= 0, "executionMinCountActive is negative");
        this.workerMinCountActive = workerMinCountActive;
        this.executionMaxWait = requireNonNull(executionMaxWait, "executionMaxWait is null");
        checkArgument(coordinatorMinCount >= 0, "coordinatorMinCount is negative");
        this.coordinatorMinCount = coordinatorMinCount;
        checkArgument(coordinatorMinCountActive >= 0, "coordinatorMinCountActive is negative");
        this.coordinatorMinCountActive = coordinatorMinCountActive;
        this.coordinatorMaxWait = requireNonNull(coordinatorMaxWait, "coordinatorMaxWait is null");
        this.coordinatorSidecarMaxWait = requireNonNull(coordinatorSidecarMaxWait, "coordinatorSidecarMaxWait is null");
        checkArgument(resourceManagerMinCountActive >= 0, "resourceManagerMinCountActive is negative");
        this.resourceManagerMinCountActive = resourceManagerMinCountActive;
        this.executor = newSingleThreadScheduledExecutor(threadsNamed("node-monitor-%s"));
        this.isCoordinatorSidecarEnabled = isCoordinatorSidecarEnabled;
    }

    @PostConstruct
    public void start()
    {
        nodeManager.addNodeChangeListener(listener);
        updateAllNodes(nodeManager.getAllNodes());
    }

    @PreDestroy
    public void stop()
    {
        nodeManager.removeNodeChangeListener(listener);
        executor.shutdownNow();
    }

    /**
     * @return true when the current worker count is greater or equals to
     * minimum worker count for Coordinator.
     */
    public boolean hasRequiredWorkers()
    {
        return currentWorkerCount >= workerMinCountActive;
    }

    /**
     * @return true when the current resource manager count is greater or equals to
     * minimum resource manager count for Coordinator.
     */
    public boolean hasRequiredResourceManagers()
    {
        return currentResourceManagerCount >= resourceManagerMinCountActive;
    }

    /**
     * @return true when the current coordinator count in a cluster is greater or equals to
     * minimum coordinator count for a given Coordinator.
     */
    public boolean hasRequiredCoordinators()
    {
        return currentCoordinatorCount >= coordinatorMinCountActive;
    }

    /**
     * @return true when the current coordinator sidecars count in a cluster is greater or equals to
     * minimum coordinator sidecars count for a given Coordinator.
     */
    public boolean hasRequiredCoordinatorSidecars()
    {
        return currentCoordinatorSidecarCount > 0;
    }

    /**
     * Returns a listener that completes when the minimum number of workers for the cluster has been met.
     * Note: caller should not add a listener using the direct executor, as this can delay the
     * notifications for other listeners.
     */
    public synchronized ListenableFuture<?> waitForMinimumWorkers()
    {
        if (currentWorkerCount >= workerMinCount) {
            return immediateFuture(null);
        }

        SettableFuture<?> future = SettableFuture.create();
        workerSizeFutures.add(future);

        // if future does not finish in wait period, complete with an exception
        ScheduledFuture<?> timeoutTask = executor.schedule(
                () -> {
                    synchronized (this) {
                        future.setException(new PrestoException(
                                GENERIC_INSUFFICIENT_RESOURCES,
                                format("Insufficient active worker nodes. Waited %s for at least %s workers, but only %s workers are active", executionMaxWait, workerMinCount, currentWorkerCount)));
                    }
                },
                executionMaxWait.toMillis(),
                MILLISECONDS);

        // remove future if finished (e.g., canceled, timed out)
        future.addListener(() -> {
            timeoutTask.cancel(true);
            removeWorkerFuture(future);
        }, executor);

        return future;
    }

    public synchronized ListenableFuture<?> waitForMinimumCoordinators()
    {
        if (currentCoordinatorCount >= coordinatorMinCount) {
            return immediateFuture(null);
        }

        SettableFuture<?> future = SettableFuture.create();
        coordinatorSizeFutures.add(future);

        // if future does not finish in wait period, complete with an exception
        ScheduledFuture<?> timeoutTask = executor.schedule(
                () -> {
                    synchronized (this) {
                        future.setException(new PrestoException(
                                GENERIC_INSUFFICIENT_RESOURCES,
                                format("Insufficient active coordinator nodes. Waited %s for at least %s coordinators, but only %s coordinators are active", executionMaxWait, 2, currentCoordinatorCount)));
                    }
                },
                coordinatorMaxWait.toMillis(),
                MILLISECONDS);

        // remove future if finished (e.g., canceled, timed out)
        future.addListener(() -> {
            timeoutTask.cancel(true);
            removeCoordinatorFuture(future);
        }, executor);

        return future;
    }

    public synchronized ListenableFuture<?> waitForMinimumCoordinatorSidecars()
    {
        if (currentCoordinatorSidecarCount > 0 || !isCoordinatorSidecarEnabled) {
            return immediateFuture(null);
        }

        SettableFuture<?> future = SettableFuture.create();
        coordinatorSidecarSizeFutures.add(future);

        // if future does not finish in wait period, complete with an exception
        ScheduledFuture<?> timeoutTask = executor.schedule(
                () -> {
                    synchronized (this) {
                        future.setException(new PrestoException(
                                NO_CPP_SIDECARS,
                                format("Insufficient active coordinator sidecar nodes. Waited %s for at least 1 coordinator sidecars, but only 0 coordinator sidecars are active", coordinatorSidecarMaxWait)));
                    }
                },
                coordinatorSidecarMaxWait.toMillis(),
                MILLISECONDS);

        // remove future if finished (e.g., canceled, timed out)
        future.addListener(() -> {
            timeoutTask.cancel(true);
            removeCoordinatorSidecarFuture(future);
        }, executor);

        return future;
    }

    private synchronized void removeWorkerFuture(SettableFuture<?> future)
    {
        workerSizeFutures.remove(future);
    }

    private synchronized void removeCoordinatorFuture(SettableFuture<?> future)
    {
        coordinatorSizeFutures.remove(future);
    }

    private synchronized void removeCoordinatorSidecarFuture(SettableFuture<?> future)
    {
        coordinatorSidecarSizeFutures.remove(future);
    }

    private synchronized void updateAllNodes(AllNodes allNodes)
    {
        if (includeCoordinator) {
            currentWorkerCount = allNodes.getActiveNodes().size();
        }
        else {
            Set<Node> activeNodes = new HashSet<>(allNodes.getActiveNodes());
            activeNodes.removeAll(allNodes.getActiveCoordinators());
            activeNodes.removeAll(allNodes.getActiveResourceManagers());
            currentWorkerCount = activeNodes.size();
        }
        currentCoordinatorCount = allNodes.getActiveCoordinators().size();
        currentResourceManagerCount = allNodes.getActiveResourceManagers().size();
        currentCoordinatorSidecarCount = allNodes.getActiveCoordinatorSidecars().size();
        if (currentWorkerCount >= workerMinCount) {
            List<SettableFuture<?>> listeners = ImmutableList.copyOf(workerSizeFutures);
            workerSizeFutures.clear();
            executor.submit(() -> listeners.forEach(listener -> listener.set(null)));
        }
        if (currentCoordinatorCount >= coordinatorMinCount) {
            List<SettableFuture<?>> listeners = ImmutableList.copyOf(coordinatorSizeFutures);
            coordinatorSizeFutures.clear();
            executor.submit(() -> listeners.forEach(listener -> listener.set(null)));
        }
        if (currentCoordinatorSidecarCount == 1) {
            List<SettableFuture<?>> listeners = ImmutableList.copyOf(coordinatorSidecarSizeFutures);
            coordinatorSidecarSizeFutures.clear();
            executor.submit(() -> listeners.forEach(listener -> listener.set(null)));
        }
    }
}