TestContainerAllocation.java

/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;

import java.util.ArrayList;
import java.util.List;
import java.util.Set;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.security.SecurityUtilTestHelper;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.api.ContainerType;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmissionData;
import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmitter;
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
import org.apache.hadoop.yarn.server.resourcemanager.RMSecretManagerService;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.resource.TestResourceProfiles;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.resource.CustomResourceTypesConfigurationProvider;
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.MAXIMUM_ALLOCATION_MB;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.MAX_ASSIGN_PER_HEARTBEAT;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class TestContainerAllocation {

  private static final Logger LOG = LoggerFactory
      .getLogger(TestContainerAllocation.class);
  public static final String DEFAULT_PATH = CapacitySchedulerConfiguration.ROOT + ".default";
  public static final String A_PATH = CapacitySchedulerConfiguration.ROOT + ".a";
  public static final String B_PATH = CapacitySchedulerConfiguration.ROOT + ".b";
  public static final String C_PATH = CapacitySchedulerConfiguration.ROOT + ".c";
  public static final String C1_PATH = C_PATH + ".c1";
  public static final String C2_PATH = C_PATH + ".c2";
  private static final QueuePath ROOT = new QueuePath(CapacitySchedulerConfiguration.ROOT);
  private static final QueuePath DEFAULT = new QueuePath(DEFAULT_PATH);
  private static final QueuePath A = new QueuePath(A_PATH);
  private static final QueuePath B = new QueuePath(B_PATH);
  private static final QueuePath C = new QueuePath(C_PATH);
  private static final QueuePath C1 = new QueuePath(C1_PATH);
  private static final QueuePath C2 = new QueuePath(C2_PATH);
  private final int GB = 1024;

  private YarnConfiguration conf;
  
  RMNodeLabelsManager mgr;

  @BeforeEach
  public void setUp() throws Exception {
    conf = new YarnConfiguration();
    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
      ResourceScheduler.class);
    mgr = new NullRMNodeLabelsManager();
    mgr.init(conf);
  }

  @Test
  @Timeout(value = 60)
  public void testExcessReservationThanNodeManagerCapacity() throws Exception {
    @SuppressWarnings("resource")
    MockRM rm = new MockRM(conf);
    rm.start();

    // Register node1
    MockNM nm1 = rm.registerNode("127.0.0.1:1234", 2 * GB, 4);
    MockNM nm2 = rm.registerNode("127.0.0.1:2234", 3 * GB, 4);

    nm1.nodeHeartbeat(true);
    nm2.nodeHeartbeat(true);

    // wait..
    int waitCount = 20;
    int size = rm.getRMContext().getRMNodes().size();
    while ((size = rm.getRMContext().getRMNodes().size()) != 2
        && waitCount-- > 0) {
      LOG.info("Waiting for node managers to register : " + size);
      Thread.sleep(100);
    }
    assertEquals(2, rm.getRMContext().getRMNodes().size());
    // Submit an application
    RMApp app1 = MockRMAppSubmitter.submitWithMemory(128, rm);

    // kick the scheduling
    nm1.nodeHeartbeat(true);
    RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
    MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId());
    am1.registerAppAttempt();

    LOG.info("sending container requests ");
    am1.addRequests(new String[] {"*"}, 2 * GB, 1, 1);
    AllocateResponse alloc1Response = am1.schedule(); // send the request

    // kick the scheduler
    nm1.nodeHeartbeat(true);
    int waitCounter = 20;
    LOG.info("heartbeating nm1");
    while (alloc1Response.getAllocatedContainers().size() < 1
        && waitCounter-- > 0) {
      LOG.info("Waiting for containers to be created for app 1...");
      Thread.sleep(500);
      alloc1Response = am1.schedule();
    }
    LOG.info("received container : "
        + alloc1Response.getAllocatedContainers().size());

    // No container should be allocated.
    // Internally it should not been reserved.
    assertTrue(alloc1Response.getAllocatedContainers().size() == 0);

    LOG.info("heartbeating nm2");
    waitCounter = 20;
    nm2.nodeHeartbeat(true);
    while (alloc1Response.getAllocatedContainers().size() < 1
        && waitCounter-- > 0) {
      LOG.info("Waiting for containers to be created for app 1...");
      Thread.sleep(500);
      alloc1Response = am1.schedule();
    }
    LOG.info("received container : "
        + alloc1Response.getAllocatedContainers().size());
    assertTrue(alloc1Response.getAllocatedContainers().size() == 1);

    rm.stop();
  }

  // This is to test container tokens are generated when the containers are
  // acquired by the AM, not when the containers are allocated
  @Test
  public void testContainerTokenGeneratedOnPullRequest() throws Exception {
    MockRM rm1 = new MockRM(conf);
    rm1.start();
    MockNM nm1 = rm1.registerNode("127.0.0.1:1234", 8000);
    RMApp app1 = MockRMAppSubmitter.submitWithMemory(200, rm1);
    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
    // request a container.
    am1.allocate("127.0.0.1", 1024, 1, new ArrayList<ContainerId>());
    ContainerId containerId2 =
        ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
    rm1.waitForState(nm1, containerId2, RMContainerState.ALLOCATED);

    RMContainer container =
        rm1.getResourceScheduler().getRMContainer(containerId2);
    // no container token is generated.
    assertEquals(containerId2, container.getContainerId());
    assertNull(container.getContainer().getContainerToken());

    // acquire the container.
    List<Container> containers =
        am1.allocate(new ArrayList<ResourceRequest>(),
          new ArrayList<ContainerId>()).getAllocatedContainers();
    assertEquals(containerId2, containers.get(0).getId());
    // container token is generated.
    assertNotNull(containers.get(0).getContainerToken());
    rm1.stop();
  }

  @Test
  public void testNormalContainerAllocationWhenDNSUnavailable() throws Exception{
    MockRM rm1 = new MockRM(conf);
    rm1.start();
    MockNM nm1 = rm1.registerNode("unknownhost:1234", 8000);
    RMApp app1 = MockRMAppSubmitter.submitWithMemory(200, rm1);
    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);

    // request a container.
    am1.allocate("127.0.0.1", 1024, 1, new ArrayList<ContainerId>());
    ContainerId containerId2 =
        ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
    rm1.waitForState(nm1, containerId2, RMContainerState.ALLOCATED);

    // acquire the container.
    SecurityUtilTestHelper.setTokenServiceUseIp(true);
    List<Container> containers;
    try {
      containers =
          am1.allocate(new ArrayList<ResourceRequest>(),
              new ArrayList<ContainerId>()).getAllocatedContainers();
      // not able to fetch the container;
      assertEquals(0, containers.size());
    } finally {
      SecurityUtilTestHelper.setTokenServiceUseIp(false);
    }
    containers =
        am1.allocate(new ArrayList<ResourceRequest>(),
          new ArrayList<ContainerId>()).getAllocatedContainers();
    // should be able to fetch the container;
    assertEquals(1, containers.size());
    rm1.stop();
  }

  // This is to test whether LogAggregationContext is passed into
  // container tokens correctly
  @Test
  public void testLogAggregationContextPassedIntoContainerToken()
      throws Exception {
    MockRM rm1 = new MockRM(conf);
    rm1.start();
    MockNM nm1 = rm1.registerNode("127.0.0.1:1234", 8000);
    MockNM nm2 = rm1.registerNode("127.0.0.1:2345", 8000);
    // LogAggregationContext is set as null
    assertNull(getLogAggregationContextFromContainerToken(rm1, nm1, null));

    // create a not-null LogAggregationContext
    LogAggregationContext logAggregationContext =
        LogAggregationContext.newInstance(
          "includePattern", "excludePattern",
          "rolledLogsIncludePattern",
          "rolledLogsExcludePattern",
          "policyClass",
          "policyParameters");
    LogAggregationContext returned =
        getLogAggregationContextFromContainerToken(rm1, nm2,
          logAggregationContext);
    assertEquals("includePattern", returned.getIncludePattern());
    assertEquals("excludePattern", returned.getExcludePattern());
    assertEquals("rolledLogsIncludePattern",
        returned.getRolledLogsIncludePattern());
    assertEquals("rolledLogsExcludePattern",
        returned.getRolledLogsExcludePattern());
    assertEquals("policyClass",
        returned.getLogAggregationPolicyClassName());
    assertEquals("policyParameters",
        returned.getLogAggregationPolicyParameters());
    rm1.stop();
  }

  private LogAggregationContext getLogAggregationContextFromContainerToken(
      MockRM rm1, MockNM nm1, LogAggregationContext logAggregationContext)
      throws Exception {
    RMApp app2 = MockRMAppSubmitter.submit(rm1,
        MockRMAppSubmissionData.Builder.createWithMemory(200, rm1)
            .withLogAggregationContext(logAggregationContext)
            .build());
    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1);
    nm1.nodeHeartbeat(true);
    // request a container.
    am2.allocate("127.0.0.1", 512, 1, new ArrayList<ContainerId>());
    ContainerId containerId =
        ContainerId.newContainerId(am2.getApplicationAttemptId(), 2);
    rm1.waitForState(nm1, containerId, RMContainerState.ALLOCATED);

    // acquire the container.
    List<Container> containers =
        am2.allocate(new ArrayList<ResourceRequest>(),
          new ArrayList<ContainerId>()).getAllocatedContainers();
    assertEquals(containerId, containers.get(0).getId());
    // container token is generated.
    assertNotNull(containers.get(0).getContainerToken());
    ContainerTokenIdentifier token =
        BuilderUtils.newContainerTokenIdentifier(containers.get(0)
          .getContainerToken());
    return token.getLogAggregationContext();
  }

  private volatile int numRetries = 0;
  private class TestRMSecretManagerService extends RMSecretManagerService {

    public TestRMSecretManagerService(Configuration conf,
        RMContextImpl rmContext) {
      super(conf, rmContext);
    }
    @Override
    protected RMContainerTokenSecretManager createContainerTokenSecretManager(
        Configuration conf) {
      return new RMContainerTokenSecretManager(conf) {

        @Override
        public Token createContainerToken(ContainerId containerId,
            int containerVersion, NodeId nodeId, String appSubmitter,
            Resource capability, Priority priority, long createTime,
            LogAggregationContext logAggregationContext, String nodeLabelExp,
            ContainerType containerType, ExecutionType executionType,
            long allocationRequestId, Set<String> allocationTags) {
          numRetries++;
          return super.createContainerToken(containerId, containerVersion,
              nodeId, appSubmitter, capability, priority, createTime,
              logAggregationContext, nodeLabelExp, containerType,
              executionType, allocationRequestId, allocationTags);
        }
      };
    }
  }

  // This is to test fetching AM container will be retried, if AM container is
  // not fetchable since DNS is unavailable causing container token/NMtoken
  // creation failure.
  @Test
  @Timeout(value = 30)
  public void testAMContainerAllocationWhenDNSUnavailable() throws Exception {
    MockRM rm1 = new MockRM(conf) {
      @Override
      protected RMSecretManagerService createRMSecretManagerService() {
        return new TestRMSecretManagerService(conf, rmContext);
      }
    };
    rm1.start();

    MockNM nm1 = rm1.registerNode("unknownhost:1234", 8000);
    RMApp app1;
    try {
      SecurityUtilTestHelper.setTokenServiceUseIp(true);
      app1 = MockRMAppSubmitter.submitWithMemory(200, rm1);
      RMAppAttempt attempt = app1.getCurrentAppAttempt();
      nm1.nodeHeartbeat(true);

      // fetching am container will fail, keep retrying 5 times.
      while (numRetries <= 5) {
        nm1.nodeHeartbeat(true);
        Thread.sleep(1000);
        assertEquals(RMAppAttemptState.SCHEDULED,
            attempt.getAppAttemptState());
        System.out.println("Waiting for am container to be allocated.");
      }
    } finally {
      SecurityUtilTestHelper.setTokenServiceUseIp(false);
    }
    MockRM.launchAndRegisterAM(app1, rm1, nm1);
    rm1.stop();
  }
  
  @Test
  @Timeout(value = 60)
  public void testExcessReservationWillBeUnreserved() throws Exception {
    /**
     * Test case: Submit two application (app1/app2) to a queue. And there's one
     * node with 8G resource in the cluster. App1 allocates a 6G container, Then
     * app2 asks for a 4G container. App2's request will be reserved on the
     * node.
     * 
     * Before next node heartbeat, app2 cancels the reservation, we should found
     * the reserved resource is cancelled as well.
     */
    // inject node label manager
    MockRM rm1 = new MockRM();

    rm1.getRMContext().setNodeLabelManager(mgr);
    rm1.start();
    MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB);
    MockNM nm2 = rm1.registerNode("h2:1234", 8 * GB);

    // launch an app to queue, AM container should be launched in nm1
    MockRMAppSubmissionData data1 =
        MockRMAppSubmissionData.Builder.createWithMemory(1 * GB, rm1)
            .withAppName("app")
            .withUser("user")
            .withAcls(null)
            .withQueue("default")
            .withUnmanagedAM(false)
            .build();
    RMApp app1 = MockRMAppSubmitter.submit(rm1, data1);
    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
    
    // launch another app to queue, AM container should be launched in nm1
    MockRMAppSubmissionData data =
        MockRMAppSubmissionData.Builder.createWithMemory(1 * GB, rm1)
            .withAppName("app")
            .withUser("user")
            .withAcls(null)
            .withQueue("default")
            .withUnmanagedAM(false)
            .build();
    RMApp app2 = MockRMAppSubmitter.submit(rm1, data);
    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1);
  
    am1.allocate("*", 4 * GB, 1, new ArrayList<ContainerId>());
    am2.allocate("*", 4 * GB, 1, new ArrayList<ContainerId>());
    
    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
    RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
    LeafQueue leafQueue = (LeafQueue) cs.getQueue("default");

    // Do node heartbeats 2 times
    // First time will allocate container for app1, second time will reserve
    // container for app2
    cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
    cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
    
    // App2 will get preference to be allocated on node1, and node1 will be all
    // used by App2.
    FiCaSchedulerApp schedulerApp1 =
        cs.getApplicationAttempt(am1.getApplicationAttemptId());
    FiCaSchedulerApp schedulerApp2 =
        cs.getApplicationAttempt(am2.getApplicationAttemptId());

    // Check if a 4G contaienr allocated for app1, and nothing allocated for app2
    assertEquals(2, schedulerApp1.getLiveContainers().size());
    assertEquals(1, schedulerApp2.getLiveContainers().size());
    assertTrue(schedulerApp2.getReservedContainers().size() > 0);
    
    // NM1 has available resource = 2G (8G - 2 * 1G - 4G)
    assertEquals(2 * GB, cs.getNode(nm1.getNodeId())
        .getUnallocatedResource().getMemorySize());
    assertNotNull(cs.getNode(nm1.getNodeId()).getReservedContainer());
    // Usage of queue = 4G + 2 * 1G + 4G (reserved)
    assertEquals(10 * GB, cs.getRootQueue().getQueueResourceUsage()
        .getUsed().getMemorySize());
    assertEquals(4 * GB, cs.getRootQueue().getQueueResourceUsage()
        .getReserved().getMemorySize());
    assertEquals(4 * GB, leafQueue.getQueueResourceUsage().getReserved()
        .getMemorySize());

    // Cancel asks of app2 and re-kick RM
    am2.allocate("*", 4 * GB, 0, new ArrayList<ContainerId>());
    cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
    
    // App2's reservation will be cancelled
    assertTrue(schedulerApp2.getReservedContainers().size() == 0);
    assertEquals(2 * GB, cs.getNode(nm1.getNodeId())
        .getUnallocatedResource().getMemorySize());
    assertNull(cs.getNode(nm1.getNodeId()).getReservedContainer());
    assertEquals(6 * GB, cs.getRootQueue().getQueueResourceUsage()
        .getUsed().getMemorySize());
    assertEquals(0, cs.getRootQueue().getQueueResourceUsage()
        .getReserved().getMemorySize());
    assertEquals(0, leafQueue.getQueueResourceUsage().getReserved()
        .getMemorySize());

    rm1.close();
  }

  @Test
  @Timeout(value = 60)
  public void testAllocationForReservedContainer() throws Exception {
    /**
     * Test case: Submit two application (app1/app2) to a queue. And there's one
     * node with 8G resource in the cluster. App1 allocates a 6G container, Then
     * app2 asks for a 4G container. App2's request will be reserved on the
     * node.
     *
     * Before next node heartbeat, app1 container is completed/killed. So app1
     * container which was reserved will be allocated.
     */
    // inject node label manager
    MockRM rm1 = new MockRM();

    rm1.getRMContext().setNodeLabelManager(mgr);
    rm1.start();
    MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB);
    MockNM nm2 = rm1.registerNode("h2:1234", 8 * GB);

    // launch an app to queue, AM container should be launched in nm1
    MockRMAppSubmissionData data1 =
        MockRMAppSubmissionData.Builder.createWithMemory(1 * GB, rm1)
            .withAppName("app")
            .withUser("user")
            .withAcls(null)
            .withQueue("default")
            .withUnmanagedAM(false)
            .build();
    RMApp app1 = MockRMAppSubmitter.submit(rm1, data1);
    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);

    // launch another app to queue, AM container should be launched in nm1
    MockRMAppSubmissionData data =
        MockRMAppSubmissionData.Builder.createWithMemory(1 * GB, rm1)
            .withAppName("app")
            .withUser("user")
            .withAcls(null)
            .withQueue("default")
            .withUnmanagedAM(false)
            .build();
    RMApp app2 = MockRMAppSubmitter.submit(rm1, data);
    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1);

    am1.allocate("*", 4 * GB, 1, new ArrayList<ContainerId>());
    am2.allocate("*", 4 * GB, 1, new ArrayList<ContainerId>());

    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
    RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
    LeafQueue leafQueue = (LeafQueue) cs.getQueue("default");

    // Do node heartbeats 2 times
    // First time will allocate container for app1, second time will reserve
    // container for app2
    cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
    cs.handle(new NodeUpdateSchedulerEvent(rmNode1));

    // App2 will get preference to be allocated on node1, and node1 will be all
    // used by App2.
    FiCaSchedulerApp schedulerApp1 =
        cs.getApplicationAttempt(am1.getApplicationAttemptId());
    FiCaSchedulerApp schedulerApp2 =
        cs.getApplicationAttempt(am2.getApplicationAttemptId());

    // Check if a 4G container allocated for app1, and nothing allocated for app2
    assertEquals(2, schedulerApp1.getLiveContainers().size());
    assertEquals(1, schedulerApp2.getLiveContainers().size());
    assertTrue(schedulerApp2.getReservedContainers().size() > 0);

    // NM1 has available resource = 2G (8G - 2 * 1G - 4G)
    assertEquals(2 * GB, cs.getNode(nm1.getNodeId())
        .getUnallocatedResource().getMemorySize());
    assertNotNull(cs.getNode(nm1.getNodeId()).getReservedContainer());
    // Usage of queue = 4G + 2 * 1G + 4G (reserved)
    assertEquals(10 * GB, cs.getRootQueue().getQueueResourceUsage()
        .getUsed().getMemorySize());
    assertEquals(4 * GB, cs.getRootQueue().getQueueResourceUsage()
        .getReserved().getMemorySize());
    assertEquals(4 * GB, leafQueue.getQueueResourceUsage().getReserved()
        .getMemorySize());

    // Mark one app1 container as killed/completed and re-kick RM
    for (RMContainer container : schedulerApp1.getLiveContainers()) {
      if (container.isAMContainer()) {
        continue;
      }
      cs.markContainerForKillable(container);
    }
    // Cancel asks of app1 and re-kick RM
    am1.allocate("*", 4 * GB, 0, new ArrayList<ContainerId>());
    cs.handle(new NodeUpdateSchedulerEvent(rmNode1));

    // Check 4G container cancelled for app1, and one container allocated for
    // app2
    assertEquals(1, schedulerApp1.getLiveContainers().size());
    assertEquals(2, schedulerApp2.getLiveContainers().size());
    assertFalse(schedulerApp2.getReservedContainers().size() > 0);

    // NM1 has available resource = 2G (8G - 2 * 1G - 4G)
    assertEquals(2 * GB, cs.getNode(nm1.getNodeId())
        .getUnallocatedResource().getMemorySize());
    assertNull(cs.getNode(nm1.getNodeId()).getReservedContainer());
    // Usage of queue = 4G + 2 * 1G
    assertEquals(6 * GB, cs.getRootQueue().getQueueResourceUsage()
        .getUsed().getMemorySize());
    assertEquals(0 * GB, cs.getRootQueue().getQueueResourceUsage()
        .getReserved().getMemorySize());
    assertEquals(0 * GB, leafQueue.getQueueResourceUsage().getReserved()
        .getMemorySize());

    rm1.close();
  }

  @Test
  @Timeout(value = 60)
  public void testReservedContainerMetricsOnDecommisionedNode() throws Exception {
    /**
     * Test case: Submit two application (app1/app2) to a queue. And there's one
     * node with 8G resource in the cluster. App1 allocates a 6G container, Then
     * app2 asks for a 4G container. App2's request will be reserved on the
     * node.
     *
     * Before next node heartbeat, app1 container is completed/killed. So app1
     * container which was reserved will be allocated.
     */
    // inject node label manager
    MockRM rm1 = new MockRM();

    rm1.getRMContext().setNodeLabelManager(mgr);
    rm1.start();
    MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB);
    MockNM nm2 = rm1.registerNode("h2:1234", 8 * GB);

    // launch an app to queue, AM container should be launched in nm1
    MockRMAppSubmissionData data1 =
        MockRMAppSubmissionData.Builder.createWithMemory(1 * GB, rm1)
            .withAppName("app")
            .withUser("user")
            .withAcls(null)
            .withQueue("default")
            .withUnmanagedAM(false)
            .build();
    RMApp app1 = MockRMAppSubmitter.submit(rm1, data1);
    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);

    // launch another app to queue, AM container should be launched in nm1
    MockRMAppSubmissionData data =
        MockRMAppSubmissionData.Builder.createWithMemory(1 * GB, rm1)
            .withAppName("app")
            .withUser("user")
            .withAcls(null)
            .withQueue("default")
            .withUnmanagedAM(false)
            .build();
    RMApp app2 = MockRMAppSubmitter.submit(rm1, data);
    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1);

    am1.allocate("*", 4 * GB, 1, new ArrayList<ContainerId>());
    am2.allocate("*", 4 * GB, 1, new ArrayList<ContainerId>());

    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
    RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
    LeafQueue leafQueue = (LeafQueue) cs.getQueue("default");

    // Do node heartbeats 2 times
    // First time will allocate container for app1, second time will reserve
    // container for app2
    cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
    cs.handle(new NodeUpdateSchedulerEvent(rmNode1));

    // App2 will get preference to be allocated on node1, and node1 will be all
    // used by App2.
    FiCaSchedulerApp schedulerApp1 =
        cs.getApplicationAttempt(am1.getApplicationAttemptId());
    FiCaSchedulerApp schedulerApp2 =
        cs.getApplicationAttempt(am2.getApplicationAttemptId());

    // Check if a 4G container allocated for app1, and nothing allocated for app2
    assertEquals(2, schedulerApp1.getLiveContainers().size());
    assertEquals(1, schedulerApp2.getLiveContainers().size());
    assertTrue(schedulerApp2.getReservedContainers().size() > 0);

    // NM1 has available resource = 2G (8G - 2 * 1G - 4G)
    assertEquals(2 * GB, cs.getNode(nm1.getNodeId())
        .getUnallocatedResource().getMemorySize());
    assertNotNull(cs.getNode(nm1.getNodeId()).getReservedContainer());
    // Usage of queue = 4G + 2 * 1G + 4G (reserved)
    assertEquals(10 * GB, cs.getRootQueue().getQueueResourceUsage()
        .getUsed().getMemorySize());
    assertEquals(4 * GB, cs.getRootQueue().getQueueResourceUsage()
        .getReserved().getMemorySize());
    assertEquals(4 * GB, leafQueue.getQueueResourceUsage().getReserved()
        .getMemorySize());

    // Remove the node
    cs.handle(new NodeRemovedSchedulerEvent(rmNode1));

    // Check all container cancelled for app1 and app2
    assertEquals(0, schedulerApp1.getLiveContainers().size());
    assertEquals(0, schedulerApp2.getLiveContainers().size());
    assertFalse(schedulerApp2.getReservedContainers().size() > 0);

    // Usage and Reserved capacity of queue is 0
    assertEquals(0 * GB, cs.getRootQueue().getQueueResourceUsage()
        .getUsed().getMemorySize());
    assertEquals(0 * GB, cs.getRootQueue().getQueueResourceUsage()
        .getReserved().getMemorySize());
    assertEquals(0 * GB, leafQueue.getQueueResourceUsage().getReserved()
        .getMemorySize());

    rm1.close();
  }

  @Test
  @Timeout(value = 60)
  public void testAssignMultipleOffswitchContainers() throws Exception {
    MockRM rm1 = new MockRM();

    rm1.getRMContext().setNodeLabelManager(mgr);
    rm1.start();
    MockNM nm1 = rm1.registerNode("h1:1234", 80 * GB);

    // launch an app to queue, AM container should be launched in nm1
    MockRMAppSubmissionData data =
        MockRMAppSubmissionData.Builder.createWithMemory(1 * GB, rm1)
            .withAppName("app")
            .withUser("user")
            .withAcls(null)
            .withQueue("default")
            .withUnmanagedAM(false)
            .build();
    RMApp app1 = MockRMAppSubmitter.submit(rm1, data);
    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);

    am1.allocate("*", 1 * GB, 5, new ArrayList<ContainerId>());

    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
    RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());

    // Do node heartbeats once
    cs.handle(new NodeUpdateSchedulerEvent(rmNode1));

    FiCaSchedulerApp schedulerApp1 =
        cs.getApplicationAttempt(am1.getApplicationAttemptId());

    // App1 will get one container allocated (plus AM container
    assertEquals(2, schedulerApp1.getLiveContainers().size());

    // Set assign multiple off-switch containers to 3
    CapacitySchedulerConfiguration newCSConf = new CapacitySchedulerConfiguration();
    newCSConf.setInt(
        CapacitySchedulerConfiguration.OFFSWITCH_PER_HEARTBEAT_LIMIT, 3);

    cs.reinitialize(newCSConf, rm1.getRMContext());

    // Do node heartbeats once
    cs.handle(new NodeUpdateSchedulerEvent(rmNode1));

    // App1 will get 3 new container allocated (plus 2 previously allocated
    // container)
    assertEquals(5, schedulerApp1.getLiveContainers().size());

    rm1.close();
  }

  @Test
  @Timeout(value = 60)
  public void testContinuousReservationLookingWhenUsedEqualsMax() throws Exception {
    CapacitySchedulerConfiguration newConf =
        (CapacitySchedulerConfiguration) TestUtils
            .getConfigurationWithMultipleQueues(conf);
    // Set maximum capacity of A to 10
    newConf.setMaximumCapacity(A, 10);
    MockRM rm1 = new MockRM(newConf);

    rm1.getRMContext().setNodeLabelManager(mgr);
    rm1.start();
    MockNM nm1 = rm1.registerNode("h1:1234", 10 * GB);
    MockNM nm2 = rm1.registerNode("h2:1234", 90 * GB);

    // launch an app to queue A, AM container should be launched in nm1
    MockRMAppSubmissionData data1 =
        MockRMAppSubmissionData.Builder.createWithMemory(2 * GB, rm1)
            .withAppName("app")
            .withUser("user")
            .withAcls(null)
            .withQueue("a")
            .withUnmanagedAM(false)
            .build();
    RMApp app1 = MockRMAppSubmitter.submit(rm1, data1);
    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);

    // launch 2nd app to queue B, AM container should be launched in nm1
    // Now usage of nm1 is 3G (2G + 1G)
    MockRMAppSubmissionData data =
        MockRMAppSubmissionData.Builder.createWithMemory(1 * GB, rm1)
            .withAppName("app")
            .withUser("user")
            .withAcls(null)
            .withQueue("b")
            .withUnmanagedAM(false)
            .build();
    RMApp app2 = MockRMAppSubmitter.submit(rm1, data);
    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1);

    am1.allocate("*", 4 * GB, 2, null);

    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
    RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
    RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());

    // Do node heartbeats twice, we expect one container allocated on nm1 and
    // one container reserved on nm1.
    cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
    cs.handle(new NodeUpdateSchedulerEvent(rmNode1));

    FiCaSchedulerApp schedulerApp1 =
        cs.getApplicationAttempt(am1.getApplicationAttemptId());

    // App1 will get 2 container allocated (plus AM container)
    assertEquals(2, schedulerApp1.getLiveContainers().size());
    assertEquals(1, schedulerApp1.getReservedContainers().size());

    // Do node heartbeats on nm2, we expect one container allocated on nm2 and
    // one unreserved on nm1
    cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
    assertEquals(3, schedulerApp1.getLiveContainers().size());
    assertEquals(0, schedulerApp1.getReservedContainers().size());

    rm1.close();
  }

  @Test
  public void testPendingResourcesConsideringUserLimit() throws Exception {
    // Set maximum capacity of A to 10
    CapacitySchedulerConfiguration newConf = new CapacitySchedulerConfiguration(
        conf);
    newConf.setUserLimitFactor(DEFAULT, 0.5f);
    newConf.setMaximumAMResourcePercentPerPartition(DEFAULT, "", 1.0f);
    MockRM rm1 = new MockRM(newConf);

    rm1.getRMContext().setNodeLabelManager(mgr);
    rm1.start();
    MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB);
    MockNM nm2 = rm1.registerNode("h2:1234", 8 * GB);

    // launch an app to queue default, AM container should be launched in nm1
    MockRMAppSubmissionData data1 =
        MockRMAppSubmissionData.Builder.createWithMemory(2 * GB, rm1)
            .withAppName("app")
            .withUser("u1")
            .withAcls(null)
            .withQueue("default")
            .withUnmanagedAM(false)
            .build();
    RMApp app1 = MockRMAppSubmitter.submit(rm1, data1);
    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);

    // launch 2nd app to queue default, AM container should be launched in nm1
    MockRMAppSubmissionData data =
        MockRMAppSubmissionData.Builder.createWithMemory(4 * GB, rm1)
            .withAppName("app")
            .withUser("u2")
            .withAcls(null)
            .withQueue("default")
            .withUnmanagedAM(false)
            .build();
    RMApp app2 = MockRMAppSubmitter.submit(rm1, data);
    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1);

    // am1 asks 1 * 3G container
    am1.allocate("*", 3 * GB, 1, null);

    // am2 asks 4 * 5G container
    am2.allocate("*", 5 * GB, 4, null);

    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
    RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
    RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());

    // Do node heartbeats one, we expect one container allocated reserved on nm1
    cs.handle(new NodeUpdateSchedulerEvent(rmNode1));

    FiCaSchedulerApp schedulerApp1 =
        cs.getApplicationAttempt(am1.getApplicationAttemptId());

    // App1 will get 1 container reserved
    assertEquals(1, schedulerApp1.getReservedContainers().size());

    /*
     * Note that the behavior of appAttemptResourceUsage is different from queue's
     * For queue, used = actual-used + reserved
     * For app, used = actual-used.
     *
     * TODO (wangda): Need to make behaviors of queue/app's resource usage
     * consistent
     */
    assertEquals(2 * GB,
        schedulerApp1.getAppAttemptResourceUsage().getUsed().getMemorySize());
    assertEquals(3 * GB,
        schedulerApp1.getAppAttemptResourceUsage().getReserved()
            .getMemorySize());
    assertEquals(3 * GB,
        schedulerApp1.getAppAttemptResourceUsage().getPending()
            .getMemorySize());

    FiCaSchedulerApp schedulerApp2 =
        cs.getApplicationAttempt(am2.getApplicationAttemptId());
    assertEquals(4 * GB,
        schedulerApp2.getAppAttemptResourceUsage().getUsed().getMemorySize());
    assertEquals(0 * GB,
        schedulerApp2.getAppAttemptResourceUsage().getReserved()
            .getMemorySize());
    assertEquals(5 * 4 * GB,
        schedulerApp2.getAppAttemptResourceUsage().getPending()
            .getMemorySize());

    LeafQueue lq = (LeafQueue) cs.getQueue("default");

    // UL = 8GB, so head room of u1 = 8GB - 2GB (AM) - 3GB (Reserved) = 3GB
    //                           u2 = 8GB - 4GB = 4GB
    // When not deduct reserved, total-pending = 3G (u1) + 4G (u2) = 7G
    //          deduct reserved, total-pending = 0G (u1) + 4G = 4G
    assertEquals(7 * GB, lq.getTotalPendingResourcesConsideringUserLimit(
        Resources.createResource(20 * GB), "", false).getMemorySize());
    assertEquals(4 * GB, lq.getTotalPendingResourcesConsideringUserLimit(
        Resources.createResource(20 * GB), "", true).getMemorySize());
    rm1.close();
  }


  @Test
  @Timeout(value = 60)
  public void testQueuePriorityOrdering() throws Exception {
    CapacitySchedulerConfiguration newConf =
        (CapacitySchedulerConfiguration) TestUtils
            .getConfigurationWithMultipleQueues(conf);

    // Set ordering policy
    newConf.setQueueOrderingPolicy(ROOT,
        CapacitySchedulerConfiguration.QUEUE_PRIORITY_UTILIZATION_ORDERING_POLICY);

    // Set maximum capacity of A to 20
    newConf.setMaximumCapacity(A, 20);
    newConf.setQueuePriority(C, 1);
    newConf.setQueuePriority(B, 2);
    newConf.setQueuePriority(A, 3);

    MockRM rm1 = new MockRM(newConf);

    rm1.getRMContext().setNodeLabelManager(mgr);
    rm1.start();
    MockNM nm1 = rm1.registerNode("h1:1234", 100 * GB);

    // launch an app to queue A, AM container should be launched in nm1
    MockRMAppSubmissionData data2 =
        MockRMAppSubmissionData.Builder.createWithMemory(2 * GB, rm1)
            .withAppName("app")
            .withUser("user")
            .withAcls(null)
            .withQueue("a")
            .withUnmanagedAM(false)
            .build();
    RMApp app1 = MockRMAppSubmitter.submit(rm1, data2);
    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);

    // launch an app to queue B, AM container should be launched in nm1
    MockRMAppSubmissionData data1 =
        MockRMAppSubmissionData.Builder.createWithMemory(2 * GB, rm1)
            .withAppName("app")
            .withUser("user")
            .withAcls(null)
            .withQueue("b")
            .withUnmanagedAM(false)
            .build();
    RMApp app2 = MockRMAppSubmitter.submit(rm1, data1);
    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1);

    // launch an app to queue C, AM container should be launched in nm1
    MockRMAppSubmissionData data =
        MockRMAppSubmissionData.Builder.createWithMemory(2 * GB, rm1)
            .withAppName("app")
            .withUser("user")
            .withAcls(null)
            .withQueue("c")
            .withUnmanagedAM(false)
            .build();
    RMApp app3 = MockRMAppSubmitter.submit(rm1, data);
    MockAM am3 = MockRM.launchAndRegisterAM(app3, rm1, nm1);

    // Each application asks 10 * 5GB containers
    am1.allocate("*", 5 * GB, 10, null);
    am2.allocate("*", 5 * GB, 10, null);
    am3.allocate("*", 5 * GB, 10, null);

    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
    RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());

    FiCaSchedulerApp schedulerApp1 =
        cs.getApplicationAttempt(am1.getApplicationAttemptId());
    FiCaSchedulerApp schedulerApp2 =
        cs.getApplicationAttempt(am2.getApplicationAttemptId());
    FiCaSchedulerApp schedulerApp3 =
        cs.getApplicationAttempt(am3.getApplicationAttemptId());

    // container will be allocated to am1
    // App1 will get 2 container allocated (plus AM container)
    cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
    assertEquals(2, schedulerApp1.getLiveContainers().size());
    assertEquals(1, schedulerApp2.getLiveContainers().size());
    assertEquals(1, schedulerApp3.getLiveContainers().size());

    // container will be allocated to am1 again,
    // App1 will get 3 container allocated (plus AM container)
    cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
    assertEquals(3, schedulerApp1.getLiveContainers().size());
    assertEquals(1, schedulerApp2.getLiveContainers().size());
    assertEquals(1, schedulerApp3.getLiveContainers().size());

    // (Now usages of queues: a=12G (satisfied), b=2G, c=2G)

    // container will be allocated to am2 (since app1 reaches its guaranteed
    // capacity)
    // App2 will get 2 container allocated (plus AM container)
    cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
    assertEquals(3, schedulerApp1.getLiveContainers().size());
    assertEquals(2, schedulerApp2.getLiveContainers().size());
    assertEquals(1, schedulerApp3.getLiveContainers().size());

    // Do this 3 times
    // container will be allocated to am2 (since app1 reaches its guaranteed
    // capacity)
    // App2 will get 2 container allocated (plus AM container)
    for (int i = 0; i < 3; i++) {
      cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
    }
    assertEquals(3, schedulerApp1.getLiveContainers().size());
    assertEquals(5, schedulerApp2.getLiveContainers().size());
    assertEquals(1, schedulerApp3.getLiveContainers().size());

    // (Now usages of queues: a=12G (satisfied), b=22G (satisfied), c=2G))

    // Do this 10 times
    for (int i = 0; i < 10; i++) {
      cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
    }
    assertEquals(3, schedulerApp1.getLiveContainers().size());
    assertEquals(5, schedulerApp2.getLiveContainers().size());
    assertEquals(11, schedulerApp3.getLiveContainers().size());

    // (Now usages of queues: a=12G (satisfied), b=22G (satisfied),
    // c=52G (satisfied and no pending))

    // Do this 20 times, we can only allocate 3 containers, 1 to A and 3 to B
    for (int i = 0; i < 20; i++) {
      cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
    }
    assertEquals(4, schedulerApp1.getLiveContainers().size());
    assertEquals(6, schedulerApp2.getLiveContainers().size());
    assertEquals(11, schedulerApp3.getLiveContainers().size());

    // (Now usages of queues: a=17G (satisfied), b=27G (satisfied), c=52G))

    rm1.close();
  }



  @Test
  @Timeout(value = 60)
  public void testUserLimitAllocationMultipleContainers() throws Exception {
    CapacitySchedulerConfiguration newConf =
        (CapacitySchedulerConfiguration) TestUtils
            .getConfigurationWithMultipleQueues(conf);
    // make sure an unlimited number of containers can be assigned,
    // overriding the default of 100 after YARN-8896
    newConf.set(MAX_ASSIGN_PER_HEARTBEAT, "-1");
    newConf.setUserLimit(C, 50);
    MockRM rm1 = new MockRM(newConf);

    rm1.getRMContext().setNodeLabelManager(mgr);
    rm1.start();
    MockNM nm1 = rm1.registerNode("h1:1234", 1000 * GB);

    // launch app from 1st user to queue C, AM container should be launched in nm1
    MockRMAppSubmissionData data1 =
        MockRMAppSubmissionData.Builder.createWithMemory(2 * GB, rm1)
            .withAppName("app")
            .withUser("user1")
            .withAcls(null)
            .withQueue("c")
            .withUnmanagedAM(false)
            .build();
    RMApp app1 = MockRMAppSubmitter.submit(rm1, data1);
    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);

    // launch app from 2nd user to queue C, AM container should be launched in nm1
    MockRMAppSubmissionData data =
        MockRMAppSubmissionData.Builder.createWithMemory(2 * GB, rm1)
            .withAppName("app")
            .withUser("user2")
            .withAcls(null)
            .withQueue("c")
            .withUnmanagedAM(false)
            .build();
    RMApp app2 = MockRMAppSubmitter.submit(rm1, data);
    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1);

    // Each application asks 1000 * 5GB containers
    am1.allocate("*", 5 * GB, 1000, null);
    am1.allocate("h1", 5 * GB, 1000, null);
    am1.allocate(NetworkTopology.DEFAULT_RACK, 5 * GB, 1000, null);

    // Each application asks 1000 * 5GB containers
    am2.allocate("*", 5 * GB, 1000, null);
    am2.allocate("h1", 5 * GB, 1000, null);
    am2.allocate(NetworkTopology.DEFAULT_RACK, 5 * GB, 1000, null);

    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
    RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());

    FiCaSchedulerApp schedulerApp1 =
        cs.getApplicationAttempt(am1.getApplicationAttemptId());
    FiCaSchedulerApp schedulerApp2 =
        cs.getApplicationAttempt(am2.getApplicationAttemptId());

    // container will be allocated to am1
    // App1 will get 2 container allocated (plus AM container)
    cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
    assertEquals(101, schedulerApp1.getLiveContainers().size());
    assertEquals(100, schedulerApp2.getLiveContainers().size());

    rm1.close();
  }

  @Test
  public void testActiveUsersWithOnlyPendingApps() throws Exception {

    CapacitySchedulerConfiguration newConf =
        new CapacitySchedulerConfiguration(conf);
    newConf.setMaximumAMResourcePercentPerPartition(DEFAULT, "", 0.2f);
    MockRM rm1 = new MockRM(newConf);

    rm1.getRMContext().setNodeLabelManager(mgr);
    rm1.start();
    MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB);

    MockRMAppSubmissionData data3 =
        MockRMAppSubmissionData.Builder.createWithMemory(1 * GB, rm1)
            .withAppName("app")
            .withUser("u1")
            .withAcls(null)
            .withQueue("default")
            .withUnmanagedAM(false)
            .build();
    RMApp app1 = MockRMAppSubmitter.submit(rm1, data3);
    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);

    MockRMAppSubmissionData data2 =
        MockRMAppSubmissionData.Builder.createWithMemory(1 * GB, rm1)
            .withAppName("app")
            .withUser("u2")
            .withAcls(null)
            .withQueue("default")
            .withUnmanagedAM(false)
            .build();
    RMApp app2 = MockRMAppSubmitter.submit(rm1, data2);
    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1);

    MockRMAppSubmissionData data1 =
        MockRMAppSubmissionData.Builder.createWithMemory(1 * GB, rm1)
            .withAppName("app")
            .withUser("u3")
            .withAcls(null)
            .withQueue("default")
            .withUnmanagedAM(false)
            .build();
    RMApp app3 = MockRMAppSubmitter.submit(rm1, data1);

    MockRMAppSubmissionData data =
        MockRMAppSubmissionData.Builder.createWithMemory(1 * GB, rm1)
            .withAppName("app")
            .withUser("u4")
            .withAcls(null)
            .withQueue("default")
            .withUnmanagedAM(false)
            .build();
    RMApp app4 = MockRMAppSubmitter.submit(rm1, data);

    // Each application asks 50 * 1GB containers
    am1.allocate("*", 1 * GB, 50, null);
    am2.allocate("*", 1 * GB, 50, null);

    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
    RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());

    for (int i = 0; i < 10; i++) {
      cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
      Thread.sleep(1000);
    }
    LeafQueue lq = (LeafQueue) cs.getQueue("default");
    UsersManager um = (UsersManager) lq.getAbstractUsersManager();

    assertEquals(4, um.getNumActiveUsers());
    assertEquals(2, um.getNumActiveUsersWithOnlyPendingApps());
    assertEquals(2, lq.getMetrics().getAppsPending());
    rm1.close();
  }

  @Test
  @Timeout(value = 60)
  public void testUnreserveWhenClusterResourceHasEmptyResourceType()
      throws Exception {
    /**
     * Test case:
     * Create a cluster with two nodes whose node resource both are
     * <8GB, 8core, 0>, create queue "a" whose max-resource is <8GB, 8 core, 0>,
     * submit app1 to queue "a" whose am use <1GB, 1 core, 0> and launch on nm1,
     * submit app2 to queue "b" whose am use <1GB, 1 core, 0> and launch on nm1,
     * app1 asks two <7GB, 1core> containers and nm1 do 1 heartbeat,
     * then scheduler reserves one container on nm1.
     *
     * After nm2 do next node heartbeat, scheduler should unreserve the reserved
     * container on nm1 then allocate a container on nm2.
     */
    CustomResourceTypesConfigurationProvider.
        initResourceTypes("resource1");
    CapacitySchedulerConfiguration newConf =
        (CapacitySchedulerConfiguration) TestUtils
            .getConfigurationWithMultipleQueues(conf);
    newConf.setClass(CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS,
        DominantResourceCalculator.class, ResourceCalculator.class);
    newConf
        .setBoolean(TestResourceProfiles.TEST_CONF_RESET_RESOURCE_TYPES, false);
    // Set maximum capacity of queue "a" to 50
    newConf.setMaximumCapacity(A, 50);
    MockRM rm1 = new MockRM(newConf);

    RMNodeLabelsManager nodeLabelsManager = new NullRMNodeLabelsManager();
    nodeLabelsManager.init(newConf);
    rm1.getRMContext().setNodeLabelManager(nodeLabelsManager);
    rm1.start();
    MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB);
    MockNM nm2 = rm1.registerNode("h2:1234", 8 * GB);

    // launch an app to queue "a", AM container should be launched on nm1
    MockRMAppSubmissionData data1 =
        MockRMAppSubmissionData.Builder.createWithMemory(1 * GB, rm1)
            .withAppName("app")
            .withUser("user")
            .withAcls(null)
            .withQueue("a")
            .withUnmanagedAM(false)
            .build();
    RMApp app1 = MockRMAppSubmitter.submit(rm1, data1);
    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);

    // launch another app to queue "b", AM container should be launched on nm1
    MockRMAppSubmissionData data =
        MockRMAppSubmissionData.Builder.createWithMemory(1 * GB, rm1)
            .withAppName("app")
            .withUser("user")
            .withAcls(null)
            .withQueue("b")
            .withUnmanagedAM(false)
            .build();
    RMApp app2 = MockRMAppSubmitter.submit(rm1, data);
    MockRM.launchAndRegisterAM(app2, rm1, nm1);

    am1.allocate("*", 7 * GB, 2, new ArrayList<ContainerId>());

    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
    RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
    RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
    FiCaSchedulerApp schedulerApp1 =
        cs.getApplicationAttempt(am1.getApplicationAttemptId());

    // Do nm1 heartbeats 1 times, will reserve a container on nm1 for app1
    cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
    assertEquals(1, schedulerApp1.getLiveContainers().size());
    assertEquals(1, schedulerApp1.getReservedContainers().size());

    // Do nm2 heartbeats 1 times, will unreserve a container on nm1
    // and allocate a container on nm2 for app1
    cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
    assertEquals(2, schedulerApp1.getLiveContainers().size());
    assertEquals(0, schedulerApp1.getReservedContainers().size());

    rm1.close();
  }

  @Test
  @Timeout(value = 60)
  public void testAllocationCannotBeBlockedWhenFormerQueueReachedItsLimit()
      throws Exception {
    /**
     * Queue structure:
     * <pre>
     *             Root
     *            /  |  \
     *           a   b   c
     *          10   20  70
     *                   |  \
     *                  c1   c2
     *           10(max=10)  90
     * </pre>
     * Test case:
     * Create a cluster with two nodes whose node resource both are
     * <10GB, 10core>, create queues as above, among them max-capacity of "c1"
     * is 10 and others are all 100, so that max-capacity of queue "c1" is
     * <2GB, 2core>,
     * submit app1 to queue "c1" and launch am1(resource=<1GB, 1 core>) on nm1,
     * submit app2 to queue "b" and launch am2(resource=<1GB, 1 core>) on nm1,
     * app1 and app2 both ask one <2GB, 1core> containers
     *
     * Now queue "c" has lower capacity percentage than queue "b", the
     * allocation sequence will be "a" -> "c" -> "b", queue "c1" has reached
     * queue limit so that requests of app1 should be pending
     *
     * After nm1 do 1 heartbeat, scheduler should allocate one container for
     * app2 on nm1.
     */
    CapacitySchedulerConfiguration newConf =
        (CapacitySchedulerConfiguration) TestUtils
            .getConfigurationWithMultipleQueues(conf);
    newConf.setQueues(C,
        new String[] { "c1", "c2" });
    newConf.setCapacity(C1, 10);
    newConf
        .setMaximumCapacity(C1, 10);
    newConf.setCapacity(C2, 90);
    newConf.setClass(CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS,
        DominantResourceCalculator.class, ResourceCalculator.class);

    MockRM rm1 = new MockRM(newConf);

    RMNodeLabelsManager nodeLabelsManager = new NullRMNodeLabelsManager();
    nodeLabelsManager.init(newConf);
    rm1.getRMContext().setNodeLabelManager(nodeLabelsManager);
    rm1.start();
    MockNM nm1 = rm1.registerNode("h1:1234", 10 * GB);
    MockNM nm2 = rm1.registerNode("h2:1234", 10 * GB);

    // launch an app to queue "c1", AM container should be launched on nm1
    MockRMAppSubmissionData data1 =
        MockRMAppSubmissionData.Builder.createWithMemory(1 * GB, rm1)
            .withAppName("app")
            .withUser("user")
            .withAcls(null)
            .withQueue("c1")
            .withUnmanagedAM(false)
            .build();
    RMApp app1 = MockRMAppSubmitter.submit(rm1, data1);
    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);

    // launch another app to queue "b", AM container should be launched on nm1
    MockRMAppSubmissionData data =
        MockRMAppSubmissionData.Builder.createWithMemory(1 * GB, rm1)
            .withAppName("app")
            .withUser("user")
            .withAcls(null)
            .withQueue("b")
            .withUnmanagedAM(false)
            .build();
    RMApp app2 = MockRMAppSubmitter.submit(rm1, data);
    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1);

    am1.allocate("*", 2 * GB, 1, new ArrayList<ContainerId>());
    am2.allocate("*", 2 * GB, 1, new ArrayList<ContainerId>());

    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
    RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
    FiCaSchedulerApp schedulerApp1 =
        cs.getApplicationAttempt(am1.getApplicationAttemptId());
    FiCaSchedulerApp schedulerApp2 =
        cs.getApplicationAttempt(am2.getApplicationAttemptId());

    // Do nm1 heartbeats 1 times, will allocate a container on nm1 for app2
    cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
    rm1.drainEvents();
    assertEquals(1, schedulerApp1.getLiveContainers().size());
    assertEquals(2, schedulerApp2.getLiveContainers().size());

    rm1.close();
  }

  @Test
  @Timeout(value = 60)
  public void testContainerRejectionWhenAskBeyondDynamicMax()
      throws Exception {
    CapacitySchedulerConfiguration newConf =
        (CapacitySchedulerConfiguration) TestUtils
            .getConfigurationWithMultipleQueues(conf);
    newConf.setClass(CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS,
        DominantResourceCalculator.class, ResourceCalculator.class);
    newConf.set(QueuePrefixes.getQueuePrefix(A)
        + MAXIMUM_ALLOCATION_MB, "4096");

    MockRM rm1 = new MockRM(newConf);
    rm1.start();

    // before any node registered or before registration timeout,
    // submit an app beyond queue max leads to failure.
    boolean submitFailed = false;
    MockNM nm1 = rm1.registerNode("h1:1234", 2 * GB, 1);
    MockRMAppSubmissionData data =
        MockRMAppSubmissionData.Builder.createWithMemory(1 * GB, rm1)
            .withAppName("app")
            .withUser("user")
            .withAcls(null)
            .withQueue("a")
            .withUnmanagedAM(false)
            .build();
    RMApp app1 = MockRMAppSubmitter.submit(rm1, data);
    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
    try {
      am1.allocate("*", 5 * GB, 1, null);
    } catch (InvalidResourceRequestException e) {
      submitFailed = true;
    }
    assertTrue(submitFailed);

    // Ask 4GB succeeded.
    am1.allocate("*", 4 * GB, 1, null);

    // Add a new node, now the cluster maximum should be refreshed to 3GB.
    CapacityScheduler cs = (CapacityScheduler)rm1.getResourceScheduler();
    cs.getNodeTracker().setForceConfiguredMaxAllocation(false);
    rm1.registerNode("h2:1234", 3 * GB, 1);

    // Now ask 4 GB will fail
    submitFailed = false;
    try {
      am1.allocate("*", 4 * GB, 1, null);
    } catch (InvalidResourceRequestException e) {
      submitFailed = true;
    }
    assertTrue(submitFailed);

    // But ask 3 GB succeeded.
    am1.allocate("*", 3 * GB, 1, null);

    rm1.close();
  }
}