NodeQueueLoadMonitor.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 * <p/>
 * http://www.apache.org/licenses/LICENSE-2.0
 * <p/>
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed;

import org.apache.commons.math3.util.Precision;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.ResourceOption;
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus;
import org.apache.hadoop.yarn.server.resourcemanager.ClusterMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;

import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_OPP_CONTAINER_ALLOCATION_NODES_NUMBER_USED;

/**
 * The NodeQueueLoadMonitor keeps track of load metrics (such as queue length
 * and total wait time) associated with Container Queues on the Node Manager.
 * It uses this information to periodically sort the Nodes from least to most
 * loaded.
 */
public class NodeQueueLoadMonitor implements ClusterMonitor {

  protected final static Logger LOG = LoggerFactory.
      getLogger(NodeQueueLoadMonitor.class);

  protected int numNodesForAnyAllocation =
      DEFAULT_OPP_CONTAINER_ALLOCATION_NODES_NUMBER_USED;

  /**
   * The comparator used to specify the metric against which the load
   * of two Nodes are compared.
   */
  public enum LoadComparator implements Comparator<ClusterNode> {
    /**
     * This policy only considers queue length.
     * When allocating, increments queue length without looking at resources
     * available on the node, and when sorting, also only sorts by queue length.
     */
    QUEUE_LENGTH,
    /**
     * This policy only considers the wait time of containers in the queue.
     * Neither looks at resources nor at queue length.
     */
    QUEUE_WAIT_TIME,
    /**
     * This policy considers both queue length and resources.
     * When allocating, first decrements resources available on a node.
     * If resources are available, does not place OContainers on the node queue.
     * When sorting, it first sorts by queue length,
     * then by available resources.
     */
    QUEUE_LENGTH_THEN_RESOURCES;

    private Resource clusterResource = Resources.none();
    private final DominantResourceCalculator resourceCalculator =
        new DominantResourceCalculator();

    private boolean shouldPerformMinRatioComputation() {
      if (clusterResource == null) {
        return false;
      }

      return !resourceCalculator.isAnyMajorResourceZeroOrNegative(
          clusterResource);
    }

    /**
     * Compares queue length of nodes first (shortest first),
     * then compares available resources normalized
     * over cluster resources (most available resources first).
     * @param o1 the first ClusterNode
     * @param o2 the second ClusterNode
     * @return the difference the two ClusterNodes for sorting
     */
    private int compareQueueLengthThenResources(
        final ClusterNode o1, final ClusterNode o2) {
      int diff = o1.getQueueLength() - o2.getQueueLength();
      if (diff != 0) {
        return diff;
      }

      final Resource availableResource1 = o1.getAvailableResource();
      final Resource availableResource2 = o2.getAvailableResource();

      // Cluster resource should be valid before performing min-ratio logic
      // Use raw available resource comparison otherwise
      if (shouldPerformMinRatioComputation()) {
        // Takes the least available resource of the two nodes,
        // normalized to the overall cluster resource
        final float availableRatio1 =
            resourceCalculator.minRatio(availableResource1, clusterResource);
        final float availableRatio2 =
            resourceCalculator.minRatio(availableResource2, clusterResource);

        // The one with more available resources should be placed first
        diff = Precision.compareTo(
            availableRatio2, availableRatio1, Precision.EPSILON);
      }

      if (diff == 0) {
        // Compare absolute value if ratios are the same
        diff = availableResource2.getVirtualCores() - availableResource1.getVirtualCores();
      }

      if (diff == 0) {
        diff = Long.compare(availableResource2.getMemorySize(),
            availableResource1.getMemorySize());
      }

      return diff;
    }

    @Override
    public int compare(ClusterNode o1, ClusterNode o2) {
      int diff;
      switch (this) {
      case QUEUE_LENGTH_THEN_RESOURCES:
        diff = compareQueueLengthThenResources(o1, o2);
        break;
      case QUEUE_WAIT_TIME:
      case QUEUE_LENGTH:
      default:
        diff = getMetric(o1) - getMetric(o2);
        break;
      }

      if (diff == 0) {
        return (int) (o2.getTimestamp() - o1.getTimestamp());
      }
      return diff;
    }

    @VisibleForTesting
    void setClusterResource(Resource clusterResource) {
      this.clusterResource = clusterResource;
    }

    public ResourceCalculator getResourceCalculator() {
      return resourceCalculator;
    }

    public int getMetric(ClusterNode c) {
      switch (this) {
      case QUEUE_WAIT_TIME:
        return c.getQueueWaitTime();
      case QUEUE_LENGTH:
      case QUEUE_LENGTH_THEN_RESOURCES:
      default:
        return c.getQueueLength();
      }
    }

    /**
     * Increment the metric by a delta if it is below the threshold.
     * @param c ClusterNode
     * @param incrementSize increment size
     * @param requested the requested resource
     * @return true if the metric was below threshold and was incremented.
     */
    public boolean compareAndIncrement(
        ClusterNode c, int incrementSize, Resource requested) {
      switch (this) {
      case QUEUE_LENGTH_THEN_RESOURCES:
        return c.compareAndIncrementAllocation(
            incrementSize, resourceCalculator, requested);
      case QUEUE_WAIT_TIME:
        // for queue wait time, we don't have any threshold.
        return true;
      case QUEUE_LENGTH:
      default:
        return c.compareAndIncrementAllocation(incrementSize);
      }
    }

    /**
     * Whether we should be placing OContainers on a node.
     * @param cn the clusterNode
     * @return whether we should be placing OContainers on a node.
     */
    public boolean isNodeAvailable(final ClusterNode cn) {
      int queueCapacity = cn.getQueueCapacity();
      int queueLength = cn.getQueueLength();
      if (this == LoadComparator.QUEUE_LENGTH_THEN_RESOURCES) {
        if (queueCapacity <= 0) {
          return queueLength <= 0;
        } else {
          return queueLength < queueCapacity;
        }
      }
      // In the special case where queueCapacity is 0 for the node,
      // the container can be allocated on the node but will be rejected there
      return queueCapacity <= 0 || queueLength < queueCapacity;
    }
  }

  private final ScheduledExecutorService scheduledExecutor;

  protected final List<NodeId> sortedNodes;
  protected final Map<NodeId, ClusterNode> clusterNodes =
      new ConcurrentHashMap<>();
  protected final Map<String, RMNode> nodeByHostName =
      new ConcurrentHashMap<>();
  protected final Map<String, Set<NodeId>> nodeIdsByRack =
      new ConcurrentHashMap<>();
  protected final LoadComparator comparator;
  protected QueueLimitCalculator thresholdCalculator;
  protected ReentrantReadWriteLock sortedNodesLock = new ReentrantReadWriteLock();
  protected ReentrantReadWriteLock clusterNodesLock =
      new ReentrantReadWriteLock();

  Runnable computeTask = new Runnable() {
    @Override
    public void run() {
      ReentrantReadWriteLock.WriteLock writeLock = sortedNodesLock.writeLock();
      writeLock.lock();
      try {
        try {
          updateSortedNodes();
        } catch (Exception ex) {
          LOG.warn("Got Exception while sorting nodes..", ex);
        }
        if (thresholdCalculator != null) {
          thresholdCalculator.update();
        }
      } finally {
        writeLock.unlock();
      }
    }
  };

  @VisibleForTesting
  NodeQueueLoadMonitor(LoadComparator comparator) {
    this.sortedNodes = new ArrayList<>();
    this.comparator = comparator;
    this.scheduledExecutor = null;
  }

  public NodeQueueLoadMonitor(long nodeComputationInterval,
      LoadComparator comparator, int numNodes) {
    this.sortedNodes = new ArrayList<>();
    this.scheduledExecutor = Executors.newScheduledThreadPool(1);
    this.comparator = comparator;
    this.scheduledExecutor.scheduleAtFixedRate(computeTask,
        nodeComputationInterval, nodeComputationInterval,
        TimeUnit.MILLISECONDS);
    numNodesForAnyAllocation = numNodes;
  }

  protected void updateSortedNodes() {
    List<NodeId> nodeIds = sortNodes(true).stream()
        .map(n -> n.nodeId)
        .collect(Collectors.toList());
    sortedNodes.clear();
    sortedNodes.addAll(nodeIds);
  }

  List<NodeId> getSortedNodes() {
    return sortedNodes;
  }

  public QueueLimitCalculator getThresholdCalculator() {
    return thresholdCalculator;
  }

  public void stop() {
    if (scheduledExecutor != null) {
      scheduledExecutor.shutdown();
    }
  }

  Map<NodeId, ClusterNode> getClusterNodes() {
    return clusterNodes;
  }

  Comparator<ClusterNode> getComparator() {
    return comparator;
  }

  public void initThresholdCalculator(float sigma, int limitMin, int limitMax) {
    this.thresholdCalculator =
        new QueueLimitCalculator(this, sigma, limitMin, limitMax);
  }

  @Override
  public void addNode(List<NMContainerStatus> containerStatuses,
      RMNode rmNode) {
    this.nodeByHostName.put(rmNode.getHostName(), rmNode);
    addIntoNodeIdsByRack(rmNode);
    // Ignoring this currently : at least one NODE_UPDATE heartbeat is
    // required to ensure node eligibility.
  }

  @Override
  public void removeNode(RMNode removedRMNode) {
    LOG.info("Node delete event for: {}", removedRMNode.getNode().getName());
    this.nodeByHostName.remove(removedRMNode.getHostName());
    removeFromNodeIdsByRack(removedRMNode);
    ReentrantReadWriteLock.WriteLock writeLock = clusterNodesLock.writeLock();
    writeLock.lock();
    ClusterNode node;
    try {
      node = this.clusterNodes.remove(removedRMNode.getNodeID());
      onNodeRemoved(node);
    } finally {
      writeLock.unlock();
    }
    if (LOG.isDebugEnabled()) {
      if (node != null) {
        LOG.debug("Delete ClusterNode: " + removedRMNode.getNodeID());
      } else {
        LOG.debug("Node not in list!");
      }
    }
  }

  /**
   * Provide an integration point for extended class
   * @param node the node removed
   */
  protected void onNodeRemoved(ClusterNode node) {
  }

  @Override
  public void updateNode(RMNode rmNode) {
    LOG.debug("Node update event from: {}", rmNode.getNodeID());
    OpportunisticContainersStatus opportunisticContainersStatus =
        rmNode.getOpportunisticContainersStatus();
    if (opportunisticContainersStatus == null) {
      opportunisticContainersStatus =
          OpportunisticContainersStatus.newInstance();
    }

    // Add nodes to clusterNodes. If estimatedQueueTime is -1, ignore node
    // UNLESS comparator is based on queue length.
    ReentrantReadWriteLock.WriteLock writeLock = clusterNodesLock.writeLock();
    writeLock.lock();
    try {
      ClusterNode clusterNode = this.clusterNodes.get(rmNode.getNodeID());
      if (clusterNode == null) {
        onNewNodeAdded(rmNode, opportunisticContainersStatus);
      } else {
        onExistingNodeUpdated(rmNode, clusterNode, opportunisticContainersStatus);
      }
    } finally {
      writeLock.unlock();
    }
  }

  protected void onNewNodeAdded(
      RMNode rmNode, OpportunisticContainersStatus status) {
    int opportQueueCapacity = status.getOpportQueueCapacity();
    int estimatedQueueWaitTime = status.getEstimatedQueueWaitTime();
    int waitQueueLength = status.getWaitQueueLength();

    if (rmNode.getState() != NodeState.DECOMMISSIONING &&
        (estimatedQueueWaitTime != -1 ||
            comparator == LoadComparator.QUEUE_LENGTH ||
            comparator == LoadComparator.QUEUE_LENGTH_THEN_RESOURCES)) {
      final ClusterNode.Properties properties =
          ClusterNode.Properties.newInstance()
              .setQueueWaitTime(estimatedQueueWaitTime)
              .setQueueLength(waitQueueLength)
              .setNodeLabels(rmNode.getNodeLabels())
              .setCapability(rmNode.getTotalCapability())
              .setAllocatedResource(rmNode.getAllocatedContainerResource())
              .setQueueCapacity(opportQueueCapacity)
              .updateTimestamp();

      this.clusterNodes.put(rmNode.getNodeID(),
          new ClusterNode(rmNode.getNodeID()).setProperties(properties));

      LOG.info(
          "Inserting ClusterNode [{}] with queue wait time [{}] and "
              + "wait queue length [{}]",
          rmNode.getNode(),
          estimatedQueueWaitTime,
          waitQueueLength
      );
    } else {
      LOG.warn(
          "IGNORING ClusterNode [{}] with queue wait time [{}] and "
              + "wait queue length [{}]",
          rmNode.getNode(),
          estimatedQueueWaitTime,
          waitQueueLength
      );
    }
  }

  protected void onExistingNodeUpdated(
      RMNode rmNode, ClusterNode clusterNode,
      OpportunisticContainersStatus status) {

    int estimatedQueueWaitTime = status.getEstimatedQueueWaitTime();
    int waitQueueLength = status.getWaitQueueLength();

    if (rmNode.getState() != NodeState.DECOMMISSIONING &&
        (estimatedQueueWaitTime != -1 ||
            comparator == LoadComparator.QUEUE_LENGTH ||
            comparator == LoadComparator.QUEUE_LENGTH_THEN_RESOURCES)) {
      final ClusterNode.Properties properties =
          ClusterNode.Properties.newInstance()
              .setQueueWaitTime(estimatedQueueWaitTime)
              .setQueueLength(waitQueueLength)
              .setNodeLabels(rmNode.getNodeLabels())
              .setCapability(rmNode.getTotalCapability())
              .setAllocatedResource(rmNode.getAllocatedContainerResource())
              .updateTimestamp();

      clusterNode.setProperties(properties);

      LOG.debug("Updating ClusterNode [{}] with queue wait time [{}] and"
              + " wait queue length [{}]", rmNode.getNodeID(),
          estimatedQueueWaitTime, waitQueueLength);

    } else {
      this.clusterNodes.remove(rmNode.getNodeID());
      LOG.info("Deleting ClusterNode [" + rmNode.getNodeID() + "] " +
          "with queue wait time [" + clusterNode.getQueueWaitTime() + "] and " +
          "wait queue length [" + clusterNode.getQueueLength() + "]");
    }
  }

  @Override
  public void updateNodeResource(RMNode rmNode, ResourceOption resourceOption) {
    LOG.debug("Node resource update event from: {}", rmNode.getNodeID());
    // Ignoring this currently.
  }

  /**
   * Returns all Node Ids as ordered list from Least to Most Loaded.
   * @return ordered list of nodes
   */
  public List<NodeId> selectNodes() {
    return selectLeastLoadedNodes(-1);
  }

  /**
   * Returns 'K' of the least Loaded Node Ids as ordered list.
   * @param k max number of nodes to return
   * @return ordered list of nodes
   */
  public List<NodeId> selectLeastLoadedNodes(int k) {
    ReentrantReadWriteLock.ReadLock readLock = sortedNodesLock.readLock();
    readLock.lock();
    try {
      List<NodeId> retVal = ((k < this.sortedNodes.size()) && (k >= 0)) ?
          new ArrayList<>(this.sortedNodes).subList(0, k) :
          new ArrayList<>(this.sortedNodes);
      return retVal;
    } finally {
      readLock.unlock();
    }
  }

  /**
   * Selects the node as specified by hostName for resource allocation,
   * unless the node has been blacklisted.
   * @param hostName the hostname of the node for local resource allocation
   * @param blacklist the blacklisted nodes
   * @param request the requested resource
   * @return the selected node, null if the node is full or is blacklisted
   */
  public RMNode selectLocalNode(
      String hostName, Set<String> blacklist, Resource request) {
    if (blacklist.contains(hostName)) {
      return null;
    }
    RMNode node = nodeByHostName.get(hostName);
    if (node != null) {
      ClusterNode clusterNode = clusterNodes.get(node.getNodeID());
      if (clusterNode != null && comparator
          .compareAndIncrement(clusterNode, 1, request)) {
        return node;
      }
    }
    return null;
  }

  /**
   * Selects a node from the rack as specified by rackName
   * for resource allocation, excluding blacklisted nodes
   * @param rackName the rack name for rack-local resource allocation
   * @param blacklist the blacklisted nodes
   * @param request the requested resource
   * @return the selected node, null if no suitable nodes
   */
  public RMNode selectRackLocalNode(
      String rackName, Set<String> blacklist, Resource request) {
    Set<NodeId> nodesOnRack = nodeIdsByRack.get(rackName);
    if (nodesOnRack != null) {
      for (NodeId nodeId : nodesOnRack) {
        if (!blacklist.contains(nodeId.getHost())) {
          ClusterNode node = clusterNodes.get(nodeId);
          if (node != null &&
              comparator.compareAndIncrement(node, 1, request)) {
            return nodeByHostName.get(nodeId.getHost());
          }
        }
      }
    }
    return null;
  }

  /**
   * Selects a node from all ClusterNodes for resource allocation,
   * excluding blacklisted nodes.
   * @param blacklist the blacklisted nodes
   * @param request the requested resource
   * @return the selected node, null if no suitable nodes
   */
  public RMNode selectAnyNode(Set<String> blacklist, Resource request) {
    List<NodeId> nodeIds = getCandidatesForSelectAnyNode();
    int size = nodeIds.size();
    if (size <= 0) {
      return null;
    }
    Random rand = new Random();
    int startIndex = rand.nextInt(size);
    for (int i = 0; i < size; ++i) {
      int index = i + startIndex;
      index %= size;
      NodeId nodeId = nodeIds.get(index);
      if (nodeId != null && !blacklist.contains(nodeId.getHost())) {
        ClusterNode node = clusterNodes.get(nodeId);
        if (node != null && comparator.compareAndIncrement(
            node, 1, request)) {
          return nodeByHostName.get(nodeId.getHost());
        }
      }
    }
    return null;
  }

  protected List<NodeId> getCandidatesForSelectAnyNode() {
    return selectLeastLoadedNodes(numNodesForAnyAllocation);
  }

  protected void removeFromNodeIdsByRack(RMNode removedNode) {
    nodeIdsByRack.computeIfPresent(removedNode.getRackName(),
        (k, v) -> {
          v.remove(removedNode.getNodeID());
          return v;
        });
  }

  protected void addIntoNodeIdsByRack(RMNode addedNode) {
    nodeIdsByRack.compute(addedNode.getRackName(), (k, v) -> v == null ?
        ConcurrentHashMap.newKeySet() : v).add(addedNode.getNodeID());
  }

  protected List<ClusterNode> sortNodes(boolean excludeFullNodes) {
    ReentrantReadWriteLock.ReadLock readLock = clusterNodesLock.readLock();
    readLock.lock();
    try {
      final ClusterNode[] nodes = new ClusterNode[clusterNodes.size()];
      int nodesIdx = 0;
      final Resource clusterResource = Resource.newInstance(Resources.none());
      for (final ClusterNode node : this.clusterNodes.values()) {
        Resources.addTo(clusterResource, node.getCapability());
        nodes[nodesIdx] = node;
        nodesIdx++;
      }

      comparator.setClusterResource(clusterResource);

      final List<ClusterNode> retList = new ArrayList<>();
      Arrays.sort(nodes, comparator);
      for (final ClusterNode cNode : nodes) {
        if (!excludeFullNodes || comparator.isNodeAvailable(cNode)) {
          retList.add(cNode);
        }
      }
      return retList;
    } finally {
      readLock.unlock();
    }
  }

}