TestCapacityScheduler.java

/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;

import static org.apache.hadoop.yarn.server.resourcemanager.MockNM.createMockNodeStatus;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfigGeneratorForTest.setMaxAllocMb;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfigGeneratorForTest.setMaxAllocVcores;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfigGeneratorForTest.setMinAllocMb;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfigGeneratorForTest.setMinAllocVcores;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.findQueue;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.setupAdditionalQueues;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.setupBlockedQueueConfiguration;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.setupOtherBlockedQueueConfiguration;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.setupQueueConfiguration;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.A;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.A1;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.A2;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.A3;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.A_CAPACITY;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.B;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.B1;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.B1_CAPACITY;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.B2;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.B2_PATH;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.B_CAPACITY;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.B_PATH;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.DEFAULT;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.ROOT;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.GB;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.appHelper;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.checkApplicationResourceUsage;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.checkNodeResourceUsage;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.checkPendingResource;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.checkPendingResourceGreaterThanZero;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.createMockRMContext;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.createResourceManager;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.nodeUpdate;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.registerNode;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.setUpMove;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.stopResourceManager;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.toSet;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.waitforNMRegistered;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestCapacitySchedulerOvercommit.assertContainerKilled;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestCapacitySchedulerOvercommit.assertMemory;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestCapacitySchedulerOvercommit.assertNoPreemption;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestCapacitySchedulerOvercommit.assertPreemption;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestCapacitySchedulerOvercommit.assertTime;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestCapacitySchedulerOvercommit.updateNodeResource;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestCapacitySchedulerOvercommit.waitMemory;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.anyBoolean;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;

import java.net.InetSocketAddress;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

import org.apache.hadoop.util.Sets;
import org.apache.hadoop.service.ServiceStateException;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.Groups;
import org.apache.hadoop.security.ShellBasedUnixGroupsMapping;
import org.apache.hadoop.security.TestGroupsCaching;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.LocalConfigurationProvider;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.ContainerUpdateType;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.PreemptionMessage;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.server.resourcemanager.Application;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmissionData;
import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmitter;
import org.apache.hadoop.yarn.server.resourcemanager.NodeManager;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.Task;
import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MockRMWithAMS;
import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MyContainerManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.resource.TestResourceProfiles;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueResourceQuotas;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.CSQueueMetricsForCustomResources;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestQueueMetricsForCustomResources;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator.AllocationState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator.ContainerAllocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.
    ContainerExpiredSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FairOrderingPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.IteratorSelector;
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

import java.util.function.Supplier;
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableMap;
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

public class TestCapacityScheduler {
  private static final Logger LOG =
      LoggerFactory.getLogger(TestCapacityScheduler.class);
  private final static ContainerUpdates NULL_UPDATE_REQUESTS =
      new ContainerUpdates();
  private ResourceManager resourceManager = null;
  private RMContext mockContext;

  private static final double DELTA = 0.0001;

  @BeforeEach
  public void setUp() throws Exception {
    resourceManager = createResourceManager();
    mockContext = createMockRMContext();
  }

  @AfterEach
  public void tearDown() throws Exception {
    stopResourceManager(resourceManager);
  }

  @Test
  @Timeout(value = 30)
  public void testConfValidation() throws Exception {
    CapacityScheduler scheduler = new CapacityScheduler();
    scheduler.setRMContext(resourceManager.getRMContext());
    Configuration conf = new YarnConfiguration();

    setMinAllocMb(conf, 2048);
    setMaxAllocMb(conf, 1024);
    try {
      scheduler.init(conf);
      fail("Exception is expected because the min memory allocation is" +
        " larger than the max memory allocation.");
    } catch (YarnRuntimeException e) {
      // Exception is expected.
      assertTrue(e.getMessage().startsWith("Invalid resource scheduler memory"),
          "The thrown exception is not the expected one.");
    }

    conf = new YarnConfiguration();
    setMinAllocVcores(conf, 2);
    setMaxAllocVcores(conf, 1);
    try {
      scheduler.reinitialize(conf, mockContext);
      fail("Exception is expected because the min vcores allocation is" +
        " larger than the max vcores allocation.");
    } catch (YarnRuntimeException e) {
      // Exception is expected.
      assertTrue(e.getMessage().startsWith("Invalid resource scheduler vcores"),
          "The thrown exception is not the expected one.");
    }
  }

  @Test
  public void testCapacityScheduler() throws Exception {

    LOG.info("--- START: testCapacityScheduler ---");

    NodeStatus mockNodeStatus = createMockNodeStatus();

    // Register node1
    String host_0 = "host_0";
    NodeManager nm_0 =
        registerNode(resourceManager, host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK,
            Resources.createResource(4 * GB, 1), mockNodeStatus);

    // Register node2
    String host_1 = "host_1";
    NodeManager nm_1 =
        registerNode(resourceManager, host_1, 1234, 2345, NetworkTopology.DEFAULT_RACK,
            Resources.createResource(2 * GB, 1), mockNodeStatus);

    // ResourceRequest priorities
    Priority priority_0 = Priority.newInstance(0);
    Priority priority_1 = Priority.newInstance(1);

    // Submit an application
    Application application_0 = new Application("user_0", "a1", resourceManager);
    application_0.submit();

    application_0.addNodeManager(host_0, 1234, nm_0);
    application_0.addNodeManager(host_1, 1234, nm_1);

    Resource capability_0_0 = Resources.createResource(1 * GB, 1);
    application_0.addResourceRequestSpec(priority_1, capability_0_0);

    Resource capability_0_1 = Resources.createResource(2 * GB, 1);
    application_0.addResourceRequestSpec(priority_0, capability_0_1);

    Task task_0_0 = new Task(application_0, priority_1,
        new String[] {host_0, host_1});
    application_0.addTask(task_0_0);

    // Submit another application
    Application application_1 = new Application("user_1", "b2", resourceManager);
    application_1.submit();

    application_1.addNodeManager(host_0, 1234, nm_0);
    application_1.addNodeManager(host_1, 1234, nm_1);

    Resource capability_1_0 = Resources.createResource(3 * GB, 1);
    application_1.addResourceRequestSpec(priority_1, capability_1_0);

    Resource capability_1_1 = Resources.createResource(2 * GB, 1);
    application_1.addResourceRequestSpec(priority_0, capability_1_1);

    Task task_1_0 = new Task(application_1, priority_1,
        new String[] {host_0, host_1});
    application_1.addTask(task_1_0);

    // Send resource requests to the scheduler
    application_0.schedule();
    application_1.schedule();

    // Send a heartbeat to kick the tires on the Scheduler
    LOG.info("Kick!");

    // task_0_0 and task_1_0 allocated, used=4G
    nodeUpdate(resourceManager, nm_0);

    // nothing allocated
    nodeUpdate(resourceManager, nm_1);

    // Get allocations from the scheduler
    application_0.schedule();     // task_0_0 
    checkApplicationResourceUsage(1 * GB, application_0);

    application_1.schedule();     // task_1_0
    checkApplicationResourceUsage(3 * GB, application_1);

    checkNodeResourceUsage(4*GB, nm_0);  // task_0_0 (1G) and task_1_0 (3G)
    checkNodeResourceUsage(0*GB, nm_1);  // no tasks, 2G available

    LOG.info("Adding new tasks...");

    Task task_1_1 = new Task(application_1, priority_0,
        new String[] {ResourceRequest.ANY});
    application_1.addTask(task_1_1);

    application_1.schedule();

    Task task_0_1 = new Task(application_0, priority_0,
        new String[] {host_0, host_1});
    application_0.addTask(task_0_1);

    application_0.schedule();

    // Send a heartbeat to kick the tires on the Scheduler
    LOG.info("Sending hb from " + nm_0.getHostName());
    // nothing new, used=4G
    nodeUpdate(resourceManager, nm_0);

    LOG.info("Sending hb from " + nm_1.getHostName());
    // task_0_1 is prefer as locality, used=2G
    nodeUpdate(resourceManager, nm_1);

    // Get allocations from the scheduler
    LOG.info("Trying to allocate...");
    application_0.schedule();
    checkApplicationResourceUsage(1 * GB, application_0);

    application_1.schedule();
    checkApplicationResourceUsage(5 * GB, application_1);

    nodeUpdate(resourceManager, nm_0);
    nodeUpdate(resourceManager, nm_1);

    checkNodeResourceUsage(4*GB, nm_0);
    checkNodeResourceUsage(2*GB, nm_1);

    LOG.info("--- END: testCapacityScheduler ---");
  }

  @Test
  public void testNotAssignMultiple() throws Exception {
    LOG.info("--- START: testNotAssignMultiple ---");
    ResourceManager rm = new ResourceManager() {
      @Override
      protected RMNodeLabelsManager createNodeLabelManager() {
        RMNodeLabelsManager mgr = new NullRMNodeLabelsManager();
        mgr.init(getConfig());
        return mgr;
      }
    };
    CapacitySchedulerConfiguration csConf =
        new CapacitySchedulerConfiguration();
    csConf.setBoolean(
        CapacitySchedulerConfiguration.ASSIGN_MULTIPLE_ENABLED, false);
    setupQueueConfiguration(csConf);
    YarnConfiguration conf = new YarnConfiguration(csConf);
    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
        ResourceScheduler.class);
    rm.init(conf);
    rm.getRMContext().getContainerTokenSecretManager().rollMasterKey();
    rm.getRMContext().getNMTokenSecretManager().rollMasterKey();
    ((AsyncDispatcher) rm.getRMContext().getDispatcher()).start();
    RMContext mC = mock(RMContext.class);
    when(mC.getConfigurationProvider()).thenReturn(
        new LocalConfigurationProvider());

    NodeStatus mockNodeStatus = createMockNodeStatus();

    // Register node1
    String host0 = "host_0";
    NodeManager nm0 =
        registerNode(rm, host0, 1234, 2345, NetworkTopology.DEFAULT_RACK,
        Resources.createResource(10 * GB, 10), mockNodeStatus);

    // ResourceRequest priorities
    Priority priority0 = Priority.newInstance(0);
    Priority priority1 = Priority.newInstance(1);

    // Submit an application
    Application application0 = new Application("user_0", "a1", rm);
    application0.submit();
    application0.addNodeManager(host0, 1234, nm0);

    Resource capability00 = Resources.createResource(1 * GB, 1);
    application0.addResourceRequestSpec(priority0, capability00);

    Resource capability01 = Resources.createResource(2 * GB, 1);
    application0.addResourceRequestSpec(priority1, capability01);

    Task task00 =
        new Task(application0, priority0, new String[] {host0});
    Task task01 =
        new Task(application0, priority1, new String[] {host0});
    application0.addTask(task00);
    application0.addTask(task01);

    // Submit another application
    Application application1 = new Application("user_1", "b2", rm);
    application1.submit();
    application1.addNodeManager(host0, 1234, nm0);

    Resource capability10 = Resources.createResource(3 * GB, 1);
    application1.addResourceRequestSpec(priority0, capability10);

    Resource capability11 = Resources.createResource(4 * GB, 1);
    application1.addResourceRequestSpec(priority1, capability11);

    Task task10 = new Task(application1, priority0, new String[] {host0});
    Task task11 = new Task(application1, priority1, new String[] {host0});
    application1.addTask(task10);
    application1.addTask(task11);

    // Send resource requests to the scheduler
    application0.schedule();

    application1.schedule();

    // Send a heartbeat to kick the tires on the Scheduler
    LOG.info("Kick!");

    // task00, used=1G
    nodeUpdate(rm, nm0);

    // Get allocations from the scheduler
    application0.schedule();
    application1.schedule();
    // 1 Task per heart beat should be scheduled
    checkNodeResourceUsage(3 * GB, nm0); // task00 (1G)
    checkApplicationResourceUsage(0 * GB, application0);
    checkApplicationResourceUsage(3 * GB, application1);

    // Another heartbeat
    nodeUpdate(rm, nm0);
    application0.schedule();
    checkApplicationResourceUsage(1 * GB, application0);
    application1.schedule();
    checkApplicationResourceUsage(3 * GB, application1);
    checkNodeResourceUsage(4 * GB, nm0);
    LOG.info("--- END: testNotAssignMultiple ---");
    rm.stop();
  }

  @Test
  public void testAssignMultiple() throws Exception {
    LOG.info("--- START: testAssignMultiple ---");
    ResourceManager rm = new ResourceManager() {
      @Override
      protected RMNodeLabelsManager createNodeLabelManager() {
        RMNodeLabelsManager mgr = new NullRMNodeLabelsManager();
        mgr.init(getConfig());
        return mgr;
      }
    };
    CapacitySchedulerConfiguration csConf =
        new CapacitySchedulerConfiguration();
    csConf.setBoolean(
        CapacitySchedulerConfiguration.ASSIGN_MULTIPLE_ENABLED, true);
    // Each heartbeat will assign 2 containers at most
    csConf.setInt(CapacitySchedulerConfiguration.MAX_ASSIGN_PER_HEARTBEAT, 2);
    setupQueueConfiguration(csConf);
    YarnConfiguration conf = new YarnConfiguration(csConf);
    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
        ResourceScheduler.class);
    rm.init(conf);
    rm.getRMContext().getContainerTokenSecretManager().rollMasterKey();
    rm.getRMContext().getNMTokenSecretManager().rollMasterKey();
    ((AsyncDispatcher) rm.getRMContext().getDispatcher()).start();
    RMContext mC = mock(RMContext.class);
    when(mC.getConfigurationProvider()).thenReturn(
            new LocalConfigurationProvider());

    NodeStatus mockNodeStatus = createMockNodeStatus();

    // Register node1
    String host0 = "host_0";
    NodeManager nm0 =
        registerNode(rm, host0, 1234, 2345, NetworkTopology.DEFAULT_RACK,
        Resources.createResource(10 * GB, 10), mockNodeStatus);

    // ResourceRequest priorities
    Priority priority0 = Priority.newInstance(0);
    Priority priority1 = Priority.newInstance(1);

    // Submit an application
    Application application0 = new Application("user_0", "a1", rm);
    application0.submit();
    application0.addNodeManager(host0, 1234, nm0);

    Resource capability00 = Resources.createResource(1 * GB, 1);
    application0.addResourceRequestSpec(priority0, capability00);

    Resource capability01 = Resources.createResource(2 * GB, 1);
    application0.addResourceRequestSpec(priority1, capability01);

    Task task00 = new Task(application0, priority0, new String[] {host0});
    Task task01 = new Task(application0, priority1, new String[] {host0});
    application0.addTask(task00);
    application0.addTask(task01);

    // Submit another application
    Application application1 = new Application("user_1", "b2", rm);
    application1.submit();
    application1.addNodeManager(host0, 1234, nm0);

    Resource capability10 = Resources.createResource(3 * GB, 1);
    application1.addResourceRequestSpec(priority0, capability10);

    Resource capability11 = Resources.createResource(4 * GB, 1);
    application1.addResourceRequestSpec(priority1, capability11);

    Task task10 =
            new Task(application1, priority0, new String[] {host0});
    Task task11 =
            new Task(application1, priority1, new String[] {host0});
    application1.addTask(task10);
    application1.addTask(task11);

    // Send resource requests to the scheduler
    application0.schedule();

    application1.schedule();

    // Send a heartbeat to kick the tires on the Scheduler
    LOG.info("Kick!");

    // task_0_0, used=1G
    nodeUpdate(rm, nm0);

    // Get allocations from the scheduler
    application0.schedule();
    application1.schedule();
    // 1 Task per heart beat should be scheduled
    checkNodeResourceUsage(4 * GB, nm0); // task00 (1G)
    checkApplicationResourceUsage(1 * GB, application0);
    checkApplicationResourceUsage(3 * GB, application1);

    // Another heartbeat
    nodeUpdate(rm, nm0);
    application0.schedule();
    checkApplicationResourceUsage(3 * GB, application0);
    application1.schedule();
    checkApplicationResourceUsage(7 * GB, application1);
    checkNodeResourceUsage(10 * GB, nm0);
    LOG.info("--- END: testAssignMultiple ---");
    rm.stop();
  }

  @Test
  public void testMaximumCapacitySetup() {
    float delta = 0.0000001f;
    CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
    assertEquals(CapacitySchedulerConfiguration.MAXIMUM_CAPACITY_VALUE,
            conf.getNonLabeledQueueMaximumCapacity(A), delta);
    conf.setMaximumCapacity(A, 50.0f);
    assertEquals(50.0f, conf.getNonLabeledQueueMaximumCapacity(A), delta);
    conf.setMaximumCapacity(A, -1);
    assertEquals(CapacitySchedulerConfiguration.MAXIMUM_CAPACITY_VALUE,
            conf.getNonLabeledQueueMaximumCapacity(A), delta);
  }

  @Test
  public void testQueueMaximumAllocations() {
    CapacityScheduler scheduler = new CapacityScheduler();
    scheduler.setConf(new YarnConfiguration());
    scheduler.setRMContext(resourceManager.getRMContext());
    CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();

    setupQueueConfiguration(conf);
    setMaxAllocMb(conf, A1, 1024);
    setMaxAllocVcores(conf, A1, 1);

    scheduler.init(conf);
    scheduler.start();

    Resource maxAllocationForQueue =
        scheduler.getMaximumResourceCapability("a1");
    Resource maxAllocation1 = scheduler.getMaximumResourceCapability("");
    Resource maxAllocation2 = scheduler.getMaximumResourceCapability(null);
    Resource maxAllocation3 = scheduler.getMaximumResourceCapability();

    assertEquals(maxAllocation1, maxAllocation2);
    assertEquals(maxAllocation1, maxAllocation3);
    assertEquals(
        YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
        maxAllocation1.getMemorySize());
    assertEquals(
        YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
        maxAllocation1.getVirtualCores());

    assertEquals(1024, maxAllocationForQueue.getMemorySize());
    assertEquals(1, maxAllocationForQueue.getVirtualCores());
    scheduler.stop();
  }

  @Test
  public void testParseQueueWithAbsoluteResource() {
    String childQueue = "testQueue";
    String labelName = "testLabel";
    QueuePath childQueuePath = new QueuePath("root." + childQueue);

    CapacityScheduler cs = new CapacityScheduler();
    cs.setConf(new YarnConfiguration());
    cs.setRMContext(resourceManager.getRMContext());
    CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();

    conf.setQueues(ROOT, new String[] {childQueue});
    conf.setCapacity(childQueuePath, "[memory=20480,vcores=200]");
    conf.setAccessibleNodeLabels(childQueuePath,
        Sets.newHashSet(labelName));
    conf.setCapacityByLabel(ROOT, labelName, "[memory=10240,vcores=100]");
    conf.setCapacityByLabel(childQueuePath, labelName,
        "[memory=4096,vcores=10]");

    cs.init(conf);
    cs.start();

    Resource rootQueueLableCapacity =
        cs.getQueue("root").getQueueResourceQuotas()
            .getConfiguredMinResource(labelName);
    assertEquals(10240, rootQueueLableCapacity.getMemorySize());
    assertEquals(100, rootQueueLableCapacity.getVirtualCores());

    QueueResourceQuotas childQueueQuotas =
        cs.getQueue(childQueue).getQueueResourceQuotas();
    Resource childQueueCapacity = childQueueQuotas.getConfiguredMinResource();
    assertEquals(20480, childQueueCapacity.getMemorySize());
    assertEquals(200, childQueueCapacity.getVirtualCores());

    Resource childQueueLabelCapacity =
        childQueueQuotas.getConfiguredMinResource(labelName);
    assertEquals(4096, childQueueLabelCapacity.getMemorySize());
    assertEquals(10, childQueueLabelCapacity.getVirtualCores());
    cs.stop();
  }

  @Test
  public void testCapacitySchedulerInfo() throws Exception {
    QueueInfo queueInfo = resourceManager.getResourceScheduler().getQueueInfo("a", true, true);
    assertEquals("a", queueInfo.getQueueName(), "Queue Name should be a");
    assertEquals("root.a", queueInfo.getQueuePath(), "Queue Path should be root.a");
    assertEquals(2, queueInfo.getChildQueues().size(), "Child Queues size should be 2");

    List<QueueUserACLInfo> userACLInfo = resourceManager.getResourceScheduler().getQueueUserAclInfo();
    assertNotNull(userACLInfo);
    for (QueueUserACLInfo queueUserACLInfo : userACLInfo) {
      assertEquals(1, getQueueCount(userACLInfo,
          queueUserACLInfo.getQueueName()));
    }

  }

  private int getQueueCount(List<QueueUserACLInfo> queueInformation, String queueName) {
    int result = 0;
    for (QueueUserACLInfo queueUserACLInfo : queueInformation) {
      if (queueName.equals(queueUserACLInfo.getQueueName())) {
        result++;
      }
    }
    return result;
  }

  @Test
  public void testAllocateReorder() throws Exception {

    //Confirm that allocation (resource request) alone will trigger a change in
    //application ordering where appropriate

    Configuration conf = new Configuration();
    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
        ResourceScheduler.class);
    MockRM rm = new MockRM(conf);
    rm.start();
    CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();

    LeafQueue q = (LeafQueue) cs.getQueue("default");
    assertNotNull(q);

    FairOrderingPolicy fop = new FairOrderingPolicy();
    fop.setSizeBasedWeight(true);
    q.setOrderingPolicy(fop);

    String host = "127.0.0.1";
    RMNode node =
        MockNodes.newNodeInfo(0, MockNodes.newResource(4 * GB), 1, host);
    cs.handle(new NodeAddedSchedulerEvent(node));

    ApplicationAttemptId appAttemptId1 = appHelper(rm, cs, 100, 1, "default", "user");
    ApplicationAttemptId appAttemptId2 = appHelper(rm, cs, 100, 2, "default", "user");

    RecordFactory recordFactory =
      RecordFactoryProvider.getRecordFactory(null);

    Priority priority = TestUtils.createMockPriority(1);
    ResourceRequest r1 = TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, true, priority, recordFactory);

    //This will allocate for app1
    cs.allocate(appAttemptId1,
        Collections.<ResourceRequest>singletonList(r1), null, Collections.<ContainerId>emptyList(),
        null, null, NULL_UPDATE_REQUESTS);

    //And this will result in container assignment for app1
    CapacityScheduler.schedule(cs);

    //Verify that app1 is still first in assignment order
    //This happens because app2 has no demand/a magnitude of NaN, which
    //results in app1 and app2 being equal in the fairness comparison and
    //failling back to fifo (start) ordering
    assertEquals(q.getOrderingPolicy().getAssignmentIterator(
        IteratorSelector.EMPTY_ITERATOR_SELECTOR).next().getId(),
        appAttemptId1.getApplicationId().toString());

    //Now, allocate for app2 (this would be the first/AM allocation)
    ResourceRequest r2 = TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, true, priority, recordFactory);
    cs.allocate(appAttemptId2,
        Collections.<ResourceRequest>singletonList(r2), null, Collections.<ContainerId>emptyList(),
        null, null, NULL_UPDATE_REQUESTS);

    //In this case we do not perform container assignment because we want to
    //verify re-ordering based on the allocation alone

    //Now, the first app for assignment is app2
    assertEquals(q.getOrderingPolicy().getAssignmentIterator(
        IteratorSelector.EMPTY_ITERATOR_SELECTOR).next().getId(),
        appAttemptId2.getApplicationId().toString());

    rm.stop();
  }

  @Test
  public void testResourceOverCommit() throws Exception {
    Configuration conf = new Configuration();
    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
        ResourceScheduler.class);
    MockRM rm = new MockRM(conf);
    rm.start();
    ResourceScheduler scheduler = rm.getResourceScheduler();

    MockNM nm = rm.registerNode("127.0.0.1:1234", 4 * GB);
    NodeId nmId = nm.getNodeId();
    RMApp app = MockRMAppSubmitter.submitWithMemory(2048, rm);
    // kick the scheduling, 2 GB given to AM1, remaining 2GB on nm
    nm.nodeHeartbeat(true);
    RMAppAttempt attempt1 = app.getCurrentAppAttempt();
    MockAM am = rm.sendAMLaunched(attempt1.getAppAttemptId());
    am.registerAppAttempt();
    assertMemory(scheduler, nmId, 2 * GB, 2 * GB);

    // add request for 1 container of 2 GB
    am.addRequests(new String[] {"127.0.0.1", "127.0.0.2"}, 2 * GB, 1, 1);
    AllocateResponse alloc1Response = am.schedule(); // send the request

    // kick the scheduler, 2 GB given to AM1, resource remaining 0
    nm.nodeHeartbeat(true);
    while (alloc1Response.getAllocatedContainers().isEmpty()) {
      LOG.info("Waiting for containers to be created for app 1...");
      Thread.sleep(100);
      alloc1Response = am.schedule();
    }

    List<Container> allocated1 = alloc1Response.getAllocatedContainers();
    assertEquals(1, allocated1.size());
    Container c1 = allocated1.get(0);
    assertEquals(2 * GB, c1.getResource().getMemorySize());
    assertEquals(nmId, c1.getNodeId());

    // check node report, 4 GB used and 0 GB available
    assertMemory(scheduler, nmId, 4 * GB, 0);
    nm.nodeHeartbeat(true);
    assertEquals(4 * GB, nm.getCapability().getMemorySize());

    // update node resource to 2 GB, so resource is over-consumed
    updateNodeResource(rm, nmId, 2 * GB, 2, -1);
    // the used resource should still 4 GB and negative available resource
    waitMemory(scheduler, nmId, 4 * GB, -2 * GB, 200, 5 * 1000);
    // check that we did not get a preemption requests
    assertNoPreemption(am.schedule().getPreemptionMessage());

    // check that the NM got the updated resources
    nm.nodeHeartbeat(true);
    assertEquals(2 * GB, nm.getCapability().getMemorySize());

    // check container can complete successfully with resource over-commitment
    ContainerStatus containerStatus = BuilderUtils.newContainerStatus(
        c1.getId(), ContainerState.COMPLETE, "", 0, c1.getResource());
    nm.containerStatus(containerStatus);

    LOG.info("Waiting for containers to be finished for app 1...");
    GenericTestUtils.waitFor(
        () -> attempt1.getJustFinishedContainers().size() == 1, 100, 2000);
    assertEquals(1, am.schedule().getCompletedContainersStatuses().size());
    assertMemory(scheduler, nmId, 2 * GB, 0);

    // verify no NPE is trigger in schedule after resource is updated
    am.addRequests(new String[] {"127.0.0.1", "127.0.0.2"}, 3 * GB, 1, 1);
    AllocateResponse allocResponse2 = am.schedule();
    assertTrue(allocResponse2.getAllocatedContainers().isEmpty(),
        "Shouldn't have enough resource to allocate containers");
    // try 10 times as scheduling is an async process
    for (int i = 0; i < 10; i++) {
      Thread.sleep(100);
      allocResponse2 = am.schedule();
      assertTrue(allocResponse2.getAllocatedContainers().isEmpty(),
          "Shouldn't have enough resource to allocate containers");
    }

    // increase the resources again to 5 GB to schedule the 3GB container
    updateNodeResource(rm, nmId, 5 * GB, 2, -1);
    waitMemory(scheduler, nmId, 2 * GB, 3 * GB, 100, 5 * 1000);

    // kick the scheduling and check it took effect
    nm.nodeHeartbeat(true);
    while (allocResponse2.getAllocatedContainers().isEmpty()) {
      LOG.info("Waiting for containers to be created for app 1...");
      Thread.sleep(100);
      allocResponse2 = am.schedule();
    }
    assertEquals(1, allocResponse2.getAllocatedContainers().size());
    Container c2 = allocResponse2.getAllocatedContainers().get(0);
    assertEquals(3 * GB, c2.getResource().getMemorySize());
    assertEquals(nmId, c2.getNodeId());
    assertMemory(scheduler, nmId, 5 * GB, 0);

    // reduce the resources and trigger a preempt request to the AM for c2
    updateNodeResource(rm, nmId, 3 * GB, 2, 2 * 1000);
    waitMemory(scheduler, nmId, 5 * GB, -2 * GB, 200, 5 * 1000);

    PreemptionMessage preemptMsg = am.schedule().getPreemptionMessage();
    assertPreemption(c2.getId(), preemptMsg);

    // increasing the resources again, should stop killing the containers
    updateNodeResource(rm, nmId, 5 * GB, 2, -1);
    waitMemory(scheduler, nmId, 5 * GB, 0, 200, 5 * 1000);
    Thread.sleep(3 * 1000);
    assertMemory(scheduler, nmId, 5 * GB, 0);

    // reduce the resources again to trigger a preempt request to the AM for c2
    long t0 = Time.now();
    updateNodeResource(rm, nmId, 3 * GB, 2, 2 * 1000);
    waitMemory(scheduler, nmId, 5 * GB, -2 * GB, 200, 5 * 1000);

    preemptMsg = am.schedule().getPreemptionMessage();
    assertPreemption(c2.getId(), preemptMsg);

    // wait until the scheduler kills the container
    GenericTestUtils.waitFor(() -> {
      try {
        nm.nodeHeartbeat(true); // trigger preemption in the NM
      } catch (Exception e) {
        LOG.error("Cannot heartbeat", e);
      }
      SchedulerNodeReport report = scheduler.getNodeReport(nmId);
      return report.getAvailableResource().getMemorySize() > 0;
    }, 200, 5 * 1000);
    assertMemory(scheduler, nmId, 2 * GB, 1 * GB);

    List<ContainerStatus> completedContainers =
        am.schedule().getCompletedContainersStatuses();
    assertEquals(1, completedContainers.size());
    ContainerStatus c2status = completedContainers.get(0);
    assertContainerKilled(c2.getId(), c2status);

    assertTime(2000, Time.now() - t0);

    rm.stop();
  }

  @Test
  public void testAsyncScheduling() throws Exception {
    Configuration conf = new Configuration();
    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
        ResourceScheduler.class);
    MockRM rm = new MockRM(conf);
    rm.start();
    CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();

    final int NODES = 100;

    // Register nodes
    for (int i=0; i < NODES; ++i) {
      String host = "192.168.1." + i;
      RMNode node =
          MockNodes.newNodeInfo(0, MockNodes.newResource(4 * GB), 1, host);
      cs.handle(new NodeAddedSchedulerEvent(node));
    }

    // Now directly exercise the scheduling loop
    for (int i=0; i < NODES; ++i) {
      CapacityScheduler.schedule(cs);
    }
    rm.stop();
  }

  private void waitForAppPreemptionInfo(RMApp app, Resource preempted,
      int numAMPreempted, int numTaskPreempted,
      Resource currentAttemptPreempted, boolean currentAttemptAMPreempted,
      int numLatestAttemptTaskPreempted) throws InterruptedException {
    while (true) {
      RMAppMetrics appPM = app.getRMAppMetrics();
      RMAppAttemptMetrics attemptPM =
          app.getCurrentAppAttempt().getRMAppAttemptMetrics();

      if (appPM.getResourcePreempted().equals(preempted)
          && appPM.getNumAMContainersPreempted() == numAMPreempted
          && appPM.getNumNonAMContainersPreempted() == numTaskPreempted
          && attemptPM.getResourcePreempted().equals(currentAttemptPreempted)
          && app.getCurrentAppAttempt().getRMAppAttemptMetrics()
            .getIsPreempted() == currentAttemptAMPreempted
          && attemptPM.getNumNonAMContainersPreempted() ==
             numLatestAttemptTaskPreempted) {
        return;
      }
      Thread.sleep(500);
    }
  }

  private void waitForNewAttemptCreated(RMApp app,
      ApplicationAttemptId previousAttemptId) throws InterruptedException {
    while (app.getCurrentAppAttempt().equals(previousAttemptId)) {
      Thread.sleep(500);
    }
  }

  @Test
  @Timeout(value = 30)
  public void testAllocateDoesNotBlockOnSchedulerLock() throws Exception {
    final YarnConfiguration conf = new YarnConfiguration();
    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
        ResourceScheduler.class);
    MyContainerManager containerManager = new MyContainerManager();
    final MockRMWithAMS rm =
        new MockRMWithAMS(conf, containerManager);
    rm.start();

    MockNM nm1 = rm.registerNode("localhost:1234", 5120);

    Map<ApplicationAccessType, String> acls =
        new HashMap<ApplicationAccessType, String>(2);
    acls.put(ApplicationAccessType.VIEW_APP, "*");
    MockRMAppSubmissionData data =
        MockRMAppSubmissionData.Builder.createWithMemory(1024, rm)
            .withAppName("appname")
            .withUser("appuser")
            .withAcls(acls)
            .build();
    RMApp app = MockRMAppSubmitter.submit(rm, data);

    nm1.nodeHeartbeat(true);

    RMAppAttempt attempt = app.getCurrentAppAttempt();
    ApplicationAttemptId applicationAttemptId = attempt.getAppAttemptId();
    int msecToWait = 10000;
    int msecToSleep = 100;
    while (attempt.getAppAttemptState() != RMAppAttemptState.LAUNCHED
        && msecToWait > 0) {
      LOG.info("Waiting for AppAttempt to reach LAUNCHED state. "
          + "Current state is " + attempt.getAppAttemptState());
      Thread.sleep(msecToSleep);
      msecToWait -= msecToSleep;
    }
    assertEquals(attempt.getAppAttemptState(),
        RMAppAttemptState.LAUNCHED);

    // Create a client to the RM.
    final YarnRPC rpc = YarnRPC.create(conf);

    UserGroupInformation currentUser =
        UserGroupInformation.createRemoteUser(applicationAttemptId.toString());
    Credentials credentials = containerManager.getContainerCredentials();
    final InetSocketAddress rmBindAddress =
        rm.getApplicationMasterService().getBindAddress();
    Token<? extends TokenIdentifier> amRMToken =
        MockRMWithAMS.setupAndReturnAMRMToken(rmBindAddress,
          credentials.getAllTokens());
    currentUser.addToken(amRMToken);
    ApplicationMasterProtocol client =
        currentUser.doAs(new PrivilegedAction<ApplicationMasterProtocol>() {
          @Override
          public ApplicationMasterProtocol run() {
            return (ApplicationMasterProtocol) rpc.getProxy(
              ApplicationMasterProtocol.class, rmBindAddress, conf);
          }
        });

    RegisterApplicationMasterRequest request =
        RegisterApplicationMasterRequest.newInstance("localhost", 12345, "");
    client.registerApplicationMaster(request);

    // Allocate a container
    List<ResourceRequest> asks = Collections.singletonList(
        ResourceRequest.newInstance(
            Priority.newInstance(1), "*", Resources.createResource(2 * GB), 1));
    AllocateRequest allocateRequest =
        AllocateRequest.newInstance(0, 0.0f, asks, null, null);
    client.allocate(allocateRequest);

    // Make sure the container is allocated in RM
    nm1.nodeHeartbeat(true);
    ContainerId containerId2 =
        ContainerId.newContainerId(applicationAttemptId, 2);
    assertTrue(rm.waitForState(nm1, containerId2,
        RMContainerState.ALLOCATED));

    // Acquire the container
    allocateRequest = AllocateRequest.newInstance(1, 0.0f, null, null, null);
    client.allocate(allocateRequest);

    // Launch the container
    final CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
    RMContainer rmContainer = cs.getRMContainer(containerId2);
    rmContainer.handle(
        new RMContainerEvent(containerId2, RMContainerEventType.LAUNCHED));

    // grab the scheduler lock from another thread
    // and verify an allocate call in this thread doesn't block on it
    final CyclicBarrier barrier = new CyclicBarrier(2);
    Thread otherThread = new Thread(new Runnable() {
      @Override
      public void run() {
        synchronized(cs) {
          try {
            barrier.await();
            barrier.await();
          } catch (InterruptedException | BrokenBarrierException e) {
            e.printStackTrace();
          }
        }
      }
    });
    otherThread.start();
    barrier.await();
    List<ContainerId> release = Collections.singletonList(containerId2);
    allocateRequest =
        AllocateRequest.newInstance(2, 0.0f, null, release, null);
    client.allocate(allocateRequest);
    barrier.await();
    otherThread.join();

    rm.stop();
  }

  @Test
  @Timeout(value = 120)
  public void testPreemptionInfo() throws Exception {
    Configuration conf = new Configuration();
    conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 3);
    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
        ResourceScheduler.class);
    int CONTAINER_MEMORY = 1024; // start RM
    MockRM rm1 = new MockRM(conf);
    rm1.start();

    // get scheduler
    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();

    // start NM
    MockNM nm1 =
        new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
    nm1.registerNode();

    // create app and launch the AM
    RMApp app0 = MockRMAppSubmitter.submitWithMemory(CONTAINER_MEMORY, rm1);
    MockAM am0 = MockRM.launchAM(app0, rm1, nm1);
    am0.registerAppAttempt();

    // get scheduler app
    FiCaSchedulerApp schedulerAppAttempt =
        cs.getSchedulerApplications().get(app0.getApplicationId())
            .getCurrentAppAttempt();

    // allocate some containers and launch them
    List<Container> allocatedContainers =
        am0.allocateAndWaitForContainers(3, CONTAINER_MEMORY, nm1);

    // kill the 3 containers
    for (Container c : allocatedContainers) {
      cs.markContainerForKillable(schedulerAppAttempt.getRMContainer(c.getId()));
    }

    // check values
    waitForAppPreemptionInfo(app0,
        Resource.newInstance(CONTAINER_MEMORY * 3, 3), 0, 3,
        Resource.newInstance(CONTAINER_MEMORY * 3, 3), false, 3);

    // kill app0-attempt0 AM container
    cs.markContainerForKillable(schedulerAppAttempt.getRMContainer(app0
        .getCurrentAppAttempt().getMasterContainer().getId()));

    // wait for app0 failed
    waitForNewAttemptCreated(app0, am0.getApplicationAttemptId());

    // check values
    waitForAppPreemptionInfo(app0,
        Resource.newInstance(CONTAINER_MEMORY * 4, 4), 1, 3,
        Resource.newInstance(0, 0), false, 0);

    // launch app0-attempt1
    MockAM am1 = MockRM.launchAM(app0, rm1, nm1);
    am1.registerAppAttempt();

    schedulerAppAttempt =
        cs.getSchedulerApplications().get(app0.getApplicationId())
            .getCurrentAppAttempt();

    // allocate some containers and launch them
    allocatedContainers =
        am1.allocateAndWaitForContainers(3, CONTAINER_MEMORY, nm1);
    for (Container c : allocatedContainers) {
      cs.markContainerForKillable(schedulerAppAttempt.getRMContainer(c.getId()));
    }

    // check values
    waitForAppPreemptionInfo(app0,
        Resource.newInstance(CONTAINER_MEMORY * 7, 7), 1, 6,
        Resource.newInstance(CONTAINER_MEMORY * 3, 3), false, 3);

    rm1.stop();
  }

  @Test
  @Timeout(value = 300)
  public void testRecoverRequestAfterPreemption() throws Exception {
    Configuration conf = new Configuration();
    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
        ResourceScheduler.class);
    MockRM rm1 = new MockRM(conf);
    rm1.start();
    MockNM nm1 = rm1.registerNode("127.0.0.1:1234", 8000);
    RMApp app1 = MockRMAppSubmitter.submitWithMemory(1024, rm1);
    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();

    // request a container.
    am1.allocate("127.0.0.1", 1024, 1, new ArrayList<ContainerId>());
    ContainerId containerId1 = ContainerId.newContainerId(
        am1.getApplicationAttemptId(), 2);
    rm1.waitForState(nm1, containerId1, RMContainerState.ALLOCATED);

    RMContainer rmContainer = cs.getRMContainer(containerId1);
    List<ResourceRequest> requests =
        rmContainer.getContainerRequest().getResourceRequests();
    FiCaSchedulerApp app = cs.getApplicationAttempt(am1
        .getApplicationAttemptId());

    FiCaSchedulerNode node = cs.getNode(rmContainer.getAllocatedNode());
    for (ResourceRequest request : requests) {
      // Skip the OffRack and RackLocal resource requests.
      if (request.getResourceName().equals(node.getRackName())
          || request.getResourceName().equals(ResourceRequest.ANY)) {
        continue;
      }

      // Already the node local resource request is cleared from RM after
      // allocation.
      assertEquals(0,
          app.getOutstandingAsksCount(SchedulerRequestKey.create(request),
              request.getResourceName()));
    }

    // Call killContainer to preempt the container
    cs.markContainerForKillable(rmContainer);

    assertEquals(3, requests.size());
    for (ResourceRequest request : requests) {
      // Resource request must have added back in RM after preempt event
      // handling.
      assertEquals(1,
          app.getOutstandingAsksCount(SchedulerRequestKey.create(request),
              request.getResourceName()));
    }

    // New container will be allocated and will move to ALLOCATED state
    ContainerId containerId2 = ContainerId.newContainerId(
        am1.getApplicationAttemptId(), 3);
    rm1.waitForState(nm1, containerId2, RMContainerState.ALLOCATED);

    // allocate container
    List<Container> containers = am1.allocate(new ArrayList<ResourceRequest>(),
        new ArrayList<ContainerId>()).getAllocatedContainers();

    // Now with updated ResourceRequest, a container is allocated for AM.
    assertTrue(containers.size() == 1);
    rm1.stop();
  }

  @Test
  public void testPreemptionDisabled() throws Exception {
    CapacityScheduler cs = new CapacityScheduler();
    CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
    conf.setBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, true);
    RMContextImpl rmContext =  new RMContextImpl(null, null, null, null, null,
        null, new RMContainerTokenSecretManager(conf),
        new NMTokenSecretManagerInRM(conf),
        new ClientToAMTokenSecretManagerInRM(), null);
    setupQueueConfiguration(conf);
    cs.setConf(new YarnConfiguration());
    cs.setRMContext(resourceManager.getRMContext());
    cs.init(conf);
    cs.start();
    cs.reinitialize(conf, rmContext);

    CSQueue rootQueue = cs.getRootQueue();
    CSQueue queueB = findQueue(rootQueue, B_PATH);
    CSQueue queueB2 = findQueue(queueB, B2_PATH);

    // When preemption turned on for the whole system
    // (yarn.resourcemanager.scheduler.monitor.enable=true), and with no other
    // preemption properties set, queue root.b.b2 should be preemptable.
    assertFalse(queueB2.getPreemptionDisabled(),
        "queue " + B2 + " should default to preemptable");

    // Disable preemption at the root queue level.
    // The preemption property should be inherited from root all the
    // way down so that root.b.b2 should NOT be preemptable.
    conf.setPreemptionDisabled(rootQueue.getQueuePathObject(), true);
    cs.reinitialize(conf, rmContext);
    assertTrue(queueB2.getPreemptionDisabled(),
        "queue " + B2 + " should have inherited non-preemptability from root");

    // Enable preemption for root (grandparent) but disable for root.b (parent).
    // root.b.b2 should inherit property from parent and NOT be preemptable
    conf.setPreemptionDisabled(rootQueue.getQueuePathObject(), false);
    conf.setPreemptionDisabled(queueB.getQueuePathObject(), true);
    cs.reinitialize(conf, rmContext);
    assertTrue(queueB2.getPreemptionDisabled(),
        "queue " + B2 + " should have inherited non-preemptability from parent");

    // When preemption is turned on for root.b.b2, it should be preemptable
    // even though preemption is disabled on root.b (parent).
    conf.setPreemptionDisabled(queueB2.getQueuePathObject(), false);
    cs.reinitialize(conf, rmContext);
    assertFalse(queueB2.getPreemptionDisabled(),
        "queue " + B2 + " should have been preemptable");
    cs.stop();
  }

  private void waitContainerAllocated(MockAM am, int mem, int nContainer,
      int startContainerId, MockRM rm, MockNM nm) throws Exception {
    for (int cId = startContainerId; cId < startContainerId + nContainer; cId++) {
      am.allocate("*", mem, 1, new ArrayList<ContainerId>());
      ContainerId containerId =
          ContainerId.newContainerId(am.getApplicationAttemptId(), cId);
      assertTrue(rm.waitForState(nm, containerId,
          RMContainerState.ALLOCATED));
    }
  }

  @Test
  public void testSchedulerKeyGarbageCollection() throws Exception {
    YarnConfiguration conf =
        new YarnConfiguration(new CapacitySchedulerConfiguration());
    conf.setBoolean(CapacitySchedulerConfiguration.ENABLE_USER_METRICS, true);

    MockRM rm = new MockRM(conf);
    rm.start();

    HashMap<NodeId, MockNM> nodes = new HashMap<>();
    MockNM nm1 = new MockNM("h1:1234", 4096, rm.getResourceTrackerService());
    nodes.put(nm1.getNodeId(), nm1);
    MockNM nm2 = new MockNM("h2:1234", 4096, rm.getResourceTrackerService());
    nodes.put(nm2.getNodeId(), nm2);
    MockNM nm3 = new MockNM("h3:1234", 4096, rm.getResourceTrackerService());
    nodes.put(nm3.getNodeId(), nm3);
    MockNM nm4 = new MockNM("h4:1234", 4096, rm.getResourceTrackerService());
    nodes.put(nm4.getNodeId(), nm4);
    nm1.registerNode();
    nm2.registerNode();
    nm3.registerNode();
    nm4.registerNode();

    MockRMAppSubmissionData data =
        MockRMAppSubmissionData.Builder.createWithMemory(1 * GB, rm)
            .withAppName("app")
            .withUser("user")
            .withAcls(null)
            .withQueue("default")
            .withUnmanagedAM(false)
            .build();
    RMApp app1 = MockRMAppSubmitter.submit(rm, data);
    ApplicationAttemptId attemptId =
        app1.getCurrentAppAttempt().getAppAttemptId();
    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2);
    ResourceScheduler scheduler = rm.getResourceScheduler();

    // All nodes 1 - 4 will be applicable for scheduling.
    nm1.nodeHeartbeat(true);
    nm2.nodeHeartbeat(true);
    nm3.nodeHeartbeat(true);
    nm4.nodeHeartbeat(true);

    Thread.sleep(1000);

    AllocateResponse allocateResponse = am1.allocate(
        Arrays.asList(
            newResourceRequest(1, 1, ResourceRequest.ANY,
                Resources.createResource(3 * GB), 1, true,
                ExecutionType.GUARANTEED),
            newResourceRequest(2, 2, ResourceRequest.ANY,
                Resources.createResource(3 * GB), 1, true,
                ExecutionType.GUARANTEED),
            newResourceRequest(3, 3, ResourceRequest.ANY,
                Resources.createResource(3 * GB), 1, true,
                ExecutionType.GUARANTEED),
            newResourceRequest(4, 4, ResourceRequest.ANY,
                Resources.createResource(3 * GB), 1, true,
                ExecutionType.GUARANTEED)
        ),
        null);
    List<Container> allocatedContainers = allocateResponse
        .getAllocatedContainers();
    assertEquals(0, allocatedContainers.size());

    Collection<SchedulerRequestKey> schedulerKeys =
        ((CapacityScheduler) scheduler).getApplicationAttempt(attemptId)
            .getAppSchedulingInfo().getSchedulerKeys();
    assertEquals(4, schedulerKeys.size());

    // Get a Node to HB... at which point 1 container should be
    // allocated
    nm1.nodeHeartbeat(true);
    Thread.sleep(200);
    allocateResponse =  am1.allocate(new ArrayList<>(), new ArrayList<>());
    allocatedContainers = allocateResponse.getAllocatedContainers();
    assertEquals(1, allocatedContainers.size());

    // Verify 1 outstanding schedulerKey is removed
    assertEquals(3, schedulerKeys.size());

    List <ResourceRequest> resReqs =
        ((CapacityScheduler) scheduler).getApplicationAttempt(attemptId)
            .getAppSchedulingInfo().getAllResourceRequests();

    // Verify 1 outstanding schedulerKey is removed from the
    // rrMap as well
    assertEquals(3, resReqs.size());

    // Verify One more container Allocation on node nm2
    // And ensure the outstanding schedulerKeys go down..
    nm2.nodeHeartbeat(true);
    Thread.sleep(200);

    // Update the allocateReq to send 0 numContainer req.
    // For the satisfied container...
    allocateResponse =  am1.allocate(Arrays.asList(
        newResourceRequest(1,
            allocatedContainers.get(0).getAllocationRequestId(),
            ResourceRequest.ANY,
            Resources.createResource(3 * GB), 0, true,
            ExecutionType.GUARANTEED)
        ),
        new ArrayList<>());
    allocatedContainers = allocateResponse.getAllocatedContainers();
    assertEquals(1, allocatedContainers.size());

    // Verify 1 outstanding schedulerKey is removed
    assertEquals(2, schedulerKeys.size());

    resReqs = ((CapacityScheduler) scheduler).getApplicationAttempt(attemptId)
        .getAppSchedulingInfo().getAllResourceRequests();
    // Verify the map size is not increased due to 0 req
    assertEquals(2, resReqs.size());

    // Now Verify that the AM can cancel 1 Ask:
    SchedulerRequestKey sk = schedulerKeys.iterator().next();
    am1.allocate(
        Arrays.asList(
            newResourceRequest(sk.getPriority().getPriority(),
                sk.getAllocationRequestId(),
                ResourceRequest.ANY, Resources.createResource(3 * GB), 0, true,
                ExecutionType.GUARANTEED)
        ),
        null);

    schedulerKeys =
        ((CapacityScheduler) scheduler).getApplicationAttempt(attemptId)
            .getAppSchedulingInfo().getSchedulerKeys();

    Thread.sleep(200);

    // Verify 1 outstanding schedulerKey is removed because of the
    // cancel ask
    assertEquals(1, schedulerKeys.size());

    // Now verify that after the next node heartbeat, we allocate
    // the last schedulerKey
    nm3.nodeHeartbeat(true);
    Thread.sleep(200);
    allocateResponse =  am1.allocate(new ArrayList<>(), new ArrayList<>());
    allocatedContainers = allocateResponse.getAllocatedContainers();
    assertEquals(1, allocatedContainers.size());

    // Verify no more outstanding schedulerKeys..
    assertEquals(0, schedulerKeys.size());
    resReqs =
        ((CapacityScheduler) scheduler).getApplicationAttempt(attemptId)
            .getAppSchedulingInfo().getAllResourceRequests();
    assertEquals(0, resReqs.size());
    rm.stop();
  }

  private static ResourceRequest newResourceRequest(int priority,
      long allocReqId, String rName, Resource resource, int numContainers,
      boolean relaxLoc, ExecutionType eType) {
    ResourceRequest rr = ResourceRequest.newInstance(
        Priority.newInstance(priority), rName, resource, numContainers,
        relaxLoc, null, ExecutionTypeRequest.newInstance(eType, true));
    rr.setAllocationRequestId(allocReqId);
    return rr;
  }

  @Test
  public void testHierarchyQueuesCurrentLimits() throws Exception {
    /*
     * Queue tree:
     *          Root
     *        /     \
     *       A       B
     *      / \    / | \
     *     A1 A2  B1 B2 B3
     */
    YarnConfiguration conf =
        new YarnConfiguration(
            setupQueueConfiguration(new CapacitySchedulerConfiguration()));
    conf.setBoolean(CapacitySchedulerConfiguration.ENABLE_USER_METRICS, true);
    MockRM rm1 = new MockRM(conf);
    rm1.start();
    MockNM nm1 =
        new MockNM("127.0.0.1:1234", 100 * GB, rm1.getResourceTrackerService());
    nm1.registerNode();

    MockRMAppSubmissionData data2 =
        MockRMAppSubmissionData.Builder.createWithMemory(1 * GB, rm1)
            .withAppName("app")
            .withUser("user")
            .withAcls(null)
            .withQueue("b1")
            .withUnmanagedAM(false)
            .build();
    RMApp app1 = MockRMAppSubmitter.submit(rm1, data2);
    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);

    waitContainerAllocated(am1, 1 * GB, 1, 2, rm1, nm1);

    // Maximum resource of b1 is 100 * 0.895 * 0.792 = 71 GB
    // 2 GBs used by am, so it's 71 - 2 = 69G.
    assertEquals(69 * GB,
        am1.doHeartbeat().getAvailableResources().getMemorySize());

    MockRMAppSubmissionData data1 =
        MockRMAppSubmissionData.Builder.createWithMemory(1 * GB, rm1)
            .withAppName("app")
            .withUser("user")
            .withAcls(null)
            .withQueue("b2")
            .withUnmanagedAM(false)
            .build();
    RMApp app2 = MockRMAppSubmitter.submit(rm1, data1);
    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1);

    // Allocate 5 containers, each one is 8 GB in am2 (40 GB in total)
    waitContainerAllocated(am2, 8 * GB, 5, 2, rm1, nm1);

    // Allocated one more container with 1 GB resource in b1
    waitContainerAllocated(am1, 1 * GB, 1, 3, rm1, nm1);

    // Total is 100 GB, 
    // B2 uses 41 GB (5 * 8GB containers and 1 AM container)
    // B1 uses 3 GB (2 * 1GB containers and 1 AM container)
    // Available is 100 - 41 - 3 = 56 GB
    assertEquals(56 * GB,
        am1.doHeartbeat().getAvailableResources().getMemorySize());

    // Now we submit app3 to a1 (in higher level hierarchy), to see if headroom
    // of app1 (in queue b1) updated correctly
    MockRMAppSubmissionData data =
        MockRMAppSubmissionData.Builder.createWithMemory(1 * GB, rm1)
            .withAppName("app")
            .withUser("user")
            .withAcls(null)
            .withQueue("a1")
            .withUnmanagedAM(false)
            .build();
    RMApp app3 = MockRMAppSubmitter.submit(rm1, data);
    MockAM am3 = MockRM.launchAndRegisterAM(app3, rm1, nm1);

    // Allocate 3 containers, each one is 8 GB in am3 (24 GB in total)
    waitContainerAllocated(am3, 8 * GB, 3, 2, rm1, nm1);

    // Allocated one more container with 4 GB resource in b1
    waitContainerAllocated(am1, 1 * GB, 1, 4, rm1, nm1);

    // Total is 100 GB, 
    // B2 uses 41 GB (5 * 8GB containers and 1 AM container)
    // B1 uses 4 GB (3 * 1GB containers and 1 AM container)
    // A1 uses 25 GB (3 * 8GB containers and 1 AM container)
    // Available is 100 - 41 - 4 - 25 = 30 GB
    assertEquals(30 * GB,
        am1.doHeartbeat().getAvailableResources().getMemorySize());
    rm1.stop();
  }

  @Test
  public void testParentQueueMaxCapsAreRespected() throws Exception {
    /*
     * Queue tree:
     *          Root
     *        /     \
     *       A       B
     *      / \
     *     A1 A2 
     */
    CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration();
    csConf.setQueues(ROOT, new String[] {"a", "b"});
    csConf.setCapacity(A, 50);
    csConf.setMaximumCapacity(A, 50);
    csConf.setCapacity(B, 50);

    // Define 2nd-level queues
    csConf.setQueues(A, new String[] {"a1", "a2"});
    csConf.setCapacity(A1, 50);
    csConf.setUserLimitFactor(A1, 100.0f);
    csConf.setCapacity(A2, 50);
    csConf.setUserLimitFactor(A2, 100.0f);
    csConf.setCapacity(B1, B1_CAPACITY);
    csConf.setUserLimitFactor(B1, 100.0f);

    YarnConfiguration conf = new YarnConfiguration(csConf);
    conf.setBoolean(CapacitySchedulerConfiguration.ENABLE_USER_METRICS, true);

    MockRM rm1 = new MockRM(conf);
    rm1.start();
    MockNM nm1 =
        new MockNM("127.0.0.1:1234", 24 * GB, rm1.getResourceTrackerService());
    nm1.registerNode();

    // Launch app1 in a1, resource usage is 1GB (am) + 4GB * 2 = 9GB 
    MockRMAppSubmissionData data1 =
        MockRMAppSubmissionData.Builder.createWithMemory(1 * GB, rm1)
            .withAppName("app")
            .withUser("user")
            .withAcls(null)
            .withQueue("a1")
            .withUnmanagedAM(false)
            .build();
    RMApp app1 = MockRMAppSubmitter.submit(rm1, data1);
    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
    waitContainerAllocated(am1, 4 * GB, 2, 2, rm1, nm1);

    // Try to launch app2 in a2, asked 2GB, should success 
    MockRMAppSubmissionData data =
        MockRMAppSubmissionData.Builder.createWithMemory(2 * GB, rm1)
            .withAppName("app")
            .withUser("user")
            .withAcls(null)
            .withQueue("a2")
            .withUnmanagedAM(false)
            .build();
    RMApp app2 = MockRMAppSubmitter.submit(rm1, data);
    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1);
    try {
      // Try to allocate a container, a's usage=11G/max=12
      // a1's usage=9G/max=12
      // a2's usage=2G/max=12
      // In this case, if a2 asked 2G, should fail.
      waitContainerAllocated(am2, 2 * GB, 1, 2, rm1, nm1);
    } catch (AssertionError failure) {
      // Expected, return;
      return;
    }
    fail("Shouldn't successfully allocate containers for am2, "
        + "queue-a's max capacity will be violated if container allocated");
    rm1.stop();
  }

  @Test
  public void testQueueHierarchyPendingResourceUpdate() throws Exception {
    Configuration conf =
        TestUtils.getConfigurationWithQueueLabels(new Configuration(false));
    conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true);

    final RMNodeLabelsManager mgr = new NullRMNodeLabelsManager();
    mgr.init(conf);
    mgr.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("x", "y"));
    mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x")));

    MockRM rm = new MockRM(conf) {
      protected RMNodeLabelsManager createNodeLabelManager() {
        return mgr;
      }
    };

    rm.start();
    MockNM nm1 = // label = x
        new MockNM("h1:1234", 200 * GB, rm.getResourceTrackerService());
    nm1.registerNode();

    MockNM nm2 = // label = ""
        new MockNM("h2:1234", 200 * GB, rm.getResourceTrackerService());
    nm2.registerNode();

    // Launch app1 in queue=a1
    MockRMAppSubmissionData data1 =
        MockRMAppSubmissionData.Builder.createWithMemory(1 * GB, rm)
            .withAppName("app")
            .withUser("user")
            .withAcls(null)
            .withQueue("a1")
            .withUnmanagedAM(false)
            .build();
    RMApp app1 = MockRMAppSubmitter.submit(rm, data1);
    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2);

    // Launch app2 in queue=b1  
    MockRMAppSubmissionData data =
        MockRMAppSubmissionData.Builder.createWithMemory(8 * GB, rm)
            .withAppName("app")
            .withUser("user")
            .withAcls(null)
            .withQueue("b1")
            .withUnmanagedAM(false)
            .build();
    RMApp app2 = MockRMAppSubmitter.submit(rm, data);
    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm2);

    // am1 asks for 8 * 1GB container for no label
    am1.allocate(Arrays.asList(ResourceRequest.newInstance(
        Priority.newInstance(1), "*", Resources.createResource(1 * GB), 8)),
        null);

    checkPendingResource(rm, "a1", 8 * GB, null);
    checkPendingResource(rm, "a", 8 * GB, null);
    checkPendingResource(rm, "root", 8 * GB, null);

    // am2 asks for 8 * 1GB container for no label
    am2.allocate(Arrays.asList(ResourceRequest.newInstance(
        Priority.newInstance(1), "*", Resources.createResource(1 * GB), 8)),
        null);

    checkPendingResource(rm, "a1", 8 * GB, null);
    checkPendingResource(rm, "a", 8 * GB, null);
    checkPendingResource(rm, "b1", 8 * GB, null);
    checkPendingResource(rm, "b", 8 * GB, null);
    // root = a + b
    checkPendingResource(rm, "root", 16 * GB, null);

    // am2 asks for 8 * 1GB container in another priority for no label
    am2.allocate(Arrays.asList(ResourceRequest.newInstance(
        Priority.newInstance(2), "*", Resources.createResource(1 * GB), 8)),
        null);

    checkPendingResource(rm, "a1", 8 * GB, null);
    checkPendingResource(rm, "a", 8 * GB, null);
    checkPendingResource(rm, "b1", 16 * GB, null);
    checkPendingResource(rm, "b", 16 * GB, null);
    // root = a + b
    checkPendingResource(rm, "root", 24 * GB, null);

    // am1 asks 4 GB resource instead of 8 * GB for priority=1
    am1.allocate(Arrays.asList(ResourceRequest.newInstance(
        Priority.newInstance(1), "*", Resources.createResource(4 * GB), 1)),
        null);

    checkPendingResource(rm, "a1", 4 * GB, null);
    checkPendingResource(rm, "a", 4 * GB, null);
    checkPendingResource(rm, "b1", 16 * GB, null);
    checkPendingResource(rm, "b", 16 * GB, null);
    // root = a + b
    checkPendingResource(rm, "root", 20 * GB, null);

    // am1 asks 8 * GB resource which label=x
    am1.allocate(Arrays.asList(ResourceRequest.newInstance(
        Priority.newInstance(2), "*", Resources.createResource(8 * GB), 1,
        true, "x")), null);

    checkPendingResource(rm, "a1", 4 * GB, null);
    checkPendingResource(rm, "a", 4 * GB, null);
    checkPendingResource(rm, "a1", 8 * GB, "x");
    checkPendingResource(rm, "a", 8 * GB, "x");
    checkPendingResource(rm, "b1", 16 * GB, null);
    checkPendingResource(rm, "b", 16 * GB, null);
    // root = a + b
    checkPendingResource(rm, "root", 20 * GB, null);
    checkPendingResource(rm, "root", 8 * GB, "x");

    // some containers allocated for am1, pending resource should decrease
    ContainerId containerId =
        ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
    assertTrue(rm.waitForState(nm1, containerId,
        RMContainerState.ALLOCATED));
    containerId = ContainerId.newContainerId(am1.getApplicationAttemptId(), 3);
    assertTrue(rm.waitForState(nm2, containerId,
        RMContainerState.ALLOCATED));

    checkPendingResource(rm, "a1", 0 * GB, null);
    checkPendingResource(rm, "a", 0 * GB, null);
    checkPendingResource(rm, "a1", 0 * GB, "x");
    checkPendingResource(rm, "a", 0 * GB, "x");
    // some containers could be allocated for am2 when we allocating containers
    // for am1, just check if pending resource of b1/b/root > 0 
    checkPendingResourceGreaterThanZero(rm, "b1", null);
    checkPendingResourceGreaterThanZero(rm, "b", null);
    // root = a + b
    checkPendingResourceGreaterThanZero(rm, "root", null);
    checkPendingResource(rm, "root", 0 * GB, "x");

    // complete am2, pending resource should be 0 now
    AppAttemptRemovedSchedulerEvent appRemovedEvent =
        new AppAttemptRemovedSchedulerEvent(
          am2.getApplicationAttemptId(), RMAppAttemptState.FINISHED, false);
    rm.getResourceScheduler().handle(appRemovedEvent);

    checkPendingResource(rm, "a1", 0 * GB, null);
    checkPendingResource(rm, "a", 0 * GB, null);
    checkPendingResource(rm, "a1", 0 * GB, "x");
    checkPendingResource(rm, "a", 0 * GB, "x");
    checkPendingResource(rm, "b1", 0 * GB, null);
    checkPendingResource(rm, "b", 0 * GB, null);
    checkPendingResource(rm, "root", 0 * GB, null);
    checkPendingResource(rm, "root", 0 * GB, "x");
    rm.stop();
  }

  // Test verifies AM Used resource for LeafQueue when AM ResourceRequest is
  // lesser than minimumAllocation
  @Test
  @Timeout(value = 30)
  public void testAMUsedResource() throws Exception {
    MockRM rm = setUpMove();
    rm.registerNode("127.0.0.1:1234", 4 * GB);

    Configuration conf = rm.getConfig();
    int minAllocMb =
        conf.getInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
            YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
    int amMemory = 50;
    assertTrue(amMemory < minAllocMb,
        "AM memory is greater than or equal to minAllocation");
    Resource minAllocResource = Resource.newInstance(minAllocMb, 1);
    String queueName = "a1";
    MockRMAppSubmissionData data =
        MockRMAppSubmissionData.Builder.createWithMemory(amMemory, rm)
            .withAppName("app-1")
            .withUser("user_0")
            .withAcls(null)
            .withQueue(queueName)
            .withUnmanagedAM(false)
            .build();
    RMApp rmApp = MockRMAppSubmitter.submit(rm, data);

    assertEquals(minAllocResource, rmApp.getAMResourceRequests().get(0).getCapability(),
        "RMApp does not containes minimum allocation");

    ResourceScheduler scheduler = rm.getRMContext().getScheduler();
    LeafQueue queueA =
        (LeafQueue) ((CapacityScheduler) scheduler).getQueue(queueName);
    assertEquals(minAllocResource, queueA.getUser("user_0").getResourceUsage().getAMUsed(),
        "Minimum Resource for AM is incorrect");
    rm.stop();
  }

  // Verifies headroom passed to ApplicationMaster has been updated in
  // RMAppAttemptMetrics
  @Test
  public void testApplicationHeadRoom() throws Exception {
    Configuration conf = new Configuration();
    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
        ResourceScheduler.class);
    MockRM rm = new MockRM(conf);
    rm.start();
    CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();

    ApplicationId appId = BuilderUtils.newApplicationId(100, 1);
    ApplicationAttemptId appAttemptId =
        BuilderUtils.newApplicationAttemptId(appId, 1);

    RMAppAttemptMetrics attemptMetric =
        new RMAppAttemptMetrics(appAttemptId, rm.getRMContext());
    RMAppImpl app = mock(RMAppImpl.class);
    when(app.getApplicationId()).thenReturn(appId);
    RMAppAttemptImpl attempt = mock(RMAppAttemptImpl.class);
    Container container = mock(Container.class);
    when(attempt.getMasterContainer()).thenReturn(container);
    ApplicationSubmissionContext submissionContext = mock(
        ApplicationSubmissionContext.class);
    when(attempt.getSubmissionContext()).thenReturn(submissionContext);
    when(attempt.getAppAttemptId()).thenReturn(appAttemptId);
    when(attempt.getRMAppAttemptMetrics()).thenReturn(attemptMetric);
    when(app.getCurrentAppAttempt()).thenReturn(attempt);

    rm.getRMContext().getRMApps().put(appId, app);

    SchedulerEvent addAppEvent =
        new AppAddedSchedulerEvent(appId, "default", "user");
    cs.handle(addAppEvent);
    SchedulerEvent addAttemptEvent =
        new AppAttemptAddedSchedulerEvent(appAttemptId, false);
    cs.handle(addAttemptEvent);

    Allocation allocate =
        cs.allocate(appAttemptId, Collections.<ResourceRequest> emptyList(),
            null, Collections.<ContainerId> emptyList(), null, null,
            NULL_UPDATE_REQUESTS);

    assertNotNull(attempt);

    assertEquals(Resource.newInstance(0, 0), allocate.getResourceLimit());
    assertEquals(Resource.newInstance(0, 0),
        attemptMetric.getApplicationAttemptHeadroom());

    // Add a node to cluster
    Resource newResource = Resource.newInstance(4 * GB, 1);
    RMNode node = MockNodes.newNodeInfo(0, newResource, 1, "127.0.0.1");
    cs.handle(new NodeAddedSchedulerEvent(node));

    allocate =
        cs.allocate(appAttemptId, Collections.<ResourceRequest> emptyList(),
            null, Collections.<ContainerId> emptyList(), null, null,
            NULL_UPDATE_REQUESTS);

    // All resources should be sent as headroom
    assertEquals(newResource, allocate.getResourceLimit());
    assertEquals(newResource,
        attemptMetric.getApplicationAttemptHeadroom());

    rm.stop();
  }


  @Test
  public void testHeadRoomCalculationWithDRC() throws Exception {
    // test with total cluster resource of 20GB memory and 20 vcores.
    // the queue where two apps running has user limit 0.8
    // allocate 10GB memory and 1 vcore to app 1.
    // app 1 should have headroom
    // 20GB*0.8 - 10GB = 6GB memory available and 15 vcores.
    // allocate 1GB memory and 1 vcore to app2.
    // app 2 should have headroom 20GB - 10 - 1 = 1GB memory,
    // and 20*0.8 - 1 = 15 vcores.

    CapacitySchedulerConfiguration csconf =
        new CapacitySchedulerConfiguration();
    csconf.setResourceComparator(DominantResourceCalculator.class);

    YarnConfiguration conf = new YarnConfiguration(csconf);
        conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
        ResourceScheduler.class);

    MockRM rm = new MockRM(conf);
    rm.start();

    CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
    LeafQueue qb = (LeafQueue)cs.getQueue("default");
    qb.setUserLimitFactor((float)0.8);

    ApplicationAttemptId appAttemptId = appHelper(rm, cs, 100, 1, "default", "user1");
    ApplicationAttemptId appAttemptId2 = appHelper(rm, cs, 100, 2, "default", "user2");

    // add nodes  to cluster, so cluster have 20GB and 20 vcores
    Resource newResource = Resource.newInstance(10 * GB, 10);
    RMNode node = MockNodes.newNodeInfo(0, newResource, 1, "127.0.0.1");
    cs.handle(new NodeAddedSchedulerEvent(node));

    Resource newResource2 = Resource.newInstance(10 * GB, 10);
    RMNode node2 = MockNodes.newNodeInfo(0, newResource2, 1, "127.0.0.2");
    cs.handle(new NodeAddedSchedulerEvent(node2));

    FiCaSchedulerApp fiCaApp1 =
            cs.getSchedulerApplications().get(appAttemptId.getApplicationId())
                .getCurrentAppAttempt();

    FiCaSchedulerApp fiCaApp2 =
            cs.getSchedulerApplications().get(appAttemptId2.getApplicationId())
                .getCurrentAppAttempt();
    Priority u0Priority = TestUtils.createMockPriority(1);
    RecordFactory recordFactory =
    RecordFactoryProvider.getRecordFactory(null);

    // allocate container for app1 with 10GB memory and 1 vcore
    fiCaApp1.updateResourceRequests(Collections.singletonList(
        TestUtils.createResourceRequest(ResourceRequest.ANY, 10*GB, 1, true,
            u0Priority, recordFactory)));
    cs.handle(new NodeUpdateSchedulerEvent(node));
    cs.handle(new NodeUpdateSchedulerEvent(node2));
    assertEquals(6*GB, fiCaApp1.getHeadroom().getMemorySize());
    assertEquals(15, fiCaApp1.getHeadroom().getVirtualCores());

    // allocate container for app2 with 1GB memory and 1 vcore
    fiCaApp2.updateResourceRequests(Collections.singletonList(
        TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, true,
            u0Priority, recordFactory)));
    cs.handle(new NodeUpdateSchedulerEvent(node));
    cs.handle(new NodeUpdateSchedulerEvent(node2));
    assertEquals(9*GB, fiCaApp2.getHeadroom().getMemorySize());
    assertEquals(15, fiCaApp2.getHeadroom().getVirtualCores());
    rm.stop();
  }

  @Test
  @Timeout(value = 60)
  public void testAMLimitUsage() throws Exception {

    CapacitySchedulerConfiguration config =
        new CapacitySchedulerConfiguration();

    config.set(CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS,
        DefaultResourceCalculator.class.getName());
    verifyAMLimitForLeafQueue(config);

    config.set(CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS,
        DominantResourceCalculator.class.getName());
    verifyAMLimitForLeafQueue(config);
  }

  private FiCaSchedulerApp getFiCaSchedulerApp(MockRM rm,
      ApplicationId appId) {
    CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
    return cs.getSchedulerApplications().get(appId).getCurrentAppAttempt();
  }

  @Test
  public void testPendingResourceUpdatedAccordingToIncreaseRequestChanges()
      throws Exception {
    Configuration conf =
        TestUtils.getConfigurationWithQueueLabels(new Configuration(false));
    conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true);

    final RMNodeLabelsManager mgr = new NullRMNodeLabelsManager();
    mgr.init(conf);


    MockRM rm = new MockRM(conf) {
      protected RMNodeLabelsManager createNodeLabelManager() {
        return mgr;
      }
    };

    rm.start();

    MockNM nm1 = // label = ""
        new MockNM("h1:1234", 200 * GB, rm.getResourceTrackerService());
    nm1.registerNode();

    // Launch app1 in queue=a1
    MockRMAppSubmissionData data =
        MockRMAppSubmissionData.Builder.createWithMemory(1 * GB, rm)
            .withAppName("app")
            .withUser("user")
            .withAcls(null)
            .withQueue("a1")
            .withUnmanagedAM(false)
            .build();
    RMApp app1 = MockRMAppSubmitter.submit(rm, data);
    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);

    // Allocate two more containers
    am1.allocate(
        Arrays.asList(ResourceRequest.newInstance(Priority.newInstance(1),
            "*", Resources.createResource(2 * GB), 2)),
        null);
    ContainerId containerId1 =
        ContainerId.newContainerId(am1.getApplicationAttemptId(), 1);
    ContainerId containerId2 =
        ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
    ContainerId containerId3 =
        ContainerId.newContainerId(am1.getApplicationAttemptId(), 3);
    assertTrue(rm.waitForState(nm1, containerId3,
        RMContainerState.ALLOCATED));
    // Acquire them
    am1.allocate(null, null);
    sentRMContainerLaunched(rm,
        ContainerId.newContainerId(am1.getApplicationAttemptId(), 1L));
    sentRMContainerLaunched(rm,
        ContainerId.newContainerId(am1.getApplicationAttemptId(), 2L));
    sentRMContainerLaunched(rm,
        ContainerId.newContainerId(am1.getApplicationAttemptId(), 3L));

    // am1 asks to change its AM container from 1GB to 3GB
    am1.sendContainerResizingRequest(Arrays.asList(
            UpdateContainerRequest
                .newInstance(0, containerId1,
                    ContainerUpdateType.INCREASE_RESOURCE,
                    Resources.createResource(3 * GB), null)));

    FiCaSchedulerApp app = getFiCaSchedulerApp(rm, app1.getApplicationId());

    assertEquals(2 * GB,
        app.getAppAttemptResourceUsage().getPending().getMemorySize());
    checkPendingResource(rm, "a1", 2 * GB, null);
    checkPendingResource(rm, "a", 2 * GB, null);
    checkPendingResource(rm, "root", 2 * GB, null);

    // am1 asks to change containerId2 (2G -> 3G) and containerId3 (2G -> 5G)
    am1.sendContainerResizingRequest(Arrays.asList(
        UpdateContainerRequest
                .newInstance(0, containerId2,
                    ContainerUpdateType.INCREASE_RESOURCE,
                    Resources.createResource(3 * GB), null),
        UpdateContainerRequest
                .newInstance(0, containerId3,
                    ContainerUpdateType.INCREASE_RESOURCE,
                    Resources.createResource(5 * GB), null)));

    assertEquals(6 * GB,
        app.getAppAttemptResourceUsage().getPending().getMemorySize());
    checkPendingResource(rm, "a1", 6 * GB, null);
    checkPendingResource(rm, "a", 6 * GB, null);
    checkPendingResource(rm, "root", 6 * GB, null);

    // am1 asks to change containerId1 (1G->3G), containerId2 (2G -> 4G) and
    // containerId3 (2G -> 2G)
    am1.sendContainerResizingRequest(Arrays.asList(
        UpdateContainerRequest
                .newInstance(0, containerId1,
                    ContainerUpdateType.INCREASE_RESOURCE,
                    Resources.createResource(3 * GB), null),
        UpdateContainerRequest
                .newInstance(0, containerId2,
                    ContainerUpdateType.INCREASE_RESOURCE,
                    Resources.createResource(4 * GB), null),
        UpdateContainerRequest
                .newInstance(0, containerId3,
                    ContainerUpdateType.INCREASE_RESOURCE,
                    Resources.createResource(2 * GB), null)));
    assertEquals(4 * GB,
        app.getAppAttemptResourceUsage().getPending().getMemorySize());
    checkPendingResource(rm, "a1", 4 * GB, null);
    checkPendingResource(rm, "a", 4 * GB, null);
    checkPendingResource(rm, "root", 4 * GB, null);
    rm.stop();
  }

  private void verifyAMLimitForLeafQueue(CapacitySchedulerConfiguration config)
      throws Exception {
    MockRM rm = setUpMove(config);
    int nodeMemory = 4 * GB;
    rm.registerNode("127.0.0.1:1234", nodeMemory);

    String queueName = "a1";
    String userName = "user_0";
    ResourceScheduler scheduler = rm.getRMContext().getScheduler();
    LeafQueue queueA =
        (LeafQueue) ((CapacityScheduler) scheduler).getQueue(queueName);
    Resource amResourceLimit = queueA.getAMResourceLimit();

    Resource amResource1 =
        Resource.newInstance(amResourceLimit.getMemorySize() + 1024,
            amResourceLimit.getVirtualCores() + 1);
    Resource amResource2 =
        Resource.newInstance(amResourceLimit.getMemorySize() + 2048,
            amResourceLimit.getVirtualCores() + 1);

    // Wait for the scheduler to be updated with new node capacity
    GenericTestUtils.waitFor(new Supplier<Boolean>() {
        @Override
        public Boolean get() {
          return scheduler.getMaximumResourceCapability().getMemorySize() == nodeMemory;
        }
      }, 100, 60 * 1000);

    MockRMAppSubmitter.submit(rm,
        MockRMAppSubmissionData.Builder.createWithResource(amResource1, rm)
        .withResource(amResource1)
        .withAppName("app-1")
        .withUser(userName)
        .withAcls(null)
        .withQueue(queueName)
        .build());

    MockRMAppSubmitter.submit(rm,
        MockRMAppSubmissionData.Builder.createWithResource(amResource2, rm)
        .withResource(amResource2)
        .withAppName("app-2")
        .withUser(userName)
        .withAcls(null)
        .withQueue(queueName)
        .build());

    // When AM limit is exceeded, 1 applications will be activated.Rest all
    // applications will be in pending
    assertEquals(1, queueA.getNumPendingApplications(), "PendingApplications should be 1");
    assertEquals(1, queueA.getNumActiveApplications(), "Active applications should be 1");

    assertEquals(1, queueA.getUser(userName).getPendingApplications(),
        "User PendingApplications should be 1");
    assertEquals(1, queueA.getUser(userName).getActiveApplications(),
        "User Active applications should be 1");
    rm.stop();
  }

  private void sentRMContainerLaunched(MockRM rm, ContainerId containerId) {
    CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
    RMContainer rmContainer = cs.getRMContainer(containerId);
    if (rmContainer != null) {
      rmContainer.handle(
          new RMContainerEvent(containerId, RMContainerEventType.LAUNCHED));
    } else {
      fail("Cannot find RMContainer");
    }
  }

  @Test
  public void testCSReservationWithRootUnblocked() throws Exception {
    CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
    conf.setResourceComparator(DominantResourceCalculator.class);
    setupOtherBlockedQueueConfiguration(conf);
    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
        ResourceScheduler.class);
    MockRM rm = new MockRM(conf);
    rm.start();
    CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
    ParentQueue q = (ParentQueue) cs.getQueue("p1");

    assertNotNull(q);
    String host = "127.0.0.1";
    String host1 = "test";
    RMNode node =
        MockNodes.newNodeInfo(0, Resource.newInstance(8 * GB, 8), 1, host);
    RMNode node1 =
        MockNodes.newNodeInfo(0, Resource.newInstance(8 * GB, 8), 2, host1);
    cs.handle(new NodeAddedSchedulerEvent(node));
    cs.handle(new NodeAddedSchedulerEvent(node1));
    ApplicationAttemptId appAttemptId1 =
        appHelper(rm, cs, 100, 1, "x1", "userX1");
    ApplicationAttemptId appAttemptId2 =
        appHelper(rm, cs, 100, 2, "x2", "userX2");
    ApplicationAttemptId appAttemptId3 =
        appHelper(rm, cs, 100, 3, "y1", "userY1");
    RecordFactory recordFactory =
        RecordFactoryProvider.getRecordFactory(null);

    Priority priority = TestUtils.createMockPriority(1);
    ResourceRequest y1Req = null;
    ResourceRequest x1Req = null;
    ResourceRequest x2Req = null;
    for(int i=0; i < 4; i++) {
      y1Req = TestUtils.createResourceRequest(
          ResourceRequest.ANY, 1 * GB, 1, true, priority, recordFactory);
      cs.allocate(appAttemptId3,
          Collections.<ResourceRequest>singletonList(y1Req), null, Collections.<ContainerId>emptyList(),
          null, null, NULL_UPDATE_REQUESTS);
      CapacityScheduler.schedule(cs);
    }
    assertEquals(4 * GB, cs.getQueue("y1").getUsedResources().getMemorySize(),
        "Y1 Used Resource should be 4 GB");
    assertEquals(4 * GB, cs.getQueue("p2").getUsedResources().getMemorySize(),
        "P2 Used Resource should be 4 GB");

    for(int i=0; i < 7; i++) {
      x1Req = TestUtils.createResourceRequest(
          ResourceRequest.ANY, 1 * GB, 1, true, priority, recordFactory);
      cs.allocate(appAttemptId1,
          Collections.<ResourceRequest>singletonList(x1Req), null, Collections.<ContainerId>emptyList(),
          null, null, NULL_UPDATE_REQUESTS);
      CapacityScheduler.schedule(cs);
    }
    assertEquals(7 * GB, cs.getQueue("x1").getUsedResources().getMemorySize(),
        "X1 Used Resource should be 7 GB");
    assertEquals(7 * GB, cs.getQueue("p1").getUsedResources().getMemorySize(),
        "P1 Used Resource should be 7 GB");

    x2Req = TestUtils.createResourceRequest(
        ResourceRequest.ANY, 2 * GB, 1, true, priority, recordFactory);
    cs.allocate(appAttemptId2,
        Collections.<ResourceRequest>singletonList(x2Req), null, Collections.<ContainerId>emptyList(),
        null, null, NULL_UPDATE_REQUESTS);
    CapacityScheduler.schedule(cs);
    assertEquals(0, cs.getQueue("x2").getUsedResources().getMemorySize(),
        "X2 Used Resource should be 0");
    assertEquals(7 * GB, cs.getQueue("p1").getUsedResources().getMemorySize(),
        "P1 Used Resource should be 7 GB");
    //this assign should fail
    x1Req = TestUtils.createResourceRequest(
        ResourceRequest.ANY, 1 * GB, 1, true, priority, recordFactory);
    cs.allocate(appAttemptId1,
        Collections.<ResourceRequest>singletonList(x1Req), null, Collections.<ContainerId>emptyList(),
        null, null, NULL_UPDATE_REQUESTS);
    CapacityScheduler.schedule(cs);
    assertEquals(7 * GB, cs.getQueue("x1").getUsedResources().getMemorySize(),
        "X1 Used Resource should be 7 GB");
    assertEquals(7 * GB, cs.getQueue("p1").getUsedResources().getMemorySize(),
        "P1 Used Resource should be 7 GB");

    //this should get thru
    for (int i=0; i < 4; i++) {
      y1Req = TestUtils.createResourceRequest(
          ResourceRequest.ANY, 1 * GB, 1, true, priority, recordFactory);
      cs.allocate(appAttemptId3,
          Collections.<ResourceRequest>singletonList(y1Req), null, Collections.<ContainerId>emptyList(),
          null, null, NULL_UPDATE_REQUESTS);
      CapacityScheduler.schedule(cs);
    }
    assertEquals(8 * GB, cs.getQueue("p2").getUsedResources().getMemorySize(),
        "P2 Used Resource should be 8 GB");

    //Free a container from X1
    ContainerId containerId = ContainerId.newContainerId(appAttemptId1, 2);
    cs.handle(new ContainerExpiredSchedulerEvent(containerId));

    //Schedule pending request
    CapacityScheduler.schedule(cs);
    assertEquals(2 * GB, cs.getQueue("x2").getUsedResources().getMemorySize(),
        "X2 Used Resource should be 2 GB");
    assertEquals(8 * GB, cs.getQueue("p1").getUsedResources().getMemorySize(),
        "P1 Used Resource should be 8 GB");
    assertEquals(8 * GB, cs.getQueue("p2").getUsedResources().getMemorySize(),
        "P2 Used Resource should be 8 GB");
    assertEquals(16 * GB, cs.getRootQueue().getUsedResources().getMemorySize(),
        "Root Used Resource should be 16 GB");
    rm.stop();
  }

  @Test
  public void testCSQueueBlocked() throws Exception {
    CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
    setupBlockedQueueConfiguration(conf);
    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
        ResourceScheduler.class);
    MockRM rm = new MockRM(conf);
    rm.start();
    CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
    LeafQueue q = (LeafQueue) cs.getQueue("a");

    assertNotNull(q);
    String host = "127.0.0.1";
    String host1 = "test";
    RMNode node =
        MockNodes.newNodeInfo(0, Resource.newInstance(8 * GB, 8), 1, host);
    RMNode node1 =
        MockNodes.newNodeInfo(0, Resource.newInstance(8 * GB, 8), 2, host1);
    cs.handle(new NodeAddedSchedulerEvent(node));
    cs.handle(new NodeAddedSchedulerEvent(node1));
    //add app begin
    ApplicationAttemptId appAttemptId1 =
        appHelper(rm, cs, 100, 1, "a", "user1");
    ApplicationAttemptId appAttemptId2 =
        appHelper(rm, cs, 100, 2, "b", "user2");
    //add app end

    RecordFactory recordFactory =
        RecordFactoryProvider.getRecordFactory(null);

    Priority priority = TestUtils.createMockPriority(1);
    ResourceRequest r1 = TestUtils.createResourceRequest(
        ResourceRequest.ANY, 2 * GB, 1, true, priority, recordFactory);
    //This will allocate for app1
    cs.allocate(appAttemptId1, Collections.<ResourceRequest>singletonList(r1),
        null, Collections.<ContainerId>emptyList(),
        null, null, NULL_UPDATE_REQUESTS).getContainers().size();
    CapacityScheduler.schedule(cs);
    ResourceRequest r2 = null;
    for (int i =0; i < 13; i++) {
      r2 = TestUtils.createResourceRequest(
          ResourceRequest.ANY, 1 * GB, 1, true, priority, recordFactory);
      cs.allocate(appAttemptId2,
          Collections.<ResourceRequest>singletonList(r2), null, Collections.<ContainerId>emptyList(),
          null, null, NULL_UPDATE_REQUESTS);
      CapacityScheduler.schedule(cs);
    }
    assertEquals(2 * GB, cs.getQueue("a").getUsedResources().getMemorySize(),
        "A Used Resource should be 2 GB");
    assertEquals(13 * GB, cs.getQueue("b").getUsedResources().getMemorySize(),
        "B Used Resource should be 13 GB");
    r1 = TestUtils.createResourceRequest(
        ResourceRequest.ANY, 2 * GB, 1, true, priority, recordFactory);
    r2 = TestUtils.createResourceRequest(
        ResourceRequest.ANY, 1 * GB, 1, true, priority, recordFactory);
    cs.allocate(appAttemptId1, Collections.<ResourceRequest>singletonList(r1),
        null, Collections.<ContainerId>emptyList(),
        null, null, NULL_UPDATE_REQUESTS).getContainers().size();
    CapacityScheduler.schedule(cs);

    cs.allocate(appAttemptId2, Collections.<ResourceRequest>singletonList(r2),
        null, Collections.<ContainerId>emptyList(), null, null, NULL_UPDATE_REQUESTS);
    CapacityScheduler.schedule(cs);
    //Check blocked Resource
    assertEquals(2 * GB, cs.getQueue("a").getUsedResources().getMemorySize(),
        "A Used Resource should be 2 GB");
    assertEquals(13 * GB, cs.getQueue("b").getUsedResources().getMemorySize(),
        "B Used Resource should be 13 GB");

    ContainerId containerId1 = ContainerId.newContainerId(appAttemptId2, 10);
    ContainerId containerId2 =ContainerId.newContainerId(appAttemptId2, 11);

    cs.handle(new ContainerExpiredSchedulerEvent(containerId1));
    rm.drainEvents();
    CapacityScheduler.schedule(cs);

    cs.handle(new ContainerExpiredSchedulerEvent(containerId2));
    CapacityScheduler.schedule(cs);
    rm.drainEvents();

    assertEquals(4 * GB, cs.getQueue("a").getUsedResources().getMemorySize(),
        "A Used Resource should be 4 GB");
    assertEquals(12 * GB, cs.getQueue("b").getUsedResources().getMemorySize(),
        "B Used Resource should be 12 GB");
    assertEquals(16 * GB, cs.getRootQueue().getUsedResources().getMemorySize(),
        "Used Resource on Root should be 16 GB");
    rm.stop();
  }

  @Test
  public void testAppAttemptLocalityStatistics() throws Exception {
    Configuration conf =
        TestUtils.getConfigurationWithMultipleQueues(new Configuration(false));
    conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true);

    final RMNodeLabelsManager mgr = new NullRMNodeLabelsManager();
    mgr.init(conf);

    MockRM rm = new MockRM(conf) {
      protected RMNodeLabelsManager createNodeLabelManager() {
        return mgr;
      }
    };

    rm.start();
    MockNM nm1 =
        new MockNM("h1:1234", 200 * GB, rm.getResourceTrackerService());
    nm1.registerNode();

    // Launch app1 in queue=a1
    MockRMAppSubmissionData data =
        MockRMAppSubmissionData.Builder.createWithMemory(1 * GB, rm)
            .withAppName("app")
            .withUser("user")
            .withAcls(null)
            .withQueue("a")
            .withUnmanagedAM(false)
            .build();
    RMApp app1 = MockRMAppSubmitter.submit(rm, data);

    // Got one offswitch request and offswitch allocation
    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);

    // am1 asks for 1 GB resource on h1/default-rack/offswitch
    am1.allocate(Arrays.asList(ResourceRequest
        .newInstance(Priority.newInstance(1), "*",
            Resources.createResource(1 * GB), 2), ResourceRequest
        .newInstance(Priority.newInstance(1), "/default-rack",
            Resources.createResource(1 * GB), 2), ResourceRequest
        .newInstance(Priority.newInstance(1), "h1",
            Resources.createResource(1 * GB), 1)), null);

    CapacityScheduler cs = (CapacityScheduler) rm.getRMContext().getScheduler();

    // Got one nodelocal request and nodelocal allocation
    cs.nodeUpdate(rm.getRMContext().getRMNodes().get(nm1.getNodeId()));

    // Got one nodelocal request and racklocal allocation
    cs.nodeUpdate(rm.getRMContext().getRMNodes().get(nm1.getNodeId()));

    RMAppAttemptMetrics attemptMetrics = rm.getRMContext().getRMApps().get(
        app1.getApplicationId()).getCurrentAppAttempt()
        .getRMAppAttemptMetrics();

    // We should get one node-local allocation, one rack-local allocation
    // And one off-switch allocation
    assertArrayEquals(new int[][]{{1, 0, 0}, {0, 1, 0}, {0, 0, 1}},
        attemptMetrics.getLocalityStatistics());
    rm.stop();
  }


  @Test
  @Timeout(value = 30)
  public void testAMLimitDouble() throws Exception {
    CapacitySchedulerConfiguration config =
        new CapacitySchedulerConfiguration();
    config.set(CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS,
        DominantResourceCalculator.class.getName());
    CapacitySchedulerConfiguration conf =
        new CapacitySchedulerConfiguration(config);
    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
        ResourceScheduler.class);
    conf.setInt("yarn.scheduler.minimum-allocation-mb", 512);
    conf.setInt("yarn.scheduler.minimum-allocation-vcores", 1);
    MockRM rm = new MockRM(conf);
    rm.start();
    rm.registerNode("127.0.0.1:1234", 10 * GB);
    rm.registerNode("127.0.0.1:1235", 10 * GB);
    rm.registerNode("127.0.0.1:1236", 10 * GB);
    rm.registerNode("127.0.0.1:1237", 10 * GB);
    ResourceScheduler scheduler = rm.getRMContext().getScheduler();
    waitforNMRegistered(scheduler, 4, 5);
    LeafQueue queueA =
        (LeafQueue) ((CapacityScheduler) scheduler).getQueue("default");
    Resource amResourceLimit = queueA.getAMResourceLimit();
    assertEquals(4096, amResourceLimit.getMemorySize());
    assertEquals(4, amResourceLimit.getVirtualCores());
    rm.stop();
  }


  @Test
  public void testQueueMappingWithCurrentUserQueueMappingForaGroup() throws
      Exception {

    CapacitySchedulerConfiguration config =
        new CapacitySchedulerConfiguration();
    config.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
        ResourceScheduler.class);
    setupQueueConfiguration(config);

    config.setClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING,
        TestGroupsCaching.FakeunPrivilegedGroupMapping.class, ShellBasedUnixGroupsMapping.class);
    config.set(CommonConfigurationKeys.HADOOP_USER_GROUP_STATIC_OVERRIDES,
        "a1" +"=" + "agroup" + "");
    Groups.getUserToGroupsMappingServiceWithLoadedConfiguration(config);

    config.set(CapacitySchedulerConfiguration.QUEUE_MAPPING,
        "g:agroup:%user");

    MockRM rm = new MockRM(config);
    rm.start();
    CapacityScheduler cs = ((CapacityScheduler) rm.getResourceScheduler());
    cs.start();

    MockRMAppSubmissionData data =
        MockRMAppSubmissionData.Builder.createWithMemory(GB, rm)
            .withAppName("appname")
            .withUser("a1")
            .withAcls(null)
            .withQueue("default")
            .withUnmanagedAM(false)
            .build();
    RMApp app = MockRMAppSubmitter.submit(rm, data);
    List<ApplicationAttemptId> appsInA1 = cs.getAppsInQueue("a1");
    assertEquals(1, appsInA1.size());
    rm.stop();
  }

  @Test
  @Timeout(value = 30)
  public void testcheckAndGetApplicationLifetime() throws Exception {
    long maxLifetime = 10;
    long defaultLifetime = 5;
    // positive integer value
    CapacityScheduler cs = setUpCSQueue(maxLifetime, defaultLifetime);
    assertEquals(maxLifetime,
        cs.checkAndGetApplicationLifetime("default", 100, null));
    assertEquals(9, cs.checkAndGetApplicationLifetime("default", 9, null));
    assertEquals(defaultLifetime,
        cs.checkAndGetApplicationLifetime("default", -1, null));
    assertEquals(defaultLifetime,
        cs.checkAndGetApplicationLifetime("default", 0, null));
    assertEquals(maxLifetime,
        cs.getMaximumApplicationLifetime("default"));

    maxLifetime = -1;
    defaultLifetime = -1;
    // test for default values
    cs = setUpCSQueue(maxLifetime, defaultLifetime);
    assertEquals(100, cs.checkAndGetApplicationLifetime("default", 100, null));
    assertEquals(defaultLifetime,
        cs.checkAndGetApplicationLifetime("default", -1, null));
    assertEquals(defaultLifetime,
        cs.checkAndGetApplicationLifetime("default", 0, null));
    assertEquals(maxLifetime,
        cs.getMaximumApplicationLifetime("default"));

    maxLifetime = 10;
    defaultLifetime = 10;
    cs = setUpCSQueue(maxLifetime, defaultLifetime);
    assertEquals(maxLifetime,
        cs.checkAndGetApplicationLifetime("default", 100, null));
    assertEquals(defaultLifetime,
        cs.checkAndGetApplicationLifetime("default", -1, null));
    assertEquals(defaultLifetime,
        cs.checkAndGetApplicationLifetime("default", 0, null));
    assertEquals(maxLifetime,
        cs.getMaximumApplicationLifetime("default"));

    maxLifetime = 0;
    defaultLifetime = 0;
    cs = setUpCSQueue(maxLifetime, defaultLifetime);
    assertEquals(100, cs.checkAndGetApplicationLifetime("default", 100, null));
    assertEquals(defaultLifetime,
        cs.checkAndGetApplicationLifetime("default", -1, null));
    assertEquals(defaultLifetime,
        cs.checkAndGetApplicationLifetime("default", 0, null));

    maxLifetime = 10;
    defaultLifetime = -1;
    cs = setUpCSQueue(maxLifetime, defaultLifetime);
    assertEquals(maxLifetime,
        cs.checkAndGetApplicationLifetime("default", 100, null));
    assertEquals(maxLifetime,
        cs.checkAndGetApplicationLifetime("default", -1, null));
    assertEquals(maxLifetime,
        cs.checkAndGetApplicationLifetime("default", 0, null));

    maxLifetime = 5;
    defaultLifetime = 10;
    try {
      setUpCSQueue(maxLifetime, defaultLifetime);
      fail("Expected to fails since maxLifetime < defaultLifetime.");
    } catch (ServiceStateException sse) {
      Throwable rootCause = sse.getCause().getCause();
      assertTrue(
          rootCause.getMessage().contains("can't exceed maximum lifetime"));
    }

    maxLifetime = -1;
    defaultLifetime = 10;
    cs = setUpCSQueue(maxLifetime, defaultLifetime);
    assertEquals(100,
        cs.checkAndGetApplicationLifetime("default", 100, null));
    assertEquals(defaultLifetime,
        cs.checkAndGetApplicationLifetime("default", -1, null));
    assertEquals(defaultLifetime,
        cs.checkAndGetApplicationLifetime("default", 0, null));
  }

  private CapacityScheduler setUpCSQueue(long maxLifetime,
      long defaultLifetime) {
    CapacitySchedulerConfiguration csConf =
        new CapacitySchedulerConfiguration();
    csConf.setQueues(ROOT,
        new String[] {"default"});
    csConf.setCapacity(DEFAULT, 100);
    csConf.setMaximumLifetimePerQueue(DEFAULT, maxLifetime);
    csConf.setDefaultLifetimePerQueue(DEFAULT, defaultLifetime);

    YarnConfiguration conf = new YarnConfiguration(csConf);
    CapacityScheduler cs = new CapacityScheduler();

    RMContext rmContext = TestUtils.getMockRMContext();
    cs.setConf(conf);
    cs.setRMContext(rmContext);
    cs.init(conf);

    return cs;
  }

  @Test
  @Timeout(value = 60)
  public void testClearRequestsBeforeApplyTheProposal()
      throws Exception {
    // init RM & NMs & Nodes
    final MockRM rm = new MockRM(new CapacitySchedulerConfiguration());
    rm.start();
    final MockNM nm = rm.registerNode("h1:1234", 200 * GB);

    // submit app
    MockRMAppSubmissionData data =
        MockRMAppSubmissionData.Builder.createWithMemory(200, rm)
            .withAppName("app")
            .withUser("user")
            .build();
    final RMApp app = MockRMAppSubmitter.submit(rm, data);
    MockRM.launchAndRegisterAM(app, rm, nm);

    // spy capacity scheduler to handle CapacityScheduler#apply
    final Priority priority = Priority.newInstance(1);
    final CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
    final CapacityScheduler spyCs = spy(cs);
    doAnswer(new Answer<Object>() {
      public Object answer(InvocationOnMock invocation) throws Exception {
        // clear resource request before applying the proposal for container_2
        spyCs.allocate(app.getCurrentAppAttempt().getAppAttemptId(),
            Arrays.asList(ResourceRequest.newInstance(priority, "*",
                Resources.createResource(1 * GB), 0)), null,
            Collections.<ContainerId>emptyList(), null, null,
            NULL_UPDATE_REQUESTS);
        // trigger real apply which can raise NPE before YARN-6629
        try {
          FiCaSchedulerApp schedulerApp = cs.getApplicationAttempt(
              app.getCurrentAppAttempt().getAppAttemptId());
          schedulerApp.apply((Resource) invocation.getArguments()[0],
              (ResourceCommitRequest) invocation.getArguments()[1],
              (Boolean) invocation.getArguments()[2]);
          // the proposal of removed request should be rejected
          assertEquals(1, schedulerApp.getLiveContainers().size());
        } catch (Throwable e) {
          fail();
        }
        return null;
      }
    }).when(spyCs).tryCommit(any(Resource.class),
        any(ResourceCommitRequest.class), anyBoolean());

    // rm allocates container_2 to reproduce the process that can raise NPE
    spyCs.allocate(app.getCurrentAppAttempt().getAppAttemptId(),
        Arrays.asList(ResourceRequest.newInstance(priority, "*",
            Resources.createResource(1 * GB), 1)), null,
        Collections.<ContainerId>emptyList(), null, null, NULL_UPDATE_REQUESTS);
    spyCs.handle(new NodeUpdateSchedulerEvent(
        spyCs.getNode(nm.getNodeId()).getRMNode()));
    rm.stop();
  }

  // Testcase for YARN-8528
  // This is to test whether ContainerAllocation constants are holding correct
  // values during scheduling.
  @Test
  public void testContainerAllocationLocalitySkipped() throws Exception {
    assertEquals(AllocationState.APP_SKIPPED,
        ContainerAllocation.APP_SKIPPED.getAllocationState());
    assertEquals(AllocationState.LOCALITY_SKIPPED,
        ContainerAllocation.LOCALITY_SKIPPED.getAllocationState());
    assertEquals(AllocationState.PRIORITY_SKIPPED,
        ContainerAllocation.PRIORITY_SKIPPED.getAllocationState());
    assertEquals(AllocationState.QUEUE_SKIPPED,
        ContainerAllocation.QUEUE_SKIPPED.getAllocationState());

    // init RM & NMs & Nodes
    final MockRM rm = new MockRM(new CapacitySchedulerConfiguration());
    CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
    rm.start();
    final MockNM nm1 = rm.registerNode("h1:1234", 4 * GB);
    final MockNM nm2 = rm.registerNode("h2:1234", 6 * GB); // maximum-allocation-mb = 6GB

    // submit app and request resource
    // container2 is larger than nm1 total resource, will trigger locality skip
    MockRMAppSubmissionData data =
        MockRMAppSubmissionData.Builder.createWithMemory(1 * GB, rm)
            .withAppName("app")
            .withUser("user")
            .build();
    final RMApp app = MockRMAppSubmitter.submit(rm, data);
    final MockAM am = MockRM.launchAndRegisterAM(app, rm, nm1);
    am.addRequests(new String[] {"*"}, 5 * GB, 1, 1, 2);
    am.schedule();

    // container1 (am) should be acquired, container2 should not
    RMNode node1 = rm.getRMContext().getRMNodes().get(nm1.getNodeId());
    cs.handle(new NodeUpdateSchedulerEvent(node1));
    ContainerId cid = ContainerId.newContainerId(am.getApplicationAttemptId(), 1l);
    assertThat(cs.getRMContainer(cid).getState()).
        isEqualTo(RMContainerState.ACQUIRED);
    cid = ContainerId.newContainerId(am.getApplicationAttemptId(), 2l);
    assertNull(cs.getRMContainer(cid));

    assertEquals(AllocationState.APP_SKIPPED,
        ContainerAllocation.APP_SKIPPED.getAllocationState());
    assertEquals(AllocationState.LOCALITY_SKIPPED,
        ContainerAllocation.LOCALITY_SKIPPED.getAllocationState());
    assertEquals(AllocationState.PRIORITY_SKIPPED,
        ContainerAllocation.PRIORITY_SKIPPED.getAllocationState());
    assertEquals(AllocationState.QUEUE_SKIPPED,
        ContainerAllocation.QUEUE_SKIPPED.getAllocationState());
    rm.stop();
  }

  /**
   * Tests
   * @throws Exception
   */
  @Test
  public void testCSQueueMetricsDoesNotLeakOnReinit() throws Exception {
    // Initialize resource map
    Map<String, ResourceInformation> riMap = new HashMap<>();

    // Initialize mandatory resources
    ResourceInformation memory =
        ResourceInformation.newInstance(ResourceInformation.MEMORY_MB.getName(),
            ResourceInformation.MEMORY_MB.getUnits(),
            YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
            YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
    ResourceInformation vcores =
        ResourceInformation.newInstance(ResourceInformation.VCORES.getName(),
            ResourceInformation.VCORES.getUnits(),
            YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
            YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES);
    riMap.put(ResourceInformation.MEMORY_URI, memory);
    riMap.put(ResourceInformation.VCORES_URI, vcores);

    ResourceUtils.initializeResourcesFromResourceInformationMap(riMap);

    CapacitySchedulerConfiguration csConf =
        new CapacitySchedulerConfiguration();
    csConf.setResourceComparator(DominantResourceCalculator.class);

    setupQueueConfiguration(csConf);

    YarnConfiguration conf = new YarnConfiguration(csConf);

    // Don't reset resource types since we have already configured resource
    // types
    conf.setBoolean(TestResourceProfiles.TEST_CONF_RESET_RESOURCE_TYPES, false);
    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
        ResourceScheduler.class);

    MockRM rm = new MockRM(conf);
    rm.start();

    CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
    csConf = new CapacitySchedulerConfiguration();
    setupAdditionalQueues(csConf);
    cs.reinitialize(csConf, cs.getRMContext());
    QueueMetrics a3DefaultPartitionMetrics = QueueMetrics.getQueueMetrics().get(
        "default.root.a.a3");

    assertSame(QueueMetrics.getQueueMetrics().get("root.a.a1").getParentQueue(),
        QueueMetrics.getQueueMetrics().get("root.a.a3").getParentQueue(),
        "Different ParentQueue of siblings is a sign of a memory leak");

    assertSame(QueueMetrics.getQueueMetrics().get("root.a.a1").getParentQueue(),
        a3DefaultPartitionMetrics.getParentQueue(),
        "Different ParentQueue of partition metrics is a sign of a memory leak");
    rm.stop();
  }

  @Test
  public void testCSQueueMetrics() throws Exception {

    // Initialize resource map
    Map<String, ResourceInformation> riMap = new HashMap<>();

    // Initialize mandatory resources
    ResourceInformation memory =
        ResourceInformation.newInstance(ResourceInformation.MEMORY_MB.getName(),
            ResourceInformation.MEMORY_MB.getUnits(),
            YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
            YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
    ResourceInformation vcores =
        ResourceInformation.newInstance(ResourceInformation.VCORES.getName(),
            ResourceInformation.VCORES.getUnits(),
            YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
            YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES);
    riMap.put(ResourceInformation.MEMORY_URI, memory);
    riMap.put(ResourceInformation.VCORES_URI, vcores);
    riMap.put(TestQueueMetricsForCustomResources.CUSTOM_RES_1,
        ResourceInformation.newInstance(
            TestQueueMetricsForCustomResources.CUSTOM_RES_1, "", 1, 10));

    ResourceUtils.initializeResourcesFromResourceInformationMap(riMap);

    CapacitySchedulerConfiguration csConf =
        new CapacitySchedulerConfiguration();
    csConf.setResourceComparator(DominantResourceCalculator.class);

    csConf.set(YarnConfiguration.RESOURCE_TYPES,
        TestQueueMetricsForCustomResources.CUSTOM_RES_1);

    setupQueueConfiguration(csConf);

    YarnConfiguration conf = new YarnConfiguration(csConf);

    // Don't reset resource types since we have already configured resource
    // types
    conf.setBoolean(TestResourceProfiles.TEST_CONF_RESET_RESOURCE_TYPES, false);
    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
        ResourceScheduler.class);

    MockRM rm = new MockRM(conf);
    rm.start();

    CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();

    RMNode n1 = MockNodes.newNodeInfo(0,
        MockNodes.newResource(50 * GB, 50,
            ImmutableMap.<String, String> builder()
                .put(TestQueueMetricsForCustomResources.CUSTOM_RES_1,
                    String.valueOf(1000))
                .build()),
        1, "n1");
    RMNode n2 = MockNodes.newNodeInfo(0,
        MockNodes.newResource(50 * GB, 50,
            ImmutableMap.<String, String> builder()
                .put(TestQueueMetricsForCustomResources.CUSTOM_RES_1,
                    String.valueOf(2000))
                .build()),
        2, "n2");
    cs.handle(new NodeAddedSchedulerEvent(n1));
    cs.handle(new NodeAddedSchedulerEvent(n2));

    Map<String, Long> guaranteedCapA11 =
        ((CSQueueMetricsForCustomResources) ((CSQueueMetrics) cs.getQueue("a1")
            .getMetrics()).getQueueMetricsForCustomResources())
                .getGuaranteedCapacity();
    assertEquals(94, guaranteedCapA11
        .get(TestQueueMetricsForCustomResources.CUSTOM_RES_1).longValue());
    Map<String, Long> maxCapA11 =
        ((CSQueueMetricsForCustomResources) ((CSQueueMetrics) cs.getQueue("a1")
            .getMetrics()).getQueueMetricsForCustomResources())
                .getMaxCapacity();
    assertEquals(3000, maxCapA11
        .get(TestQueueMetricsForCustomResources.CUSTOM_RES_1).longValue());

    assertEquals(10240, ((CSQueueMetrics)cs.getQueue("a").getMetrics()).getGuaranteedMB());
    assertEquals(71680, ((CSQueueMetrics)cs.getQueue("b1").getMetrics()).getGuaranteedMB());
    assertEquals(102400, ((CSQueueMetrics)cs.getQueue("a").getMetrics()).getMaxCapacityMB());
    assertEquals(102400, ((CSQueueMetrics)cs.getQueue("b1").getMetrics()).getMaxCapacityMB());
    Map<String, Long> guaranteedCapA =
        ((CSQueueMetricsForCustomResources) ((CSQueueMetrics) cs.getQueue("a")
            .getMetrics()).getQueueMetricsForCustomResources())
                .getGuaranteedCapacity();
    assertEquals(314, guaranteedCapA
        .get(TestQueueMetricsForCustomResources.CUSTOM_RES_1).longValue());
    Map<String, Long> maxCapA =
        ((CSQueueMetricsForCustomResources) ((CSQueueMetrics) cs.getQueue("a")
            .getMetrics()).getQueueMetricsForCustomResources())
                .getMaxCapacity();
    assertEquals(3000, maxCapA
        .get(TestQueueMetricsForCustomResources.CUSTOM_RES_1).longValue());
    Map<String, Long> guaranteedCapB1 =
        ((CSQueueMetricsForCustomResources) ((CSQueueMetrics) cs.getQueue("b1")
            .getMetrics()).getQueueMetricsForCustomResources())
                .getGuaranteedCapacity();
    assertEquals(2126, guaranteedCapB1
        .get(TestQueueMetricsForCustomResources.CUSTOM_RES_1).longValue());
    Map<String, Long> maxCapB1 =
        ((CSQueueMetricsForCustomResources) ((CSQueueMetrics) cs.getQueue("b1")
            .getMetrics()).getQueueMetricsForCustomResources())
                .getMaxCapacity();
    assertEquals(3000, maxCapB1
        .get(TestQueueMetricsForCustomResources.CUSTOM_RES_1).longValue());

    // Remove a node, metrics should be updated
    cs.handle(new NodeRemovedSchedulerEvent(n2));
    assertEquals(5120, ((CSQueueMetrics)cs.getQueue("a").getMetrics()).getGuaranteedMB());
    assertEquals(35840, ((CSQueueMetrics)cs.getQueue("b1").getMetrics()).getGuaranteedMB());
    assertEquals(51200, ((CSQueueMetrics)cs.getQueue("a").getMetrics()).getMaxCapacityMB());
    assertEquals(51200, ((CSQueueMetrics)cs.getQueue("b1").getMetrics()).getMaxCapacityMB());
    Map<String, Long> guaranteedCapA1 =
        ((CSQueueMetricsForCustomResources) ((CSQueueMetrics) cs.getQueue("a")
            .getMetrics()).getQueueMetricsForCustomResources())
                .getGuaranteedCapacity();

    assertEquals(104, guaranteedCapA1
        .get(TestQueueMetricsForCustomResources.CUSTOM_RES_1).longValue());
    Map<String, Long> maxCapA1 =
        ((CSQueueMetricsForCustomResources) ((CSQueueMetrics) cs.getQueue("a")
            .getMetrics()).getQueueMetricsForCustomResources())
                .getMaxCapacity();
    assertEquals(1000, maxCapA1
        .get(TestQueueMetricsForCustomResources.CUSTOM_RES_1).longValue());
    Map<String, Long> guaranteedCapB11 =
        ((CSQueueMetricsForCustomResources) ((CSQueueMetrics) cs.getQueue("b1")
            .getMetrics()).getQueueMetricsForCustomResources())
                .getGuaranteedCapacity();
    assertEquals(708, guaranteedCapB11
        .get(TestQueueMetricsForCustomResources.CUSTOM_RES_1).longValue());
    Map<String, Long> maxCapB11 =
        ((CSQueueMetricsForCustomResources) ((CSQueueMetrics) cs.getQueue("b1")
            .getMetrics()).getQueueMetricsForCustomResources())
                .getMaxCapacity();
    assertEquals(1000, maxCapB11
        .get(TestQueueMetricsForCustomResources.CUSTOM_RES_1).longValue());
    assertEquals(A_CAPACITY / 100, ((CSQueueMetrics)cs.getQueue("a")
        .getMetrics()).getGuaranteedCapacity(), DELTA);
    assertEquals(A_CAPACITY / 100, ((CSQueueMetrics)cs.getQueue("a")
        .getMetrics()).getGuaranteedAbsoluteCapacity(), DELTA);
    assertEquals(B1_CAPACITY / 100, ((CSQueueMetrics)cs.getQueue("b1")
        .getMetrics()).getGuaranteedCapacity(), DELTA);
    assertEquals((B_CAPACITY / 100) * (B1_CAPACITY / 100), ((CSQueueMetrics)cs
        .getQueue("b1").getMetrics()).getGuaranteedAbsoluteCapacity(), DELTA);
    assertEquals(1, ((CSQueueMetrics)cs.getQueue("a").getMetrics())
        .getMaxCapacity(), DELTA);
    assertEquals(1, ((CSQueueMetrics)cs.getQueue("a").getMetrics())
        .getMaxAbsoluteCapacity(), DELTA);
    assertEquals(1, ((CSQueueMetrics)cs.getQueue("b1").getMetrics())
        .getMaxCapacity(), DELTA);
    assertEquals(1, ((CSQueueMetrics)cs.getQueue("b1").getMetrics())
        .getMaxAbsoluteCapacity(), DELTA);

    // Add child queue to a, and reinitialize. Metrics should be updated
    csConf.setQueues(A,
        new String[] {"a1", "a2", "a3"});
    csConf.setCapacity(A2, 29.5f);
    csConf.setCapacity(A3, 40.5f);
    csConf.setMaximumCapacity(A3,
        50.0f);

    cs.reinitialize(csConf, new RMContextImpl(null, null, null, null, null,
        null, new RMContainerTokenSecretManager(csConf),
        new NMTokenSecretManagerInRM(csConf),
        new ClientToAMTokenSecretManagerInRM(), null));

    assertEquals(1024, ((CSQueueMetrics)cs.getQueue("a2").getMetrics()).getGuaranteedMB());
    assertEquals(2048, ((CSQueueMetrics)cs.getQueue("a3").getMetrics()).getGuaranteedMB());
    assertEquals(51200, ((CSQueueMetrics)cs.getQueue("a2").getMetrics()).getMaxCapacityMB());
    assertEquals(25600, ((CSQueueMetrics)cs.getQueue("a3").getMetrics()).getMaxCapacityMB());

    Map<String, Long> guaranteedCapA2 =
        ((CSQueueMetricsForCustomResources) ((CSQueueMetrics) cs.getQueue("a2")
            .getMetrics()).getQueueMetricsForCustomResources())
                .getGuaranteedCapacity();
    assertEquals(30, guaranteedCapA2
        .get(TestQueueMetricsForCustomResources.CUSTOM_RES_1).longValue());
    Map<String, Long> maxCapA2 =
        ((CSQueueMetricsForCustomResources) ((CSQueueMetrics) cs.getQueue("a2")
            .getMetrics()).getQueueMetricsForCustomResources())
                .getMaxCapacity();
    assertEquals(1000, maxCapA2
        .get(TestQueueMetricsForCustomResources.CUSTOM_RES_1).longValue());

    Map<String, Long> guaranteedCapA3 =
        ((CSQueueMetricsForCustomResources) ((CSQueueMetrics) cs.getQueue("a3")
            .getMetrics()).getQueueMetricsForCustomResources())
                .getGuaranteedCapacity();
    assertEquals(42, guaranteedCapA3
        .get(TestQueueMetricsForCustomResources.CUSTOM_RES_1).longValue());
    Map<String, Long> maxCapA3 =
        ((CSQueueMetricsForCustomResources) ((CSQueueMetrics) cs.getQueue("a3")
            .getMetrics()).getQueueMetricsForCustomResources())
                .getMaxCapacity();
    assertEquals(500, maxCapA3
        .get(TestQueueMetricsForCustomResources.CUSTOM_RES_1).longValue());
    rm.stop();
  }

  @Test
  public void testReservedContainerLeakWhenMoveApplication() throws Exception {
    CapacitySchedulerConfiguration csConf
        = new CapacitySchedulerConfiguration();
    csConf.setQueues(ROOT,
        new String[] {"a", "b"});
    csConf.setCapacity(A, 50);
    csConf.setMaximumCapacity(A, 100);
    csConf.setUserLimitFactor(A, 100);
    csConf.setCapacity(B, 50);
    csConf.setMaximumCapacity(B, 100);
    csConf.setUserLimitFactor(B, 100);

    YarnConfiguration conf=new YarnConfiguration(csConf);
    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
        ResourceScheduler.class);
    RMNodeLabelsManager mgr=new NullRMNodeLabelsManager();
    mgr.init(conf);
    MockRM rm1 = new MockRM(csConf);
    CapacityScheduler scheduler=(CapacityScheduler) rm1.getResourceScheduler();
    rm1.getRMContext().setNodeLabelManager(mgr);
    rm1.start();
    MockNM nm1 = rm1.registerNode("127.0.0.1:1234", 8 * GB);
    MockNM nm2 = rm1.registerNode("127.0.0.2:1234", 8 * GB);
    /*
     * simulation
     * app1: (1 AM,1 running container)
     * app2: (1 AM,1 reserved container)
     */
    // launch an app to queue, AM container should be launched in nm1
    MockRMAppSubmissionData submissionData =
        MockRMAppSubmissionData.Builder.createWithMemory(1 * GB, rm1)
            .withAppName("app_1")
            .withUser("user_1")
            .withAcls(null)
            .withQueue("a")
            .build();
    RMApp app1 = MockRMAppSubmitter.submit(rm1, submissionData);
    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);

    // launch another app to queue, AM container should be launched in nm1
    submissionData =
        MockRMAppSubmissionData.Builder.createWithMemory(1 * GB, rm1)
            .withAppName("app_2")
            .withUser("user_1")
            .withAcls(null)
            .withQueue("a")
            .build();
    RMApp app2 = MockRMAppSubmitter.submit(rm1, submissionData);
    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1);

    am1.allocate("*", 4 * GB, 1, new ArrayList<ContainerId>());
    // this containerRequest should be reserved
    am2.allocate("*", 4 * GB, 1, new ArrayList<ContainerId>());

    RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
    // Do node heartbeats 2 times
    // First time will allocate container for app1, second time will reserve
    // container for app2
    scheduler.handle(new NodeUpdateSchedulerEvent(rmNode1));
    scheduler.handle(new NodeUpdateSchedulerEvent(rmNode1));

    FiCaSchedulerApp schedulerApp1 =
        scheduler.getApplicationAttempt(am1.getApplicationAttemptId());
    FiCaSchedulerApp schedulerApp2 =
        scheduler.getApplicationAttempt(am2.getApplicationAttemptId());
    // APP1:  1 AM, 1 allocatedContainer
    assertEquals(2, schedulerApp1.getLiveContainers().size());
    // APP2:  1 AM,1 reservedContainer
    assertEquals(1, schedulerApp2.getLiveContainers().size());
    assertEquals(1, schedulerApp2.getReservedContainers().size());
    //before,move app2 which has one reservedContainer
    LeafQueue srcQueue = (LeafQueue) scheduler.getQueue("a");
    LeafQueue desQueue = (LeafQueue) scheduler.getQueue("b");
    assertEquals(4, srcQueue.getNumContainers());
    assertEquals(10*GB, srcQueue.getUsedResources().getMemorySize());
    assertEquals(0, desQueue.getNumContainers());
    assertEquals(0, desQueue.getUsedResources().getMemorySize());
    //app1 ResourceUsage (0 reserved)
    assertEquals(5*GB,
        schedulerApp1
            .getAppAttemptResourceUsage().getAllUsed().getMemorySize());
    assertEquals(0,
        schedulerApp1.getCurrentReservation().getMemorySize());
    //app2  ResourceUsage (4GB reserved)
    assertEquals(1*GB,
        schedulerApp2
            .getAppAttemptResourceUsage().getAllUsed().getMemorySize());
    assertEquals(4*GB,
        schedulerApp2.getCurrentReservation().getMemorySize());
    //move app2 which has one reservedContainer
    scheduler.moveApplication(app2.getApplicationId(), "b");
    // keep this order
    // if killing app1 first,the reservedContainer of app2 will be allocated
    rm1.killApp(app2.getApplicationId());
    rm1.killApp(app1.getApplicationId());
    //after,moved app2 which has one reservedContainer
    assertEquals(0, srcQueue.getNumContainers());
    assertEquals(0, desQueue.getNumContainers());
    assertEquals(0, srcQueue.getUsedResources().getMemorySize());
    assertEquals(0, desQueue.getUsedResources().getMemorySize());
    rm1.close();
  }

  /**
   * (YARN-11191) This test ensures that no deadlock happens while the
   * refreshQueues is called on the preemptionManager (refresh thread) and the
   * AbstractCSQueue.getTotalKillableResource is called from the schedule thread.
   *
   * @throws Exception TestTimedOutException means deadlock
   */
  @Test
  @Timeout(value = 20)
  public void testRefreshQueueWithOpenPreemption() throws Exception {
    CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration();
    csConf.setQueues(new QueuePath(CapacitySchedulerConfiguration.ROOT), new String[]{"a"});
    QueuePath a = new QueuePath("root.a");
    csConf.setCapacity(a, 100);
    csConf.setQueues(a, new String[]{"b"});
    QueuePath b = new QueuePath("root.a.b");
    csConf.setCapacity(b, 100);

    YarnConfiguration conf = new YarnConfiguration(csConf);
    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
        ResourceScheduler.class);
    RMNodeLabelsManager mgr = new NullRMNodeLabelsManager();
    mgr.init(conf);
    try (MockRM rm = new MockRM(csConf)) {
      CapacityScheduler scheduler = (CapacityScheduler) rm.getResourceScheduler();
      PreemptionManager preemptionManager = scheduler.getPreemptionManager();
      rm.getRMContext().setNodeLabelManager(mgr);
      rm.start();

      AbstractParentQueue queue = (AbstractParentQueue) scheduler.getQueue("a");

      // The scheduler thread holds the queue's read-lock for 5 seconds
      // then the preemption's read-lock is used
      Thread schedulerThread = new Thread(() -> {
        queue.readLock.lock();
        try {
          Thread.sleep(5 * 1000);
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
        preemptionManager.getKillableContainers("a",
            queue.getDefaultNodeLabelExpression());
        queue.readLock.unlock();
      }, "SCHEDULE");

      // The complete thread locks/unlocks the queue's write-lock after 1 seconds
      Thread completeThread = new Thread(() -> {
        try {
          Thread.sleep(1000);
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
        queue.writeLock.lock();
        queue.writeLock.unlock();
      }, "COMPLETE");


      // The refresh thread holds the preemption's write-lock after 2 seconds
      // while it calls the getChildQueues(ByTryLock) that
      // locks(tryLocks) the queue's read-lock
      Thread refreshThread = new Thread(() -> {
        try {
          Thread.sleep(2 * 1000);
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
        preemptionManager.refreshQueues(queue.getParent(), queue);
      }, "REFRESH");
      schedulerThread.start();
      completeThread.start();
      refreshThread.start();

      schedulerThread.join();
      completeThread.join();
      refreshThread.join();
    }
  }
}