TestApplicationLimits.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;

import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.setQueueHandler;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Sets;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmissionData;
import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmitter;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueResourceQuotas;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.PREFIX;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.mockito.ArgumentMatchers;

import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableMap;
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TestApplicationLimits {
  
  private static final Logger LOG =
      LoggerFactory.getLogger(TestApplicationLimits.class);
  final static int GB = 1024;

  LeafQueue queue;
  CSQueue root;
  
  private final ResourceCalculator resourceCalculator = new DefaultResourceCalculator();

  RMContext rmContext = null;
  private CapacitySchedulerContext csContext;

  
  @BeforeEach
  public void setUp() throws IOException {
    CapacitySchedulerConfiguration csConf = 
        new CapacitySchedulerConfiguration();
    YarnConfiguration conf = new YarnConfiguration();
    setupQueueConfiguration(csConf);
    
    rmContext = TestUtils.getMockRMContext();
    Resource clusterResource = Resources.createResource(10 * 16 * GB, 10 * 32);

    csContext = createCSContext(csConf, resourceCalculator,
        Resources.createResource(GB, 1), Resources.createResource(16*GB, 32),
        clusterResource);
    when(csContext.getRMContext()).thenReturn(rmContext);
    CapacitySchedulerQueueContext queueContext = new CapacitySchedulerQueueContext(csContext);
    setQueueHandler(csContext);

    RMContainerTokenSecretManager containerTokenSecretManager =
        new RMContainerTokenSecretManager(conf);
    containerTokenSecretManager.rollMasterKey();
    when(csContext.getContainerTokenSecretManager()).thenReturn(
        containerTokenSecretManager);

    CSQueueStore queues = new CSQueueStore();
    root = CapacitySchedulerQueueManager
        .parseQueue(queueContext, csConf, null, "root",
            queues, queues,
            TestUtils.spyHook);
    root.updateClusterResource(clusterResource,
        new ResourceLimits(clusterResource));

    queue = spy(new LeafQueue(queueContext, A, root, null));
    QueueResourceQuotas queueResourceQuotas = ((LeafQueue) queues.get(A))
        .getQueueResourceQuotas();
    doReturn(queueResourceQuotas).when(queue).getQueueResourceQuotas();

    // Stub out ACL checks
    doReturn(true).
        when(queue).hasAccess(any(QueueACL.class), 
                              any(UserGroupInformation.class));
    
    // Some default values
    doReturn(100).when(queue).getMaxApplications();
    doReturn(25).when(queue).getMaxApplicationsPerUser();
  }

  private static final String A = "a";
  private static final String B = "b";
  private static final  String C = "c";
  private static final  String D = "d";
  private static final  String AA1 = "a1";
  private static final  String AA2 = "a2";
  private static final  String AA3 = "a3";
  private static final QueuePath ROOT_QUEUE_PATH =
      new QueuePath(CapacitySchedulerConfiguration.ROOT);
  private static final QueuePath A_QUEUE_PATH = ROOT_QUEUE_PATH.createNewLeaf(A);
  private static final QueuePath B_QUEUE_PATH = ROOT_QUEUE_PATH.createNewLeaf(B);
  private static final QueuePath C_QUEUE_PATH = ROOT_QUEUE_PATH.createNewLeaf(C);
  private static final QueuePath D_QUEUE_PATH = ROOT_QUEUE_PATH.createNewLeaf(D);
  private static final QueuePath AA1_QUEUE_PATH = A_QUEUE_PATH.createNewLeaf(AA1);
  private static final QueuePath AA2_QUEUE_PATH = A_QUEUE_PATH.createNewLeaf(AA2);
  private static final QueuePath AA3_QUEUE_PATH = A_QUEUE_PATH.createNewLeaf(AA3);
  private void setupQueueConfiguration(CapacitySchedulerConfiguration conf) {
    
    // Define top-level queues
    conf.setQueues(ROOT_QUEUE_PATH, new String[] {A, B});

    conf.setCapacity(A_QUEUE_PATH, 10);
    conf.setCapacity(B_QUEUE_PATH, 90);
    
    conf.setUserLimit(A_QUEUE_PATH, 50);
    conf.setUserLimitFactor(A_QUEUE_PATH, 5.0f);
    
    LOG.info("Setup top-level queues a and b");
  }

  private FiCaSchedulerApp getMockApplication(int appId, String user,
    Resource amResource) {
    FiCaSchedulerApp application = mock(FiCaSchedulerApp.class);
    ApplicationAttemptId applicationAttemptId =
        TestUtils.getMockApplicationAttemptId(appId, 0);
    doReturn(applicationAttemptId.getApplicationId()).
        when(application).getApplicationId();
    doReturn(applicationAttemptId). when(application).getApplicationAttemptId();
    doReturn(user).when(application).getUser();
    doReturn(amResource).when(application).getAMResource();
    doReturn(Priority.newInstance(0)).when(application).getPriority();
    doReturn(CommonNodeLabelsManager.NO_LABEL).when(application)
        .getAppAMNodePartitionName();
    doReturn(amResource).when(application).getAMResource(
        CommonNodeLabelsManager.NO_LABEL);
    when(application.compareInputOrderTo(any(FiCaSchedulerApp.class))).thenCallRealMethod();
    when(application.isRunnable()).thenReturn(true);
    return application;
  }
  
  @Test
  public void testAMResourceLimit() throws Exception {
    final String user_0 = "user_0";
    final String user_1 = "user_1";
    
    // This uses the default 10% of cluster value for the max am resources
    // which are allowed, at 80GB = 8GB for AM's at the queue level.  The user
    // am limit is 4G initially (based on the queue absolute capacity)
    // when there is only 1 user, and drops to 2G (the userlimit) when there
    // is a second user
    Resource clusterResource = Resource.newInstance(80 * GB, 40);
    root.updateClusterResource(clusterResource, new ResourceLimits(
        clusterResource));
    queue = (LeafQueue) root.getChildQueues().stream().filter(
        child -> child.getQueueName().equals(A))
        .findFirst().orElseThrow(NoSuchElementException::new);
    queue.updateClusterResource(clusterResource, new ResourceLimits(
        clusterResource));

    ActiveUsersManager activeUsersManager = mock(ActiveUsersManager.class);
    when(queue.getAbstractUsersManager()).thenReturn(activeUsersManager);

    assertEquals(Resource.newInstance(8 * GB, 1),
        queue.calculateAndGetAMResourceLimit());
    assertEquals(Resource.newInstance(4 * GB, 1),
      queue.getUserAMResourceLimit());
    
    // Two apps for user_0, both start
    int APPLICATION_ID = 0;
    FiCaSchedulerApp app_0 = getMockApplication(APPLICATION_ID++, user_0, 
      Resource.newInstance(2 * GB, 1));
    queue.submitApplicationAttempt(app_0, user_0);
    assertEquals(1, queue.getNumActiveApplications());
    assertEquals(0, queue.getNumPendingApplications());
    assertEquals(1, queue.getNumActiveApplications(user_0));
    assertEquals(0, queue.getNumPendingApplications(user_0));
    
    when(activeUsersManager.getNumActiveUsers()).thenReturn(1);

    FiCaSchedulerApp app_1 = getMockApplication(APPLICATION_ID++, user_0, 
      Resource.newInstance(2 * GB, 1));
    queue.submitApplicationAttempt(app_1, user_0);
    assertEquals(2, queue.getNumActiveApplications());
    assertEquals(0, queue.getNumPendingApplications());
    assertEquals(2, queue.getNumActiveApplications(user_0));
    assertEquals(0, queue.getNumPendingApplications(user_0));
    
    // AMLimits unchanged
    assertEquals(Resource.newInstance(8 * GB, 1), queue.getAMResourceLimit());
    assertEquals(Resource.newInstance(4 * GB, 1),
      queue.getUserAMResourceLimit());
    
    // One app for user_1, starts
    FiCaSchedulerApp app_2 = getMockApplication(APPLICATION_ID++, user_1, 
      Resource.newInstance(2 * GB, 1));
    queue.submitApplicationAttempt(app_2, user_1);
    assertEquals(3, queue.getNumActiveApplications());
    assertEquals(0, queue.getNumPendingApplications());
    assertEquals(1, queue.getNumActiveApplications(user_1));
    assertEquals(0, queue.getNumPendingApplications(user_1));
    
    when(activeUsersManager.getNumActiveUsers()).thenReturn(2);
    
    // Now userAMResourceLimit drops to the queue configured 50% as there is
    // another user active
    assertEquals(Resource.newInstance(8 * GB, 1), queue.getAMResourceLimit());
    assertEquals(Resource.newInstance(2 * GB, 1),
      queue.getUserAMResourceLimit());
    
    // Second user_1 app cannot start
    FiCaSchedulerApp app_3 = getMockApplication(APPLICATION_ID++, user_1, 
      Resource.newInstance(2 * GB, 1));
    queue.submitApplicationAttempt(app_3, user_1);
    assertEquals(3, queue.getNumActiveApplications());
    assertEquals(1, queue.getNumPendingApplications());
    assertEquals(1, queue.getNumActiveApplications(user_1));
    assertEquals(1, queue.getNumPendingApplications(user_1));

    // Now finish app so another should be activated
    queue.finishApplicationAttempt(app_2, A);
    assertEquals(3, queue.getNumActiveApplications());
    assertEquals(0, queue.getNumPendingApplications());
    assertEquals(1, queue.getNumActiveApplications(user_1));
    assertEquals(0, queue.getNumPendingApplications(user_1));
    
  }
  
  @Test
  public void testLimitsComputation() throws Exception {
    final float epsilon = 1e-5f;

    CapacitySchedulerConfiguration csConf = 
        new CapacitySchedulerConfiguration();
    setupQueueConfiguration(csConf);

    // Say cluster has 100 nodes of 16G each
    Resource clusterResource = 
      Resources.createResource(100 * 16 * GB, 100 * 16);

    CapacitySchedulerContext context = createCSContext(csConf, resourceCalculator,
        Resources.createResource(GB, 1), Resources.createResource(16*GB, 16),
        clusterResource);
    CapacitySchedulerQueueManager queueManager = context.getCapacitySchedulerQueueManager();
    CapacitySchedulerQueueContext queueContext = new CapacitySchedulerQueueContext(context);

    CSQueueStore queues = new CSQueueStore();
    CSQueue root = 
        CapacitySchedulerQueueManager.parseQueue(queueContext, csConf, null,
            "root", queues, queues, TestUtils.spyHook);
    queueManager.setRootQueue(root);
    root.updateClusterResource(clusterResource,
        new ResourceLimits(clusterResource));

    LeafQueue queue = (LeafQueue)queues.get(A);
    
    LOG.info("Queue 'A' -" +
    		" aMResourceLimit=" + queue.getAMResourceLimit() + 
    		" UserAMResourceLimit=" + 
    		queue.getUserAMResourceLimit());
    
    Resource amResourceLimit = Resource.newInstance(160 * GB, 1);
    assertThat(queue.calculateAndGetAMResourceLimit()).
        isEqualTo(amResourceLimit);
    assertThat(queue.getUserAMResourceLimit()).isEqualTo(
        Resource.newInstance(80*GB, 1));
    
    // Assert in metrics
    assertThat(queue.getMetrics().getAMResourceLimitMB()).isEqualTo(
        amResourceLimit.getMemorySize());
    assertThat(queue.getMetrics().getAMResourceLimitVCores()).isEqualTo(
        amResourceLimit.getVirtualCores());

    assertEquals((int)(clusterResource.getMemorySize() * queue.getAbsoluteCapacity()),
        queue.getMetrics().getAvailableMB());
    
    // Add some nodes to the cluster & test new limits
    clusterResource = Resources.createResource(120 * 16 * GB);
    root.updateClusterResource(clusterResource, new ResourceLimits(
        clusterResource));

    assertThat(queue.calculateAndGetAMResourceLimit()).isEqualTo(
        Resource.newInstance(192 * GB, 1));
    assertThat(queue.getUserAMResourceLimit()).isEqualTo(
        Resource.newInstance(96*GB, 1));
    
    assertEquals((int)(clusterResource.getMemorySize() * queue.getAbsoluteCapacity()),
        queue.getMetrics().getAvailableMB());

    // should return -1 if per queue setting not set
    assertEquals(
        (int)CapacitySchedulerConfiguration.UNDEFINED, 
        csConf.getMaximumApplicationsPerQueue(queue.getQueuePathObject()));
    int expectedMaxApps =  
        (int)
        (CapacitySchedulerConfiguration.DEFAULT_MAXIMUM_SYSTEM_APPLICATIIONS * 
        queue.getAbsoluteCapacity());
    assertEquals(expectedMaxApps, queue.getMaxApplications());

    int expectedMaxAppsPerUser = Math.min(expectedMaxApps,
        (int)(expectedMaxApps * (queue.getUserLimit()/100.0f) *
        queue.getUserLimitFactor()));
    assertEquals(expectedMaxAppsPerUser, queue.getMaxApplicationsPerUser());

    // should default to global setting if per queue setting not set
    assertEquals(CapacitySchedulerConfiguration.DEFAULT_MAXIMUM_APPLICATIONMASTERS_RESOURCE_PERCENT,
        csConf.getMaximumApplicationMasterResourcePerQueuePercent(
            queue.getQueuePathObject()), epsilon);

    // Change the per-queue max AM resources percentage.
    csConf.setFloat(PREFIX + queue.getQueuePath()
        + ".maximum-am-resource-percent", 0.5f);
    queueContext.reinitialize();
    // Re-create queues to get new configs.
    queues = new CSQueueStore();
    root = CapacitySchedulerQueueManager.parseQueue(
        queueContext, csConf, null, "root",
        queues, queues, TestUtils.spyHook);
    clusterResource = Resources.createResource(100 * 16 * GB);
    queueManager.setRootQueue(root);
    root.updateClusterResource(clusterResource, new ResourceLimits(
        clusterResource));

    queue = (LeafQueue)queues.get(A);

    assertEquals(0.5f,
        csConf.getMaximumApplicationMasterResourcePerQueuePercent(
          queue.getQueuePathObject()), epsilon);

    assertThat(queue.calculateAndGetAMResourceLimit()).isEqualTo(
        Resource.newInstance(800 * GB, 1));
    assertThat(queue.getUserAMResourceLimit()).isEqualTo(
        Resource.newInstance(400*GB, 1));

    // Change the per-queue max applications.
    csConf.setInt(PREFIX + queue.getQueuePath() + ".maximum-applications",
        9999);
    queueContext.reinitialize();
    // Re-create queues to get new configs.
    queues = new CSQueueStore();
    root = CapacitySchedulerQueueManager.parseQueue(
        queueContext, csConf, null, "root",
        queues, queues, TestUtils.spyHook);
    root.updateClusterResource(clusterResource, new ResourceLimits(
        clusterResource));

    queue = (LeafQueue)queues.get(A);
    assertEquals(9999, (int)csConf.getMaximumApplicationsPerQueue(
        queue.getQueuePathObject()));
    assertEquals(9999, queue.getMaxApplications());

    expectedMaxAppsPerUser = Math.min(9999, (int)(9999 *
        (queue.getUserLimit()/100.0f) * queue.getUserLimitFactor()));
    assertEquals(expectedMaxAppsPerUser, queue.getMaxApplicationsPerUser());
  }
  
  @Test
  public void testActiveApplicationLimits() throws Exception {
    final String user_0 = "user_0";
    final String user_1 = "user_1";
    final String user_2 = "user_2";

    assertEquals(Resource.newInstance(16 * GB, 1),
        queue.calculateAndGetAMResourceLimit());
    assertEquals(Resource.newInstance(8 * GB, 1),
      queue.getUserAMResourceLimit());
    
    int APPLICATION_ID = 0;
    // Submit first application
    FiCaSchedulerApp app_0 = getMockApplication(APPLICATION_ID++, user_0,
      Resources.createResource(4 * GB, 0));
    queue.submitApplicationAttempt(app_0, user_0);
    assertEquals(1, queue.getNumActiveApplications());
    assertEquals(0, queue.getNumPendingApplications());
    assertEquals(1, queue.getNumActiveApplications(user_0));
    assertEquals(0, queue.getNumPendingApplications(user_0));

    // Submit second application
    FiCaSchedulerApp app_1 = getMockApplication(APPLICATION_ID++, user_0,
      Resources.createResource(4 * GB, 0));
    queue.submitApplicationAttempt(app_1, user_0);
    assertEquals(2, queue.getNumActiveApplications());
    assertEquals(0, queue.getNumPendingApplications());
    assertEquals(2, queue.getNumActiveApplications(user_0));
    assertEquals(0, queue.getNumPendingApplications(user_0));
    
    // Submit third application, should remain pending due to user amlimit
    FiCaSchedulerApp app_2 = getMockApplication(APPLICATION_ID++, user_0,
      Resources.createResource(4 * GB, 0));
    queue.submitApplicationAttempt(app_2, user_0);
    assertEquals(2, queue.getNumActiveApplications());
    assertEquals(1, queue.getNumPendingApplications());
    assertEquals(2, queue.getNumActiveApplications(user_0));
    assertEquals(1, queue.getNumPendingApplications(user_0));
    
    // Finish one application, app_2 should be activated
    queue.finishApplicationAttempt(app_0, A);
    assertEquals(2, queue.getNumActiveApplications());
    assertEquals(0, queue.getNumPendingApplications());
    assertEquals(2, queue.getNumActiveApplications(user_0));
    assertEquals(0, queue.getNumPendingApplications(user_0));
    
    // Submit another one for user_0
    FiCaSchedulerApp app_3 = getMockApplication(APPLICATION_ID++, user_0,
      Resources.createResource(4 * GB, 0));
    queue.submitApplicationAttempt(app_3, user_0);
    assertEquals(2, queue.getNumActiveApplications());
    assertEquals(1, queue.getNumPendingApplications());
    assertEquals(2, queue.getNumActiveApplications(user_0));
    assertEquals(1, queue.getNumPendingApplications(user_0));
    
    // Submit first app for user_1
    FiCaSchedulerApp app_4 = getMockApplication(APPLICATION_ID++, user_1,
      Resources.createResource(8 * GB, 0));
    queue.submitApplicationAttempt(app_4, user_1);
    assertEquals(3, queue.getNumActiveApplications());
    assertEquals(1, queue.getNumPendingApplications());
    assertEquals(2, queue.getNumActiveApplications(user_0));
    assertEquals(1, queue.getNumPendingApplications(user_0));
    assertEquals(1, queue.getNumActiveApplications(user_1));
    assertEquals(0, queue.getNumPendingApplications(user_1));

    // Submit first app for user_2, should block due to queue amlimit
    FiCaSchedulerApp app_5 = getMockApplication(APPLICATION_ID++, user_2,
      Resources.createResource(8 * GB, 0));
    queue.submitApplicationAttempt(app_5, user_2);
    assertEquals(3, queue.getNumActiveApplications());
    assertEquals(2, queue.getNumPendingApplications());
    assertEquals(2, queue.getNumActiveApplications(user_0));
    assertEquals(1, queue.getNumPendingApplications(user_0));
    assertEquals(1, queue.getNumActiveApplications(user_1));
    assertEquals(0, queue.getNumPendingApplications(user_1));
    assertEquals(1, queue.getNumPendingApplications(user_2));

    // Now finish one app of user_1 so app_5 should be activated
    queue.finishApplicationAttempt(app_4, A);
    assertEquals(3, queue.getNumActiveApplications());
    assertEquals(1, queue.getNumPendingApplications());
    assertEquals(2, queue.getNumActiveApplications(user_0));
    assertEquals(1, queue.getNumPendingApplications(user_0));
    assertEquals(0, queue.getNumActiveApplications(user_1));
    assertEquals(0, queue.getNumPendingApplications(user_1));
    assertEquals(1, queue.getNumActiveApplications(user_2));
    assertEquals(0, queue.getNumPendingApplications(user_2));
    
  }
  
  @Test
  public void testActiveLimitsWithKilledApps() throws Exception {
    final String user_0 = "user_0";

    int APPLICATION_ID = 0;

    // Submit first application
    FiCaSchedulerApp app_0 = getMockApplication(APPLICATION_ID++, user_0,
      Resources.createResource(4 * GB, 0));
    queue.submitApplicationAttempt(app_0, user_0);
    assertEquals(1, queue.getNumActiveApplications());
    assertEquals(0, queue.getNumPendingApplications());
    assertEquals(1, queue.getNumActiveApplications(user_0));
    assertEquals(0, queue.getNumPendingApplications(user_0));
    assertTrue(queue.getApplications().contains(app_0));

    // Submit second application
    FiCaSchedulerApp app_1 = getMockApplication(APPLICATION_ID++, user_0,
      Resources.createResource(4 * GB, 0));
    queue.submitApplicationAttempt(app_1, user_0);
    assertEquals(2, queue.getNumActiveApplications());
    assertEquals(0, queue.getNumPendingApplications());
    assertEquals(2, queue.getNumActiveApplications(user_0));
    assertEquals(0, queue.getNumPendingApplications(user_0));
    assertTrue(queue.getApplications().contains(app_1));

    // Submit third application, should remain pending
    FiCaSchedulerApp app_2 = getMockApplication(APPLICATION_ID++, user_0,
      Resources.createResource(4 * GB, 0));
    queue.submitApplicationAttempt(app_2, user_0);
    assertEquals(2, queue.getNumActiveApplications());
    assertEquals(1, queue.getNumPendingApplications());
    assertEquals(2, queue.getNumActiveApplications(user_0));
    assertEquals(1, queue.getNumPendingApplications(user_0));
    assertTrue(queue.getPendingApplications().contains(app_2));

    // Submit fourth application, should remain pending
    FiCaSchedulerApp app_3 = getMockApplication(APPLICATION_ID++, user_0,
      Resources.createResource(4 * GB, 0));
    queue.submitApplicationAttempt(app_3, user_0);
    assertEquals(2, queue.getNumActiveApplications());
    assertEquals(2, queue.getNumPendingApplications());
    assertEquals(2, queue.getNumActiveApplications(user_0));
    assertEquals(2, queue.getNumPendingApplications(user_0));
    assertTrue(queue.getPendingApplications().contains(app_3));

    // Kill 3rd pending application
    queue.finishApplicationAttempt(app_2, A);
    assertEquals(2, queue.getNumActiveApplications());
    assertEquals(1, queue.getNumPendingApplications());
    assertEquals(2, queue.getNumActiveApplications(user_0));
    assertEquals(1, queue.getNumPendingApplications(user_0));
    assertFalse(queue.getPendingApplications().contains(app_2));
    assertFalse(queue.getApplications().contains(app_2));

    // Finish 1st application, app_3 should become active
    queue.finishApplicationAttempt(app_0, A);
    assertEquals(2, queue.getNumActiveApplications());
    assertEquals(0, queue.getNumPendingApplications());
    assertEquals(2, queue.getNumActiveApplications(user_0));
    assertEquals(0, queue.getNumPendingApplications(user_0));
    assertTrue(queue.getApplications().contains(app_3));
    assertFalse(queue.getPendingApplications().contains(app_3));
    assertFalse(queue.getApplications().contains(app_0));

    // Finish 2nd application
    queue.finishApplicationAttempt(app_1, A);
    assertEquals(1, queue.getNumActiveApplications());
    assertEquals(0, queue.getNumPendingApplications());
    assertEquals(1, queue.getNumActiveApplications(user_0));
    assertEquals(0, queue.getNumPendingApplications(user_0));
    assertFalse(queue.getApplications().contains(app_1));

    // Finish 4th application
    queue.finishApplicationAttempt(app_3, A);
    assertEquals(0, queue.getNumActiveApplications());
    assertEquals(0, queue.getNumPendingApplications());
    assertEquals(0, queue.getNumActiveApplications(user_0));
    assertEquals(0, queue.getNumPendingApplications(user_0));
    assertFalse(queue.getApplications().contains(app_3));
  }

  @Test
  public void testHeadroom() throws Exception {
    CapacitySchedulerConfiguration csConf = 
        new CapacitySchedulerConfiguration();
    csConf.setUserLimit(A_QUEUE_PATH, 25);
    setupQueueConfiguration(csConf);

    // Say cluster has 100 nodes of 16G each
    Resource clusterResource = Resources.createResource(100 * 16 * GB);

    CapacitySchedulerContext context = createCSContext(csConf, resourceCalculator,
        Resources.createResource(GB), Resources.createResource(16*GB), clusterResource);
    CapacitySchedulerQueueManager queueManager = context.getCapacitySchedulerQueueManager();
    CapacitySchedulerQueueContext queueContext = new CapacitySchedulerQueueContext(context);

    CSQueueStore queues = new CSQueueStore();
    CSQueue rootQueue = CapacitySchedulerQueueManager.parseQueue(queueContext,
        csConf, null, "root", queues, queues, TestUtils.spyHook);
    queueManager.setRootQueue(rootQueue);
    rootQueue.updateClusterResource(clusterResource,
        new ResourceLimits(clusterResource));

    ResourceUsage queueCapacities = rootQueue.getQueueResourceUsage();
    when(context.getClusterResourceUsage())
        .thenReturn(queueCapacities);

    // Manipulate queue 'a'
    LeafQueue queue = TestLeafQueue.stubLeafQueue((LeafQueue)queues.get(A));
    queue.updateClusterResource(clusterResource, new ResourceLimits(
        clusterResource));
    
    String host_0 = "host_0";
    String rack_0 = "rack_0";
    FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, rack_0, 0, 16*GB);

    final String user_0 = "user_0";
    final String user_1 = "user_1";
    
    RecordFactory recordFactory = 
        RecordFactoryProvider.getRecordFactory(null);
    RMContext rmContext = TestUtils.getMockRMContext();
    RMContext spyRMContext = spy(rmContext);
    
    ConcurrentMap<ApplicationId, RMApp> spyApps = 
        spy(new ConcurrentHashMap<ApplicationId, RMApp>());
    RMApp rmApp = mock(RMApp.class);
    ResourceRequest amResourceRequest = mock(ResourceRequest.class);
    Resource amResource = Resources.createResource(0, 0);
    when(amResourceRequest.getCapability()).thenReturn(amResource);
    when(rmApp.getAMResourceRequests()).thenReturn(
        Collections.singletonList(amResourceRequest));
    doReturn(rmApp)
        .when(spyApps).get(ArgumentMatchers.<ApplicationId>any());
    when(spyRMContext.getRMApps()).thenReturn(spyApps);
    RMAppAttempt rmAppAttempt = mock(RMAppAttempt.class);
    when(rmApp.getRMAppAttempt(any()))
        .thenReturn(rmAppAttempt);
    when(rmApp.getCurrentAppAttempt()).thenReturn(rmAppAttempt);
    doReturn(rmApp)
        .when(spyApps).get(ArgumentMatchers.<ApplicationId>any());
    doReturn(true).when(spyApps)
        .containsKey(ArgumentMatchers.<ApplicationId>any());

    Priority priority_1 = TestUtils.createMockPriority(1);

    // Submit first application with some resource-requests from user_0, 
    // and check headroom
    final ApplicationAttemptId appAttemptId_0_0 = 
        TestUtils.getMockApplicationAttemptId(0, 0); 
    FiCaSchedulerApp app_0_0 = new FiCaSchedulerApp(
      appAttemptId_0_0, user_0, queue, 
            queue.getAbstractUsersManager(), spyRMContext);
    queue.submitApplicationAttempt(app_0_0, user_0);

    List<ResourceRequest> app_0_0_requests = new ArrayList<ResourceRequest>();
    app_0_0_requests.add(
        TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2,
            true, priority_1, recordFactory));
    app_0_0.updateResourceRequests(app_0_0_requests);

    // Schedule to compute 
    queue.assignContainers(clusterResource, node_0, new ResourceLimits(
        clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
    Resource expectedHeadroom = Resources.createResource(5*16*GB, 1);
    assertEquals(expectedHeadroom, app_0_0.getHeadroom());

    // Submit second application from user_0, check headroom
    final ApplicationAttemptId appAttemptId_0_1 = 
        TestUtils.getMockApplicationAttemptId(1, 0); 
    FiCaSchedulerApp app_0_1 = new FiCaSchedulerApp(
      appAttemptId_0_1, user_0, queue, 
            queue.getAbstractUsersManager(), spyRMContext);
    queue.submitApplicationAttempt(app_0_1, user_0);
    
    List<ResourceRequest> app_0_1_requests = new ArrayList<ResourceRequest>();
    app_0_1_requests.add(
        TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2,
            true, priority_1, recordFactory));
    app_0_1.updateResourceRequests(app_0_1_requests);

    // Schedule to compute 
    queue.assignContainers(clusterResource, node_0, new ResourceLimits(
        clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); // Schedule to compute
    assertEquals(expectedHeadroom, app_0_0.getHeadroom());
    assertEquals(expectedHeadroom, app_0_1.getHeadroom());// no change
    
    // Submit first application from user_1, check  for new headroom
    final ApplicationAttemptId appAttemptId_1_0 = 
        TestUtils.getMockApplicationAttemptId(2, 0); 
    FiCaSchedulerApp app_1_0 = new FiCaSchedulerApp(
      appAttemptId_1_0, user_1, queue, 
            queue.getAbstractUsersManager(), spyRMContext);
    queue.submitApplicationAttempt(app_1_0, user_1);

    List<ResourceRequest> app_1_0_requests = new ArrayList<ResourceRequest>();
    app_1_0_requests.add(
        TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2,
            true, priority_1, recordFactory));
    app_1_0.updateResourceRequests(app_1_0_requests);
    
    // Schedule to compute 
    queue.assignContainers(clusterResource, node_0, new ResourceLimits(
        clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); // Schedule to compute
    expectedHeadroom = Resources.createResource(10*16*GB / 2, 1); // changes
    assertEquals(expectedHeadroom, app_0_0.getHeadroom());
    assertEquals(expectedHeadroom, app_0_1.getHeadroom());
    assertEquals(expectedHeadroom, app_1_0.getHeadroom());

    // Now reduce cluster size and check for the smaller headroom
    clusterResource = Resources.createResource(90*16*GB);
    rootQueue.updateClusterResource(clusterResource,
        new ResourceLimits(clusterResource));

    // Any change is cluster resource needs to enforce user-limit recomputation.
    // In existing code, LeafQueue#updateClusterResource handled this. However
    // here that method was not used.
    queue.getUsersManager().userLimitNeedsRecompute();
    queue.assignContainers(clusterResource, node_0, new ResourceLimits(
        clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); // Schedule to compute
    expectedHeadroom = Resources.createResource(9*16*GB / 2, 1); // changes
    assertEquals(expectedHeadroom, app_0_0.getHeadroom());
    assertEquals(expectedHeadroom, app_0_1.getHeadroom());
    assertEquals(expectedHeadroom, app_1_0.getHeadroom());
  }

  private Configuration getConfigurationWithQueueLabels(Configuration config) {
    CapacitySchedulerConfiguration conf =
        new CapacitySchedulerConfiguration(config);
    // Define top-level
    conf.setQueues(ROOT_QUEUE_PATH,
        new String[]{"a", "b", "c", "d"});
    conf.setCapacityByLabel(ROOT_QUEUE_PATH, "x", 100);
    conf.setCapacityByLabel(ROOT_QUEUE_PATH, "y", 100);
    conf.setCapacityByLabel(ROOT_QUEUE_PATH, "z", 100);

    conf.setInt(CapacitySchedulerConfiguration.QUEUE_GLOBAL_MAX_APPLICATION,
        20);
    conf.setInt("yarn.scheduler.capacity.root.a.a1.maximum-applications", 1);
    conf.setFloat("yarn.scheduler.capacity.root.d.user-limit-factor", 0.1f);
    conf.setInt("yarn.scheduler.capacity.maximum-applications", 4);

    conf.setQueues(A_QUEUE_PATH, new String[]{"a1", "a2", "a3"});
    conf.setCapacity(A_QUEUE_PATH, 50);
    conf.setCapacity(B_QUEUE_PATH, 50);
    conf.setCapacity(C_QUEUE_PATH, 0);
    conf.setCapacity(D_QUEUE_PATH, 0);
    conf.setCapacity(AA1_QUEUE_PATH, 50);
    conf.setCapacity(AA2_QUEUE_PATH, 50);
    conf.setCapacity(AA3_QUEUE_PATH, 0);

    conf.setCapacityByLabel(A_QUEUE_PATH, "y", 25);
    conf.setCapacityByLabel(B_QUEUE_PATH, "y", 50);
    conf.setCapacityByLabel(C_QUEUE_PATH, "y", 25);
    conf.setCapacityByLabel(D_QUEUE_PATH, "y", 0);

    conf.setCapacityByLabel(A_QUEUE_PATH, "x", 50);
    conf.setCapacityByLabel(B_QUEUE_PATH, "x", 50);

    conf.setCapacityByLabel(A_QUEUE_PATH, "z", 50);
    conf.setCapacityByLabel(B_QUEUE_PATH, "z", 50);

    conf.setCapacityByLabel(AA1_QUEUE_PATH, "x", 100);
    conf.setCapacityByLabel(AA2_QUEUE_PATH, "x", 0);

    conf.setCapacityByLabel(AA1_QUEUE_PATH, "y", 25);
    conf.setCapacityByLabel(AA2_QUEUE_PATH, "y", 75);

    conf.setCapacityByLabel(AA2_QUEUE_PATH, "z", 75);
    conf.setCapacityByLabel(AA3_QUEUE_PATH, "z", 25);
    return conf;
  }

  private Set<String> toSet(String... elements) {
    Set<String> set = Sets.newHashSet(elements);
    return set;
  }

  @Test
  @Timeout(value = 120)
  public void testApplicationLimitSubmit() throws Exception {
    YarnConfiguration conf = new YarnConfiguration();
    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
        ResourceScheduler.class);
    RMNodeLabelsManager mgr = new NullRMNodeLabelsManager();
    mgr.init(conf);
    // set node -> label
    mgr.addToCluserNodeLabelsWithDefaultExclusivity(
        ImmutableSet.of("x", "y", "z"));

    // set mapping:
    // h1 -> x
    // h2 -> y
    mgr.addLabelsToNode(
        ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x")));
    mgr.addLabelsToNode(
        ImmutableMap.of(NodeId.newInstance("h2", 0), toSet("y")));

    // inject node label manager
    MockRM rm = new MockRM(getConfigurationWithQueueLabels(conf)) {
      @Override
      public RMNodeLabelsManager createNodeLabelManager() {
        return mgr;
      }
    };
    rm.getRMContext().setNodeLabelManager(mgr);
    rm.start();
    MockNM nm1 = rm.registerNode("h1:1234", 4096);
    MockNM nm2 = rm.registerNode("h2:1234", 4096);
    MockNM nm3 = rm.registerNode("h3:1234", 4096);

    // Submit application to queue c where the default partition capacity is
    // zero
    RMApp app1 = MockRMAppSubmitter.submit(rm,
        MockRMAppSubmissionData.Builder.createWithMemory(GB, rm)
            .withAppName("app")
            .withUser("user")
            .withAcls(null)
            .withQueue("c")
            .withWaitForAppAcceptedState(false)
            .build());
    rm.drainEvents();
    rm.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
    assertEquals(RMAppState.ACCEPTED, app1.getState());
    rm.killApp(app1.getApplicationId());

    RMApp app2 = MockRMAppSubmitter.submit(rm,
        MockRMAppSubmissionData.Builder.createWithMemory(GB, rm)
            .withAppName("app")
            .withUser("user")
            .withAcls(null)
            .withQueue("a1")
            .withWaitForAppAcceptedState(false)
            .build());
    rm.drainEvents();
    rm.waitForState(app2.getApplicationId(), RMAppState.ACCEPTED);
    assertEquals(RMAppState.ACCEPTED, app2.getState());

    // Check second application is rejected and based on queue level max
    // application app is rejected
    RMApp app3 = MockRMAppSubmitter.submit(rm,
        MockRMAppSubmissionData.Builder.createWithMemory(GB, rm)
            .withAppName("app")
            .withUser("user")
            .withAcls(null)
            .withQueue("a1")
            .withWaitForAppAcceptedState(false)
            .build());
    rm.drainEvents();
    rm.waitForState(app3.getApplicationId(), RMAppState.FAILED);
    assertEquals(RMAppState.FAILED, app3.getState());
    assertEquals(
        "org.apache.hadoop.security.AccessControlException: "
            + "Queue root.a.a1 already has 1 applications, cannot accept "
            + "submission of application: " + app3.getApplicationId(),
        app3.getDiagnostics().toString());

    // based on per user max app settings, app should be rejected instantly
    RMApp app13 = MockRMAppSubmitter.submit(rm,
        MockRMAppSubmissionData.Builder.createWithMemory(GB, rm)
            .withAppName("app")
            .withUser("user")
            .withAcls(null)
            .withQueue("d")
            .withWaitForAppAcceptedState(false)
            .build());
    rm.drainEvents();
    rm.waitForState(app13.getApplicationId(), RMAppState.FAILED);
    assertEquals(RMAppState.FAILED, app13.getState());
    assertEquals(
        "org.apache.hadoop.security.AccessControlException: Queue"
            + " root.d already has 0 applications from user user cannot"
            + " accept submission of application: " + app13.getApplicationId(),
        app13.getDiagnostics().toString());

    RMApp app11 = MockRMAppSubmitter.submit(rm,
        MockRMAppSubmissionData.Builder.createWithMemory(GB, rm)
            .withAppName("app")
            .withUser("user2")
            .withAcls(null)
            .withQueue("a2")
            .withWaitForAppAcceptedState(false)
            .build());
    rm.drainEvents();
    rm.waitForState(app11.getApplicationId(), RMAppState.ACCEPTED);
    assertEquals(RMAppState.ACCEPTED, app11.getState());
    RMApp app12 = MockRMAppSubmitter.submit(rm,
        MockRMAppSubmissionData.Builder.createWithMemory(GB, rm)
            .withAppName("app")
            .withUser("user2")
            .withAcls(null)
            .withQueue("a2")
            .withWaitForAppAcceptedState(false)
            .build());
    rm.drainEvents();
    rm.waitForState(app12.getApplicationId(), RMAppState.ACCEPTED);
    assertEquals(RMAppState.ACCEPTED, app12.getState());
    // based on system max limit application is rejected
    RMApp app14 = MockRMAppSubmitter.submit(rm,
        MockRMAppSubmissionData.Builder.createWithMemory(GB, rm)
            .withAppName("app")
            .withUser("user2")
            .withAcls(null)
            .withQueue("a2")
            .withWaitForAppAcceptedState(false)
            .build());
    rm.drainEvents();
    rm.waitForState(app14.getApplicationId(), RMAppState.ACCEPTED);
    RMApp app15 = MockRMAppSubmitter.submit(rm,
        MockRMAppSubmissionData.Builder.createWithMemory(GB, rm)
            .withAppName("app")
            .withUser("user2")
            .withAcls(null)
            .withQueue("a2")
            .withWaitForAppAcceptedState(false)
            .build());
    rm.drainEvents();
    rm.waitForState(app15.getApplicationId(), RMAppState.FAILED);
    assertEquals(RMAppState.FAILED, app15.getState());
    assertEquals(
        "Maximum system application limit reached,cannot"
            + " accept submission of application: " + app15.getApplicationId(),
        app15.getDiagnostics().toString());

    rm.killApp(app2.getApplicationId());
    rm.killApp(app13.getApplicationId());
    rm.killApp(app14.getApplicationId());
    rm.stop();
  }

  // Test that max AM limit is correct in the case where one resource is
  // depleted but the other is not. Use DominantResourceCalculator.
  @Test
  public void testAMResourceLimitWithDRCAndFullParent() throws Exception {
    CapacitySchedulerConfiguration csConf =
        new CapacitySchedulerConfiguration();
    setupQueueConfiguration(csConf);
    csConf.setFloat(CapacitySchedulerConfiguration.
        MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT, 0.3f);

    // Total cluster resources.
    Resource clusterResource = Resources.createResource(100 * GB, 1000);

    CapacitySchedulerQueueContext queueContext = new CapacitySchedulerQueueContext(
        createCSContext(csConf, new DominantResourceCalculator(), Resources.createResource(GB),
            Resources.createResource(16*GB), clusterResource));

    // Set up queue hierarchy.
    CSQueueStore queues = new CSQueueStore();
    CSQueue rootQueue = CapacitySchedulerQueueManager.parseQueue(queueContext,
        csConf, null, "root", queues, queues, TestUtils.spyHook);
    rootQueue.updateClusterResource(clusterResource,
        new ResourceLimits(clusterResource));

    // Queue "queueA" has a 30% capacity guarantee. The max pct of "queueA" that
    // can be used for AMs is 30%. So, 30% of <memory: 100GB, vCores: 1000> is
    // <memory: 30GB, vCores: 30>, which is the guaranteed capacity of "queueA".
    // 30% of that (rounded to the nearest 1GB) is <memory: 9GB, vCores: 9>. The
    // max AM queue limit should never be less than that for any resource.
    LeafQueue queueA = TestLeafQueue.stubLeafQueue((LeafQueue)queues.get(A));
    queueA.setCapacity(30.0f);
    queueA.setUserLimitFactor(10f);
    queueA.setMaxAMResourcePerQueuePercent(0.3f);
    // Make sure "queueA" knows the total cluster resource.
    queueA.updateClusterResource(clusterResource, new ResourceLimits(
        clusterResource));
    // Get "queueA"'s guaranteed capacity (<memory: 30GB, vCores: 300>).
    Resource capacity =
        Resources.multiply(clusterResource, (queueA.getCapacity()/100));
    // Limit is the actual resources available to "queueA". The following
    // simulates the case where a second queue ("queueB") has "borrowed" almost
    // all of "queueA"'s resources because "queueB" has a max capacity of 100%
    // and has gone well over its guaranteed capacity. In this case, "queueB"
    // has used 99GB of memory and used 505 vCores. This is to make vCores
    // dominant in the calculations for the available resources.
    when(queueA.getEffectiveCapacity(any())).thenReturn(capacity);
    Resource limit = Resource.newInstance(1024, 495);
    ResourceLimits currentResourceLimits =
        new ResourceLimits(limit, Resources.none());
    queueA.updateClusterResource(clusterResource, currentResourceLimits);
    Resource expectedAmLimit = Resources.multiply(capacity,
        queueA.getMaxAMResourcePerQueuePercent());
    Resource amLimit = queueA.calculateAndGetAMResourceLimit();
    assertTrue(amLimit.getMemorySize() >= expectedAmLimit.getMemorySize(),
        "AM memory limit is less than expected: Expected: " +
        expectedAmLimit.getMemorySize() + "; Computed: "
        + amLimit.getMemorySize());
    assertTrue(amLimit.getVirtualCores() >= expectedAmLimit.getVirtualCores(),
        "AM vCore limit is less than expected: Expected: " +
        expectedAmLimit.getVirtualCores() + "; Computed: "
        + amLimit.getVirtualCores());
  }

  private CapacitySchedulerContext createCSContext(CapacitySchedulerConfiguration csConf,
      ResourceCalculator rc, Resource minResource, Resource maxResource, Resource clusterResource) {
    YarnConfiguration conf = new YarnConfiguration();

    CapacitySchedulerContext context = mock(CapacitySchedulerContext.class);
    when(context.getConfiguration()).thenReturn(csConf);
    when(context.getConf()).thenReturn(conf);
    when(context.getMinimumResourceCapability()).
        thenReturn(minResource);
    when(context.getMaximumResourceCapability()).
        thenReturn(maxResource);
    when(context.getResourceCalculator()).
        thenReturn(rc);
    CapacitySchedulerQueueManager queueManager = new CapacitySchedulerQueueManager(conf,
        rmContext.getNodeLabelManager(), null);
    when(context.getPreemptionManager()).thenReturn(new PreemptionManager());
    when(context.getCapacitySchedulerQueueManager()).thenReturn(queueManager);

    when(context.getRMContext()).thenReturn(rmContext);
    when(context.getPreemptionManager()).thenReturn(new PreemptionManager());
    setQueueHandler(context);

    // Total cluster resources.
    when(context.getClusterResource()).thenReturn(clusterResource);

    return context;
  }

}