TestMiniYarnClusterNodeUtilization.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.server;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.io.IOException;
import java.util.ArrayList;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.ResourceUtilization;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.MiniYARNCluster.CustomNodeManager;
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

public class TestMiniYarnClusterNodeUtilization {
  // Mini YARN cluster setup
  private static final int NUM_RM = 1;
  private static final int NUM_NM = 1;

  // Values for the first round
  private static final int CONTAINER_PMEM_1 = 1024;
  private static final int CONTAINER_VMEM_1 = 2048;
  private static final float CONTAINER_CPU_1 = 11.0f;

  private static final int NODE_PMEM_1 = 10240;
  private static final int NODE_VMEM_1 = 20480;
  private static final float NODE_CPU_1 = 51.0f;

  // Values for the second round
  private static final int CONTAINER_PMEM_2 = 2048;
  private static final int CONTAINER_VMEM_2 = 4096;
  private static final float CONTAINER_CPU_2 = 22.0f;

  private static final int NODE_PMEM_2 = 20480;
  private static final int NODE_VMEM_2 = 40960;
  private static final float NODE_CPU_2 = 61.0f;

  private MiniYARNCluster cluster;
  private CustomNodeManager nm;

  private Configuration conf;

  private NodeStatus nodeStatus;

  @BeforeEach
  public void setup() {
    conf = new YarnConfiguration();
    conf.set(YarnConfiguration.RM_WEBAPP_ADDRESS, "localhost:0");
    conf.setInt(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS, 100);
    String name = TestMiniYarnClusterNodeUtilization.class.getName();
    cluster = new MiniYARNCluster(name, NUM_RM, NUM_NM, 1, 1);
    cluster.init(conf);
    cluster.start();
    assertFalse(-1 == cluster.getActiveRMIndex(), "RM never turned active");

    nm = (CustomNodeManager)cluster.getNodeManager(0);
    nodeStatus = createNodeStatus(nm.getNMContext().getNodeId(), 0,
        CONTAINER_PMEM_1, CONTAINER_VMEM_1, CONTAINER_CPU_1,
        NODE_PMEM_1, NODE_VMEM_1, NODE_CPU_1);
    nm.setNodeStatus(nodeStatus);
  }

  /**
   * Simulates a NM heartbeat using the simulated NodeStatus fixture. Verify
   * both the RMNode and SchedulerNode have been updated with the new
   * utilization.
   */
  @Test
  @Timeout(60000)
  void testUpdateNodeUtilization()
      throws InterruptedException, IOException, YarnException {
    assertTrue(cluster.waitForNodeManagersToConnect(10000),
        "NMs fail to connect to the RM");

    // Give the heartbeat time to propagate to the RM
    verifySimulatedUtilization();

    // Alter utilization
    nodeStatus = createNodeStatus(nm.getNMContext().getNodeId(), 0,
        CONTAINER_PMEM_2, CONTAINER_VMEM_2, CONTAINER_CPU_2,
        NODE_PMEM_2, NODE_VMEM_2, NODE_CPU_2);
    nm.setNodeStatus(nodeStatus);

    // Give the heartbeat time to propagate to the RM
    verifySimulatedUtilization();
  }

  /**
   * Trigger the NM to send a heartbeat using the simulated NodeStatus fixture.
   * Verify both the RMNode and SchedulerNode have been updated with the new
   * utilization.
   */
  @Test
  @Timeout(60000)
  void testMockNodeStatusHeartbeat()
      throws InterruptedException, YarnException {
    assertTrue(cluster.waitForNodeManagersToConnect(10000),
        "NMs fail to connect to the RM");

    NodeStatusUpdater updater = nm.getNodeStatusUpdater();
    updater.sendOutofBandHeartBeat();

    // Give the heartbeat time to propagate to the RM
    verifySimulatedUtilization();

    // Alter utilization
    nodeStatus = createNodeStatus(nm.getNMContext().getNodeId(), 0,
        CONTAINER_PMEM_2, CONTAINER_VMEM_2, CONTAINER_CPU_2,
        NODE_PMEM_2, NODE_VMEM_2, NODE_CPU_2);
    nm.setNodeStatus(nodeStatus);
    updater.sendOutofBandHeartBeat();

    verifySimulatedUtilization();
  }

  /**
   * Create a NodeStatus test vector.
   * @param nodeId Node identifier.
   * @param responseId Response identifier.
   * @param containerPMem Virtual memory of the container.
   * @param containerVMem Physical memory of the container.
   * @param containerCPU CPU percentage of the container.
   * @param nodePMem Physical memory of the node.
   * @param nodeVMem Virtual memory of the node.
   * @param nodeCPU CPU percentage of the node.
   */
  private NodeStatus createNodeStatus(
      NodeId nodeId,
      int responseId,
      int containerPMem,
      int containerVMem,
      float containerCPU,
      int nodePMem,
      int nodeVMem,
      float nodeCPU) {

    // Fake node status with fake utilization
    ResourceUtilization containersUtilization =
        ResourceUtilization.newInstance(containerPMem, containerVMem,
            containerCPU);
    ResourceUtilization nodeUtilization =
        ResourceUtilization.newInstance(nodePMem, nodeVMem, nodeCPU);
    NodeStatus status = NodeStatus.newInstance(
        nodeId,
        responseId,
        new ArrayList<ContainerStatus>(),
        null,
        NodeHealthStatus.newInstance(true, null, 0),
        containersUtilization,
        nodeUtilization,
        null);

    return status;
  }

  /**
   * Verify both the RMNode and SchedulerNode have been updated with the test
   * fixture utilization data.
   */
  private void verifySimulatedUtilization() throws InterruptedException {
    ResourceManager rm = cluster.getResourceManager(0);
    RMContext rmContext = rm.getRMContext();

    ResourceUtilization containersUtilization =
        nodeStatus.getContainersUtilization();
    ResourceUtilization nodeUtilization =
        nodeStatus.getNodeUtilization();

    // Give the heartbeat time to propagate to the RM (max 10 seconds)
    // We check if the nodeUtilization is up to date
    for (int i = 0; i < 100; i++) {
      for (RMNode ni : rmContext.getRMNodes().values()) {
        if (ni.getNodeUtilization() != null) {
          if (ni.getNodeUtilization().equals(nodeUtilization)) {
            break;
          }
        }
      }
      Thread.sleep(100);
    }

    // Verify the data is readable from the RM and scheduler nodes
    for (RMNode ni : rmContext.getRMNodes().values()) {
      ResourceUtilization cu = ni.getAggregatedContainersUtilization();
      assertEquals(containersUtilization, cu, "Containers Utillization not propagated to RMNode");

      ResourceUtilization nu = ni.getNodeUtilization();
      assertEquals(nodeUtilization, nu, "Node Utillization not propagated to RMNode");

      SchedulerNode scheduler = rmContext.getScheduler().getSchedulerNode(ni.getNodeID());
      cu = scheduler.getAggregatedContainersUtilization();
      assertEquals(containersUtilization, cu,
          "Containers Utillization not propagated to SchedulerNode");

      nu = scheduler.getNodeUtilization();
      assertEquals(nodeUtilization, nu, "Node Utillization not propagated to SchedulerNode");
    }
  }
}