TestResourceTrackerService.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.server.resourcemanager;

import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableMap;
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet;
import org.apache.hadoop.net.ServerSocketUtil;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.delegation.web.DelegationTokenIdentifier;
import org.apache.hadoop.util.Sets;
import org.apache.hadoop.util.XMLUtils;
import org.apache.hadoop.yarn.nodelabels.NodeAttributeStore;
import org.apache.hadoop.yarn.nodelabels.NodeLabelUtil;
import org.apache.hadoop.yarn.server.api.ResourceTracker;
import org.apache.hadoop.yarn.server.api.ServerRMProxy;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeToAttributes;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.NodeEventDispatcher;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.FileSystemNodeAttributeStore;

import static org.apache.hadoop.yarn.server.resourcemanager.MockNM.createMockNodeStatus;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.HashSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.transform.Transformer;
import javax.xml.transform.TransformerFactory;
import javax.xml.transform.dom.DOMSource;
import javax.xml.transform.stream.StreamResult;
import javax.xml.transform.OutputKeys;

import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.metrics2.MetricsSystem;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.yarn.LocalConfigurationProvider;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.NodeAttribute;
import org.apache.hadoop.yarn.api.records.NodeAttributeType;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.event.InlineDispatcher;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.nodelabels.AttributeValue;
import org.apache.hadoop.yarn.nodelabels.NodeAttributesManager;
import org.apache.hadoop.yarn.nodelabels.NodeLabelTestBase;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SystemCredentialsForAppsProto;
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.UnRegisterNodeManagerRequest;
import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
import org.apache.hadoop.yarn.server.api.records.NodeAction;
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NodeLabelsUtils;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.timelineservice.collector.PerNodeTimelineCollectorsAuxService;
import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils;
import org.apache.hadoop.yarn.util.Records;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.apache.hadoop.yarn.util.YarnVersionInfo;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.w3c.dom.Document;
import org.w3c.dom.Element;

public class TestResourceTrackerService extends NodeLabelTestBase {

  private final static File TEMP_DIR = new File(System.getProperty(
      "test.build.data", "/tmp"), "decommision");
  private final File hostFile =
      new File(TEMP_DIR + File.separator + "hostFile.txt");
  private final File excludeHostFile = new File(TEMP_DIR + File.separator +
      "excludeHostFile.txt");
  private final File excludeHostXmlFile =
      new File(TEMP_DIR + File.separator + "excludeHostFile.xml");

  private MockRM rm;

  /**
   * Test RM read NM next heartBeat Interval correctly from Configuration file,
   * and NM get next heartBeat Interval from RM correctly
   */
  @Test
  @Timeout(value = 50)
  public void testGetNextHeartBeatInterval() throws Exception {
    Configuration conf = new Configuration();
    conf.set(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS, "4000");

    rm = new MockRM(conf);
    rm.start();

    MockNM nm1 = rm.registerNode("host1:1234", 5120);
    MockNM nm2 = rm.registerNode("host2:5678", 10240);

    NodeHeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true);
    assertEquals(4000, nodeHeartbeat.getNextHeartBeatInterval());

    NodeHeartbeatResponse nodeHeartbeat2 = nm2.nodeHeartbeat(true);
    assertEquals(4000, nodeHeartbeat2.getNextHeartBeatInterval());

  }

  /**
   * Decommissioning using a pre-configured include hosts file
   */
  @Test
  public void testDecommissionWithIncludeHosts() throws Exception {

    writeToHostsFile("localhost", "host1", "host2");
    Configuration conf = new Configuration();
    conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, hostFile
        .getAbsolutePath());

    rm = new MockRM(conf);
    rm.start();

    MockNM nm1 = rm.registerNode("host1:1234", 5120);
    MockNM nm2 = rm.registerNode("host2:5678", 10240);
    MockNM nm3 = rm.registerNode("localhost:4433", 1024);
    
    ClusterMetrics metrics = ClusterMetrics.getMetrics();
    assert(metrics != null);
    int metricCount = metrics.getNumDecommisionedNMs();

    NodeHeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true);
    assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
    nodeHeartbeat = nm2.nodeHeartbeat(true);
    assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
    nodeHeartbeat = nm3.nodeHeartbeat(true);
    assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));

    // To test that IPs also work
    String ip = NetUtils.normalizeHostName("localhost");
    writeToHostsFile("host1", ip);

    rm.getNodesListManager().refreshNodes(conf);

    checkShutdownNMCount(rm, ++metricCount);

    nodeHeartbeat = nm1.nodeHeartbeat(true);
    assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));

    assertEquals(1, ClusterMetrics.getMetrics().getNumShutdownNMs());

    nodeHeartbeat = nm2.nodeHeartbeat(true);
    assertTrue(NodeAction.SHUTDOWN.equals(nodeHeartbeat.getNodeAction()),
        "Node is not decommisioned.");

    nodeHeartbeat = nm3.nodeHeartbeat(true);
    assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
    assertEquals(metricCount, ClusterMetrics.getMetrics()
      .getNumShutdownNMs());
    rm.stop();
  }

  /**
   * Decommissioning using a pre-configured exclude hosts file
   */
  @Test
  public void testDecommissionWithExcludeHosts() throws Exception {
    Configuration conf = new Configuration();
    conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, hostFile
        .getAbsolutePath());

    writeToHostsFile("");
    rm = new MockRM(conf);
    rm.start();

    MockNM nm1 = rm.registerNode("host1:1234", 5120);
    MockNM nm2 = rm.registerNode("host2:5678", 10240);
    MockNM nm3 = rm.registerNode("localhost:4433", 1024);


    int metricCount = ClusterMetrics.getMetrics().getNumDecommisionedNMs();
    NodeHeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true);
    assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
    nodeHeartbeat = nm2.nodeHeartbeat(true);
    assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
    rm.drainEvents();

    // To test that IPs also work
    String ip = NetUtils.normalizeHostName("localhost");
    writeToHostsFile("host2", ip);

    rm.getNodesListManager().refreshNodes(conf);

    checkDecommissionedNMCount(rm, metricCount + 2);

    nodeHeartbeat = nm1.nodeHeartbeat(true);
    assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
    nodeHeartbeat = nm2.nodeHeartbeat(true);
    assertTrue(NodeAction.SHUTDOWN.equals(nodeHeartbeat.getNodeAction()),
        "The decommisioned metrics are not updated");

    nodeHeartbeat = nm3.nodeHeartbeat(true);
    assertTrue(NodeAction.SHUTDOWN.equals(nodeHeartbeat.getNodeAction()),
        "The decommisioned metrics are not updated");
    rm.drainEvents();

    writeToHostsFile("");
    rm.getNodesListManager().refreshNodes(conf);

    nm3 = rm.registerNode("localhost:4433", 1024);
    nodeHeartbeat = nm3.nodeHeartbeat(true);
    rm.drainEvents();
    assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
    // decommissined node is 1 since 1 node is rejoined after updating exclude
    // file
    checkDecommissionedNMCount(rm, metricCount + 1);
  }

  /**
   * Graceful decommission node with no running application.
   */
  @Test
  public void testGracefulDecommissionNoApp() throws Exception {
    Configuration conf = new Configuration();
    conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, hostFile
        .getAbsolutePath());

    writeToHostsFile("");
    rm = new MockRM(conf);
    rm.start();

    MockNM nm1 = rm.registerNode("host1:1234", 5120);
    MockNM nm2 = rm.registerNode("host2:5678", 10240);
    MockNM nm3 = rm.registerNode("host3:4433", 5120);

    int metricCount = ClusterMetrics.getMetrics().getNumDecommisionedNMs();
    NodeHeartbeatResponse nodeHeartbeat1 = nm1.nodeHeartbeat(true);
    NodeHeartbeatResponse nodeHeartbeat2 = nm2.nodeHeartbeat(true);
    NodeHeartbeatResponse nodeHeartbeat3 = nm3.nodeHeartbeat(true);

    assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat1.getNodeAction()));
    assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat2.getNodeAction()));
    assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat3.getNodeAction()));

    rm.waitForState(nm2.getNodeId(), NodeState.RUNNING);
    rm.waitForState(nm3.getNodeId(), NodeState.RUNNING);

    // Graceful decommission both host2 and host3.
    writeToHostsFile("host2", "host3");
    rm.getNodesListManager().refreshNodes(conf, true);

    rm.waitForState(nm2.getNodeId(), NodeState.DECOMMISSIONING);
    rm.waitForState(nm3.getNodeId(), NodeState.DECOMMISSIONING);

    nodeHeartbeat1 = nm1.nodeHeartbeat(true);
    nodeHeartbeat2 = nm2.nodeHeartbeat(true);
    nodeHeartbeat3 = nm3.nodeHeartbeat(true);

    checkDecommissionedNMCount(rm, metricCount + 2);
    rm.waitForState(nm2.getNodeId(), NodeState.DECOMMISSIONED);
    rm.waitForState(nm3.getNodeId(), NodeState.DECOMMISSIONED);

    assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat1.getNodeAction()));
    assertEquals(NodeAction.SHUTDOWN, nodeHeartbeat2.getNodeAction());
    assertEquals(NodeAction.SHUTDOWN, nodeHeartbeat3.getNodeAction());
  }

  @Test
  public void testGracefulDecommissionDefaultTimeoutResolution()
      throws Exception {
    Configuration conf = new Configuration();
    conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, excludeHostXmlFile
        .getAbsolutePath());

    writeToHostsXmlFile(excludeHostXmlFile, Pair.of("", null));
    rm = new MockRM(conf);
    rm.start();

    int nodeMemory = 1024;
    MockNM nm1 = rm.registerNode("host1:1234", nodeMemory);
    MockNM nm2 = rm.registerNode("host2:5678", nodeMemory);
    MockNM nm3 = rm.registerNode("host3:9101", nodeMemory);

    NodeHeartbeatResponse nodeHeartbeat1 = nm1.nodeHeartbeat(true);
    NodeHeartbeatResponse nodeHeartbeat2 = nm2.nodeHeartbeat(true);
    NodeHeartbeatResponse nodeHeartbeat3 = nm3.nodeHeartbeat(true);

    assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat1.getNodeAction()));
    assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat2.getNodeAction()));
    assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat3.getNodeAction()));

    rm.waitForState(nm1.getNodeId(), NodeState.RUNNING);
    rm.waitForState(nm2.getNodeId(), NodeState.RUNNING);
    rm.waitForState(nm3.getNodeId(), NodeState.RUNNING);

    // Graceful decommission both host1 and host2, with
    // non default timeout for host1
    final Integer nm1DecommissionTimeout = 20;
    writeToHostsXmlFile(
        excludeHostXmlFile,
        Pair.of(nm1.getNodeId().getHost(), nm1DecommissionTimeout),
        Pair.of(nm2.getNodeId().getHost(), null));
    rm.getNodesListManager().refreshNodes(conf, true);
    rm.waitForState(nm1.getNodeId(), NodeState.DECOMMISSIONING);
    rm.waitForState(nm2.getNodeId(), NodeState.DECOMMISSIONING);
    assertEquals(nm1DecommissionTimeout, rm.getDecommissioningTimeout(nm1.getNodeId()));
    Integer defaultDecTimeout =
        conf.getInt(YarnConfiguration.RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT,
            YarnConfiguration.DEFAULT_RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT);
    assertEquals(defaultDecTimeout, rm.getDecommissioningTimeout(nm2.getNodeId()));

    // Graceful decommission host3 with a new default timeout
    final Integer newDefaultDecTimeout = defaultDecTimeout + 10;
    writeToHostsXmlFile(
        excludeHostXmlFile, Pair.of(nm3.getNodeId().getHost(), null));
    conf.setInt(YarnConfiguration.RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT,
        newDefaultDecTimeout);
    rm.getNodesListManager().refreshNodes(conf, true);
    rm.waitForState(nm3.getNodeId(), NodeState.DECOMMISSIONING);
    assertEquals(newDefaultDecTimeout, rm.getDecommissioningTimeout(nm3.getNodeId()));
  }

  /**
   * Graceful decommission node with running application.
   */
  @Test
  public void testGracefulDecommissionWithApp() throws Exception {
    Configuration conf = new Configuration();
    conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, hostFile
        .getAbsolutePath());

    writeToHostsFile("");
    rm = new MockRM(conf);
    rm.start();

    MockNM nm1 = rm.registerNode("host1:1234", 10240);
    MockNM nm2 = rm.registerNode("host2:5678", 20480);
    MockNM nm3 = rm.registerNode("host3:4433", 10240);
    NodeId id1 = nm1.getNodeId();
    NodeId id3 = nm3.getNodeId();
    rm.waitForState(id1, NodeState.RUNNING);
    rm.waitForState(id3, NodeState.RUNNING);

    // Create an app and launch two containers on host1.
    RMApp app = MockRMAppSubmitter.submitWithMemory(2000, rm);
    MockAM am = MockRM.launchAndRegisterAM(app, rm, nm1);
    ApplicationAttemptId aaid = app.getCurrentAppAttempt().getAppAttemptId();
    nm1.nodeHeartbeat(aaid, 2, ContainerState.RUNNING);
    nm3.nodeHeartbeat(true);

    // Graceful decommission host1 and host3
    writeToHostsFile("host1", "host3");
    rm.getNodesListManager().refreshNodes(conf, true);
    rm.waitForState(id1, NodeState.DECOMMISSIONING);
    rm.waitForState(id3, NodeState.DECOMMISSIONING);

    // host1 should be DECOMMISSIONING due to running containers.
    // host3 should become DECOMMISSIONED.
    nm1.nodeHeartbeat(true);
    nm3.nodeHeartbeat(true);
    rm.waitForState(id1, NodeState.DECOMMISSIONING);
    rm.waitForState(id3, NodeState.DECOMMISSIONED);
    nm1.nodeHeartbeat(aaid, 2, ContainerState.RUNNING);

    // Complete containers on host1.
    // Since the app is still RUNNING, expect NodeAction.NORMAL.
    NodeHeartbeatResponse nodeHeartbeat1 =
        nm1.nodeHeartbeat(aaid, 2, ContainerState.COMPLETE);
    assertEquals(NodeAction.NORMAL, nodeHeartbeat1.getNodeAction());

    // Finish the app and verified DECOMMISSIONED.
    MockRM.finishAMAndVerifyAppState(app, rm, nm1, am);
    rm.waitForState(app.getApplicationId(), RMAppState.FINISHED);
    nodeHeartbeat1 = nm1.nodeHeartbeat(aaid, 2, ContainerState.COMPLETE);
    assertEquals(NodeAction.SHUTDOWN, nodeHeartbeat1.getNodeAction());
    rm.waitForState(id1, NodeState.DECOMMISSIONED);
  }

  /**
   * Test graceful decommission of node when an AM container is scheduled on a
   * node just before it is gracefully decommissioned.
   */
  @Test
  @Timeout(value = 60)
  public void testGracefulDecommissionAfterAMContainerAlloc() throws Exception {
    Configuration conf = new Configuration();
    conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, hostFile
        .getAbsolutePath());

    writeToHostsFile("");
    rm = new MockRM(conf);
    rm.start();

    MockNM nm1 = rm.registerNode("host1:1234", 10240);
    MockNM nm2 = rm.registerNode("host2:5678", 20480);
    MockNM nm3 = rm.registerNode("host3:4433", 10240);

    NodeId id1 = nm1.getNodeId();
    NodeId id2 = nm2.getNodeId();
    NodeId id3 = nm3.getNodeId();

    rm.waitForState(id1, NodeState.RUNNING);
    rm.waitForState(id2, NodeState.RUNNING);
    rm.waitForState(id3, NodeState.RUNNING);

    // Create an app and schedule AM on host1.
    RMApp app = MockRMAppSubmitter.submitWithMemory(2000, rm);
    MockAM am = MockRM.launchAM(app, rm, nm1);

    // Before sending heartbeat we gracefully decommission the node on which AM
    // is scheduled to simulate race condition.
    writeToHostsFile("host1", "host3");
    rm.getNodesListManager().refreshNodes(conf, true);
    rm.waitForState(id1, NodeState.DECOMMISSIONING);
    rm.waitForState(id3, NodeState.DECOMMISSIONING);

    // Heartbeat after the node is in DECOMMISSIONING state. This will be the
    // first heartbeat containing information about the AM container since the
    // application was submitted.
    ApplicationAttemptId aaid = app.getCurrentAppAttempt().getAppAttemptId();
    nm1.nodeHeartbeat(aaid, 1, ContainerState.RUNNING);
    nm3.nodeHeartbeat(true);

    // host1 should stay in DECOMMISSIONING as it has container running on it.
    rm.waitForState(id1, NodeState.DECOMMISSIONING);
    rm.waitForState(id3, NodeState.DECOMMISSIONED);

    // Go through the normal application flow and wait for it to finish.
    am.registerAppAttempt();
    rm.waitForState(app.getApplicationId(), RMAppState.RUNNING);
    MockRM.finishAMAndVerifyAppState(app, rm, nm1, am);
    nm1.nodeHeartbeat(aaid, 1, ContainerState.COMPLETE);
    rm.waitForState(app.getApplicationId(), RMAppState.FINISHED);
    rm.waitForState(id1, NodeState.DECOMMISSIONED);
  }


  /**
  * Decommissioning using a post-configured include hosts file
  */
  @Test
  public void testAddNewIncludePathToConfiguration() throws Exception {
    Configuration conf = new Configuration();
    rm = new MockRM(conf);
    rm.start();
    MockNM nm1 = rm.registerNode("host1:1234", 5120);
    MockNM nm2 = rm.registerNode("host2:5678", 10240);
    ClusterMetrics metrics = ClusterMetrics.getMetrics();
    assert(metrics != null);
    int initialMetricCount = metrics.getNumShutdownNMs();
    NodeHeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true);
    assertEquals(
        NodeAction.NORMAL,
        nodeHeartbeat.getNodeAction());
    nodeHeartbeat = nm2.nodeHeartbeat(true);
    assertEquals(
        NodeAction.NORMAL,
        nodeHeartbeat.getNodeAction());
    writeToHostsFile("host1");
    conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, hostFile
        .getAbsolutePath());
    rm.getNodesListManager().refreshNodes(conf);
    checkShutdownNMCount(rm, ++initialMetricCount);
    nodeHeartbeat = nm1.nodeHeartbeat(true);
    assertEquals(NodeAction.NORMAL,
        nodeHeartbeat.getNodeAction(), "Node should not have been shutdown.");
    NodeState nodeState =
        rm.getRMContext().getInactiveRMNodes().get(nm2.getNodeId()).getState();
    assertEquals(NodeState.SHUTDOWN, nodeState,
        "Node should have been shutdown but is in state" + nodeState);
  }
  
  /**
   * Decommissioning using a post-configured exclude hosts file
   */
  @Test
  public void testAddNewExcludePathToConfiguration() throws Exception {
    Configuration conf = new Configuration();
    rm = new MockRM(conf);
    rm.start();
    MockNM nm1 = rm.registerNode("host1:1234", 5120);
    MockNM nm2 = rm.registerNode("host2:5678", 10240);
    ClusterMetrics metrics = ClusterMetrics.getMetrics();
    assert(metrics != null);
    int initialMetricCount = metrics.getNumDecommisionedNMs();
    NodeHeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true);
    assertEquals(
        NodeAction.NORMAL,
        nodeHeartbeat.getNodeAction());
    nodeHeartbeat = nm2.nodeHeartbeat(true);
    assertEquals(
        NodeAction.NORMAL,
        nodeHeartbeat.getNodeAction());
    writeToHostsFile("host2");
    conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, hostFile
        .getAbsolutePath());
    rm.getNodesListManager().refreshNodes(conf);
    checkDecommissionedNMCount(rm, ++initialMetricCount);
    nodeHeartbeat = nm1.nodeHeartbeat(true);
    assertEquals(NodeAction.NORMAL,
        nodeHeartbeat.getNodeAction(), "Node should not have been decommissioned.");
    nodeHeartbeat = nm2.nodeHeartbeat(true);
    assertEquals(NodeAction.SHUTDOWN, nodeHeartbeat.getNodeAction(),
        "Node should have been decommissioned but is in state"
        + nodeHeartbeat.getNodeAction());
  }

  @Test
  public void testNodeRegistrationSuccess() throws Exception {
    writeToHostsFile("host2");
    Configuration conf = new Configuration();
    conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, hostFile
        .getAbsolutePath());
    rm = new MockRM(conf);
    rm.start();

    ResourceTrackerService resourceTrackerService = rm.getResourceTrackerService();
    RegisterNodeManagerRequest req = Records.newRecord(
        RegisterNodeManagerRequest.class);
    NodeId nodeId = NodeId.newInstance("host2", 1234);
    Resource capability = Resources.createResource(1024);
    req.setResource(capability);
    req.setNodeId(nodeId);
    req.setHttpPort(1234);
    req.setNMVersion(YarnVersionInfo.getVersion());
    // trying to register a invalid node.
    RegisterNodeManagerResponse response =
        resourceTrackerService.registerNodeManager(req);
    assertEquals(NodeAction.NORMAL, response.getNodeAction());
  }

  @Test
  public void testNodeRegistrationWithLabels() throws Exception {
    writeToHostsFile("host2");
    Configuration conf = new Configuration();
    conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH,
        hostFile.getAbsolutePath());
    conf.set(YarnConfiguration.NODELABEL_CONFIGURATION_TYPE,
        YarnConfiguration.DISTRIBUTED_NODELABEL_CONFIGURATION_TYPE);

    final RMNodeLabelsManager nodeLabelsMgr = new NullRMNodeLabelsManager();

    rm = new MockRM(conf) {
      @Override
      protected RMNodeLabelsManager createNodeLabelManager() {
        return nodeLabelsMgr;
      }
    };
    rm.start();

    try {
      nodeLabelsMgr.addToCluserNodeLabelsWithDefaultExclusivity(toSet("A", "B", "C"));
    } catch (IOException e) {
      fail("Caught Exception while initializing");
      e.printStackTrace();
    }

    ResourceTrackerService resourceTrackerService =
        rm.getResourceTrackerService();
    RegisterNodeManagerRequest registerReq =
        Records.newRecord(RegisterNodeManagerRequest.class);
    NodeId nodeId = NodeId.newInstance("host2", 1234);
    Resource capability = Resources.createResource(1024);
    registerReq.setResource(capability);
    registerReq.setNodeId(nodeId);
    registerReq.setHttpPort(1234);
    registerReq.setNMVersion(YarnVersionInfo.getVersion());
    registerReq.setNodeLabels(toSet(NodeLabel.newInstance("A")));
    RegisterNodeManagerResponse response =
        resourceTrackerService.registerNodeManager(registerReq);

    assertEquals(NodeAction.NORMAL, response.getNodeAction(),
        "Action should be normal on valid Node Labels");
    assertCollectionEquals(nodeLabelsMgr.getNodeLabels().get(nodeId),
        NodeLabelsUtils.convertToStringSet(registerReq.getNodeLabels()));
    assertTrue(response.getAreNodeLabelsAcceptedByRM(),
        "Valid Node Labels were not accepted by RM");
    rm.stop();
  }

  @Test
  public void testNodeRegistrationWithInvalidLabels() throws Exception {
    writeToHostsFile("host2");
    Configuration conf = new Configuration();
    conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH,
        hostFile.getAbsolutePath());
    conf.set(YarnConfiguration.NODELABEL_CONFIGURATION_TYPE,
        YarnConfiguration.DISTRIBUTED_NODELABEL_CONFIGURATION_TYPE);

    final RMNodeLabelsManager nodeLabelsMgr = new NullRMNodeLabelsManager();

    rm = new MockRM(conf) {
      @Override
      protected RMNodeLabelsManager createNodeLabelManager() {
        return nodeLabelsMgr;
      }
    };
    rm.start();

    try {
      nodeLabelsMgr.addToCluserNodeLabelsWithDefaultExclusivity(toSet("X", "Y", "Z"));
    } catch (IOException e) {
      fail("Caught Exception while initializing");
      e.printStackTrace();
    }

    ResourceTrackerService resourceTrackerService =
        rm.getResourceTrackerService();
    RegisterNodeManagerRequest registerReq =
        Records.newRecord(RegisterNodeManagerRequest.class);
    NodeId nodeId = NodeId.newInstance("host2", 1234);
    Resource capability = Resources.createResource(1024);
    registerReq.setResource(capability);
    registerReq.setNodeId(nodeId);
    registerReq.setHttpPort(1234);
    registerReq.setNMVersion(YarnVersionInfo.getVersion());
    registerReq.setNodeLabels(toNodeLabelSet("A", "B", "C"));
    RegisterNodeManagerResponse response =
        resourceTrackerService.registerNodeManager(registerReq);

    assertEquals(NodeAction.NORMAL, response.getNodeAction(),
        "On Invalid Node Labels action is expected to be normal");
    assertNull(nodeLabelsMgr.getNodeLabels().get(nodeId));
    assertNotNull(response.getDiagnosticsMessage());
    assertFalse(response.getAreNodeLabelsAcceptedByRM(),
        "Node Labels should not accepted by RM If Invalid");

    if (rm != null) {
      rm.stop();
    }
  }

  @Test
  public void testNodeRegistrationWithInvalidLabelsSyntax() throws Exception {
    writeToHostsFile("host2");
    Configuration conf = new Configuration();
    conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH,
        hostFile.getAbsolutePath());
    conf.set(YarnConfiguration.NODELABEL_CONFIGURATION_TYPE,
        YarnConfiguration.DISTRIBUTED_NODELABEL_CONFIGURATION_TYPE);

    final RMNodeLabelsManager nodeLabelsMgr = new NullRMNodeLabelsManager();

    rm = new MockRM(conf) {
      @Override
      protected RMNodeLabelsManager createNodeLabelManager() {
        return nodeLabelsMgr;
      }
    };
    rm.start();

    try {
      nodeLabelsMgr.addToCluserNodeLabelsWithDefaultExclusivity(toSet("X", "Y", "Z"));
    } catch (IOException e) {
      fail("Caught Exception while initializing");
      e.printStackTrace();
    }

    ResourceTrackerService resourceTrackerService =
        rm.getResourceTrackerService();
    RegisterNodeManagerRequest req =
        Records.newRecord(RegisterNodeManagerRequest.class);
    NodeId nodeId = NodeId.newInstance("host2", 1234);
    Resource capability = Resources.createResource(1024);
    req.setResource(capability);
    req.setNodeId(nodeId);
    req.setHttpPort(1234);
    req.setNMVersion(YarnVersionInfo.getVersion());
    req.setNodeLabels(toNodeLabelSet("#Y"));
    RegisterNodeManagerResponse response =
        resourceTrackerService.registerNodeManager(req);

    assertEquals(NodeAction.NORMAL, response.getNodeAction(),
        "On Invalid Node Labels action is expected to be normal");
    assertNull(nodeLabelsMgr.getNodeLabels().get(nodeId));
    assertNotNull(response.getDiagnosticsMessage());
    assertFalse(response.getAreNodeLabelsAcceptedByRM(),
        "Node Labels should not accepted by RM If Invalid");

    if (rm != null) {
      rm.stop();
    }
  }

  @Test
  public void testNodeRegistrationWithCentralLabelConfig() throws Exception {
    writeToHostsFile("host2");
    Configuration conf = new Configuration();
    conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH,
        hostFile.getAbsolutePath());
    conf.set(YarnConfiguration.NODELABEL_CONFIGURATION_TYPE,
        YarnConfiguration.DEFAULT_NODELABEL_CONFIGURATION_TYPE);

    final RMNodeLabelsManager nodeLabelsMgr = new NullRMNodeLabelsManager();

    rm = new MockRM(conf) {
      @Override
      protected RMNodeLabelsManager createNodeLabelManager() {
        return nodeLabelsMgr;
      }
    };
    rm.start();
    try {
      nodeLabelsMgr.addToCluserNodeLabelsWithDefaultExclusivity(toSet("A", "B", "C"));
    } catch (IOException e) {
      fail("Caught Exception while initializing");
      e.printStackTrace();
    }
    ResourceTrackerService resourceTrackerService =
        rm.getResourceTrackerService();
    RegisterNodeManagerRequest req =
        Records.newRecord(RegisterNodeManagerRequest.class);
    NodeId nodeId = NodeId.newInstance("host2", 1234);
    Resource capability = Resources.createResource(1024);
    req.setResource(capability);
    req.setNodeId(nodeId);
    req.setHttpPort(1234);
    req.setNMVersion(YarnVersionInfo.getVersion());
    req.setNodeLabels(toNodeLabelSet("A"));
    RegisterNodeManagerResponse response =
        resourceTrackerService.registerNodeManager(req);
    // registered to RM with central label config
    assertEquals(NodeAction.NORMAL, response.getNodeAction());
    assertNull(nodeLabelsMgr.getNodeLabels().get(nodeId));
    assertFalse(response.getAreNodeLabelsAcceptedByRM(),
        "Node Labels should not accepted by RM If its configured with " +
        "Central configuration");
    if (rm != null) {
      rm.stop();
    }
  }

  @Test
  public void testNodeRegistrationWithAttributes() throws Exception {
    writeToHostsFile("host2");
    Configuration conf = new Configuration();
    conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH,
        hostFile.getAbsolutePath());
    conf.setClass(YarnConfiguration.FS_NODE_ATTRIBUTE_STORE_IMPL_CLASS,
        FileSystemNodeAttributeStore.class, NodeAttributeStore.class);
    File tempDir = File.createTempFile("nattr", ".tmp");
    tempDir.delete();
    tempDir.mkdirs();
    tempDir.deleteOnExit();
    conf.set(YarnConfiguration.FS_NODE_ATTRIBUTE_STORE_ROOT_DIR,
        tempDir.getAbsolutePath());
    rm = new MockRM(conf);
    rm.start();

    ResourceTrackerService resourceTrackerService =
        rm.getResourceTrackerService();
    RegisterNodeManagerRequest registerReq =
        Records.newRecord(RegisterNodeManagerRequest.class);
    NodeId nodeId = NodeId.newInstance("host2", 1234);
    Resource capability = Resources.createResource(1024);
    NodeAttribute nodeAttribute1 = NodeAttribute
        .newInstance(NodeAttribute.PREFIX_DISTRIBUTED, "Attr1",
            NodeAttributeType.STRING, "V1");
    NodeAttribute nodeAttribute2 = NodeAttribute
        .newInstance(NodeAttribute.PREFIX_DISTRIBUTED, "Attr2",
            NodeAttributeType.STRING, "V2");
    registerReq.setResource(capability);
    registerReq.setNodeId(nodeId);
    registerReq.setHttpPort(1234);
    registerReq.setNMVersion(YarnVersionInfo.getVersion());
    registerReq.setNodeAttributes(toSet(nodeAttribute1, nodeAttribute2));
    RegisterNodeManagerResponse response =
        resourceTrackerService.registerNodeManager(registerReq);

    assertEquals(NodeAction.NORMAL, response.getNodeAction(),
        "Action should be normal on valid Node Attributes");
    assertTrue(NodeLabelUtil.isNodeAttributesEquals(
        rm.getRMContext().getNodeAttributesManager()
            .getAttributesForNode(nodeId.getHost()).keySet(),
        registerReq.getNodeAttributes()));
    assertTrue(response.getAreNodeAttributesAcceptedByRM(),
        "Valid Node Attributes were not accepted by RM");

    if (rm != null) {
      rm.stop();
    }
  }

  @Test
  public void testNodeRegistrationWithInvalidAttributes() throws Exception {
    writeToHostsFile("host2");
    Configuration conf = new Configuration();
    conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH,
        hostFile.getAbsolutePath());
    conf.setClass(YarnConfiguration.FS_NODE_ATTRIBUTE_STORE_IMPL_CLASS,
        FileSystemNodeAttributeStore.class, NodeAttributeStore.class);
    conf.set(YarnConfiguration.FS_NODE_ATTRIBUTE_STORE_ROOT_DIR,
        TEMP_DIR.getAbsolutePath());
    rm = new MockRM(conf);
    rm.start();

    ResourceTrackerService resourceTrackerService =
        rm.getResourceTrackerService();
    RegisterNodeManagerRequest req =
        Records.newRecord(RegisterNodeManagerRequest.class);
    NodeId nodeId = NodeId.newInstance("host2", 1234);
    Resource capability = Resources.createResource(1024);
    NodeAttribute validNodeAttribute = NodeAttribute
        .newInstance(NodeAttribute.PREFIX_DISTRIBUTED, "Attr1",
            NodeAttributeType.STRING, "V1");
    NodeAttribute invalidPrefixNodeAttribute = NodeAttribute
        .newInstance("_P", "Attr1",
            NodeAttributeType.STRING, "V2");
    NodeAttribute invalidNameNodeAttribute = NodeAttribute
        .newInstance(NodeAttribute.PREFIX_DISTRIBUTED, "_N",
            NodeAttributeType.STRING, "V2");
    NodeAttribute invalidValueNodeAttribute = NodeAttribute
        .newInstance(NodeAttribute.PREFIX_DISTRIBUTED, "Attr2",
            NodeAttributeType.STRING, "...");
    req.setResource(capability);
    req.setNodeId(nodeId);
    req.setHttpPort(1234);
    req.setNMVersion(YarnVersionInfo.getVersion());

    // check invalid prefix
    req.setNodeAttributes(
        toSet(validNodeAttribute, invalidPrefixNodeAttribute));
    RegisterNodeManagerResponse response =
        resourceTrackerService.registerNodeManager(req);
    assertEquals(0, rm.getRMContext().getNodeAttributesManager()
        .getAttributesForNode(nodeId.getHost()).size());
    assertRegisterResponseForInvalidAttributes(response);
    assertTrue(response.getDiagnosticsMessage()
        .endsWith("attributes in HB must have prefix nm.yarn.io"));

    // check invalid name
    req.setNodeAttributes(toSet(validNodeAttribute, invalidNameNodeAttribute));
    response = resourceTrackerService.registerNodeManager(req);
    assertEquals(0, rm.getRMContext().getNodeAttributesManager()
        .getAttributesForNode(nodeId.getHost()).size());
    assertRegisterResponseForInvalidAttributes(response);
    assertTrue(response.getDiagnosticsMessage()
        .startsWith("attribute name should only contains"));

    // check invalid value
    req.setNodeAttributes(toSet(validNodeAttribute, invalidValueNodeAttribute));
    response = resourceTrackerService.registerNodeManager(req);
    assertEquals(0, rm.getRMContext().getNodeAttributesManager()
        .getAttributesForNode(nodeId.getHost()).size());
    assertRegisterResponseForInvalidAttributes(response);
    assertTrue(response.getDiagnosticsMessage()
        .startsWith("attribute value should only contains"));

    if (rm != null) {
      rm.stop();
    }
  }

  private void assertRegisterResponseForInvalidAttributes(
      RegisterNodeManagerResponse response) {
    assertEquals(NodeAction.NORMAL, response.getNodeAction(),
        "On Invalid Node Labels action is expected to be normal");
    assertNotNull(response.getDiagnosticsMessage());
    assertFalse(response.getAreNodeLabelsAcceptedByRM(),
        "Node Labels should not accepted by RM If Invalid");
  }

  private NodeStatus getNodeStatusObject(NodeId nodeId) {
    NodeStatus status = Records.newRecord(NodeStatus.class);
    status.setNodeId(nodeId);
    status.setResponseId(0);
    status.setContainersStatuses(Collections.emptyList());
    status.setKeepAliveApplications(Collections.emptyList());
    return status;
  }

  @Test
  public void testNodeHeartBeatWithLabels() throws Exception {
    writeToHostsFile("host2");
    Configuration conf = new Configuration();
    conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH,
        hostFile.getAbsolutePath());
    conf.set(YarnConfiguration.NODELABEL_CONFIGURATION_TYPE,
        YarnConfiguration.DISTRIBUTED_NODELABEL_CONFIGURATION_TYPE);

    final RMNodeLabelsManager nodeLabelsMgr = new NullRMNodeLabelsManager();

    rm = new MockRM(conf) {
      @Override
      protected RMNodeLabelsManager createNodeLabelManager() {
        return nodeLabelsMgr;
      }
    };
    rm.start();
    // adding valid labels
    try {
      nodeLabelsMgr.addToCluserNodeLabelsWithDefaultExclusivity(toSet("A", "B", "C"));
    } catch (IOException e) {
      fail("Caught Exception while initializing");
      e.printStackTrace();
    }

    // Registering of labels and other required info to RM
    ResourceTrackerService resourceTrackerService =
        rm.getResourceTrackerService();
    RegisterNodeManagerRequest registerReq =
        Records.newRecord(RegisterNodeManagerRequest.class);
    NodeId nodeId = NodeId.newInstance("host2", 1234);
    Resource capability = Resources.createResource(1024);
    registerReq.setResource(capability);
    registerReq.setNodeId(nodeId);
    registerReq.setHttpPort(1234);
    registerReq.setNMVersion(YarnVersionInfo.getVersion());
    registerReq.setNodeLabels(toNodeLabelSet("A")); // Node register label
    RegisterNodeManagerResponse registerResponse =
        resourceTrackerService.registerNodeManager(registerReq);

    // modification of labels during heartbeat
    NodeHeartbeatRequest heartbeatReq =
        Records.newRecord(NodeHeartbeatRequest.class);
    heartbeatReq.setNodeLabels(toNodeLabelSet("B")); // Node heartbeat label update
    NodeStatus nodeStatusObject = getNodeStatusObject(nodeId);
    heartbeatReq.setNodeStatus(nodeStatusObject);
    heartbeatReq.setLastKnownNMTokenMasterKey(registerResponse
        .getNMTokenMasterKey());
    heartbeatReq.setLastKnownContainerTokenMasterKey(registerResponse
        .getContainerTokenMasterKey());
    NodeHeartbeatResponse nodeHeartbeatResponse =
        resourceTrackerService.nodeHeartbeat(heartbeatReq);

    assertEquals(NodeAction.NORMAL, nodeHeartbeatResponse.getNodeAction(),
        "InValid Node Labels were not accepted by RM");
    assertCollectionEquals(nodeLabelsMgr.getNodeLabels().get(nodeId),
        NodeLabelsUtils.convertToStringSet(heartbeatReq.getNodeLabels()));
    assertTrue(nodeHeartbeatResponse.getAreNodeLabelsAcceptedByRM(),
        "Valid Node Labels were not accepted by RM");
    
    // After modification of labels next heartbeat sends null informing no update
    Set<String> oldLabels = nodeLabelsMgr.getNodeLabels().get(nodeId);
    int responseId = nodeStatusObject.getResponseId();
    heartbeatReq =
        Records.newRecord(NodeHeartbeatRequest.class);
    heartbeatReq.setNodeLabels(null); // Node heartbeat label update
    nodeStatusObject = getNodeStatusObject(nodeId);
    nodeStatusObject.setResponseId(responseId+1);
    heartbeatReq.setNodeStatus(nodeStatusObject);
    heartbeatReq.setLastKnownNMTokenMasterKey(registerResponse
        .getNMTokenMasterKey());
    heartbeatReq.setLastKnownContainerTokenMasterKey(registerResponse
        .getContainerTokenMasterKey());
    nodeHeartbeatResponse = resourceTrackerService.nodeHeartbeat(heartbeatReq);

    assertEquals(NodeAction.NORMAL, nodeHeartbeatResponse.getNodeAction(),
        "InValid Node Labels were not accepted by RM");
    assertCollectionEquals(nodeLabelsMgr.getNodeLabels().get(nodeId),
        oldLabels);
    assertFalse(nodeHeartbeatResponse.getAreNodeLabelsAcceptedByRM(),
        "Node Labels should not accepted by RM");
    rm.stop();
  }

  @Test
  public void testNodeHeartbeatWithNodeAttributes() throws Exception {
    writeToHostsFile("host2");
    Configuration conf = new Configuration();
    conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH,
        hostFile.getAbsolutePath());
    conf.setClass(YarnConfiguration.FS_NODE_ATTRIBUTE_STORE_IMPL_CLASS,
        FileSystemNodeAttributeStore.class, NodeAttributeStore.class);
    conf.set(YarnConfiguration.FS_NODE_ATTRIBUTE_STORE_ROOT_DIR,
        TEMP_DIR.getAbsolutePath());
    rm = new MockRM(conf);
    rm.start();

    // Register to RM
    ResourceTrackerService resourceTrackerService =
        rm.getResourceTrackerService();
    RegisterNodeManagerRequest registerReq =
        Records.newRecord(RegisterNodeManagerRequest.class);
    NodeId nodeId = NodeId.newInstance("host2", 1234);
    Resource capability = Resources.createResource(1024);
    registerReq.setResource(capability);
    registerReq.setNodeId(nodeId);
    registerReq.setHttpPort(1234);
    registerReq.setNMVersion(YarnVersionInfo.getVersion());
    RegisterNodeManagerResponse registerResponse =
        resourceTrackerService.registerNodeManager(registerReq);

    Set<NodeAttribute> nodeAttributes = new HashSet<>();
    nodeAttributes.add(NodeAttribute.newInstance(
        NodeAttribute.PREFIX_DISTRIBUTED, "host",
        NodeAttributeType.STRING, "host2"));

    // Set node attributes in HB.
    NodeHeartbeatRequest heartbeatReq =
        Records.newRecord(NodeHeartbeatRequest.class);
    NodeStatus nodeStatusObject = getNodeStatusObject(nodeId);
    int responseId = nodeStatusObject.getResponseId();
    heartbeatReq.setNodeStatus(nodeStatusObject);
    heartbeatReq.setLastKnownNMTokenMasterKey(registerResponse
        .getNMTokenMasterKey());
    heartbeatReq.setLastKnownContainerTokenMasterKey(registerResponse
        .getContainerTokenMasterKey());
    heartbeatReq.setNodeAttributes(nodeAttributes);
    resourceTrackerService.nodeHeartbeat(heartbeatReq);

    // Ensure RM gets correct node attributes update.
    NodeAttributesManager attributeManager =
        rm.getRMContext().getNodeAttributesManager();
    Map<NodeAttribute, AttributeValue> attrs = attributeManager
        .getAttributesForNode(nodeId.getHost());
    assertEquals(1, attrs.size());
    NodeAttribute na = attrs.keySet().iterator().next();
    assertEquals("host", na.getAttributeKey().getAttributeName());
    assertEquals("host2", na.getAttributeValue());
    assertEquals(NodeAttributeType.STRING, na.getAttributeType());


    // Send another HB to RM with updated node atrribute
    nodeAttributes.clear();
    nodeAttributes.add(NodeAttribute.newInstance(
        NodeAttribute.PREFIX_DISTRIBUTED, "host",
        NodeAttributeType.STRING, "host3"));
    nodeStatusObject = getNodeStatusObject(nodeId);
    nodeStatusObject.setResponseId(++responseId);
    heartbeatReq.setNodeStatus(nodeStatusObject);
    heartbeatReq.setNodeAttributes(nodeAttributes);
    resourceTrackerService.nodeHeartbeat(heartbeatReq);

    // Make sure RM gets the updated attribute
    attrs = attributeManager.getAttributesForNode(nodeId.getHost());
    assertEquals(1, attrs.size());
    na = attrs.keySet().iterator().next();
    assertEquals("host", na.getAttributeKey().getAttributeName());
    assertEquals("host3", na.getAttributeValue());
    assertEquals(NodeAttributeType.STRING, na.getAttributeType());
  }

  @Test
  public void testNodeHeartbeatWithInvalidNodeAttributes() throws Exception {
    writeToHostsFile("host2");
    Configuration conf = new Configuration();
    conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH,
        hostFile.getAbsolutePath());
    conf.setClass(YarnConfiguration.FS_NODE_ATTRIBUTE_STORE_IMPL_CLASS,
        FileSystemNodeAttributeStore.class, NodeAttributeStore.class);
    conf.set(YarnConfiguration.FS_NODE_ATTRIBUTE_STORE_ROOT_DIR,
        TEMP_DIR.getAbsolutePath());
    rm = new MockRM(conf);
    rm.start();

    // Register to RM
    ResourceTrackerService resourceTrackerService =
        rm.getResourceTrackerService();
    RegisterNodeManagerRequest registerReq =
        Records.newRecord(RegisterNodeManagerRequest.class);
    NodeId nodeId = NodeId.newInstance("host2", 1234);
    Resource capability = Resources.createResource(1024);
    registerReq.setResource(capability);
    registerReq.setNodeId(nodeId);
    registerReq.setHttpPort(1234);
    registerReq.setNMVersion(YarnVersionInfo.getVersion());
    RegisterNodeManagerResponse registerResponse =
        resourceTrackerService.registerNodeManager(registerReq);

    NodeAttribute validNodeAttribute = NodeAttribute
        .newInstance(NodeAttribute.PREFIX_DISTRIBUTED, "host",
            NodeAttributeType.STRING, "host2");
    NodeAttribute invalidPrefixNodeAttribute = NodeAttribute
        .newInstance("_P", "Attr1",
            NodeAttributeType.STRING, "V2");
    NodeAttribute invalidNameNodeAttribute = NodeAttribute
        .newInstance(NodeAttribute.PREFIX_DISTRIBUTED, "_N",
            NodeAttributeType.STRING, "V2");
    NodeAttribute invalidValueNodeAttribute = NodeAttribute
        .newInstance(NodeAttribute.PREFIX_DISTRIBUTED, "Attr2",
            NodeAttributeType.STRING, "...");

    // Set node attributes in HB.
    NodeHeartbeatRequest heartbeatReq =
        Records.newRecord(NodeHeartbeatRequest.class);
    NodeStatus nodeStatusObject = getNodeStatusObject(nodeId);
    int responseId = nodeStatusObject.getResponseId();
    heartbeatReq.setNodeStatus(nodeStatusObject);
    heartbeatReq.setLastKnownNMTokenMasterKey(registerResponse
        .getNMTokenMasterKey());
    heartbeatReq.setLastKnownContainerTokenMasterKey(registerResponse
        .getContainerTokenMasterKey());
    heartbeatReq.setNodeAttributes(toSet(validNodeAttribute));

    // Send first HB to RM with invalid prefix node attributes
    heartbeatReq.setNodeAttributes(
        toSet(validNodeAttribute, invalidPrefixNodeAttribute));
    NodeHeartbeatResponse response =
        resourceTrackerService.nodeHeartbeat(heartbeatReq);
    assertEquals(0, rm.getRMContext().getNodeAttributesManager()
        .getAttributesForNode(nodeId.getHost()).size());
    assertNodeHeartbeatResponseForInvalidAttributes(response);
    assertTrue(response.getDiagnosticsMessage()
        .endsWith("attributes in HB must have prefix nm.yarn.io"));

    // Send another HB to RM with invalid name node attributes
    nodeStatusObject.setResponseId(++responseId);
    heartbeatReq
        .setNodeAttributes(toSet(validNodeAttribute, invalidNameNodeAttribute));
    response = resourceTrackerService.nodeHeartbeat(heartbeatReq);
    assertEquals(0, rm.getRMContext().getNodeAttributesManager()
        .getAttributesForNode(nodeId.getHost()).size());
    assertNodeHeartbeatResponseForInvalidAttributes(response);
    assertTrue(response.getDiagnosticsMessage()
        .startsWith("attribute name should only contains"));

    // Send another HB to RM with invalid value node attributes
    nodeStatusObject.setResponseId(++responseId);
    heartbeatReq.setNodeAttributes(
        toSet(validNodeAttribute, invalidValueNodeAttribute));
    response = resourceTrackerService.nodeHeartbeat(heartbeatReq);
    assertEquals(0, rm.getRMContext().getNodeAttributesManager()
        .getAttributesForNode(nodeId.getHost()).size());
    assertNodeHeartbeatResponseForInvalidAttributes(response);
    assertTrue(response.getDiagnosticsMessage()
        .startsWith("attribute value should only contains"));

    // Send another HB to RM with updated node attribute
    NodeAttribute updatedNodeAttribute = NodeAttribute.newInstance(
        NodeAttribute.PREFIX_DISTRIBUTED, "host",
        NodeAttributeType.STRING, "host3");
    nodeStatusObject.setResponseId(++responseId);
    heartbeatReq.setNodeAttributes(toSet(updatedNodeAttribute));
    resourceTrackerService.nodeHeartbeat(heartbeatReq);

    // Make sure RM gets the updated attribute
    NodeAttributesManager attributeManager =
        rm.getRMContext().getNodeAttributesManager();
    Map<NodeAttribute, AttributeValue> attrs =
        attributeManager.getAttributesForNode(nodeId.getHost());
    assertEquals(1, attrs.size());
    NodeAttribute na = attrs.keySet().iterator().next();
    assertEquals("host", na.getAttributeKey().getAttributeName());
    assertEquals("host3", na.getAttributeValue());
    assertEquals(NodeAttributeType.STRING, na.getAttributeType());
  }

  private void assertNodeHeartbeatResponseForInvalidAttributes(
      NodeHeartbeatResponse response) {
    assertEquals(NodeAction.NORMAL, response.getNodeAction(),
        "On Invalid Node Labels action is expected to be normal");
    assertNotNull(response.getDiagnosticsMessage());
    assertFalse(response.getAreNodeLabelsAcceptedByRM(),
        "Node Labels should not accepted by RM If Invalid");
  }

  @Test
  public void testNodeHeartbeatOnlyUpdateNodeAttributesIfNeeded()
      throws Exception {
    writeToHostsFile("host2");
    Configuration conf = new Configuration();
    conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH,
        hostFile.getAbsolutePath());
    conf.setClass(YarnConfiguration.FS_NODE_ATTRIBUTE_STORE_IMPL_CLASS,
        NullNodeAttributeStore.class, NodeAttributeStore.class);
    conf.set(YarnConfiguration.FS_NODE_ATTRIBUTE_STORE_ROOT_DIR,
        TEMP_DIR.getAbsolutePath());
    rm = new MockRM(conf);
    rm.start();

    // spy node attributes manager
    NodeAttributesManager tmpAttributeManager =
        rm.getRMContext().getNodeAttributesManager();
    NodeAttributesManager spyAttributeManager = spy(tmpAttributeManager);
    rm.getRMContext().setNodeAttributesManager(spyAttributeManager);
    AtomicInteger count = new AtomicInteger(0);
    Mockito.doAnswer(new Answer<Object>() {
      public Object answer(InvocationOnMock invocation) throws Exception {
        count.incrementAndGet();
        tmpAttributeManager
            .replaceNodeAttributes((String) invocation.getArguments()[0],
                (Map<String, Set<NodeAttribute>>) invocation.getArguments()[1]);
        return null;
      }
    }).when(spyAttributeManager)
        .replaceNodeAttributes(Mockito.any(String.class),
            Mockito.any(Map.class));

    // Register to RM
    ResourceTrackerService resourceTrackerService =
        rm.getResourceTrackerService();
    RegisterNodeManagerRequest registerReq =
        Records.newRecord(RegisterNodeManagerRequest.class);
    NodeId nodeId = NodeId.newInstance("host2", 1234);
    Resource capability = Resources.createResource(1024);
    registerReq.setResource(capability);
    registerReq.setNodeId(nodeId);
    registerReq.setHttpPort(1234);
    registerReq.setNMVersion(YarnVersionInfo.getVersion());
    RegisterNodeManagerResponse registerResponse =
        resourceTrackerService.registerNodeManager(registerReq);

    Set<NodeAttribute> nodeAttributes = new HashSet<>();
    nodeAttributes.add(NodeAttribute.newInstance(
        NodeAttribute.PREFIX_DISTRIBUTED, "host",
        NodeAttributeType.STRING, "host2"));

    // Set node attributes in HB.
    NodeHeartbeatRequest heartbeatReq =
        Records.newRecord(NodeHeartbeatRequest.class);
    NodeStatus nodeStatusObject = getNodeStatusObject(nodeId);
    int responseId = nodeStatusObject.getResponseId();
    heartbeatReq.setNodeStatus(nodeStatusObject);
    heartbeatReq.setLastKnownNMTokenMasterKey(registerResponse
        .getNMTokenMasterKey());
    heartbeatReq.setLastKnownContainerTokenMasterKey(registerResponse
        .getContainerTokenMasterKey());
    heartbeatReq.setNodeAttributes(nodeAttributes);
    resourceTrackerService.nodeHeartbeat(heartbeatReq);

    // Ensure RM gets correct node attributes update.
    Map<NodeAttribute, AttributeValue> attrs = spyAttributeManager
        .getAttributesForNode(nodeId.getHost());
    spyAttributeManager.getNodesToAttributes(ImmutableSet.of(nodeId.getHost()));
    assertEquals(1, attrs.size());
    NodeAttribute na = attrs.keySet().iterator().next();
    assertEquals("host", na.getAttributeKey().getAttributeName());
    assertEquals("host2", na.getAttributeValue());
    assertEquals(NodeAttributeType.STRING, na.getAttributeType());
    assertEquals(1, count.get());

    // Send HBs to RM with the same node attributes
    nodeStatusObject.setResponseId(++responseId);
    heartbeatReq.setNodeStatus(nodeStatusObject);
    resourceTrackerService.nodeHeartbeat(heartbeatReq);

    nodeStatusObject.setResponseId(++responseId);
    heartbeatReq.setNodeStatus(nodeStatusObject);
    resourceTrackerService.nodeHeartbeat(heartbeatReq);

    // Make sure RM updated node attributes once
    assertEquals(1, count.get());

    // Send another HB to RM with updated node attributes
    nodeAttributes.clear();
    nodeAttributes.add(NodeAttribute.newInstance(
        NodeAttribute.PREFIX_DISTRIBUTED, "host",
        NodeAttributeType.STRING, "host3"));
    nodeStatusObject.setResponseId(++responseId);
    heartbeatReq.setNodeStatus(nodeStatusObject);
    heartbeatReq.setNodeAttributes(nodeAttributes);
    resourceTrackerService.nodeHeartbeat(heartbeatReq);

    // Make sure RM gets the updated attribute
    attrs = spyAttributeManager.getAttributesForNode(nodeId.getHost());
    assertEquals(1, attrs.size());
    na = attrs.keySet().iterator().next();
    assertEquals("host", na.getAttributeKey().getAttributeName());
    assertEquals("host3", na.getAttributeValue());
    assertEquals(NodeAttributeType.STRING, na.getAttributeType());

    // Make sure RM updated node attributes twice
    assertEquals(2, count.get());

    // Add centralized attributes
    Map<String, Set<NodeAttribute>> nodeAttributeMapping = ImmutableMap
        .of(nodeId.getHost(), ImmutableSet.of(NodeAttribute.newInstance(
            NodeAttribute.PREFIX_CENTRALIZED, "centAttr",
            NodeAttributeType.STRING, "x")));
    spyAttributeManager.replaceNodeAttributes(NodeAttribute.PREFIX_CENTRALIZED,
        nodeAttributeMapping);

    // Make sure RM updated node attributes three times
    assertEquals(3, count.get());

    // Send another HB to RM with non-updated node attributes
    nodeAttributes.clear();
    nodeAttributes.add(NodeAttribute.newInstance(
        NodeAttribute.PREFIX_DISTRIBUTED, "host",
        NodeAttributeType.STRING, "host3"));
    nodeStatusObject.setResponseId(++responseId);
    heartbeatReq.setNodeStatus(nodeStatusObject);
    heartbeatReq.setNodeAttributes(nodeAttributes);
    resourceTrackerService.nodeHeartbeat(heartbeatReq);

    // Make sure RM still updated node attributes three times
    assertEquals(3, count.get());

    // Send another HB to RM with updated node attributes
    nodeAttributes.clear();
    nodeAttributes.add(NodeAttribute.newInstance(
        NodeAttribute.PREFIX_DISTRIBUTED, "host",
        NodeAttributeType.STRING, "host4"));
    nodeStatusObject.setResponseId(++responseId);
    heartbeatReq.setNodeStatus(nodeStatusObject);
    heartbeatReq.setNodeAttributes(nodeAttributes);
    resourceTrackerService.nodeHeartbeat(heartbeatReq);

    // Make sure RM gets the updated attribute
    attrs = spyAttributeManager.getAttributesForNode(nodeId.getHost());
    assertEquals(2, attrs.size());
    attrs.keySet().stream().forEach(e -> {
      assertEquals(NodeAttributeType.STRING, e.getAttributeType());
      if (e.getAttributeKey().getAttributePrefix() == NodeAttribute.PREFIX_DISTRIBUTED) {
        assertEquals("host", e.getAttributeKey().getAttributeName());
        assertEquals("host4", e.getAttributeValue());
      } else if (e.getAttributeKey().getAttributePrefix() == NodeAttribute.PREFIX_CENTRALIZED) {
        assertEquals("centAttr", e.getAttributeKey().getAttributeName());
        assertEquals("x", e.getAttributeValue());
      }
    });

    // Make sure RM updated node attributes four times
    assertEquals(4, count.get());

    if (rm != null) {
      rm.stop();
    }
  }

  @Test
  public void testNodeHeartBeatWithInvalidLabels() throws Exception {
    writeToHostsFile("host2");
    Configuration conf = new Configuration();
    conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH,
        hostFile.getAbsolutePath());
    conf.set(YarnConfiguration.NODELABEL_CONFIGURATION_TYPE,
        YarnConfiguration.DISTRIBUTED_NODELABEL_CONFIGURATION_TYPE);

    final RMNodeLabelsManager nodeLabelsMgr = new NullRMNodeLabelsManager();

    rm = new MockRM(conf) {
      @Override
      protected RMNodeLabelsManager createNodeLabelManager() {
        return nodeLabelsMgr;
      }
    };
    rm.start();

    try {
      nodeLabelsMgr.addToCluserNodeLabelsWithDefaultExclusivity(toSet("A", "B", "C"));
    } catch (IOException e) {
      fail("Caught Exception while initializing");
      e.printStackTrace();
    }

    ResourceTrackerService resourceTrackerService =
        rm.getResourceTrackerService();
    RegisterNodeManagerRequest registerReq =
        Records.newRecord(RegisterNodeManagerRequest.class);
    NodeId nodeId = NodeId.newInstance("host2", 1234);
    Resource capability = Resources.createResource(1024);
    registerReq.setResource(capability);
    registerReq.setNodeId(nodeId);
    registerReq.setHttpPort(1234);
    registerReq.setNMVersion(YarnVersionInfo.getVersion());
    registerReq.setNodeLabels(toNodeLabelSet("A"));
    RegisterNodeManagerResponse registerResponse =
        resourceTrackerService.registerNodeManager(registerReq);

    NodeHeartbeatRequest heartbeatReq =
        Records.newRecord(NodeHeartbeatRequest.class);
    heartbeatReq.setNodeLabels(toNodeLabelSet("B", "#C")); // Invalid heart beat labels
    heartbeatReq.setNodeStatus(getNodeStatusObject(nodeId));
    heartbeatReq.setLastKnownNMTokenMasterKey(registerResponse
        .getNMTokenMasterKey());
    heartbeatReq.setLastKnownContainerTokenMasterKey(registerResponse
        .getContainerTokenMasterKey());
    NodeHeartbeatResponse nodeHeartbeatResponse =
        resourceTrackerService.nodeHeartbeat(heartbeatReq);

    // response should be NORMAL when RM heartbeat labels are rejected
    assertEquals(NodeAction.NORMAL, nodeHeartbeatResponse.getNodeAction(),
        "Response should be NORMAL when RM heartbeat labels"
        + " are rejected");
    assertFalse(nodeHeartbeatResponse.getAreNodeLabelsAcceptedByRM());
    assertNotNull(nodeHeartbeatResponse.getDiagnosticsMessage());
    rm.stop();
  }

  @Test
  public void testNodeHeartbeatWithCentralLabelConfig() throws Exception {
    writeToHostsFile("host2");
    Configuration conf = new Configuration();
    conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH,
        hostFile.getAbsolutePath());
    conf.set(YarnConfiguration.NODELABEL_CONFIGURATION_TYPE,
        YarnConfiguration.DEFAULT_NODELABEL_CONFIGURATION_TYPE);

    final RMNodeLabelsManager nodeLabelsMgr = new NullRMNodeLabelsManager();

    rm = new MockRM(conf) {
      @Override
      protected RMNodeLabelsManager createNodeLabelManager() {
        return nodeLabelsMgr;
      }
    };
    rm.start();

    ResourceTrackerService resourceTrackerService =
        rm.getResourceTrackerService();
    RegisterNodeManagerRequest req =
        Records.newRecord(RegisterNodeManagerRequest.class);
    NodeId nodeId = NodeId.newInstance("host2", 1234);
    Resource capability = Resources.createResource(1024);
    req.setResource(capability);
    req.setNodeId(nodeId);
    req.setHttpPort(1234);
    req.setNMVersion(YarnVersionInfo.getVersion());
    req.setNodeLabels(toNodeLabelSet("A", "B", "C"));
    RegisterNodeManagerResponse registerResponse =
        resourceTrackerService.registerNodeManager(req);

    NodeHeartbeatRequest heartbeatReq =
        Records.newRecord(NodeHeartbeatRequest.class);
    heartbeatReq.setNodeLabels(toNodeLabelSet("B")); // Valid heart beat labels
    heartbeatReq.setNodeStatus(getNodeStatusObject(nodeId));
    heartbeatReq.setLastKnownNMTokenMasterKey(registerResponse
        .getNMTokenMasterKey());
    heartbeatReq.setLastKnownContainerTokenMasterKey(registerResponse
        .getContainerTokenMasterKey());
    NodeHeartbeatResponse nodeHeartbeatResponse =
        resourceTrackerService.nodeHeartbeat(heartbeatReq);

    // response should be ok but the RMacceptNodeLabelsUpdate should be false
    assertEquals(NodeAction.NORMAL,
        nodeHeartbeatResponse.getNodeAction());
    // no change in the labels,
    assertNull(nodeLabelsMgr.getNodeLabels().get(nodeId));
    // heartbeat labels rejected
    assertFalse(nodeHeartbeatResponse.getAreNodeLabelsAcceptedByRM(),
        "Invalid Node Labels should not accepted by RM");
    if (rm != null) {
      rm.stop();
    }
  }

  @Test
  public void testNodeRegistrationVersionLessThanRM() throws Exception {
    writeToHostsFile("host2");
    Configuration conf = new Configuration();
    conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, hostFile
        .getAbsolutePath());
    conf.set(YarnConfiguration.RM_NODEMANAGER_MINIMUM_VERSION,"EqualToRM" );
    rm = new MockRM(conf);
    rm.start();
    String nmVersion = "1.9.9";

    ResourceTrackerService resourceTrackerService = rm.getResourceTrackerService();
    RegisterNodeManagerRequest req = Records.newRecord(
        RegisterNodeManagerRequest.class);
    NodeId nodeId = NodeId.newInstance("host2", 1234);
    Resource capability = Resources.createResource(1024);
    req.setResource(capability);
    req.setNodeId(nodeId);
    req.setHttpPort(1234);
    req.setNMVersion(nmVersion);
    // trying to register a invalid node.
    RegisterNodeManagerResponse response = resourceTrackerService.registerNodeManager(req);
    assertEquals(NodeAction.SHUTDOWN, response.getNodeAction());
    assertTrue(response.getDiagnosticsMessage().contains("Disallowed NodeManager Version " +
        nmVersion + ", is less than the minimum version "),
        "Diagnostic message did not contain: 'Disallowed NodeManager " +
        "Version "+ nmVersion + ", is less than the minimum version'");

  }

  @Test
  public void testNodeRegistrationFailure() throws Exception {
    writeToHostsFile("host1");
    Configuration conf = new Configuration();
    conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, hostFile
        .getAbsolutePath());
    rm = new MockRM(conf);
    rm.start();
    
    ResourceTrackerService resourceTrackerService = rm.getResourceTrackerService();
    RegisterNodeManagerRequest req = Records.newRecord(
        RegisterNodeManagerRequest.class);
    NodeId nodeId = NodeId.newInstance("host2", 1234);
    req.setNodeId(nodeId);
    req.setHttpPort(1234);
    // trying to register a invalid node.
    RegisterNodeManagerResponse response = resourceTrackerService.registerNodeManager(req);
    assertEquals(NodeAction.SHUTDOWN, response.getNodeAction());
    assertEquals(
        "Disallowed NodeManager from  host2, Sending SHUTDOWN signal to the NodeManager.",
        response.getDiagnosticsMessage());
  }

  @Test
  public void testSetRMIdentifierInRegistration() throws Exception {

    Configuration conf = new Configuration();
    rm = new MockRM(conf);
    rm.start();

    MockNM nm = new MockNM("host1:1234", 5120, rm.getResourceTrackerService());
    RegisterNodeManagerResponse response = nm.registerNode();

    // Verify the RMIdentifier is correctly set in RegisterNodeManagerResponse
    assertEquals(ResourceManager.getClusterTimeStamp(),
      response.getRMIdentifier());
  }

  @Test
  public void testNodeRegistrationWithMinimumAllocations() throws Exception {
    Configuration conf = new Configuration();
    conf.set(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, "2048");
    conf.set(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, "4");
    rm = new MockRM(conf);
    rm.start();

    ResourceTrackerService resourceTrackerService
      = rm.getResourceTrackerService();
    RegisterNodeManagerRequest req = Records.newRecord(
        RegisterNodeManagerRequest.class);
    NodeId nodeId = BuilderUtils.newNodeId("host", 1234);
    req.setNodeId(nodeId);

    Resource capability = Resources.createResource(1024);
    req.setResource(capability);
    RegisterNodeManagerResponse response1 =
        resourceTrackerService.registerNodeManager(req);
    assertEquals(NodeAction.SHUTDOWN, response1.getNodeAction());
    
    capability.setMemorySize(2048);
    capability.setVirtualCores(1);
    req.setResource(capability);
    RegisterNodeManagerResponse response2 =
        resourceTrackerService.registerNodeManager(req);
    assertEquals(NodeAction.SHUTDOWN, response2.getNodeAction());
    
    capability.setMemorySize(1024);
    capability.setVirtualCores(4);
    req.setResource(capability);
    RegisterNodeManagerResponse response3 =
        resourceTrackerService.registerNodeManager(req);
    assertEquals(NodeAction.SHUTDOWN, response3.getNodeAction());
    
    capability.setMemorySize(2048);
    capability.setVirtualCores(4);
    req.setResource(capability);
    RegisterNodeManagerResponse response4 =
        resourceTrackerService.registerNodeManager(req);
    assertEquals(NodeAction.NORMAL, response4.getNodeAction());
  }

  @Test
  public void testReboot() throws Exception {
    Configuration conf = new Configuration();
    rm = new MockRM(conf);
    rm.start();

    MockNM nm1 = rm.registerNode("host1:1234", 5120);
    MockNM nm2 = rm.registerNode("host2:1234", 2048);

    int initialMetricCount = ClusterMetrics.getMetrics().getNumRebootedNMs();
    NodeHeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true);
    assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));

    nodeHeartbeat = nm2.nodeHeartbeat(
      new HashMap<ApplicationId, List<ContainerStatus>>(), true, -100);
    assertTrue(NodeAction.RESYNC.equals(nodeHeartbeat.getNodeAction()));
    assertEquals("Too far behind rm response id:0 nm response id:-100",
      nodeHeartbeat.getDiagnosticsMessage());
    checkRebootedNMCount(rm, ++initialMetricCount);
  }

  @Test
  public void testNodeHeartbeatForAppCollectorsMap() throws Exception {
    Configuration conf = new Configuration();
    conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
    // set version to 2
    conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f);
    // enable aux-service based timeline collectors
    conf.set(YarnConfiguration.NM_AUX_SERVICES, "timeline_collector");
    conf.set(YarnConfiguration.NM_AUX_SERVICES + "."
        + "timeline_collector" + ".class",
        PerNodeTimelineCollectorsAuxService.class.getName());
    conf.setClass(YarnConfiguration.TIMELINE_SERVICE_WRITER_CLASS,
        FileSystemTimelineWriterImpl.class, TimelineWriter.class);

    rm = new MockRM(conf);
    rm.start();

    MockNM nm1 = rm.registerNode("host1:1234", 5120);
    MockNM nm2 = rm.registerNode("host2:1234", 2048);

    NodeHeartbeatResponse nodeHeartbeat1 = nm1.nodeHeartbeat(true);
    NodeHeartbeatResponse nodeHeartbeat2 = nm2.nodeHeartbeat(true);

    RMNodeImpl node1 =
        (RMNodeImpl) rm.getRMContext().getRMNodes().get(nm1.getNodeId());

    RMNodeImpl node2 =
        (RMNodeImpl) rm.getRMContext().getRMNodes().get(nm2.getNodeId());

    RMAppImpl app1 = (RMAppImpl) MockRMAppSubmitter.submitWithMemory(1024, rm);
    String collectorAddr1 = "1.2.3.4:5";
    app1.setCollectorData(AppCollectorData.newInstance(
        app1.getApplicationId(), collectorAddr1));

    String collectorAddr2 = "5.4.3.2:1";
    RMAppImpl app2 = (RMAppImpl) MockRMAppSubmitter.submitWithMemory(1024, rm);
    app2.setCollectorData(AppCollectorData.newInstance(
        app2.getApplicationId(), collectorAddr2));

    String collectorAddr3 = "5.4.3.2:2";
    app2.setCollectorData(AppCollectorData.newInstance(
        app2.getApplicationId(), collectorAddr3, 0, 1));

    String collectorAddr4 = "5.4.3.2:3";
    app2.setCollectorData(AppCollectorData.newInstance(
        app2.getApplicationId(), collectorAddr4, 1, 0));

    // Create a running container for app1 running on nm1
    ContainerId runningContainerId1 = BuilderUtils.newContainerId(
        BuilderUtils.newApplicationAttemptId(
        app1.getApplicationId(), 0), 0);

    ContainerStatus status1 = ContainerStatus.newInstance(runningContainerId1,
        ContainerState.RUNNING, "", 0);
    List<ContainerStatus> statusList = new ArrayList<ContainerStatus>();
    statusList.add(status1);
    NodeHealthStatus nodeHealth = NodeHealthStatus.newInstance(true,
        "", System.currentTimeMillis());
    NodeStatus nodeStatus = NodeStatus.newInstance(nm1.getNodeId(), 0,
        statusList, null, nodeHealth, null, null, null);
    node1.handle(new RMNodeStatusEvent(nm1.getNodeId(), nodeStatus));

    assertEquals(1, node1.getRunningApps().size());
    assertEquals(app1.getApplicationId(), node1.getRunningApps().get(0));

    // Create a running container for app2 running on nm2
    ContainerId runningContainerId2 = BuilderUtils.newContainerId(
        BuilderUtils.newApplicationAttemptId(
        app2.getApplicationId(), 0), 0);

    ContainerStatus status2 = ContainerStatus.newInstance(runningContainerId2,
        ContainerState.RUNNING, "", 0);
    statusList = new ArrayList<ContainerStatus>();
    statusList.add(status2);
    nodeStatus = NodeStatus.newInstance(nm1.getNodeId(), 0,
        statusList, null, nodeHealth, null, null, null);
    node2.handle(new RMNodeStatusEvent(nm2.getNodeId(), nodeStatus));
    assertEquals(1, node2.getRunningApps().size());
    assertEquals(app2.getApplicationId(), node2.getRunningApps().get(0));

    nodeHeartbeat1 = nm1.nodeHeartbeat(true);
    Map<ApplicationId, AppCollectorData> map1
        = nodeHeartbeat1.getAppCollectors();
    assertEquals(1, map1.size());
    assertEquals(collectorAddr1,
        map1.get(app1.getApplicationId()).getCollectorAddr());

    nodeHeartbeat2 = nm2.nodeHeartbeat(true);
    Map<ApplicationId, AppCollectorData> map2
        = nodeHeartbeat2.getAppCollectors();
    assertEquals(1, map2.size());
    assertEquals(collectorAddr4,
        map2.get(app2.getApplicationId()).getCollectorAddr());
  }

  private void checkRebootedNMCount(MockRM rm2, int count)
      throws InterruptedException {
    
    int waitCount = 0;
    while (ClusterMetrics.getMetrics().getNumRebootedNMs() != count
        && waitCount++ < 20) {
      synchronized (this) {
        wait(100);
      }
    }
    assertEquals(count, ClusterMetrics.getMetrics().getNumRebootedNMs(),
        "The rebooted metrics are not updated");
  }

  @Test
  public void testUnhealthyNodeStatus() throws Exception {
    Configuration conf = new Configuration();
    conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, hostFile
        .getAbsolutePath());

    rm = new MockRM(conf);
    rm.start();

    MockNM nm1 = rm.registerNode("host1:1234", 5120);
    assertEquals(0, ClusterMetrics.getMetrics().getUnhealthyNMs());
    // node healthy
    nm1.nodeHeartbeat(true);

    // node unhealthy
    nm1.nodeHeartbeat(false);
    checkUnhealthyNMCount(rm, nm1, true, 1);

    // node healthy again
    nm1.nodeHeartbeat(true);
    checkUnhealthyNMCount(rm, nm1, false, 0);
  }
  
  private void checkUnhealthyNMCount(MockRM rm, MockNM nm1, boolean health,
                                     int count) throws Exception {
    
    int waitCount = 0;
    while((rm.getRMContext().getRMNodes().get(nm1.getNodeId())
        .getState() != NodeState.UNHEALTHY) == health
        && waitCount++ < 20) {
      synchronized (this) {
        wait(100);
      }
    }
    assertFalse((rm.getRMContext().getRMNodes().get(nm1.getNodeId())
        .getState() != NodeState.UNHEALTHY) == health);
    assertEquals(count, ClusterMetrics.getMetrics().getUnhealthyNMs(),
        "Unhealthy metrics not incremented");
  }

  @SuppressWarnings({ "unchecked", "rawtypes" })
  @Test
  public void testHandleContainerStatusInvalidCompletions() throws Exception {
    rm = new MockRM(new YarnConfiguration());
    rm.start();

    EventHandler handler =
        spy(rm.getRMContext().getDispatcher().getEventHandler());

    // Case 1: Unmanaged AM
    MockRMAppSubmissionData data =
        MockRMAppSubmissionData.Builder.createWithMemory(1024, rm)
            .withUnmanagedAM(true)
            .build();
    RMApp app = MockRMAppSubmitter.submit(rm, data);

    // Case 1.1: AppAttemptId is null
    NMContainerStatus report =
        NMContainerStatus.newInstance(
          ContainerId.newContainerId(
            ApplicationAttemptId.newInstance(app.getApplicationId(), 2), 1), 0,
          ContainerState.COMPLETE, Resource.newInstance(1024, 1),
          "Dummy Completed", 0, Priority.newInstance(10), 1234);
    rm.getResourceTrackerService().handleNMContainerStatus(report, null);
    verify(handler, never()).handle((Event) any());

    // Case 1.2: Master container is null
    RMAppAttemptImpl currentAttempt =
        (RMAppAttemptImpl) app.getCurrentAppAttempt();
    currentAttempt.setMasterContainer(null);
    report = NMContainerStatus.newInstance(
          ContainerId.newContainerId(currentAttempt.getAppAttemptId(), 0), 0,
          ContainerState.COMPLETE, Resource.newInstance(1024, 1),
          "Dummy Completed", 0, Priority.newInstance(10), 1234);
    rm.getResourceTrackerService().handleNMContainerStatus(report, null);
    verify(handler, never()).handle((Event)any());

    // Case 2: Managed AM
    app = MockRMAppSubmitter.submitWithMemory(1024, rm);

    // Case 2.1: AppAttemptId is null
    report = NMContainerStatus.newInstance(
          ContainerId.newContainerId(
            ApplicationAttemptId.newInstance(app.getApplicationId(), 2), 1), 0,
          ContainerState.COMPLETE, Resource.newInstance(1024, 1),
          "Dummy Completed", 0, Priority.newInstance(10), 1234);
    try {
      rm.getResourceTrackerService().handleNMContainerStatus(report, null);
    } catch (Exception e) {
      // expected - ignore
    }
    verify(handler, never()).handle((Event)any());

    // Case 2.2: Master container is null
    currentAttempt =
        (RMAppAttemptImpl) app.getCurrentAppAttempt();
    currentAttempt.setMasterContainer(null);
    report = NMContainerStatus.newInstance(
      ContainerId.newContainerId(currentAttempt.getAppAttemptId(), 0), 0,
      ContainerState.COMPLETE, Resource.newInstance(1024, 1),
      "Dummy Completed", 0, Priority.newInstance(10), 1234);
    try {
      rm.getResourceTrackerService().handleNMContainerStatus(report, null);
    } catch (Exception e) {
      // expected - ignore
    }
    verify(handler, never()).handle((Event)any());
  }

  @Test
  public void testReconnectNode() throws Exception {
    rm = new MockRM() {
      @Override
      protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() {
        return new EventDispatcher<SchedulerEvent>(this.scheduler,
            this.scheduler.getClass().getName()) {
          @Override
          public void handle(SchedulerEvent event) {
            scheduler.handle(event);
          }
        };
      }
    };
    rm.start();

    MockNM nm1 = rm.registerNode("host1:1234", 5120);
    MockNM nm2 = rm.registerNode("host2:5678", 5120);
    nm1.nodeHeartbeat(true);
    nm2.nodeHeartbeat(false);
    rm.drainEvents();
    checkUnhealthyNMCount(rm, nm2, true, 1);
    final int expectedNMs = ClusterMetrics.getMetrics().getNumActiveNMs();
    QueueMetrics metrics = rm.getResourceScheduler().getRootQueueMetrics();
    // TODO Metrics incorrect in case of the FifoScheduler
    assertEquals(5120, metrics.getAvailableMB());

    // reconnect of healthy node
    nm1 = rm.registerNode("host1:1234", 5120);
    NodeHeartbeatResponse response = nm1.nodeHeartbeat(true);
    assertTrue(NodeAction.NORMAL.equals(response.getNodeAction()));
    rm.drainEvents();
    assertEquals(expectedNMs, ClusterMetrics.getMetrics().getNumActiveNMs());
    checkUnhealthyNMCount(rm, nm2, true, 1);

    // reconnect of unhealthy node
    nm2 = rm.registerNode("host2:5678", 5120);
    response = nm2.nodeHeartbeat(false);
    assertTrue(NodeAction.NORMAL.equals(response.getNodeAction()));
    rm.drainEvents();
    assertEquals(expectedNMs, ClusterMetrics.getMetrics().getNumActiveNMs());
    checkUnhealthyNMCount(rm, nm2, true, 1);
    
    // unhealthy node changed back to healthy
    nm2 = rm.registerNode("host2:5678", 5120);
    response = nm2.nodeHeartbeat(true);
    response = nm2.nodeHeartbeat(true);
    rm.drainEvents();
    assertEquals(5120 + 5120, metrics.getAvailableMB());

    // reconnect of node with changed capability
    nm1 = rm.registerNode("host2:5678", 10240);
    response = nm1.nodeHeartbeat(true);
    rm.drainEvents();
    assertTrue(NodeAction.NORMAL.equals(response.getNodeAction()));
    assertEquals(5120 + 10240, metrics.getAvailableMB());

    // reconnect of node with changed capability and running applications
    List<ApplicationId> runningApps = new ArrayList<ApplicationId>();
    runningApps.add(ApplicationId.newInstance(1, 0));
    nm1 = rm.registerNode("host2:5678", 15360, 2, runningApps);
    response = nm1.nodeHeartbeat(true);
    rm.drainEvents();
    assertTrue(NodeAction.NORMAL.equals(response.getNodeAction()));
    assertEquals(5120 + 15360, metrics.getAvailableMB());
    
    // reconnect healthy node changing http port
    nm1 = new MockNM("host1:1234", 5120, rm.getResourceTrackerService());
    nm1.setHttpPort(3);
    nm1.registerNode();
    response = nm1.nodeHeartbeat(true);
    response = nm1.nodeHeartbeat(true);
    rm.drainEvents();
    RMNode rmNode = rm.getRMContext().getRMNodes().get(nm1.getNodeId());
    assertEquals(3, rmNode.getHttpPort());
    assertEquals(5120, rmNode.getTotalCapability().getMemorySize());
    assertEquals(5120 + 15360, metrics.getAvailableMB());

  }

  @Test
  public void testNMUnregistration() throws Exception {
    Configuration conf = new Configuration();
    rm = new MockRM(conf);
    rm.start();

    ResourceTrackerService resourceTrackerService = rm
        .getResourceTrackerService();
    MockNM nm1 = rm.registerNode("host1:1234", 5120);

    int shutdownNMsCount = ClusterMetrics.getMetrics()
        .getNumShutdownNMs();
    NodeHeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true);
    assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));

    UnRegisterNodeManagerRequest request = Records
        .newRecord(UnRegisterNodeManagerRequest.class);
    request.setNodeId(nm1.getNodeId());
    resourceTrackerService.unRegisterNodeManager(request);
    checkShutdownNMCount(rm, ++shutdownNMsCount);

    // The RM should remove the node after unregistration, hence send a reboot
    // command.
    nodeHeartbeat = nm1.nodeHeartbeat(true);
    assertTrue(NodeAction.RESYNC.equals(nodeHeartbeat.getNodeAction()));
  }

  @Test
  public void testUnhealthyNMUnregistration() throws Exception {
    Configuration conf = new Configuration();
    rm = new MockRM(conf);
    rm.start();

    ResourceTrackerService resourceTrackerService = rm
        .getResourceTrackerService();
    MockNM nm1 = rm.registerNode("host1:1234", 5120);
    assertEquals(0, ClusterMetrics.getMetrics().getUnhealthyNMs());
    // node healthy
    nm1.nodeHeartbeat(true);
    int shutdownNMsCount = ClusterMetrics.getMetrics().getNumShutdownNMs();

    // node unhealthy
    nm1.nodeHeartbeat(false);
    checkUnhealthyNMCount(rm, nm1, true, 1);
    UnRegisterNodeManagerRequest request = Records
        .newRecord(UnRegisterNodeManagerRequest.class);
    request.setNodeId(nm1.getNodeId());
    resourceTrackerService.unRegisterNodeManager(request);
    checkShutdownNMCount(rm, ++shutdownNMsCount);
  }

  @Test
  public void testInvalidNMUnregistration() throws Exception {
    Configuration conf = new Configuration();
    rm = new MockRM(conf);
    rm.start();
    ResourceTrackerService resourceTrackerService = rm
        .getResourceTrackerService();
    int decommisionedNMsCount = ClusterMetrics.getMetrics()
        .getNumDecommisionedNMs();

    // Node not found for unregister
    UnRegisterNodeManagerRequest request = Records
        .newRecord(UnRegisterNodeManagerRequest.class);
    request.setNodeId(BuilderUtils.newNodeId("host", 1234));
    resourceTrackerService.unRegisterNodeManager(request);
    checkShutdownNMCount(rm, 0);
    checkDecommissionedNMCount(rm, 0);

    // 1. Register the Node Manager
    // 2. Exclude the same Node Manager host
    // 3. Give NM heartbeat to RM
    // 4. Unregister the Node Manager
    MockNM nm1 = new MockNM("host1:1234", 5120, resourceTrackerService);
    RegisterNodeManagerResponse response = nm1.registerNode();
    assertEquals(NodeAction.NORMAL, response.getNodeAction());
    int shutdownNMsCount = ClusterMetrics.getMetrics().getNumShutdownNMs();
    writeToHostsFile("host2");
    conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH,
        hostFile.getAbsolutePath());
    rm.getNodesListManager().refreshNodes(conf);
    NodeHeartbeatResponse heartbeatResponse = nm1.nodeHeartbeat(true);
    assertEquals(NodeAction.SHUTDOWN, heartbeatResponse.getNodeAction());
    checkDecommissionedNMCount(rm, decommisionedNMsCount);
    request.setNodeId(nm1.getNodeId());
    resourceTrackerService.unRegisterNodeManager(request);
    checkShutdownNMCount(rm, ++shutdownNMsCount);
    checkDecommissionedNMCount(rm, decommisionedNMsCount);

    // 1. Register the Node Manager
    // 2. Exclude the same Node Manager host
    // 3. Unregister the Node Manager
    MockNM nm2 = new MockNM("host2:1234", 5120, resourceTrackerService);
    RegisterNodeManagerResponse response2 = nm2.registerNode();
    assertEquals(NodeAction.NORMAL, response2.getNodeAction());
    writeToHostsFile("host1");
    conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH,
        hostFile.getAbsolutePath());
    rm.getNodesListManager().refreshNodes(conf);
    request.setNodeId(nm2.getNodeId());
    resourceTrackerService.unRegisterNodeManager(request);
    checkShutdownNMCount(rm, ++shutdownNMsCount);
    checkDecommissionedNMCount(rm, decommisionedNMsCount);
    rm.stop();
  }

  @Test
  @Timeout(value = 30)
  public void testInitDecommMetric() throws Exception {
    testInitDecommMetricHelper(true);
    testInitDecommMetricHelper(false);
  }

  public void testInitDecommMetricHelper(boolean hasIncludeList)
      throws Exception {
    Configuration conf = new Configuration();
    rm = new MockRM(conf);
    rm.start();
    MockNM nm1 = rm.registerNode("host1:1234", 5120);
    MockNM nm2 = rm.registerNode("host2:5678", 10240);
    nm1.nodeHeartbeat(true);
    nm2.nodeHeartbeat(true);

    writeToHostsFile(excludeHostFile, "host1");
    conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH,
        excludeHostFile.getAbsolutePath());

    if (hasIncludeList) {
      writeToHostsFile(hostFile, "host1", "host2");
      conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH,
          hostFile.getAbsolutePath());
    }
    rm.getNodesListManager().refreshNodes(conf);
    rm.drainEvents();
    rm.stop();

    MockRM rm1 = new MockRM(conf);
    rm1.start();
    nm1 = rm1.registerNode("host1:1234", 5120);
    nm2 = rm1.registerNode("host2:5678", 10240);
    nm1.nodeHeartbeat(true);
    nm2.nodeHeartbeat(true);
    rm1.drainEvents();
    assertEquals(1, ClusterMetrics.getMetrics().getNumDecommisionedNMs(),
        "Number of Decommissioned nodes should be 1");
    assertEquals(1, rm1.getRMContext().getInactiveRMNodes().size(),
        "The inactiveRMNodes should contain an entry for the" +
        "decommissioned node");
    writeToHostsFile(excludeHostFile, "");
    conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH,
        excludeHostFile.getAbsolutePath());
    rm1.getNodesListManager().refreshNodes(conf);
    nm1 = rm1.registerNode("host1:1234", 5120);
    nm1.nodeHeartbeat(true);
    nm2.nodeHeartbeat(true);
    rm1.drainEvents();
    assertEquals(0, ClusterMetrics.getMetrics().getNumDecommisionedNMs(),
        "The decommissioned nodes metric should have " +
        "decremented to 0");
    assertEquals(2, ClusterMetrics.getMetrics().getNumActiveNMs(),
        "The active nodes metric should be 2");
    assertEquals(0, rm1.getRMContext().getInactiveRMNodes().size(),
        "The inactive RMNodes entry should have been removed");
    rm1.drainEvents();
    rm1.stop();
  }

  @Test
  @Timeout(value = 30)
  public void testInitDecommMetricNoRegistration() throws Exception {
    Configuration conf = new Configuration();
    rm = new MockRM(conf);
    rm.start();
    MockNM nm1 = rm.registerNode("host1:1234", 5120);
    MockNM nm2 = rm.registerNode("host2:5678", 10240);
    nm1.nodeHeartbeat(true);
    nm2.nodeHeartbeat(true);
    //host3 will not register or heartbeat
    writeToHostsFile(excludeHostFile, "host3", "host2");
    conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH,
        excludeHostFile.getAbsolutePath());
    writeToHostsFile(hostFile, "host1", "host2");
    conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH,
        hostFile.getAbsolutePath());
    rm.getNodesListManager().refreshNodes(conf);
    rm.drainEvents();
    assertEquals(1, ClusterMetrics.getMetrics().getNumDecommisionedNMs(),
        "The decommissioned nodes metric should be 1 ");
    rm.stop();

    MockRM rm1 = new MockRM(conf);
    rm1.start();
    rm1.getNodesListManager().refreshNodes(conf);
    rm1.drainEvents();
    assertEquals(2, ClusterMetrics.getMetrics().getNumDecommisionedNMs(),
        "The decommissioned nodes metric should be 2 ");
    rm1.stop();
  }

  @Test
  public void testIncorrectRecommission() throws Exception {
    //Check decommissioned node not get recommissioned with graceful refresh
    Configuration conf = new Configuration();
    rm = new MockRM(conf);
    rm.start();
    MockNM nm1 = rm.registerNode("host1:1234", 5120);
    MockNM nm2 = rm.registerNode("host2:5678", 10240);
    nm1.nodeHeartbeat(true);
    nm2.nodeHeartbeat(true);
    writeToHostsFile(excludeHostFile, "host3", "host2");
    conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH,
        excludeHostFile.getAbsolutePath());
    writeToHostsFile(hostFile, "host1", "host2");
    writeToHostsFile(excludeHostFile, "host1");
    rm.getNodesListManager().refreshNodesGracefully(conf, null);
    rm.drainEvents();
    nm1.nodeHeartbeat(true);
    rm.drainEvents();
    assertTrue(rm.getRMContext()
        .getInactiveRMNodes().get(nm1.getNodeId()).getState() == NodeState
        .DECOMMISSIONED, "Node " + nm1.getNodeId().getHost() +
        " should be Decommissioned");
    writeToHostsFile(excludeHostFile, "");
    rm.getNodesListManager().refreshNodesGracefully(conf, null);
    rm.drainEvents();
    assertTrue(rm.getRMContext()
        .getInactiveRMNodes().get(nm1.getNodeId()).getState() == NodeState
        .DECOMMISSIONED, "Node " + nm1.getNodeId().getHost() +
        " should be Decommissioned");
    rm.stop();
  }

  /**
   * Remove a node from all lists and check if its forgotten.
   */
  @Test
  public void testNodeRemovalNormally() throws Exception {
    testNodeRemovalUtil(false);
    testNodeRemovalUtilLost(false);
    testNodeRemovalUtilRebooted(false);
    testNodeRemovalUtilUnhealthy(false);
  }

  @Test
  public void testNodeRemovalGracefully() throws Exception {
    testNodeRemovalUtil(true);
    testNodeRemovalUtilLost(true);
    testNodeRemovalUtilRebooted(true);
    testNodeRemovalUtilUnhealthy(true);
  }

  public void refreshNodesOption(boolean doGraceful, Configuration conf)
      throws Exception {
    if (doGraceful) {
      rm.getNodesListManager().refreshNodesGracefully(conf, null);
    } else {
      rm.getNodesListManager().refreshNodes(conf);
    }
  }

  public void testNodeRemovalUtil(boolean doGraceful) throws Exception {
    Configuration conf = new Configuration();
    int timeoutValue = 500;
    conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, "");
    conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, "");
    conf.setInt(YarnConfiguration.RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC,
        timeoutValue);
    CountDownLatch latch = new CountDownLatch(1);
    rm = new MockRM(conf);
    rm.init(conf);
    rm.start();
    RMContext rmContext = rm.getRMContext();
    refreshNodesOption(doGraceful, conf);
    MockNM nm1 = rm.registerNode("host1:1234", 5120);
    MockNM nm2 = rm.registerNode("host2:5678", 10240);
    MockNM nm3 = rm.registerNode("localhost:4433", 1024);
    ClusterMetrics metrics = ClusterMetrics.getMetrics();
    assert (metrics != null);

    //check all 3 nodes joined in as NORMAL
    NodeHeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true);
    assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
    nodeHeartbeat = nm2.nodeHeartbeat(true);
    assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
    nodeHeartbeat = nm3.nodeHeartbeat(true);
    assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
    rm.drainEvents();
    assertEquals(metrics.getNumActiveNMs(), 3,
        "All 3 nodes should be active");

    //Remove nm2 from include list, should now be shutdown with timer test
    String ip = NetUtils.normalizeHostName("localhost");
    writeToHostsFile("host1", ip);
    conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, hostFile
        .getAbsolutePath());
    refreshNodesOption(doGraceful, conf);
    if (doGraceful) {
      rm.waitForState(nm2.getNodeId(), NodeState.DECOMMISSIONING);
    }
    nm1.nodeHeartbeat(true);
    nm2.nodeHeartbeat(true);
    rm.drainEvents();
    assertTrue(!rmContext.getRMNodes().containsKey(nm2.getNodeId()),
        "Node should not be in active node list");

    RMNode rmNode = rmContext.getInactiveRMNodes().get(nm2.getNodeId());
    assertEquals(rmNode.getState(),
        doGraceful? NodeState.DECOMMISSIONED : NodeState.SHUTDOWN,
        "Node should be in inactive node list");
    assertEquals(metrics.getNumActiveNMs(), 2, "Active nodes should be 2");
    assertEquals(metrics.getNumShutdownNMs(), doGraceful? 0 : 1,
        "Shutdown nodes should be expected");

    int nodeRemovalTimeout =
        conf.getInt(
            YarnConfiguration.RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC,
            YarnConfiguration.
                DEFAULT_RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC);
    int nodeRemovalInterval =
        rmContext.getNodesListManager().getNodeRemovalCheckInterval();
    long maxThreadSleeptime = nodeRemovalInterval + nodeRemovalTimeout + 100;
    latch.await(maxThreadSleeptime, TimeUnit.MILLISECONDS);

    rmNode = rmContext.getInactiveRMNodes().get(nm2.getNodeId());
    assertEquals(rmNode, null, "Node should have been forgotten!");
    assertEquals(metrics.getNumShutdownNMs(), 0, "Shutdown nodes should be 0 now");

    //Check node removal and re-addition before timer expires
    writeToHostsFile("host1", ip, "host2");
    refreshNodesOption(doGraceful, conf);
    nm2 = rm.registerNode("host2:5678", 10240);
    rm.drainEvents();
    writeToHostsFile("host1", ip);
    refreshNodesOption(doGraceful, conf);
    rm.waitForState(nm2.getNodeId(),
                    doGraceful? NodeState.DECOMMISSIONING : NodeState.SHUTDOWN);
    nm2.nodeHeartbeat(true);
    rm.drainEvents();
    rmNode = rmContext.getInactiveRMNodes().get(nm2.getNodeId());
    assertEquals(rmNode.getState(), doGraceful? NodeState.DECOMMISSIONED :
        NodeState.SHUTDOWN, "Node should be shutdown");
    assertEquals(metrics.getNumActiveNMs(), 2, "Active nodes should be 2");
    assertEquals(metrics.getNumShutdownNMs(), doGraceful? 0 : 1,
        "Shutdown nodes should be expected");

    //add back the node before timer expires
    latch.await(maxThreadSleeptime - 2000, TimeUnit.MILLISECONDS);
    writeToHostsFile("host1", ip, "host2");
    refreshNodesOption(doGraceful, conf);
    nm2 = rm.registerNode("host2:5678", 10240);
    nodeHeartbeat = nm2.nodeHeartbeat(true);
    rm.drainEvents();
    assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
    assertEquals(metrics.getNumShutdownNMs(), 0,
        "Shutdown nodes should be 0 now");
    assertEquals(metrics.getNumActiveNMs(), 3,
        "All 3 nodes should be active");

    //Decommission this node, check timer doesn't remove it
    writeToHostsFile("host1", "host2", ip);
    writeToHostsFile(excludeHostFile, "host2");
    conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, excludeHostFile
        .getAbsolutePath());
    refreshNodesOption(doGraceful, conf);
    rm.drainEvents();
    rmNode = doGraceful ? rmContext.getRMNodes().get(nm2.getNodeId()) :
             rmContext.getInactiveRMNodes().get(nm2.getNodeId());
    assertTrue((rmNode.getState() == NodeState.DECOMMISSIONED) ||
        (rmNode.getState() == NodeState.DECOMMISSIONING),
        "Node should be DECOMMISSIONED or DECOMMISSIONING");
    if (rmNode.getState() == NodeState.DECOMMISSIONED) {
      assertEquals(metrics.getNumDecommisionedNMs(), 1,
          "Decommissioned/ing nodes should be 1 now");
    }
    latch.await(maxThreadSleeptime, TimeUnit.MILLISECONDS);

    rmNode = doGraceful ? rmContext.getRMNodes().get(nm2.getNodeId()) :
             rmContext.getInactiveRMNodes().get(nm2.getNodeId());
    assertTrue((rmNode.getState() == NodeState.DECOMMISSIONED) ||
        (rmNode.getState() == NodeState.DECOMMISSIONING),
        "Node should be DECOMMISSIONED or DECOMMISSIONING");
    if (rmNode.getState() == NodeState.DECOMMISSIONED) {
      assertEquals(metrics.getNumDecommisionedNMs(), 1,
          "Decommissioned/ing nodes should be 1 now");
    }

    //Test decommed/ing node that transitions to untracked,timer should remove
    testNodeRemovalUtilDecomToUntracked(rmContext, conf, nm1, nm2, nm3, doGraceful);
    rm.stop();
  }

  // A helper method used by testNodeRemovalUtil to avoid exceeding
  // max allowed length.
  private void testNodeRemovalUtilDecomToUntracked(
      RMContext rmContext, Configuration conf,
      MockNM nm1, MockNM nm2, MockNM nm3, boolean doGraceful
  ) throws Exception {
    ClusterMetrics metrics = ClusterMetrics.getMetrics();
    String ip = NetUtils.normalizeHostName("localhost");
    writeToHostsFile("host1", ip, "host2");
    writeToHostsFile(excludeHostFile, "host2");
    refreshNodesOption(doGraceful, conf);
    nm1.nodeHeartbeat(true);
    //nm2.nodeHeartbeat(true);
    nm3.nodeHeartbeat(true);
    Supplier<RMNode> nodeSupplier = doGraceful
        ? () -> rmContext.getRMNodes().get(nm2.getNodeId())
        : () -> rmContext.getInactiveRMNodes().get(nm2.getNodeId());
    pollingAssert(() -> nodeSupplier.get() != null,
        "Timer for this node was not canceled!");
    final List<NodeState> expectedStates = Arrays.asList(
        NodeState.DECOMMISSIONED,
        NodeState.DECOMMISSIONING
    );
    pollingAssert(() -> expectedStates.contains(nodeSupplier.get().getState()),
        "Node should be in one of these states: " + expectedStates);


    writeToHostsFile("host1", ip);
    writeToHostsFile(excludeHostFile, "");
    refreshNodesOption(doGraceful, conf);
    nm2.nodeHeartbeat(true);
    pollingAssert(() -> nodeSupplier.get() == null,
        "Node should have been forgotten!");
    pollingAssert(metrics::getNumDecommisionedNMs, 0,
        "metrics#getNumDecommisionedNMs should be 0 now");
    pollingAssert(metrics::getNumShutdownNMs, 0,
        "metrics#getNumShutdownNMs should be 0 now");
    pollingAssert(metrics::getNumActiveNMs, 2,
        "metrics#getNumActiveNMs should be 2 now");
  }

  private void testNodeRemovalUtilLost(boolean doGraceful) throws Exception {
    Configuration conf = new Configuration();
    conf.setLong(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS, 2000);
    int timeoutValue = 500;
    conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH,
        hostFile.getAbsolutePath());
    conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH,
        excludeHostFile.getAbsolutePath());
    writeToHostsFile(hostFile, "host1", "localhost", "host2");
    writeToHostsFile(excludeHostFile, "");
    conf.setInt(YarnConfiguration.RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC,
        timeoutValue);

    rm = new MockRM(conf);
    rm.init(conf);
    rm.start();
    RMContext rmContext = rm.getRMContext();
    refreshNodesOption(doGraceful, conf);
    MockNM nm1 = rm.registerNode("host1:1234", 5120);
    MockNM nm2 = rm.registerNode("host2:5678", 10240);
    MockNM nm3 = rm.registerNode("localhost:4433", 1024);
    ClusterMetrics clusterMetrics = ClusterMetrics.getMetrics();
    ClusterMetrics metrics = clusterMetrics;
    assert (metrics != null);
    rm.drainEvents();
    //check all 3 nodes joined in as NORMAL
    NodeHeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true);
    assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
    nodeHeartbeat = nm2.nodeHeartbeat(true);
    assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
    nodeHeartbeat = nm3.nodeHeartbeat(true);
    assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
    rm.drainEvents();
    assertEquals(metrics.getNumActiveNMs(), 3, "All 3 nodes should be active");
    int waitCount = 0;
    while(waitCount++ < 20){
      synchronized (this) {
        wait(200);
      }
      nm3.nodeHeartbeat(true);
      nm1.nodeHeartbeat(true);
    }
    assertNotEquals(rmContext.getInactiveRMNodes().get(nm2.getNodeId()),
        null, "host2 should be a lost NM!");
    assertEquals(rmContext.getInactiveRMNodes().get(nm2.getNodeId()).getState(),
        NodeState.LOST, "host2 should be a lost NM!");
    assertEquals(clusterMetrics.getNumLostNMs(), 1, "There should be 1 Lost NM!");
    assertEquals(clusterMetrics.getNumActiveNMs(), 2, "There should be 2 Active NM!");
    int nodeRemovalTimeout =
        conf.getInt(
            YarnConfiguration.RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC,
            YarnConfiguration.
                DEFAULT_RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC);
    int nodeRemovalInterval =
        rmContext.getNodesListManager().getNodeRemovalCheckInterval();
    long maxThreadSleeptime = nodeRemovalInterval + nodeRemovalTimeout;
    writeToHostsFile(hostFile, "host1", "localhost");
    writeToHostsFile(excludeHostFile, "");
    refreshNodesOption(doGraceful, conf);
    nm1.nodeHeartbeat(true);
    nm3.nodeHeartbeat(true);
    rm.drainEvents();
    waitCount = 0;
    while(rmContext.getInactiveRMNodes().get(
        nm2.getNodeId()) != null && waitCount++ < 2){
      synchronized (this) {
        wait(maxThreadSleeptime);
        nm1.nodeHeartbeat(true);
        nm2.nodeHeartbeat(true);
      }
    }
    assertEquals(rmContext.getInactiveRMNodes().get(nm2.getNodeId()), null,
        "host2 should have been forgotten!");
    assertEquals(clusterMetrics.getNumLostNMs(), 0,
        "There should be no Lost NMs!");
    assertEquals(clusterMetrics.getNumActiveNMs(), 2,
        "There should be 2 Active NM!");
    rm.stop();
  }

  private void testNodeRemovalUtilRebooted(boolean doGraceful)
      throws Exception {
    Configuration conf = new Configuration();
    int timeoutValue = 500;
    conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH,
        hostFile.getAbsolutePath());
    conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH,
        excludeHostFile.getAbsolutePath());
    writeToHostsFile(hostFile, "host1", "localhost", "host2");
    writeToHostsFile(excludeHostFile, "");
    conf.setInt(YarnConfiguration.RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC,
        timeoutValue);

    rm = new MockRM(conf);
    rm.init(conf);
    rm.start();
    RMContext rmContext = rm.getRMContext();
    refreshNodesOption(doGraceful, conf);
    MockNM nm1 = rm.registerNode("host1:1234", 5120);
    MockNM nm2 = rm.registerNode("host2:5678", 10240);
    MockNM nm3 = rm.registerNode("localhost:4433", 1024);
    ClusterMetrics clusterMetrics = ClusterMetrics.getMetrics();
    ClusterMetrics metrics = clusterMetrics;
    assert (metrics != null);
    NodeHeartbeatResponse nodeHeartbeat = nm2.nodeHeartbeat(
        new HashMap<ApplicationId, List<ContainerStatus>>(), true, -100);
    rm.drainEvents();
    rm.drainEvents();

    assertNotEquals(rmContext.getInactiveRMNodes().get(nm2.getNodeId()), null,
        "host2 should be a rebooted NM!");
    assertEquals(rmContext.getInactiveRMNodes().get(nm2.getNodeId()).getState(),
        NodeState.REBOOTED, "host2 should be a rebooted NM!");
    assertEquals(clusterMetrics.getNumRebootedNMs(), 1,
        "There should be 1 Rebooted NM!");
    assertEquals(clusterMetrics.getNumActiveNMs(), 2,
        "There should be 2 Active NM!");

    int nodeRemovalTimeout =
        conf.getInt(
            YarnConfiguration.RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC,
            YarnConfiguration.
                DEFAULT_RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC);
    int nodeRemovalInterval =
        rmContext.getNodesListManager().getNodeRemovalCheckInterval();
    long maxThreadSleeptime = nodeRemovalInterval + nodeRemovalTimeout;
    writeToHostsFile(hostFile, "host1", "localhost");
    writeToHostsFile(excludeHostFile, "");
    refreshNodesOption(doGraceful, conf);
    nm1.nodeHeartbeat(true);
    nm2.nodeHeartbeat(true);
    nm3.nodeHeartbeat(true);
    rm.drainEvents();
    int waitCount = 0;
    while(rmContext.getInactiveRMNodes().get(
        nm2.getNodeId()) != null && waitCount++ < 2){
      synchronized (this) {
        wait(maxThreadSleeptime);
      }
    }
    assertEquals(rmContext.getInactiveRMNodes().get(nm2.getNodeId()), null,
        "host2 should have been forgotten!");
    assertEquals(clusterMetrics.getNumRebootedNMs(), 0,
        "There should be no Rebooted NMs!");
    assertEquals(clusterMetrics.getNumActiveNMs(), 2,
        "There should be 2 Active NM!");
    rm.stop();
  }

  private void testNodeRemovalUtilUnhealthy(boolean doGraceful)
      throws Exception {
    Configuration conf = new Configuration();
    int timeoutValue = 500;
    conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH,
        hostFile.getAbsolutePath());
    conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH,
        excludeHostFile.getAbsolutePath());
    writeToHostsFile(hostFile, "host1", "localhost", "host2");
    writeToHostsFile(excludeHostFile, "");
    conf.setInt(YarnConfiguration.RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC,
        timeoutValue);

    rm = new MockRM(conf);
    rm.init(conf);
    rm.start();
    RMContext rmContext = rm.getRMContext();
    refreshNodesOption(doGraceful, conf);
    MockNM nm1 = rm.registerNode("host1:1234", 5120);
    MockNM nm2 = rm.registerNode("host2:5678", 10240);
    MockNM nm3 = rm.registerNode("localhost:4433", 1024);
    ClusterMetrics clusterMetrics = ClusterMetrics.getMetrics();
    ClusterMetrics metrics = clusterMetrics;
    assert (metrics != null);
    rm.drainEvents();
    //check all 3 nodes joined in as NORMAL
    NodeHeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true);
    assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
    nodeHeartbeat = nm2.nodeHeartbeat(true);
    assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
    nodeHeartbeat = nm3.nodeHeartbeat(true);
    assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
    rm.drainEvents();
    assertEquals(metrics.getNumActiveNMs(), 3, "All 3 nodes should be active");
    // node healthy
    nm1.nodeHeartbeat(true);
    nm2.nodeHeartbeat(false);
    nm3.nodeHeartbeat(true);
    checkUnhealthyNMCount(rm, nm2, true, 1);
    writeToHostsFile(hostFile, "host1", "localhost");
    writeToHostsFile(excludeHostFile, "");
    refreshNodesOption(doGraceful, conf);
    nm1.nodeHeartbeat(true);
    nm2.nodeHeartbeat(false);
    nm3.nodeHeartbeat(true);
    rm.drainEvents();
    if (!doGraceful) {
      assertNotEquals(rmContext.getInactiveRMNodes().get(nm2.getNodeId()), null,
          "host2 should be a shutdown NM!");
      assertEquals(rmContext.getInactiveRMNodes().get(nm2.getNodeId()).getState(),
          NodeState.SHUTDOWN, "host2 should be a shutdown NM!");
    }
    assertEquals(clusterMetrics.getNumActiveNMs(), 2,
        "There should be 2 Active NM!");
    if (!doGraceful) {
      assertEquals(clusterMetrics.getNumShutdownNMs(), 1,
          "There should be 1 Shutdown NM!");
    }
    assertEquals(clusterMetrics.getUnhealthyNMs(), 0,
        "There should be 0 Unhealthy NM!");
    int nodeRemovalTimeout =
        conf.getInt(
            YarnConfiguration.RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC,
            YarnConfiguration.
                DEFAULT_RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC);
    int nodeRemovalInterval =
        rmContext.getNodesListManager().getNodeRemovalCheckInterval();
    long maxThreadSleeptime = nodeRemovalInterval + nodeRemovalTimeout;
    int waitCount = 0;
    while(rmContext.getInactiveRMNodes().get(
        nm2.getNodeId()) != null && waitCount++ < 2){
      synchronized (this) {
        wait(maxThreadSleeptime);
      }
    }
    assertEquals(rmContext.getInactiveRMNodes().get(nm2.getNodeId()), null,
        "host2 should have been forgotten!");
    assertEquals(clusterMetrics.getNumRebootedNMs(), 0,
        "There should be no Shutdown NMs!");
    assertEquals(clusterMetrics.getNumActiveNMs(), 2, "There should be 2 Active NM!");
    rm.stop();
  }

  private void ensureFileExists(File file) throws IOException {
    if (!file.exists()) {
      TEMP_DIR.mkdirs();
      file.createNewFile();
    }
  }

  private void writeToHostsFile(String... hosts) throws IOException {
    writeToHostsFile(hostFile, hosts);
  }

  private void writeToHostsFile(File file, String... hosts)
      throws IOException {
    ensureFileExists(file);
    FileOutputStream fStream = null;
    try {
      fStream = new FileOutputStream(file);
      for (int i = 0; i < hosts.length; i++) {
        fStream.write(hosts[i].getBytes());
        fStream.write("\n".getBytes());
      }
    } finally {
      if (fStream != null) {
        IOUtils.closeStream(fStream);
        fStream = null;
      }
    }
  }

  private void writeToHostsXmlFile(
      File file, Pair<String, Integer>... hostsAndTimeouts) throws Exception {
    ensureFileExists(file);
    DocumentBuilderFactory dbFactory = XMLUtils.newSecureDocumentBuilderFactory();
    Document doc = dbFactory.newDocumentBuilder().newDocument();
    Element hosts = doc.createElement("hosts");
    doc.appendChild(hosts);
    for (Pair<String, Integer> hostsAndTimeout : hostsAndTimeouts) {
      Element host = doc.createElement("host");
      hosts.appendChild(host);
      Element name = doc.createElement("name");
      host.appendChild(name);
      name.appendChild(doc.createTextNode(hostsAndTimeout.getLeft()));
      if (hostsAndTimeout.getRight() != null) {
        Element timeout = doc.createElement("timeout");
        host.appendChild(timeout);
        timeout.appendChild(
            doc.createTextNode(hostsAndTimeout.getRight().toString())
        );
      }
    }
    TransformerFactory transformerFactory = XMLUtils.newSecureTransformerFactory();
    Transformer transformer = transformerFactory.newTransformer();
    transformer.setOutputProperty(OutputKeys.INDENT, "yes");
    transformer.transform(new DOMSource(doc), new StreamResult(file));
  }

  private void checkDecommissionedNMCount(MockRM rm, int count)
      throws InterruptedException {
    int waitCount = 0;
    while (ClusterMetrics.getMetrics().getNumDecommisionedNMs() != count
        && waitCount++ < 20) {
      synchronized (this) {
        wait(100);
      }
    }
    assertEquals(count, ClusterMetrics.getMetrics()
        .getNumDecommisionedNMs());
    assertEquals(count,
        ClusterMetrics.getMetrics().getNumDecommisionedNMs(),
        "The decommisioned metrics are not updated");
  }

  private void checkShutdownNMCount(MockRM rm, int count)
      throws InterruptedException {
    int waitCount = 0;
    while (ClusterMetrics.getMetrics().getNumShutdownNMs() != count
        && waitCount++ < 20) {
      synchronized (this) {
        wait(100);
      }
    }
    assertEquals(count,
        ClusterMetrics.getMetrics().getNumShutdownNMs(),
        "The shutdown metrics are not updated");
  }

  @AfterEach
  public void tearDown() {
    if (hostFile != null && hostFile.exists()) {
      hostFile.delete();
    }

    ClusterMetrics.destroy();
    if (rm != null) {
      rm.stop();
    }

    MetricsSystem ms = DefaultMetricsSystem.instance();
    if (ms.getSource("ClusterMetrics") != null) {
      DefaultMetricsSystem.shutdown();
    }
  }

  @SuppressWarnings("unchecked")
  @Test
  public void testHandleOpportunisticContainerStatus() throws Exception{
    final DrainDispatcher dispatcher = new DrainDispatcher();
    YarnConfiguration conf = new YarnConfiguration();
    conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
    conf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED,
        true);
    rm = new MockRM(conf){
      @Override
      protected Dispatcher createDispatcher() {
        return dispatcher;
      }
    };

    rm.start();
    MockRMAppSubmissionData data =
        MockRMAppSubmissionData.Builder.createWithMemory(1024, rm)
            .withUnmanagedAM(true)
            .build();
    RMApp app = MockRMAppSubmitter.submit(rm, data);
    ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
        .getAppAttemptId();

    ResourceTrackerService resourceTrackerService =
        rm.getResourceTrackerService();
    SchedulerApplicationAttempt applicationAttempt = null;
    while (applicationAttempt == null) {
      applicationAttempt =
          ((AbstractYarnScheduler)rm.getRMContext().getScheduler())
          .getApplicationAttempt(appAttemptId);
      Thread.sleep(100);
    }

    Resource currentConsumption = applicationAttempt.getCurrentConsumption();
    assertEquals(Resource.newInstance(0, 0), currentConsumption);
    Resource allocResources =
        applicationAttempt.getQueue().getMetrics().getAllocatedResources();
    assertEquals(Resource.newInstance(0, 0), allocResources);

    RegisterNodeManagerRequest req = Records.newRecord(
        RegisterNodeManagerRequest.class);
    NodeId nodeId = NodeId.newInstance("host2", 1234);
    Resource capability = Resources.createResource(1024);

    NodeStatus mockNodeStatus = createMockNodeStatus();

    req.setResource(capability);
    req.setNodeId(nodeId);
    req.setHttpPort(1234);
    req.setNMVersion(YarnVersionInfo.getVersion());
    req.setNodeStatus(mockNodeStatus);
    ContainerId c1 = ContainerId.newContainerId(appAttemptId, 1);
    ContainerId c2 = ContainerId.newContainerId(appAttemptId, 2);
    ContainerId c3 = ContainerId.newContainerId(appAttemptId, 3);
    NMContainerStatus queuedOpp =
        NMContainerStatus.newInstance(c1, 1, ContainerState.RUNNING,
            Resource.newInstance(1024, 1), "Dummy Queued OC",
            ContainerExitStatus.INVALID, Priority.newInstance(5), 1234, "",
            ExecutionType.OPPORTUNISTIC, -1);
    NMContainerStatus runningOpp =
        NMContainerStatus.newInstance(c2, 1, ContainerState.RUNNING,
            Resource.newInstance(2048, 1), "Dummy Running OC",
            ContainerExitStatus.INVALID, Priority.newInstance(6), 1234, "",
            ExecutionType.OPPORTUNISTIC, -1);
    NMContainerStatus runningGuar =
        NMContainerStatus.newInstance(c3, 1, ContainerState.RUNNING,
            Resource.newInstance(2048, 1), "Dummy Running GC",
            ContainerExitStatus.INVALID, Priority.newInstance(6), 1234, "",
            ExecutionType.GUARANTEED, -1);
    req.setContainerStatuses(Arrays.asList(queuedOpp, runningOpp, runningGuar));
    // trying to register a invalid node.
    RegisterNodeManagerResponse response =
        resourceTrackerService.registerNodeManager(req);
    dispatcher.await();
    Thread.sleep(2000);
    dispatcher.await();
    assertEquals(NodeAction.NORMAL, response.getNodeAction());

    Collection<RMContainer> liveContainers = applicationAttempt
        .getLiveContainers();
    assertEquals(3, liveContainers.size());
    Iterator<RMContainer> iter = liveContainers.iterator();
    while (iter.hasNext()) {
      RMContainer rc = iter.next();
      assertEquals(
          rc.getContainerId().equals(c3) ?
              ExecutionType.GUARANTEED : ExecutionType.OPPORTUNISTIC,
          rc.getExecutionType());
    }

    // Should only include GUARANTEED resources
    currentConsumption = applicationAttempt.getCurrentConsumption();
    assertEquals(Resource.newInstance(2048, 1), currentConsumption);
    allocResources =
        applicationAttempt.getQueue().getMetrics().getAllocatedResources();
    assertEquals(Resource.newInstance(2048, 1), allocResources);

    SchedulerNode schedulerNode =
        rm.getRMContext().getScheduler().getSchedulerNode(nodeId);
    assertNotNull(schedulerNode);
    Resource nodeResources = schedulerNode.getAllocatedResource();
    assertEquals(Resource.newInstance(2048, 1), nodeResources);
  }

  @Test
  @Timeout(value = 60)
  public void testNodeHeartBeatResponseForUnknownContainerCleanUp()
      throws Exception {
    Configuration conf = new Configuration();
    rm = new MockRM(conf);
    rm.init(conf);
    rm.start();

    MockNM nm1 = rm.registerNode("host1:1234", 5120);
    rm.drainEvents();

    // send 1st heartbeat
    nm1.nodeHeartbeat(true);

    // Create 2 unknown containers tracked by NM
    ApplicationId applicationId = BuilderUtils.newApplicationId(1, 1);
    ApplicationAttemptId applicationAttemptId = BuilderUtils
        .newApplicationAttemptId(applicationId, 1);
    ContainerId cid1 = BuilderUtils.newContainerId(applicationAttemptId, 2);
    ContainerId cid2 = BuilderUtils.newContainerId(applicationAttemptId, 3);
    ArrayList<ContainerStatus> containerStats =
        new ArrayList<ContainerStatus>();
    containerStats.add(
        ContainerStatus.newInstance(cid1, ContainerState.COMPLETE, "", -1));
    containerStats.add(
        ContainerStatus.newInstance(cid2, ContainerState.COMPLETE, "", -1));

    Map<ApplicationId, List<ContainerStatus>> conts =
        new HashMap<ApplicationId, List<ContainerStatus>>();
    conts.put(applicationAttemptId.getApplicationId(), containerStats);

    // add RMApp into context.
    RMApp app1 = mock(RMApp.class);
    when(app1.getApplicationId()).thenReturn(applicationId);
    rm.getRMContext().getRMApps().put(applicationId, app1);

    // Send unknown container status in heartbeat
    nm1.nodeHeartbeat(conts, true);
    rm.drainEvents();

    int containersToBeRemovedFromNM = 0;
    while (true) {
      NodeHeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true);
      rm.drainEvents();
      containersToBeRemovedFromNM +=
          nodeHeartbeat.getContainersToBeRemovedFromNM().size();
      // asserting for 2 since two unknown containers status has been sent
      if (containersToBeRemovedFromNM == 2) {
        break;
      }
    }
  }

  @Test
  public void testResponseIdOverflow() throws Exception {
    Configuration conf = new Configuration();
    rm = new MockRM(conf);
    rm.start();

    MockNM nm1 = rm.registerNode("host1:1234", 5120);

    NodeHeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true);
    assertEquals(NodeAction.NORMAL, nodeHeartbeat.getNodeAction());

    // prepare the responseId that's about to overflow
    RMNode node = rm.getRMContext().getRMNodes().get(nm1.getNodeId());
    node.getLastNodeHeartBeatResponse().setResponseId(Integer.MAX_VALUE);

    nm1.setResponseId(Integer.MAX_VALUE);

    // heartbeat twice and check responseId
    nodeHeartbeat = nm1.nodeHeartbeat(true);
    assertEquals(NodeAction.NORMAL, nodeHeartbeat.getNodeAction());
    assertEquals(0, nodeHeartbeat.getResponseId());

    nodeHeartbeat = nm1.nodeHeartbeat(true);
    assertEquals(NodeAction.NORMAL, nodeHeartbeat.getNodeAction());
    assertEquals(1, nodeHeartbeat.getResponseId());
  }

  @Test
  public void testNMIpHostNameResolution() throws Exception {
    Configuration conf = new Configuration();
    conf.set(YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS,
        "localhost:" + ServerSocketUtil.getPort(10000, 10));
    conf.setBoolean(YarnConfiguration.RM_NM_REGISTRATION_IP_HOSTNAME_CHECK_KEY,
        true);
    MockRM mockRM = new MockRM(conf) {
      @Override
      protected ResourceTrackerService createResourceTrackerService() {
        return new ResourceTrackerService(getRMContext(), nodesListManager,
            this.nmLivelinessMonitor,
            rmContext.getContainerTokenSecretManager(),
            rmContext.getNMTokenSecretManager()) {
        };
      }
    };
    mockRM.start();
    ResourceTracker rmTracker =
        ServerRMProxy.createRMProxy(mockRM.getConfig(), ResourceTracker.class);
    RegisterNodeManagerResponse response = rmTracker.registerNodeManager(
        RegisterNodeManagerRequest.newInstance(
            NodeId.newInstance("host1" + System.currentTimeMillis(), 1234),
            1236, Resource.newInstance(10000, 10), "2", new ArrayList<>(),
            new ArrayList<>()));

    assertEquals(NodeAction.SHUTDOWN,
        response.getNodeAction(), "Shutdown signal should be received");
    assertTrue(response.getDiagnosticsMessage()
        .contains("hostname cannot be resolved "), "Diagnostic Message");
    // Test success
    rmTracker =
        ServerRMProxy.createRMProxy(mockRM.getConfig(), ResourceTracker.class);
    response = rmTracker.registerNodeManager(RegisterNodeManagerRequest
        .newInstance(NodeId.newInstance("localhost", 1234), 1236,
            Resource.newInstance(10000, 10), "2", new ArrayList<>(),
            new ArrayList<>()));
    assertEquals(NodeAction.NORMAL,
        response.getNodeAction(), "Successfull registration");
    mockRM.stop();
  }

  private void pollingAssert(Supplier<Boolean> supplier, String message)
      throws InterruptedException, TimeoutException {
    GenericTestUtils.waitFor(supplier,
        100, 10_000, message);
  }

  private <T> void pollingAssert(Supplier<T> supplier, T expected, String message)
      throws InterruptedException, TimeoutException {
    GenericTestUtils.waitFor(() -> Objects.equals(supplier.get(), expected),
        100, 10_000, message);
  }

  /**
   * A no-op implementation of NodeAttributeStore for testing
   */
  public static class NullNodeAttributeStore implements NodeAttributeStore {

    @Override
    public void replaceNodeAttributes(List<NodeToAttributes> nodeToAttribute) {
    }

    @Override
    public void addNodeAttributes(List<NodeToAttributes> nodeToAttribute) {
    }

    @Override
    public void removeNodeAttributes(List<NodeToAttributes> nodeToAttribute) {
    }

    @Override
    public void init(Configuration configuration, NodeAttributesManager mgr) {
    }

    @Override
    public void recover() {
    }

    @Override
    public void close() {
    }
  }

  @Test
  @Timeout(value = 5)
  public void testSystemCredentialsAfterTokenSequenceNoChange()
      throws Exception {

    Configuration conf = new Configuration();

    RMContext rmContext = mock(RMContextImpl.class);

    Dispatcher dispatcher = new InlineDispatcher();
    when(rmContext.getDispatcher()).thenReturn(dispatcher);

    NodeId nodeId = NodeId.newInstance("localhost", 1234);
    ConcurrentMap<NodeId, RMNode> rmNodes =
        new ConcurrentHashMap<NodeId, RMNode>();
    RMNode rmNode = MockNodes.newNodeInfo(1, Resource.newInstance(1024, 1), 1,
        "localhost", 1234, rmContext);
    rmNodes.put(nodeId, rmNode);
    when(rmContext.getRMNodes()).thenReturn(rmNodes);

    ConcurrentMap<NodeId, RMNode> inactiveNodes =
        new ConcurrentHashMap<NodeId, RMNode>();
    when(rmContext.getInactiveRMNodes()).thenReturn(inactiveNodes);
    when(rmContext.getConfigurationProvider())
        .thenReturn(new LocalConfigurationProvider());

    dispatcher.register(SchedulerEventType.class,
        new InlineDispatcher.EmptyEventHandler());
    dispatcher.register(RMNodeEventType.class,
        new NodeEventDispatcher(rmContext));

    NMLivelinessMonitor nmLivelinessMonitor =
        new NMLivelinessMonitor(dispatcher);
    nmLivelinessMonitor.init(conf);
    nmLivelinessMonitor.start();
    NodesListManager nodesListManager = new NodesListManager(rmContext);
    nodesListManager.init(conf);
    RMContainerTokenSecretManager containerTokenSecretManager =
        new RMContainerTokenSecretManager(conf);
    containerTokenSecretManager.start();
    NMTokenSecretManagerInRM nmTokenSecretManager =
        new NMTokenSecretManagerInRM(conf);
    nmTokenSecretManager.start();
    ResourceTrackerService resourceTrackerService = new ResourceTrackerService(
        rmContext, nodesListManager, nmLivelinessMonitor,
        containerTokenSecretManager, nmTokenSecretManager);

    resourceTrackerService.init(conf);
    resourceTrackerService.start();

    RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);

    RegisterNodeManagerRequest request =
        recordFactory.newRecordInstance(RegisterNodeManagerRequest.class);
    request.setNodeId(nodeId);
    request.setHttpPort(1234);
    request.setResource(Resources.createResource(1024));
    resourceTrackerService.registerNodeManager(request);

    org.apache.hadoop.yarn.server.api.records.NodeStatus nodeStatus =
        recordFactory.newRecordInstance(
            org.apache.hadoop.yarn.server.api.records.NodeStatus.class);
    nodeStatus.setNodeId(nodeId);
    nodeStatus.setResponseId(0);
    nodeStatus.setNodeHealthStatus(
        recordFactory.newRecordInstance(NodeHealthStatus.class));
    nodeStatus.getNodeHealthStatus().setIsNodeHealthy(true);

    NodeHeartbeatRequest request1 =
        recordFactory.newRecordInstance(NodeHeartbeatRequest.class);
    request1.setNodeStatus(nodeStatus);

    // Set NM's token sequence no as 1
    request1.setTokenSequenceNo(1);

    // Set RM's token sequence no as 1
    when(rmContext.getTokenSequenceNo()).thenReturn((long) 1);

    // Populate SystemCredentialsForApps
    final ApplicationId appId = ApplicationId.newInstance(1234, 1);
    Credentials app1Cred = new Credentials();

    Token<DelegationTokenIdentifier> token =
        new Token<DelegationTokenIdentifier>();
    token.setKind(new Text("kind1"));
    app1Cred.addToken(new Text("token1"), token);
    Token<DelegationTokenIdentifier> token2 =
        new Token<DelegationTokenIdentifier>();
    token2.setKind(new Text("kind2"));
    app1Cred.addToken(new Text("token2"), token2);

    DataOutputBuffer dob = new DataOutputBuffer();
    app1Cred.writeTokenStorageToStream(dob);
    ByteBuffer byteBuffer = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());

    SystemCredentialsForAppsProto systemCredentialsForAppsProto =
        YarnServerBuilderUtils.newSystemCredentialsForAppsProto(appId,
            byteBuffer);

    ConcurrentHashMap<ApplicationId, SystemCredentialsForAppsProto> systemCredentialsForApps =
        new ConcurrentHashMap<ApplicationId, SystemCredentialsForAppsProto>(1);

    systemCredentialsForApps.put(appId, systemCredentialsForAppsProto);

    when(rmContext.getSystemCredentialsForApps())
        .thenReturn(systemCredentialsForApps);

    // first ping
    NodeHeartbeatResponse response =
        resourceTrackerService.nodeHeartbeat(request1);

    // Though SystemCredentialsForApps size is 1, it is not being sent as part
    // of response as there is no difference between NM's and RM's token
    // sequence no
    assertEquals(1, rmContext.getTokenSequenceNo());
    assertEquals(1, rmContext.getSystemCredentialsForApps().size());
    assertEquals(1, response.getTokenSequenceNo());
    assertEquals(0, response.getSystemCredentialsForApps().size());

    // Set RM's token sequence no as 2
    when(rmContext.getTokenSequenceNo()).thenReturn((long) 2);

    // Ensure new heartbeat has been sent to avoid duplicate issues
    nodeStatus.setResponseId(1);
    request1.setNodeStatus(nodeStatus);

    // second ping
    NodeHeartbeatResponse response1 =
        resourceTrackerService.nodeHeartbeat(request1);

    // Since NM's and RM's token sequence no is different, response should
    // contain SystemCredentialsForApps
    assertEquals(2, response1.getTokenSequenceNo());
    assertEquals(1, response1.getSystemCredentialsForApps().size());

    resourceTrackerService.close();
  }

  /**
   * Decommissioning without pre-configured include hosts file.
   */
  @Test
  public void testDecommissionWithoutIncludeFile() throws Exception {
    // clear exclude hosts
    writeToHostsFile(excludeHostFile, "");
    // init conf:
    // (1) set untracked removal timeout to 500ms
    // (2) set exclude path (no include path)
    // (3) enable node untracked without pre-configured include path
    Configuration conf = new Configuration();
    conf.setInt(YarnConfiguration.RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC,
        500);
    conf.setBoolean(
        YarnConfiguration.RM_ENABLE_NODE_UNTRACKED_WITHOUT_INCLUDE_PATH, true);
    conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH,
        excludeHostFile.getAbsolutePath());

    rm = new MockRM(conf);
    rm.start();
    MockNM nm1 = rm.registerNode("host1:1234", 10240);
    MockNM nm2 = rm.registerNode("host2:1234", 10240);
    MockNM nm3 = rm.registerNode("host3:1234", 10240);
    MockNM nm4 = rm.registerNode("host4:1234", 10240);
    assertEquals(4, rm.getRMContext().getRMNodes().size());
    assertEquals(0, rm.getRMContext().getInactiveRMNodes().size());

    // decommission nm1 via adding nm1 into exclude hosts
    RMNode rmNode1 = rm.getRMContext().getRMNodes().get(nm1.getNodeId());
    writeToHostsFile(excludeHostFile, "host1");
    rm.getNodesListManager().refreshNodes(conf);
    rm.drainEvents();
    assertEquals(rmNode1.getState(), NodeState.DECOMMISSIONED);
    assertEquals(3, rm.getRMContext().getRMNodes().size());
    assertEquals(1, rm.getRMContext().getInactiveRMNodes().size());
    assertEquals(Sets.newHashSet(nm1.getNodeId()),
        rm.getRMContext().getInactiveRMNodes().keySet());

    // remove nm1 from exclude hosts, so that it will be marked as untracked
    // and removed from inactive nodes after the timeout
    writeToHostsFile(excludeHostFile, "");
    rm.getNodesListManager().refreshNodes(conf);
    // confirmed that nm1 should be removed from inactive nodes in 1 second
    GenericTestUtils.waitFor(
        () -> rm.getRMContext().getInactiveRMNodes().size() == 0, 100, 1000);

    // lost nm2
    RMNode rmNode2 = rm.getRMContext().getRMNodes().get(nm2.getNodeId());
    rm.getRMContext().getDispatcher().getEventHandler()
        .handle(new RMNodeEvent(nm2.getNodeId(), RMNodeEventType.EXPIRE));
    rm.drainEvents();
    assertEquals(rmNode2.getState(), NodeState.LOST);
    assertEquals(2, rm.getRMContext().getRMNodes().size());
    assertEquals(1, rm.getRMContext().getInactiveRMNodes().size());
    assertEquals(Sets.newHashSet(nm2.getNodeId()),
        rm.getRMContext().getInactiveRMNodes().keySet());
    // confirmed that nm2 should be removed from inactive nodes in 1 second
    GenericTestUtils.waitFor(
        () -> rm.getRMContext().getInactiveRMNodes().size() == 0, 100, 1000);

    // shutdown nm3
    RMNode rmNode3 = rm.getRMContext().getRMNodes().get(nm3.getNodeId());
    rm.getRMContext().getDispatcher().getEventHandler()
        .handle(new RMNodeEvent(nm3.getNodeId(), RMNodeEventType.SHUTDOWN));
    rm.drainEvents();
    assertEquals(rmNode3.getState(), NodeState.SHUTDOWN);
    assertEquals(1, rm.getRMContext().getRMNodes().size());
    assertEquals(1, rm.getRMContext().getInactiveRMNodes().size());
    assertEquals(Sets.newHashSet(nm3.getNodeId()),
        rm.getRMContext().getInactiveRMNodes().keySet());
    // confirmed that nm3 should be removed from inactive nodes in 1 second
    GenericTestUtils.waitFor(
        () -> rm.getRMContext().getInactiveRMNodes().size() == 0, 100, 1000);

    // nm4 is still active node at last
    assertEquals(Sets.newHashSet(nm4.getNodeId()),
        rm.getRMContext().getRMNodes().keySet());

    rm.close();
  }

  /**
   * Decommissioning with selective states for untracked nodes.
   */
  @Test
  public void testDecommissionWithSelectiveStates() throws Exception {
    // clear exclude hosts
    writeToHostsFile(excludeHostFile, "");
    // init conf:
    // (1) set untracked removal timeout to 500ms
    // (2) set exclude path (no include path)
    // (3) enable node untracked without pre-configured include path
    Configuration conf = new Configuration();
    conf.setInt(YarnConfiguration.RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC, 500);
    conf.setBoolean(YarnConfiguration.RM_ENABLE_NODE_UNTRACKED_WITHOUT_INCLUDE_PATH, true);
    conf.setStrings(YarnConfiguration.RM_NODEMANAGER_UNTRACKED_NODE_SELECTIVE_STATES_TO_REMOVE,
         "DECOMMISSIONED", "SHUTDOWN");
    conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, excludeHostFile.getAbsolutePath());

    rm = new MockRM(conf);
    rm.start();
    MockNM nm1 = rm.registerNode("host1:1234", 10240);
    MockNM nm2 = rm.registerNode("host2:1234", 10240);
    MockNM nm3 = rm.registerNode("host3:1234", 10240);
    MockNM nm4 = rm.registerNode("host4:1234", 10240);
    assertEquals(4, rm.getRMContext().getRMNodes().size());
    assertEquals(0, rm.getRMContext().getInactiveRMNodes().size());

    // decommission nm1 via adding nm1 into exclude hosts
    RMNode rmNode1 = rm.getRMContext().getRMNodes().get(nm1.getNodeId());
    writeToHostsFile(excludeHostFile, "host1");
    rm.getNodesListManager().refreshNodes(conf);
    rm.drainEvents();
    assertEquals(rmNode1.getState(), NodeState.DECOMMISSIONED);
    assertEquals(3, rm.getRMContext().getRMNodes().size());
    assertEquals(1, rm.getRMContext().getInactiveRMNodes().size());
    assertEquals(new HashSet(Arrays.asList(nm1.getNodeId())),
         rm.getRMContext().getInactiveRMNodes().keySet());

    // remove nm1 from exclude hosts, so that it will be marked as untracked
    // and removed from inactive nodes after the timeout
    writeToHostsFile(excludeHostFile, "");
    rm.getNodesListManager().refreshNodes(conf);
    // confirmed that nm1 should be removed from inactive nodes in 1 second
    GenericTestUtils.waitFor(() -> rm.getRMContext().getInactiveRMNodes().size() == 0,
         100, 1000);

    // lost nm2
    RMNode rmNode2 = rm.getRMContext().getRMNodes().get(nm2.getNodeId());
    rm.getRMContext().getDispatcher().getEventHandler()
         .handle(new RMNodeEvent(nm2.getNodeId(), RMNodeEventType.EXPIRE));
    rm.drainEvents();
    assertEquals(rmNode2.getState(), NodeState.LOST);
    assertEquals(2, rm.getRMContext().getRMNodes().size());
    assertEquals(1, rm.getRMContext().getInactiveRMNodes().size());
    // confirmed that nm2 should not be removed from inactive nodes in 1 second
    GenericTestUtils.waitFor(() -> rm.getRMContext().getInactiveRMNodes().size() == 1,
         100, 1000);

    // shutdown nm3
    RMNode rmNode3 = rm.getRMContext().getRMNodes().get(nm3.getNodeId());
    rm.getRMContext().getDispatcher().getEventHandler()
         .handle(new RMNodeEvent(nm3.getNodeId(), RMNodeEventType.SHUTDOWN));
    rm.drainEvents();
    assertEquals(rmNode3.getState(), NodeState.SHUTDOWN);
    assertEquals(1, rm.getRMContext().getRMNodes().size());
    assertEquals(2, rm.getRMContext().getInactiveRMNodes().size());
    // confirmed that nm3 should be removed from inactive nodes in 1 second
    GenericTestUtils.waitFor(() -> rm.getRMContext().getInactiveRMNodes().size() == 1,
         100, 1000);

    // nm4 is still active node at last
    assertEquals(new HashSet(Arrays.asList(nm4.getNodeId())),
         rm.getRMContext().getRMNodes().keySet());

    // nm2 is still inactive node at last, not removed
    assertEquals(new HashSet(Arrays.asList(nm2.getNodeId())),
         rm.getRMContext().getInactiveRMNodes().keySet());

    rm.close();
  }

  /**
   * Test case to verify the behavior of ResourceManager when unregistered nodes
   * are marked as 'LOST' and node metrics are correctly updated in the system.
   *
   * @throws Exception if any unexpected behavior occurs
   */
  @Test
  public void testMarkUnregisteredNodesAsLost() throws Exception {
    // Step 1: Create a Configuration object to hold the settings.
    Configuration conf = new Configuration();

    // Step 2: Setup the host files.
    // Include the following hosts: test_host1, test_host2, test_host3, test_host4
    writeToHostsFile(hostFile, "test_host1", "test_host2", "test_host3", "test_host4");
    conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, hostFile.getAbsolutePath());

    // Exclude the following host: test_host4
    writeToHostsFile(excludeHostFile, "test_host4");
    conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, excludeHostFile.getAbsolutePath());

    // Enable tracking for unregistered nodes in the ResourceManager configuration
    conf.setBoolean(YarnConfiguration.ENABLE_TRACKING_FOR_UNREGISTERED_NODES, true);

    // Step 3: Create a MockRM (ResourceManager) instance to simulate RM behavior
    rm = new MockRM(conf);
    RMContext rmContext = rm.getRMContext(); // Retrieve the ResourceManager context
    ClusterMetrics clusterMetrics = ClusterMetrics.getMetrics(); // Get cluster metrics for nodes
    rm.start(); // Start the ResourceManager instance

    // Step 4: Register and simulate node activity for "test_host1"
    TimeUnit.MILLISECONDS.sleep(50); // Allow some time for event dispatch
    MockNM nm1 = rm.registerNode("test_host1:1234", 5120); // Register test_host1 with 5120MB
    nm1.nodeHeartbeat(true); // Send heartbeat to simulate the node being alive
    TimeUnit.MILLISECONDS.sleep(50); // Allow some time for event processing

    // Step 5: Validate that test_host3 is marked as a LOST node
    assertNotNull(clusterMetrics); // Ensure metrics are not null
    assertEquals(NodeState.LOST,
        rmContext.getInactiveRMNodes().get(
        rm.getNodesListManager().createLostNodeId("test_host3")).getState(),
        "test_host3 should be a lost NM!");

    // Step 6: Validate node metrics for lost, active, and decommissioned nodes
    // Two nodes are lost
    assertEquals(2, clusterMetrics.getNumLostNMs(), "There should be 2 Lost NM!");
    // One node is active
    assertEquals(1, clusterMetrics.getNumActiveNMs(), "There should be 1 Active NM!");
    // One node is decommissioned
    assertEquals(1, clusterMetrics.getNumDecommisionedNMs(),
        "There should be 1 Decommissioned NM!");

    // Step 7: Register and simulate node activity for "test_host3"
    MockNM nm3 = rm.registerNode("test_host3:5678", 10240); // Register test_host3 with 10240MB
    nm3.nodeHeartbeat(true); // Send heartbeat to simulate the node being alive
    TimeUnit.MILLISECONDS.sleep(50); // Allow some time for event dispatch and processing

    // Step 8: Validate updated node metrics after registering test_host3
    assertEquals(1, clusterMetrics.getNumLostNMs(),
        "There should be 1 Lost NM!"); // Only one node is lost now
    assertEquals(2, clusterMetrics.getNumActiveNMs(),
        "There should be 2 Active NM!"); // Two nodes are now active
  }
}