TestCapacitySchedulerNodeLabelUpdate.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.Sets;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmissionData;
import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmitter;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeLabelsUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceInfo;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableMap;
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet;

public class TestCapacitySchedulerNodeLabelUpdate {
  private final int GB = 1024;

  private YarnConfiguration conf;
  private static final QueuePath ROOT = new QueuePath(CapacitySchedulerConfiguration.ROOT);
  
  RMNodeLabelsManager mgr;

  @BeforeEach
  public void setUp() throws Exception {
    conf = new YarnConfiguration();
    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
      ResourceScheduler.class);
    mgr = new NullRMNodeLabelsManager();
    mgr.init(conf);
  }
  
  private Configuration getConfigurationWithQueueLabels(Configuration config) {
    CapacitySchedulerConfiguration conf =
        new CapacitySchedulerConfiguration(config);
    
    // Define top-level queues
    conf.setQueues(ROOT, new String[] {"a"});
    conf.setCapacityByLabel(ROOT, "x", 100);
    conf.setCapacityByLabel(ROOT, "y", 100);
    conf.setCapacityByLabel(ROOT, "z", 100);

    final String aPath = CapacitySchedulerConfiguration.ROOT + ".a";
    final QueuePath a = new QueuePath(aPath);
    conf.setCapacity(a, 100);
    conf.setAccessibleNodeLabels(a, ImmutableSet.of("x", "y", "z"));
    conf.setCapacityByLabel(a, "x", 100);
    conf.setCapacityByLabel(a, "y", 100);
    conf.setCapacityByLabel(a, "z", 100);
    
    return conf;
  }

  private Configuration getConfigurationWithSubQueueLabels(
      Configuration config) {
    CapacitySchedulerConfiguration conf2 =
        new CapacitySchedulerConfiguration(config);

    // Define top-level queues
    conf2.setQueues(ROOT,
        new String[] {"a", "b"});
    conf2.setCapacityByLabel(ROOT, "x", 100);
    conf2.setCapacityByLabel(ROOT, "y", 100);

    final String aPath = CapacitySchedulerConfiguration.ROOT + ".a";
    final String bPath = CapacitySchedulerConfiguration.ROOT + ".b";
    final String aa1Path = aPath + ".a1";
    final String aa2Path = aPath + ".a2";
    final String aa3Path = aPath + ".a3";
    final String aa4Path = aPath + ".a4";

    final QueuePath a = new QueuePath(aPath);
    final QueuePath b = new QueuePath(bPath);
    final QueuePath aa1 = new QueuePath(aa1Path);
    final QueuePath aa2 = new QueuePath(aa2Path);
    final QueuePath aa3= new QueuePath(aa3Path);
    final QueuePath aa4 = new QueuePath(aa4Path);

    conf2.setQueues(a, new String[] {"a1", "a2", "a3", "a4"});
    conf2.setCapacity(a, 50);
    conf2.setCapacity(b, 50);
    conf2.setCapacity(aa1, 40);
    conf2.setCapacity(aa2, 20);
    conf2.setCapacity(aa3, 20);
    conf2.setCapacity(aa4, 20);
    conf2.setAccessibleNodeLabels(a, ImmutableSet.of("x", "y", "z"));
    conf2.setAccessibleNodeLabels(aa1, ImmutableSet.of("x", "y"));
    conf2.setAccessibleNodeLabels(aa2, ImmutableSet.of("y"));
    conf2.setAccessibleNodeLabels(aa3, ImmutableSet.of("x", "y", "z"));
    conf2.setAccessibleNodeLabels(aa4, ImmutableSet.of("x", "y"));
    conf2.setCapacityByLabel(a, "x", 50);
    conf2.setCapacityByLabel(a, "y", 50);
    conf2.setCapacityByLabel(a, "z", 50);
    conf2.setCapacityByLabel(b, "x", 50);
    conf2.setCapacityByLabel(b, "y", 50);
    conf2.setCapacityByLabel(b, "z", 50);
    conf2.setCapacityByLabel(aa1, "x", 50);
    conf2.setCapacityByLabel(aa3, "x", 25);
    conf2.setCapacityByLabel(aa4, "x", 25);
    conf2.setCapacityByLabel(aa1, "y", 25);
    conf2.setCapacityByLabel(aa2, "y", 25);
    conf2.setCapacityByLabel(aa4, "y", 50);
    conf2.setCapacityByLabel(aa3, "z", 50);
    conf2.setCapacityByLabel(aa4, "z", 50);
    return conf2;
  }

  private Set<String> toSet(String... elements) {
    Set<String> set = Sets.newHashSet(elements);
    return set;
  }
  
  private void checkUsedResource(MockRM rm, String queueName, int memory) {
    checkUsedResource(rm, queueName, memory, RMNodeLabelsManager.NO_LABEL);
  }

  private void checkAMUsedResource(MockRM rm, String queueName, int memory) {
    checkAMUsedResource(rm, queueName, memory, RMNodeLabelsManager.NO_LABEL);
  }

  private void checkUsedCapacity(MockRM rm, String queueName, int capacity,
      int total) {
    checkUsedCapacity(rm, queueName, capacity, total,
        RMNodeLabelsManager.NO_LABEL);
  }

  private void checkUsedResource(MockRM rm, String queueName, int memory,
      String label) {
    CapacityScheduler scheduler = (CapacityScheduler) rm.getResourceScheduler();
    CSQueue queue = scheduler.getQueue(queueName);
    assertEquals(memory, queue.getQueueResourceUsage().getUsed(label)
        .getMemorySize());
  }

  private void checkUsedCapacity(MockRM rm, String queueName, int capacity,
      int total, String label) {
    float epsillon = 0.0001f;
    CapacityScheduler scheduler = (CapacityScheduler) rm.getResourceScheduler();
    CSQueue queue = scheduler.getQueue(queueName);
    assertEquals((float)capacity/total,
        queue.getQueueCapacities().getUsedCapacity(label), epsillon);
  }

  private void checkAMUsedResource(MockRM rm, String queueName, int memory,
      String label) {
    CapacityScheduler scheduler = (CapacityScheduler) rm.getResourceScheduler();
    CSQueue queue = scheduler.getQueue(queueName);
    assertEquals(memory, queue.getQueueResourceUsage().getAMUsed(label)
        .getMemorySize());
  }

  private void checkUserUsedResource(MockRM rm, String queueName,
      String userName, String partition, int memory) {
    CapacityScheduler scheduler = (CapacityScheduler) rm.getResourceScheduler();
    LeafQueue queue = (LeafQueue) scheduler.getQueue(queueName);
    UsersManager.User user = queue.getUser(userName);
    assertEquals(memory,
        user.getResourceUsage().getUsed(partition).getMemorySize());
  }

  @Test
  @Timeout(value = 60)
  public void testRequestContainerAfterNodePartitionUpdated()
      throws Exception {
    // set node -> label
    mgr.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("x", "y",
        "z"));

    // set mapping:
    // h1 -> x
    // h2 -> y
    mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x")));
    mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h2", 0), toSet("y")));

    // inject node label manager
    MockRM rm = new MockRM(getConfigurationWithQueueLabels(conf)) {
      @Override
      public RMNodeLabelsManager createNodeLabelManager() {
        return mgr;
      }
    };
    rm.getRMContext().setNodeLabelManager(mgr);
    rm.start();
    MockNM nm1 = rm.registerNode("h1:1234", 2048);
    MockNM nm2 = rm.registerNode("h2:1234", 2048);
    MockNM nm3 = rm.registerNode("h3:1234", 2048);

    ContainerId containerId;
    // launch an app to queue a1 (label = x), and check all container will
    // be allocated in h1
    MockRMAppSubmissionData data =
        MockRMAppSubmissionData.Builder.createWithMemory(GB, rm)
            .withAppName("app")
            .withUser("user")
            .withAcls(null)
            .withQueue("a")
            .withUnmanagedAM(false)
            .build();
    RMApp app1 = MockRMAppSubmitter.submit(rm, data);
    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm3);
    ApplicationResourceUsageReport appResourceUsageReport =
        rm.getResourceScheduler().getAppResourceUsageReport(
            am1.getApplicationAttemptId());
    assertEquals(1024, appResourceUsageReport.getUsedResources()
        .getMemorySize());
    assertEquals(1, appResourceUsageReport.getUsedResources()
        .getVirtualCores());
    // request a container.
    am1.allocate("*", GB, 1, new ArrayList<ContainerId>(), "x");
    containerId = ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
    rm.waitForState(nm1, containerId, RMContainerState.ALLOCATED);
    appResourceUsageReport =
        rm.getResourceScheduler().getAppResourceUsageReport(
            am1.getApplicationAttemptId());
    assertEquals(2048, appResourceUsageReport.getUsedResources()
        .getMemorySize());
    assertEquals(2, appResourceUsageReport.getUsedResources()
        .getVirtualCores());
    LeafQueue queue =
        (LeafQueue) ((CapacityScheduler) rm.getResourceScheduler())
            .getQueue("a");
    ArrayList<UserInfo> users = queue.getUsersManager().getUsersInfo();
    for (UserInfo userInfo : users) {
      if (userInfo.getUsername().equals("user")) {
        ResourceInfo resourcesUsed = userInfo.getResourcesUsed();
        assertEquals(2048, resourcesUsed.getMemorySize());
        assertEquals(2, resourcesUsed.getvCores());
      }
    }
    rm.stop();
  }

  @Test
  public void testResourceUsageWhenNodeUpdatesPartition()
      throws Exception {
    // set node -> label
    mgr.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("x", "y", "z"));
    
    // set mapping:
    // h1 -> x
    // h2 -> y
    mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x")));
    mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h2", 0), toSet("y")));

    // inject node label manager
    MockRM rm = new MockRM(getConfigurationWithQueueLabels(conf)) {
      @Override
      public RMNodeLabelsManager createNodeLabelManager() {
        return mgr;
      }
    };

    rm.getRMContext().setNodeLabelManager(mgr);
    rm.start();
    MockNM nm1 = rm.registerNode("h1:1234", 8000);
    MockNM nm2 = rm.registerNode("h2:1234", 8000);
    MockNM nm3 = rm.registerNode("h3:1234", 8000);

    ContainerId containerId1;
    ContainerId containerId2;

    // launch an app to queue a1 (label = x), and check all container will
    // be allocated in h1
    MockRMAppSubmissionData data =
        MockRMAppSubmissionData.Builder.createWithMemory(GB, rm)
            .withAppName("app")
            .withUser("user")
            .withAcls(null)
            .withQueue("a")
            .withUnmanagedAM(false)
            .build();
    RMApp app1 = MockRMAppSubmitter.submit(rm, data);
    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm3);

    // request a container.
    am1.allocate("*", GB, 1, new ArrayList<ContainerId>(), "x");
    containerId1 = ContainerId.newContainerId(am1.getApplicationAttemptId(), 1);
    containerId2 = ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
    assertTrue(rm.waitForState(nm1, containerId2,
        RMContainerState.ALLOCATED));
    
    // check used resource:
    // queue-a used x=1G, ""=1G
    checkUsedResource(rm, "a", 1024, "x");
    checkUsedResource(rm, "a", 1024);
    checkUsedCapacity(rm, "a", 1024, 8000, "x");
    checkUsedCapacity(rm, "a", 1024, 8000);
    
    CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
    FiCaSchedulerApp app = cs.getApplicationAttempt(am1.getApplicationAttemptId());
    
    // change h1's label to z
    mgr.replaceLabelsOnNode(ImmutableMap.of(nm1.getNodeId(), toSet("z")));
    cs.handle(new NodeLabelsUpdateSchedulerEvent(ImmutableMap.of(nm1.getNodeId(),
        toSet("z"))));
    Thread.sleep(100);
    checkUsedResource(rm, "a", 0, "x");
    checkUsedResource(rm, "a", 1024, "z");
    checkUsedResource(rm, "a", 1024);
    checkUsedCapacity(rm, "a", 0, 8000, "x");
    checkUsedCapacity(rm, "a", 1024, 8000, "z");
    checkUsedCapacity(rm, "a", 1024, 8000);
    checkUsedResource(rm, "root", 0, "x");
    checkUsedResource(rm, "root", 1024, "z");
    checkUsedResource(rm, "root", 1024);
    checkUserUsedResource(rm, "a", "user", "x", 0);
    checkUserUsedResource(rm, "a", "user", "z", 1024);
    assertEquals(0,
        app.getAppAttemptResourceUsage().getUsed("x").getMemorySize());
    assertEquals(1024,
        app.getAppAttemptResourceUsage().getUsed("z").getMemorySize());
    
    // change h1's label to y
    mgr.replaceLabelsOnNode(ImmutableMap.of(nm1.getNodeId(), toSet("y")));
    cs.handle(new NodeLabelsUpdateSchedulerEvent(ImmutableMap.of(nm1.getNodeId(),
        toSet("y"))));
    Thread.sleep(100);
    checkUsedResource(rm, "a", 0, "x");
    checkUsedResource(rm, "a", 1024, "y");
    checkUsedResource(rm, "a", 0, "z");
    checkUsedResource(rm, "a", 1024);
    checkUsedCapacity(rm, "a", 0, 8000, "x");
    checkUsedCapacity(rm, "a", 1024, 16000, "y");
    checkUsedCapacity(rm, "a", 0, 8000, "z");
    checkUsedCapacity(rm, "a", 1024, 8000);
    checkUsedResource(rm, "root", 0, "x");
    checkUsedResource(rm, "root", 1024, "y");
    checkUsedResource(rm, "root", 0, "z");
    checkUsedResource(rm, "root", 1024);
    checkUserUsedResource(rm, "a", "user", "x", 0);
    checkUserUsedResource(rm, "a", "user", "y", 1024);
    checkUserUsedResource(rm, "a", "user", "z", 0);
    assertEquals(0,
        app.getAppAttemptResourceUsage().getUsed("x").getMemorySize());
    assertEquals(1024,
        app.getAppAttemptResourceUsage().getUsed("y").getMemorySize());
    assertEquals(0,
        app.getAppAttemptResourceUsage().getUsed("z").getMemorySize());
    
    // change h1's label to no label
    Set<String> emptyLabels = new HashSet<>();
    Map<NodeId,Set<String>> map = ImmutableMap.of(nm1.getNodeId(),
        emptyLabels);
    mgr.replaceLabelsOnNode(map);
    cs.handle(new NodeLabelsUpdateSchedulerEvent(map));
    Thread.sleep(100);
    checkUsedResource(rm, "a", 0, "x");
    checkUsedResource(rm, "a", 0, "y");
    checkUsedResource(rm, "a", 0, "z");
    checkUsedResource(rm, "a", 2048);
    checkUsedCapacity(rm, "a", 0, 8000, "x");
    checkUsedCapacity(rm, "a", 0, 8000, "y");
    checkUsedCapacity(rm, "a", 0, 8000, "z");
    checkUsedCapacity(rm, "a", 2048, 16000);
    checkUsedResource(rm, "root", 0, "x");
    checkUsedResource(rm, "root", 0, "y");
    checkUsedResource(rm, "root", 0, "z");
    checkUsedResource(rm, "root", 2048);
    checkUserUsedResource(rm, "a", "user", "x", 0);
    checkUserUsedResource(rm, "a", "user", "y", 0);
    checkUserUsedResource(rm, "a", "user", "z", 0);
    checkUserUsedResource(rm, "a", "user", "", 2048);
    assertEquals(0,
        app.getAppAttemptResourceUsage().getUsed("x").getMemorySize());
    assertEquals(0,
        app.getAppAttemptResourceUsage().getUsed("y").getMemorySize());
    assertEquals(0,
        app.getAppAttemptResourceUsage().getUsed("z").getMemorySize());
    assertEquals(2048,
        app.getAppAttemptResourceUsage().getUsed("").getMemorySize());

    // Finish the two containers, we should see used resource becomes 0
    cs.completedContainer(cs.getRMContainer(containerId2),
        ContainerStatus.newInstance(containerId2, ContainerState.COMPLETE, "",
            ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
        RMContainerEventType.KILL);
    cs.completedContainer(cs.getRMContainer(containerId1),
        ContainerStatus.newInstance(containerId1, ContainerState.COMPLETE, "",
            ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
        RMContainerEventType.KILL);
    
    checkUsedResource(rm, "a", 0, "x");
    checkUsedResource(rm, "a", 0, "y");
    checkUsedResource(rm, "a", 0, "z");
    checkUsedResource(rm, "a", 0);
    checkUsedCapacity(rm, "a", 0, 8000, "x");
    checkUsedCapacity(rm, "a", 0, 8000, "y");
    checkUsedCapacity(rm, "a", 0, 8000, "z");
    checkUsedCapacity(rm, "a", 0, 16000);
    checkUsedResource(rm, "root", 0, "x");
    checkUsedResource(rm, "root", 0, "y");
    checkUsedResource(rm, "root", 0, "z");
    checkUsedResource(rm, "root", 0);
    checkUserUsedResource(rm, "a", "user", "x", 0);
    checkUserUsedResource(rm, "a", "user", "y", 0);
    checkUserUsedResource(rm, "a", "user", "z", 0);
    checkUserUsedResource(rm, "a", "user", "", 0);

    rm.close();
  }

  @Test
  @Timeout(value = 300)
  public void testMoveApplicationWithLabel() throws Exception {
    // set node -> label
    mgr.addToCluserNodeLabelsWithDefaultExclusivity(
        ImmutableSet.of("x", "y", "z"));
    // set mapping:
    // h1 -> x
    // h2 -> y
    mgr.addLabelsToNode(
        ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x")));
    mgr.addLabelsToNode(
        ImmutableMap.of(NodeId.newInstance("h2", 0), toSet("y")));
    mgr.addLabelsToNode(
        ImmutableMap.of(NodeId.newInstance("h4", 0), toSet("z")));
    // inject node label manager
    MockRM rm = new MockRM(getConfigurationWithSubQueueLabels(conf)) {
      @Override
      public RMNodeLabelsManager createNodeLabelManager() {
        return mgr;
      }
    };
    rm.getRMContext().getContainerTokenSecretManager().rollMasterKey();
    rm.getRMContext().getNMTokenSecretManager().rollMasterKey();
    rm.getRMContext().setNodeLabelManager(mgr);
    rm.start();
    MockNM nm1 = rm.registerNode("h1:1234", 4096 * 2);
    MockNM nm2 = rm.registerNode("h2:1234", 4096 * 2);
    MockNM nm3 = rm.registerNode("h3:1234", 4096 * 2);
    MockNM nm4 = rm.registerNode("h4:1234", 4096 * 2);
    // launch an app to queue a1 (label = x), and check all container will
    // be allocated in h1
    MockRMAppSubmissionData data =
        MockRMAppSubmissionData.Builder.createWithMemory(GB, rm)
            .withAppName("app")
            .withUser("user")
            .withAcls(null)
            .withQueue("a1")
            .withUnmanagedAM(false)
            .build();
    RMApp app1 = MockRMAppSubmitter.submit(rm, data);
    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm3);
    am1.allocate("*", GB, 1, new ArrayList<ContainerId>(), "x");
    ContainerId container1 =
        ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
    rm.waitForState(nm1, container1, RMContainerState.ALLOCATED, 10 * 1000);
    am1.allocate("*", GB, 1, new ArrayList<ContainerId>(), "y");
    ContainerId container2 =
        ContainerId.newContainerId(am1.getApplicationAttemptId(), 3);
    rm.waitForState(nm2, container2, RMContainerState.ALLOCATED, 10 * 1000);
    CapacityScheduler scheduler =
        ((CapacityScheduler) rm.getResourceScheduler());
    try {
      scheduler.preValidateMoveApplication(app1.getApplicationId(), "a2");
      scheduler.moveApplication(app1.getApplicationId(), "a2");
      fail("Should throw exception since target queue doesnt have "
          + "required labels");
    } catch (Exception e) {
      assertTrue(e instanceof YarnException,
          "Yarn Exception should be thrown");
      assertEquals("Specified queue=root.a.a2 can't satisfy "
          + "following apps label expressions =[x] accessible "
          + "node labels =[y]", e.getMessage());
    }
    try {
      scheduler.moveApplication(app1.getApplicationId(), "a3");
      scheduler.moveApplication(app1.getApplicationId(), "a4");
      // Check move to queue with accessible label ANY
      scheduler.moveApplication(app1.getApplicationId(), "b");
    } catch (Exception e) {
      fail("Should not throw exception since target queue has "
          + "required labels");
    }
    rm.stop();
  }

  @Test
  @Timeout(value = 60)
  public void testComplexResourceUsageWhenNodeUpdatesPartition()
      throws Exception {
    /*
     * This test is similar to testResourceUsageWhenNodeUpdatesPartition, this
     * will include multiple applications, multiple users and multiple
     * containers running on a single node, size of each container is 1G
     *
     * Node 1
     * ------
     * App1-container3
     * App2-container2
     * App2-Container3
     *
     * Node 2
     * ------
     * App2-container1
     * App1-container1
     * App1-container2
     */
    // set node -> label
    mgr.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("x", "y", "z"));
    
    // set mapping:
    // h1 -> x
    // h2 -> y
    mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x")));

    // inject node label manager
    MockRM rm = new MockRM(getConfigurationWithQueueLabels(conf)) {
      @Override
      public RMNodeLabelsManager createNodeLabelManager() {
        return mgr;
      }
    };

    rm.getRMContext().setNodeLabelManager(mgr);
    rm.start();
    MockNM nm1 = rm.registerNode("h1:1234", 80000);
    MockNM nm2 = rm.registerNode("h2:1234", 80000);

    // app1
    MockRMAppSubmissionData data1 =
        MockRMAppSubmissionData.Builder.createWithMemory(GB, rm)
            .withAppName("app")
            .withUser("u1")
            .withAcls(null)
            .withQueue("a")
            .withUnmanagedAM(false)
            .build();
    RMApp app1 = MockRMAppSubmitter.submit(rm, data1);
    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2);

    // c2 on n1, c3 on n2
    am1.allocate("*", GB, 1, new ArrayList<ContainerId>(), "x");
    ContainerId containerId =
        ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
    assertTrue(rm.waitForState(nm1, containerId,
        RMContainerState.ALLOCATED));
    am1.allocate("*", GB, 1, new ArrayList<ContainerId>());
    containerId =
        ContainerId.newContainerId(am1.getApplicationAttemptId(), 3);
    assertTrue(rm.waitForState(nm2, containerId,
        RMContainerState.ALLOCATED));
    
    // app2
    MockRMAppSubmissionData data =
        MockRMAppSubmissionData.Builder.createWithMemory(GB, rm)
            .withAppName("app")
            .withUser("u2")
            .withAcls(null)
            .withQueue("a")
            .withUnmanagedAM(false)
            .build();
    RMApp app2 = MockRMAppSubmitter.submit(rm, data);
    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm2);

    // c2/c3 on n1
    am2.allocate("*", GB, 2, new ArrayList<ContainerId>(), "x");
    containerId =
        ContainerId.newContainerId(am2.getApplicationAttemptId(), 3);
    assertTrue(rm.waitForState(nm1, containerId,
        RMContainerState.ALLOCATED));
    
    // check used resource:
    // queue-a used x=1G, ""=1G
    checkUsedResource(rm, "a", 3 * GB, "x");
    checkUsedResource(rm, "a", 3 * GB);

    CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
    FiCaSchedulerApp application1 =
        cs.getApplicationAttempt(am1.getApplicationAttemptId());
    FiCaSchedulerApp application2 =
        cs.getApplicationAttempt(am2.getApplicationAttemptId());

    // change h1's label to z
    cs.handle(new NodeLabelsUpdateSchedulerEvent(ImmutableMap.of(nm1.getNodeId(),
        toSet("z"))));
    checkUsedResource(rm, "a", 0, "x");
    checkUsedResource(rm, "a", 3 * GB, "z");
    checkUsedResource(rm, "a", 3 * GB);
    checkUsedResource(rm, "root", 0, "x");
    checkUsedResource(rm, "root", 3 * GB, "z");
    checkUsedResource(rm, "root", 3 * GB);
    checkUserUsedResource(rm, "a", "u1", "x", 0 * GB);
    checkUserUsedResource(rm, "a", "u1", "z", 1 * GB);
    checkUserUsedResource(rm, "a", "u1", "", 2 * GB);
    checkUserUsedResource(rm, "a", "u2", "x", 0 * GB);
    checkUserUsedResource(rm, "a", "u2", "z", 2 * GB);
    checkUserUsedResource(rm, "a", "u2", "", 1 * GB);
    assertEquals(0,
        application1.getAppAttemptResourceUsage().getUsed("x").getMemorySize());
    assertEquals(1 * GB,
        application1.getAppAttemptResourceUsage().getUsed("z").getMemorySize());
    assertEquals(2 * GB,
        application1.getAppAttemptResourceUsage().getUsed("").getMemorySize());
    assertEquals(0,
        application2.getAppAttemptResourceUsage().getUsed("x").getMemorySize());
    assertEquals(2 * GB,
        application2.getAppAttemptResourceUsage().getUsed("z").getMemorySize());
    assertEquals(1 * GB,
        application2.getAppAttemptResourceUsage().getUsed("").getMemorySize());

    rm.close();
  }

  @Test
  public void testAMResourceLimitNodeUpdatePartition() throws Exception {
    conf.setInt("yarn.scheduler.minimum-allocation-mb", 64);
    // inject node label manager
    MockRM rm = new MockRM(getConfigurationWithQueueLabels(conf)) {
      @Override
      public RMNodeLabelsManager createNodeLabelManager() {
        return mgr;
      }
    };
    rm.getRMContext().setNodeLabelManager(mgr);
    rm.start();
    rm.registerNode("h1:1234", 6400);
    mgr.addToCluserNodeLabelsWithDefaultExclusivity(
        ImmutableSet.of("x", "y", "z"));

    // .1 percentage of 6400 will be for am
    checkAMResourceLimit(rm, "a", 640, "");
    checkAMResourceLimit(rm, "a", 0, "x");
    checkAMResourceLimit(rm, "a", 0, "y");
    checkAMResourceLimit(rm, "a", 0, "z");

    mgr.replaceLabelsOnNode(
        ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x")));
    rm.drainEvents();

    checkAMResourceLimit(rm, "a", 640, "x");
    checkAMResourceLimit(rm, "a", 0, "y");
    checkAMResourceLimit(rm, "a", 0, "z");
    checkAMResourceLimit(rm, "a", 0, "");

    // Switch
    mgr.replaceLabelsOnNode(
        ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("y")));
    rm.drainEvents();

    checkAMResourceLimit(rm, "a", 0, "x");
    checkAMResourceLimit(rm, "a", 640, "y");
    checkAMResourceLimit(rm, "a", 0, "z");
    checkAMResourceLimit(rm, "a", 0, "");
    rm.stop();
  }

  @Test
  @Timeout(value = 60)
  public void testAMResourceUsageWhenNodeUpdatesPartition()
      throws Exception {
    // set node -> label
    mgr.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("x", "y", "z"));

    // set mapping:
    // h1 -> x
    // h2 -> y
    mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x")));
    mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h2", 0), toSet("y")));

    // inject node label manager
    MockRM rm = new MockRM(getConfigurationWithQueueLabels(conf)) {
      @Override
      public RMNodeLabelsManager createNodeLabelManager() {
        return mgr;
      }
    };

    rm.getRMContext().setNodeLabelManager(mgr);
    rm.start();
    MockNM nm1 = rm.registerNode("h1:1234", 8000);
    rm.registerNode("h2:1234", 8000);
    rm.registerNode("h3:1234", 8000);

    ContainerId containerId2;

    // launch an app to queue a1 (label = x), and check all container will
    // be allocated in h1
    MockRMAppSubmissionData data =
        MockRMAppSubmissionData.Builder.createWithMemory(GB, rm)
            .withAppName("app")
            .withUser("user")
            .withAcls(null)
            .withQueue("a")
            .withAmLabel("x")
            .build();
    RMApp app1 = MockRMAppSubmitter.submit(rm, data);
    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);

    // request a container.
    am1.allocate("*", GB, 1, new ArrayList<ContainerId>(), "x");
    ContainerId.newContainerId(am1.getApplicationAttemptId(), 1);
    containerId2 = ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
    assertTrue(rm.waitForState(nm1, containerId2,
        RMContainerState.ALLOCATED));

    // check used resource:
    // queue-a used x=2G
    checkUsedResource(rm, "a", 2048, "x");
    checkAMUsedResource(rm, "a", 1024, "x");

    CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
    FiCaSchedulerApp app = cs.getApplicationAttempt(am1.getApplicationAttemptId());

    // change h1's label to z
    cs.handle(new NodeLabelsUpdateSchedulerEvent(
        ImmutableMap.of(nm1.getNodeId(), toSet("z"))));

    // Now the resources also should change from x to z. Verify AM and normal
    // used resource are successfully changed.
    checkUsedResource(rm, "a", 0, "x");
    checkUsedResource(rm, "a", 2048, "z");
    checkAMUsedResource(rm, "a", 0, "x");
    checkAMUsedResource(rm, "a", 1024, "z");
    checkUserUsedResource(rm, "a", "user", "x", 0);
    checkUserUsedResource(rm, "a", "user", "z", 2048);
    assertEquals(0,
        app.getAppAttemptResourceUsage().getAMUsed("x").getMemorySize());
    assertEquals(1024,
        app.getAppAttemptResourceUsage().getAMUsed("z").getMemorySize());

    // change h1's label to no label
    Set<String> emptyLabels = new HashSet<>();
    Map<NodeId,Set<String>> map = ImmutableMap.of(nm1.getNodeId(),
        emptyLabels);
    cs.handle(new NodeLabelsUpdateSchedulerEvent(map));
    checkUsedResource(rm, "a", 0, "x");
    checkUsedResource(rm, "a", 0, "z");
    checkUsedResource(rm, "a", 2048);
    checkAMUsedResource(rm, "a", 0, "x");
    checkAMUsedResource(rm, "a", 0, "z");
    checkAMUsedResource(rm, "a", 1024);
    checkUserUsedResource(rm, "a", "user", "x", 0);
    checkUserUsedResource(rm, "a", "user", "z", 0);
    checkUserUsedResource(rm, "a", "user", "", 2048);
    assertEquals(0,
        app.getAppAttemptResourceUsage().getAMUsed("x").getMemorySize());
    assertEquals(0,
        app.getAppAttemptResourceUsage().getAMUsed("z").getMemorySize());
    assertEquals(1024,
        app.getAppAttemptResourceUsage().getAMUsed("").getMemorySize());

    rm.close();
  }

  @Test
  @Timeout(value = 30)
  public void testBlacklistAMDisableLabel() throws Exception {
    conf.setBoolean(YarnConfiguration.AM_SCHEDULING_NODE_BLACKLISTING_ENABLED,
        true);
    conf.setFloat(
        YarnConfiguration.AM_SCHEDULING_NODE_BLACKLISTING_DISABLE_THRESHOLD,
        0.5f);
    mgr.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("x", "y"));
    mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h2", 0), toSet("x"),
        NodeId.newInstance("h3", 0), toSet("x"), NodeId.newInstance("h6", 0),
        toSet("x")));
    mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h4", 0), toSet("y"),
        NodeId.newInstance("h5", 0), toSet("y"), NodeId.newInstance("h7", 0),
        toSet("y")));

    MockRM rm = new MockRM(getConfigurationWithQueueLabels(conf)) {
      @Override
      public RMNodeLabelsManager createNodeLabelManager() {
        return mgr;
      }
    };
    rm.getRMContext().setNodeLabelManager(mgr);
    rm.start();
    // Nodes in label default h1,h8,h9
    // Nodes in label x h2,h3,h6
    // Nodes in label y h4,h5,h7
    MockNM nm1 = rm.registerNode("h1:1234", 2048);
    MockNM nm2 = rm.registerNode("h2:1234", 2048);
    rm.registerNode("h3:1234", 2048);
    rm.registerNode("h4:1234", 2048);
    rm.registerNode("h5:1234", 2048);
    rm.registerNode("h6:1234", 2048);
    rm.registerNode("h7:1234", 2048);
    rm.registerNode("h8:1234", 2048);
    rm.registerNode("h9:1234", 2048);

    // Submit app with AM container launched on default partition i.e. h1.
    MockRMAppSubmissionData data =
        MockRMAppSubmissionData.Builder.createWithMemory(GB, rm)
            .withAppName("app")
            .withUser("user")
            .withAcls(null)
            .withQueue("a")
            .withUnmanagedAM(false)
            .build();
    RMApp app = MockRMAppSubmitter.submit(rm, data);
    MockRM.launchAndRegisterAM(app, rm, nm1);
    RMAppAttempt appAttempt = app.getCurrentAppAttempt();
    // Add default node blacklist from default
    appAttempt.getAMBlacklistManager().addNode("h1");
    ResourceBlacklistRequest blacklistUpdates =
        appAttempt.getAMBlacklistManager().getBlacklistUpdates();
    assertEquals(1, blacklistUpdates.getBlacklistAdditions().size());
    assertEquals(0, blacklistUpdates.getBlacklistRemovals().size());
    // Adding second node from default parition
    appAttempt.getAMBlacklistManager().addNode("h8");
    blacklistUpdates = appAttempt.getAMBlacklistManager().getBlacklistUpdates();
    assertEquals(0, blacklistUpdates.getBlacklistAdditions().size());
    assertEquals(2, blacklistUpdates.getBlacklistRemovals().size());

    // Submission in label x
    MockRMAppSubmissionData data1 =
        MockRMAppSubmissionData.Builder.createWithMemory(GB, rm)
            .withAppName("app")
            .withUser("user")
            .withAcls(null)
            .withQueue("a")
            .withAmLabel("x")
            .build();
    RMApp applabel = MockRMAppSubmitter.submit(rm, data1);
    MockRM.launchAndRegisterAM(applabel, rm, nm2);
    RMAppAttempt appAttemptlabelx = applabel.getCurrentAppAttempt();
    appAttemptlabelx.getAMBlacklistManager().addNode("h2");
    ResourceBlacklistRequest blacklistUpdatesOnx =
        appAttemptlabelx.getAMBlacklistManager().getBlacklistUpdates();
    assertEquals(1, blacklistUpdatesOnx.getBlacklistAdditions().size());
    assertEquals(0, blacklistUpdatesOnx.getBlacklistRemovals().size());
    // Adding second node from default parition
    appAttemptlabelx.getAMBlacklistManager().addNode("h3");
    blacklistUpdatesOnx =
        appAttempt.getAMBlacklistManager().getBlacklistUpdates();
    assertEquals(0, blacklistUpdatesOnx.getBlacklistAdditions().size());
    assertEquals(2, blacklistUpdatesOnx.getBlacklistRemovals().size());

    rm.close();
  }

  private void checkAMResourceLimit(MockRM rm, String queuename, int memory,
      String label) throws InterruptedException {
    assertEquals(memory,
        waitForResourceUpdate(rm, queuename, memory, label, 3000L));
  }

  private long waitForResourceUpdate(MockRM rm, String queuename, long memory,
      String label, long timeout) throws InterruptedException {
    long start = System.currentTimeMillis();
    long memorySize = 0;
    while (System.currentTimeMillis() - start < timeout) {
      CapacityScheduler scheduler =
          (CapacityScheduler) rm.getResourceScheduler();
      CSQueue queue = scheduler.getQueue(queuename);
      memorySize =
          queue.getQueueResourceUsage().getAMLimit(label).getMemorySize();
      if (memory == memorySize) {
        return memorySize;
      }
      Thread.sleep(100);
    }
    return memorySize;
  }

  private long waitForNodeLabelSchedulerEventUpdate(MockRM rm, String partition,
      long expectedNodeCount, long timeout) throws InterruptedException {
    long start = System.currentTimeMillis();
    long size = 0;
    while (System.currentTimeMillis() - start < timeout) {
      CapacityScheduler scheduler = (CapacityScheduler) rm
          .getResourceScheduler();
      size = scheduler.getNodeTracker().getNodesPerPartition(partition).size();
      if (size == expectedNodeCount) {
        return size;
      }
      Thread.sleep(100);
    }
    return size;
  }

  @Test
  public void testNodeCountBasedOnNodeLabelsFromClusterNodeTracker()
      throws Exception {
    // set node -> label
    mgr.addToCluserNodeLabelsWithDefaultExclusivity(
        ImmutableSet.of("x", "y", "z"));

    // set mapping:
    // h1 -> x
    // h2 -> y
    mgr.addLabelsToNode(
        ImmutableMap.of(NodeId.newInstance("h1", 1234), toSet("x")));
    mgr.addLabelsToNode(
        ImmutableMap.of(NodeId.newInstance("h2", 1234), toSet("x")));

    // inject node label manager
    MockRM rm = new MockRM(getConfigurationWithQueueLabels(conf)) {
      @Override
      public RMNodeLabelsManager createNodeLabelManager() {
        return mgr;
      }
    };

    rm.getRMContext().setNodeLabelManager(mgr);
    rm.start();
    MockNM nm1 = rm.registerNode("h1:1234", 8000);
    rm.registerNode("h2:1234", 8000);
    rm.registerNode("h3:1234", 8000);

    CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();

    // Ensure that cluster node tracker is updated with correct set of node
    // after Node registration.
    assertEquals(2,
        cs.getNodeTracker().getNodesPerPartition("x").size());
    assertEquals(1, cs.getNodeTracker().getNodesPerPartition("").size());

    rm.unRegisterNode(nm1);
    rm.registerNode("h4:1234", 8000);

    // Ensure that cluster node tracker is updated with correct set of node
    // after new Node registration and old node label change.
    assertEquals(1,
        cs.getNodeTracker().getNodesPerPartition("x").size());
    assertEquals(2, cs.getNodeTracker().getNodesPerPartition("").size());

    mgr.replaceLabelsOnNode(
        ImmutableMap.of(NodeId.newInstance("h2", 1234), toSet("")));

    // Last node with label x is replaced by CLI or REST.
    assertEquals(0,
        waitForNodeLabelSchedulerEventUpdate(rm, "x", 0, 3000L));
    rm.stop();
  }
}