TestLeafQueue.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;

import static org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager
    .NO_LABEL;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
    .capacity.CSQueueUtils.EPSILON;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
    .capacity.CapacitySchedulerConfiguration.DOT;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
    .capacity.CapacitySchedulerConfiguration.ROOT;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.ConcurrentModificationException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableMap;
import org.apache.hadoop.util.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.security.AccessType;
import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.RMAppManager;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueResourceQuotas;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueStateManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderingPolicyWithExclusivePartitions;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.UsersManager.User;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FairOrderingPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderingPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils.toSchedulerKey;

public class TestLeafQueue {
  private final RecordFactory recordFactory = 
      RecordFactoryProvider.getRecordFactory(null);
  private static final Logger LOG =
      LoggerFactory.getLogger(TestLeafQueue.class);

  RMContext rmContext;
  RMContext spyRMContext;
  ResourceRequest amResourceRequest;
  CapacityScheduler cs;
  CapacitySchedulerConfiguration csConf;
  CapacitySchedulerContext csContext;
  CapacitySchedulerQueueContext queueContext;
  private RMApp rmApp;
  
  CSQueue root;
  private CSQueueStore queues;
  
  final static int GB = 1024;
  final static String DEFAULT_RACK = "/default";

  private final static String LABEL = "test";

  private final ResourceCalculator resourceCalculator =
      new DefaultResourceCalculator();
  
  private final ResourceCalculator dominantResourceCalculator =
      new DominantResourceCalculator();

  @BeforeEach
  public void setUp() throws Exception {
    setUpInternal(resourceCalculator, false);
  }

  private void setUpWithDominantResourceCalculator() throws Exception {
    setUpInternal(dominantResourceCalculator, false);
  }

  private void setUpWithNodeLabels() throws Exception {
    setUpInternal(resourceCalculator, true);
  }

  private void setUpInternal(ResourceCalculator rC, boolean withNodeLabels)
      throws Exception {
    CapacityScheduler spyCs = new CapacityScheduler();
    queues = new CSQueueStore();
    cs = spy(spyCs);

    //All stub calls on the spy object of the 'cs' field should happen
    //before cs.start() is invoked. See YARN-10672 for more details.
    when(cs.getNumClusterNodes()).thenReturn(3);

    rmContext = TestUtils.getMockRMContext();
    spyRMContext = spy(rmContext);

    ConcurrentMap<ApplicationId, RMApp> spyApps = 
        spy(new ConcurrentHashMap<ApplicationId, RMApp>());
    rmApp = mock(RMApp.class);
    when(rmApp.getRMAppAttempt(any())).thenReturn(null);
    amResourceRequest = mock(ResourceRequest.class);
    when(amResourceRequest.getCapability()).thenReturn(
      Resources.createResource(0, 0));
    when(rmApp.getAMResourceRequests()).thenReturn(
        Collections.singletonList(amResourceRequest));
    Mockito.doReturn(rmApp)
        .when(spyApps).get(ArgumentMatchers.<ApplicationId>any());
    when(spyRMContext.getRMApps()).thenReturn(spyApps);
    
    csConf = 
        new CapacitySchedulerConfiguration();
    csConf.setBoolean(CapacitySchedulerConfiguration.ENABLE_USER_METRICS, true);
    csConf.setBoolean(CapacitySchedulerConfiguration.RESERVE_CONT_LOOK_ALL_NODES,
        false);
    csConf.setResourceComparator(rC.getClass());
    final String newRoot = "root" + System.currentTimeMillis();
    setupQueueConfiguration(csConf, newRoot, withNodeLabels);
    YarnConfiguration conf = new YarnConfiguration();
    cs.setConf(conf);
    when(spyRMContext.getYarnConfiguration()).thenReturn(conf);

    csContext = mock(CapacitySchedulerContext.class);
    when(csContext.getConfiguration()).thenReturn(csConf);
    when(csContext.getConf()).thenReturn(conf);
    when(csContext.getMinimumResourceCapability()).
        thenReturn(Resources.createResource(GB, 1));
    when(csContext.getMaximumResourceCapability()).
        thenReturn(Resources.createResource(16*GB, 32));
    when(csContext.getClusterResource()).
        thenReturn(Resources.createResource(100 * 16 * GB, 100 * 32));
    when(csContext.getResourceCalculator()).
        thenReturn(resourceCalculator);
    when(csContext.getPreemptionManager()).thenReturn(new PreemptionManager());
    when(csContext.getResourceCalculator()).thenReturn(rC);
    when(csContext.getRMContext()).thenReturn(rmContext);
    RMContainerTokenSecretManager containerTokenSecretManager =
        new RMContainerTokenSecretManager(conf);
    containerTokenSecretManager.rollMasterKey();
    when(csContext.getContainerTokenSecretManager()).thenReturn(
        containerTokenSecretManager);
    CapacitySchedulerQueueManager queueManager =
        new CapacitySchedulerQueueManager(csConf, rmContext.getNodeLabelManager(), null);
    when(csContext.getCapacitySchedulerQueueManager()).thenReturn(queueManager);

    queueManager.reinitConfiguredNodeLabels(csConf);

    queueContext = new CapacitySchedulerQueueContext(csContext);

    root = 
        CapacitySchedulerQueueManager.parseQueue(queueContext, csConf, null,
            ROOT.getFullPath(),
            queues, queues, 
            TestUtils.spyHook);
    queueManager.setRootQueue(root);
    root.updateClusterResource(Resources.createResource(100 * 16 * GB, 100 * 32),
        new ResourceLimits(Resources.createResource(100 * 16 * GB, 100 * 32)));

    ResourceUsage queueResUsage = root.getQueueResourceUsage();
    when(csContext.getClusterResourceUsage())
        .thenReturn(queueResUsage);

    cs.setRMContext(spyRMContext);
    cs.setResourceCalculator(rC);
    cs.init(csConf);

    when(spyRMContext.getScheduler()).thenReturn(cs);
    when(spyRMContext.getYarnConfiguration())
        .thenReturn(new YarnConfiguration());
    cs.start();
  }


  private static final String A = "a";
  private static final String B = "b";
  private static final String C = "c";
  private static final String C1 = "c1";
  private static final String D = "d";
  private static final String E = "e";
  private static final QueuePath ROOT = new QueuePath(CapacitySchedulerConfiguration.ROOT);
  private static final QueuePath A_QUEUE_PATH = ROOT.createNewLeaf(A);
  private void setupQueueConfiguration(
      CapacitySchedulerConfiguration conf, 
      final String newRootName, boolean withNodeLabels) {
    
    // Define top-level queues
    conf.setQueues(ROOT, new String[] {newRootName});
    conf.setMaximumCapacity(ROOT, 100);
    conf.setAcl(ROOT,
      QueueACL.SUBMIT_APPLICATIONS, " ");
    if (withNodeLabels) {
      conf.setCapacityByLabel(ROOT, LABEL, 100);
      conf.setMaximumCapacityByLabel(ROOT, LABEL, 100);
    }
    
    final String newRootPath = CapacitySchedulerConfiguration.ROOT + "." + newRootName;
    final QueuePath newRoot = new QueuePath(newRootPath);
    conf.setQueues(newRoot, new String[] {A, B, C, D, E});
    conf.setCapacity(newRoot, 100);
    conf.setMaximumCapacity(newRoot, 100);
    conf.setAcl(newRoot, QueueACL.SUBMIT_APPLICATIONS, " ");
    if (withNodeLabels) {
      conf.setAccessibleNodeLabels(newRoot, Collections.singleton(LABEL));
      conf.setCapacityByLabel(newRoot, LABEL, 100);
      conf.setMaximumCapacityByLabel(newRoot, LABEL, 100);
    }

    final QueuePath a = new QueuePath(newRootPath, A);
    conf.setCapacity(a, 8.5f);
    conf.setMaximumCapacity(a, 20);
    conf.setAcl(a, QueueACL.SUBMIT_APPLICATIONS, "*");
    if (withNodeLabels) {
      conf.setAccessibleNodeLabels(a, Collections.singleton(LABEL));
      conf.setCapacityByLabel(a, LABEL, 100);
      conf.setMaximumCapacityByLabel(a, LABEL, 100);
    }

    final QueuePath b = new QueuePath(newRootPath, B);
    conf.setCapacity(b, 80);
    conf.setMaximumCapacity(b, 99);
    conf.setAcl(b, QueueACL.SUBMIT_APPLICATIONS, "*");

    final String cPath = newRootPath + "." + C;
    final QueuePath c = new QueuePath(cPath);
    conf.setCapacity(c, 1.5f);
    conf.setMaximumCapacity(c, 10);
    conf.setAcl(c, QueueACL.SUBMIT_APPLICATIONS, " ");
    
    conf.setQueues(c, new String[] {C1});

    final QueuePath c1 = new QueuePath(cPath, C1);
    conf.setCapacity(c1, 100);

    final QueuePath d = new QueuePath(newRootPath, D);
    conf.setCapacity(d, 9);
    conf.setMaximumCapacity(d, 11);
    conf.setAcl(d, QueueACL.SUBMIT_APPLICATIONS, "user_d");

    final QueuePath e = new QueuePath(newRootPath, E);
    conf.setCapacity(e, 1);
    conf.setMaximumCapacity(e, 1);
    conf.setAcl(e, QueueACL.SUBMIT_APPLICATIONS, "user_e");

  }

  static LeafQueue stubLeafQueue(LeafQueue queue) {
    // Mock some methods for ease in these unit tests
    
    // 1. Stub out LeafQueue.parent.completedContainer
    CSQueue parent = queue.getParent();
    doNothing().when(parent).completedContainer(
        any(Resource.class), any(FiCaSchedulerApp.class), any(FiCaSchedulerNode.class), 
        any(RMContainer.class), any(ContainerStatus.class), 
        any(RMContainerEventType.class), any(CSQueue.class), anyBoolean());

    // Stub out parent queue's accept and apply.
    doReturn(true).when(parent).accept(any(Resource.class),
        any(ResourceCommitRequest.class));
    doNothing().when(parent).apply(any(Resource.class),
        any(ResourceCommitRequest.class));
    
    return queue;
  }
  
  @Test
  public void testInitializeQueue() throws Exception {
    final float epsilon = 1e-5f;
    //can add more sturdy test with 3-layer queues 
    //once MAPREDUCE:3410 is resolved
    LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A));
    assertEquals(0.085, a.getCapacity(), epsilon);
    assertEquals(0.085, a.getAbsoluteCapacity(), epsilon);
    assertEquals(0.2, a.getMaximumCapacity(), epsilon);
    assertEquals(0.2, a.getAbsoluteMaximumCapacity(), epsilon);
    
    LeafQueue b = stubLeafQueue((LeafQueue)queues.get(B));
    assertEquals(0.80, b.getCapacity(), epsilon);
    assertEquals(0.80, b.getAbsoluteCapacity(), epsilon);
    assertEquals(0.99, b.getMaximumCapacity(), epsilon);
    assertEquals(0.99, b.getAbsoluteMaximumCapacity(), epsilon);
    
    ParentQueue c = (ParentQueue)queues.get(C);
    assertEquals(0.015, c.getCapacity(), epsilon);
    assertEquals(0.015, c.getAbsoluteCapacity(), epsilon);
    assertEquals(0.1, c.getMaximumCapacity(), epsilon);
    assertEquals(0.1, c.getAbsoluteMaximumCapacity(), epsilon);

    // Verify the value for getAMResourceLimit for queues with < .1 maxcap
    Resource clusterResource = Resource.newInstance(50 * GB, 50);

    root.updateClusterResource(clusterResource,
        new ResourceLimits(clusterResource));
    assertEquals(Resource.newInstance(1 * GB, 1),
        a.calculateAndGetAMResourceLimit());

    assertEquals(Resource.newInstance(5 * GB, 1),
        b.calculateAndGetAMResourceLimit());
  }
 
  @Test
  public void testSingleQueueOneUserMetrics() throws Exception {

    // Manipulate queue 'a'
    LeafQueue a = stubLeafQueue((LeafQueue)queues.get(B));

    // Users
    final String user_0 = "user_0";

    // Submit applications
    final ApplicationAttemptId appAttemptId_0 = 
        TestUtils.getMockApplicationAttemptId(0, 0); 
    FiCaSchedulerApp app_0 = 
        new FiCaSchedulerApp(appAttemptId_0, user_0, a, 
            mock(ActiveUsersManager.class), spyRMContext);
    a.submitApplicationAttempt(app_0, user_0);

    final ApplicationAttemptId appAttemptId_1 = 
        TestUtils.getMockApplicationAttemptId(1, 0); 
    FiCaSchedulerApp app_1 = 
        new FiCaSchedulerApp(appAttemptId_1, user_0, a, 
            mock(ActiveUsersManager.class), spyRMContext);
    a.submitApplicationAttempt(app_1, user_0);  // same user

    
    // Setup some nodes
    String host_0 = "127.0.0.1";
    FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0,
        8*GB);

    Map<ApplicationAttemptId, FiCaSchedulerApp> apps = ImmutableMap.of(
        app_0.getApplicationAttemptId(), app_0, app_1.getApplicationAttemptId(),
        app_1);
    Map<NodeId, FiCaSchedulerNode> nodes = ImmutableMap.of(node_0.getNodeID(),
        node_0);

    final int numNodes = 1;
    Resource clusterResource = 
        Resources.createResource(numNodes * (8*GB), numNodes * 16);
    when(csContext.getNumClusterNodes()).thenReturn(numNodes);
    root.updateClusterResource(clusterResource,
        new ResourceLimits(clusterResource));

    // Setup resource-requests
    Priority priority = TestUtils.createMockPriority(1);
    app_0.updateResourceRequests(Collections.singletonList(
            TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 3, true,
                priority, recordFactory)));

    // Start testing...
    
    // Only 1 container
    applyCSAssignment(clusterResource,
        a.assignContainers(clusterResource, node_0,
        new ResourceLimits(clusterResource),
        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
    assertEquals(
        (int)(node_0.getTotalResource().getMemorySize() * a.getCapacity()) - (1*GB),
        a.getMetrics().getAvailableMB());
  }

  @Test
  public void testUserQueueAcl() throws Exception {

    // Manipulate queue 'a'
    LeafQueue d = stubLeafQueue((LeafQueue) queues.get(D));

    // Users
    final String user_d = "user_d";

    // Submit applications
    final ApplicationAttemptId appAttemptId_0 = TestUtils
        .getMockApplicationAttemptId(0, 1);
    FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_d, d, null,
        spyRMContext);
    d.submitApplicationAttempt(app_0, user_d);

    // Attempt the same application again
    final ApplicationAttemptId appAttemptId_1 = TestUtils
        .getMockApplicationAttemptId(0, 2);
    FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_d, d, null,
        spyRMContext);
    d.submitApplicationAttempt(app_1, user_d); // same user
  }

  @Test
  public void testPolicyConfiguration() throws Exception {
    
    CapacitySchedulerConfiguration testConf = 
        new CapacitySchedulerConfiguration();
    
    String tproot = ROOT + "." +
      "testPolicyRoot" + System.currentTimeMillis();

    OrderingPolicy<FiCaSchedulerApp> comPol =    
        testConf.<FiCaSchedulerApp>getAppOrderingPolicy(new QueuePath(tproot));
    
    
  }

  @Test
  public void testAppAttemptMetrics() throws Exception {
    CSMaxRunningAppsEnforcer enforcer = mock(CSMaxRunningAppsEnforcer.class);
    cs.setMaxRunningAppsEnforcer(enforcer);
    ApplicationSubmissionContext applicationSubmissionContext =
        mock(ApplicationSubmissionContext.class);
    when(applicationSubmissionContext.getUnmanagedAM()).thenReturn(false);
    when(rmApp.getApplicationSubmissionContext())
        .thenReturn(applicationSubmissionContext);
    when(rmApp.getCurrentAppAttempt()).thenReturn(mock(RMAppAttempt.class));
    // Manipulate queue 'a'
    LeafQueue a = stubLeafQueue((LeafQueue) queues.get(B));

    // Users
    final String user_0 = "user_0";

    // Submit applications
    final ApplicationAttemptId appAttemptId_0 = TestUtils
        .getMockApplicationAttemptId(0, 1);

    AppAddedSchedulerEvent addAppEvent =
        new AppAddedSchedulerEvent(appAttemptId_0.getApplicationId(),
          a.getQueuePath(), user_0);
    cs.handle(addAppEvent);
    AppAttemptAddedSchedulerEvent addAttemptEvent = 
        new AppAttemptAddedSchedulerEvent(appAttemptId_0, false);
    cs.handle(addAttemptEvent);

    AppAttemptRemovedSchedulerEvent event = new AppAttemptRemovedSchedulerEvent(
        appAttemptId_0, RMAppAttemptState.FAILED, false);
    cs.handle(event);
    
    assertEquals(0, a.getMetrics().getAppsPending());
    assertEquals(0, a.getMetrics().getAppsFailed());

    // Attempt the same application again
    final ApplicationAttemptId appAttemptId_1 = TestUtils
        .getMockApplicationAttemptId(0, 2);
    FiCaSchedulerApp app1 = new FiCaSchedulerApp(appAttemptId_1, user_0, a,
        null, spyRMContext);
    app1.getAppSchedulingInfo().setUnmanagedAM(false);
    app1.setAMResource(Resource.newInstance(100, 1));
    a.submitApplicationAttempt(app1, user_0); // same user

    assertEquals(1, a.getMetrics().getAppsSubmitted());
    assertEquals(1, a.getMetrics().getAppsPending());
    assertEquals(1, a.getUser(user_0).getActiveApplications());
    assertEquals(app1.getAMResource().getMemorySize(), a.getMetrics()
        .getUsedAMResourceMB());
    assertEquals(app1.getAMResource().getVirtualCores(), a.getMetrics()
        .getUsedAMResourceVCores());
    
    event = new AppAttemptRemovedSchedulerEvent(appAttemptId_0,
        RMAppAttemptState.FINISHED, false);
    cs.handle(event);
    AppRemovedSchedulerEvent rEvent = new AppRemovedSchedulerEvent(
        appAttemptId_0.getApplicationId(), RMAppState.FINISHED);
    cs.handle(rEvent);
    
    assertEquals(1, a.getMetrics().getAppsSubmitted());
    assertEquals(0, a.getMetrics().getAppsPending());
    assertEquals(0, a.getMetrics().getAppsFailed());
    assertEquals(1, a.getMetrics().getAppsCompleted());

    QueueMetrics userMetrics = a.getMetrics().getUserMetrics(user_0);
    assertEquals(1, userMetrics.getAppsSubmitted());
  }

  @Test
  public void testUnmanagedAppAttemptMetrics() throws Exception {
    CSMaxRunningAppsEnforcer enforcer = mock(CSMaxRunningAppsEnforcer.class);
    cs.setMaxRunningAppsEnforcer(enforcer);
    // Manipulate queue 'a'
    LeafQueue a = stubLeafQueue((LeafQueue) queues.get(B));

    // Users
    final String user0 = "user_0";

    // Submit applications
    final ApplicationAttemptId appAttemptId0 = TestUtils
        .getMockApplicationAttemptId(0, 1);

    ApplicationSubmissionContext applicationSubmissionContext =
        ApplicationSubmissionContext.newInstance(
            appAttemptId0.getApplicationId(), "test", a.getQueuePath(),
            Priority.newInstance(0), null, true, true,
            2, null, "test");

    AppAddedSchedulerEvent addAppEvent =
        new AppAddedSchedulerEvent(user0, applicationSubmissionContext, false,
            null);
    cs.handle(addAppEvent);
    AppAttemptAddedSchedulerEvent addAttemptEvent =
        new AppAttemptAddedSchedulerEvent(appAttemptId0, false);
    cs.handle(addAttemptEvent);

    AppAttemptRemovedSchedulerEvent event = new AppAttemptRemovedSchedulerEvent(
        appAttemptId0, RMAppAttemptState.FAILED, false);
    cs.handle(event);

    assertEquals(0, a.getMetrics().getUnmanagedAppsPending());
    assertEquals(0, a.getMetrics().getUnmanagedAppsFailed());

    // Attempt the same application again
    final ApplicationAttemptId appAttemptId1 = TestUtils
        .getMockApplicationAttemptId(0, 2);
    FiCaSchedulerApp app1 = new FiCaSchedulerApp(appAttemptId1, user0, a,
        null, spyRMContext);

    app1.setAMResource(Resource.newInstance(100, 1));
    a.submitApplicationAttempt(app1, user0); // same user

    assertEquals(1, a.getMetrics().getUnmanagedAppsSubmitted());
    assertEquals(1, a.getMetrics().getUnmanagedAppsPending());
    assertEquals(1, a.getUser(user0).getActiveApplications());
    assertEquals(app1.getAMResource().getMemorySize(), a.getMetrics()
        .getUsedAMResourceMB());
    assertEquals(app1.getAMResource().getVirtualCores(), a.getMetrics()
        .getUsedAMResourceVCores());

    event = new AppAttemptRemovedSchedulerEvent(appAttemptId0,
        RMAppAttemptState.FINISHED, false);
    cs.handle(event);
    AppRemovedSchedulerEvent rEvent = new AppRemovedSchedulerEvent(
        appAttemptId0.getApplicationId(), RMAppState.FINISHED);
    cs.handle(rEvent);

    assertEquals(1, a.getMetrics().getUnmanagedAppsSubmitted());
    assertEquals(0, a.getMetrics().getUnmanagedAppsPending());
    assertEquals(0, a.getMetrics().getUnmanagedAppsFailed());
    assertEquals(1, a.getMetrics().getUnmanagedAppsCompleted());

    QueueMetrics userMetrics = a.getMetrics().getUserMetrics(user0);
    assertEquals(1, userMetrics.getUnmanagedAppsSubmitted());
  }

  @Test
  public void testFairConfiguration() throws Exception {

    CapacitySchedulerConfiguration testConf =
        new CapacitySchedulerConfiguration();

    String tprootPath = ROOT + "." +
      "testPolicyRoot" + System.currentTimeMillis();
    QueuePath tproot = new QueuePath(tprootPath);

    OrderingPolicy<FiCaSchedulerApp> schedOrder =
      testConf.<FiCaSchedulerApp>getAppOrderingPolicy(tproot);

    //override default to fair
    String policyType = CapacitySchedulerConfiguration.PREFIX + tproot +
      "." + CapacitySchedulerConfiguration.ORDERING_POLICY;

    testConf.set(policyType,
      CapacitySchedulerConfiguration.FAIR_APP_ORDERING_POLICY);
    schedOrder =
      testConf.<FiCaSchedulerApp>getAppOrderingPolicy(tproot);
    FairOrderingPolicy fop = (FairOrderingPolicy<FiCaSchedulerApp>) schedOrder;
    assertFalse(fop.getSizeBasedWeight());

    //Now with sizeBasedWeight
    String sbwConfig = CapacitySchedulerConfiguration.PREFIX + tproot +
      "." + CapacitySchedulerConfiguration.ORDERING_POLICY + "." +
      FairOrderingPolicy.ENABLE_SIZE_BASED_WEIGHT;
    testConf.set(sbwConfig, "true");
    schedOrder =
      testConf.<FiCaSchedulerApp>getAppOrderingPolicy(tproot);
    fop = (FairOrderingPolicy<FiCaSchedulerApp>) schedOrder;
    assertTrue(fop.getSizeBasedWeight());

  }

  @Test
  public void testSingleQueueWithOneUser() throws Exception {

    // Manipulate queue 'a'
    LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A));
    //unset maxCapacity
    a.setMaxCapacity(1.0f);

    // Users
    final String user_0 = "user_0";

    // Active Users Manager
    AbstractUsersManager activeUserManager = a.getAbstractUsersManager();

    // Submit applications
    final ApplicationAttemptId appAttemptId_0 = 
        TestUtils.getMockApplicationAttemptId(0, 0); 
    FiCaSchedulerApp app_0 = 
        new FiCaSchedulerApp(appAttemptId_0, user_0, a, 
            activeUserManager, spyRMContext);
    a.submitApplicationAttempt(app_0, user_0);

    final ApplicationAttemptId appAttemptId_1 = 
        TestUtils.getMockApplicationAttemptId(1, 0); 
    FiCaSchedulerApp app_1 = 
        new FiCaSchedulerApp(appAttemptId_1, user_0, a, 
            activeUserManager, spyRMContext);
    a.submitApplicationAttempt(app_1, user_0);  // same user

    
    // Setup some nodes
    String host_0 = "127.0.0.1";
    FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0,
        8*GB);

    Map<ApplicationAttemptId, FiCaSchedulerApp> apps = ImmutableMap.of(
        app_0.getApplicationAttemptId(), app_0, app_1.getApplicationAttemptId(),
        app_1);
    Map<NodeId, FiCaSchedulerNode> nodes = ImmutableMap.of(node_0.getNodeID(),
        node_0);

    final int numNodes = 1;
    Resource clusterResource = 
        Resources.createResource(numNodes * (8*GB), numNodes * 16);
    when(csContext.getNumClusterNodes()).thenReturn(numNodes);
    root.updateClusterResource(clusterResource,
        new ResourceLimits(clusterResource));

    // Setup resource-requests
    Priority priority = TestUtils.createMockPriority(1);
    app_0.updateResourceRequests(Collections.singletonList(
            TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 3, true,
                priority, recordFactory)));

    app_1.updateResourceRequests(Collections.singletonList(
        TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, true,
            priority, recordFactory)));

    // Start testing...
    
    // Only 1 container
    applyCSAssignment(clusterResource,
        a.assignContainers(clusterResource, node_0,
        new ResourceLimits(clusterResource),
        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
    assertEquals(1*GB, a.getUsedResources().getMemorySize());
    assertEquals(1*GB, app_0.getCurrentConsumption().getMemorySize());
    assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize());
    assertEquals(0*GB, a.getMetrics().getReservedMB());
    assertEquals(1*GB, a.getMetrics().getAllocatedMB());
    assertEquals(0*GB, a.getMetrics().getAvailableMB());

    // Also 2nd -> minCapacity = 1024 since (.1 * 8G) < minAlloc, also
    // you can get one container more than user-limit
    applyCSAssignment(clusterResource,
        a.assignContainers(clusterResource, node_0,
        new ResourceLimits(clusterResource),
        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
    assertEquals(2*GB, a.getUsedResources().getMemorySize());
    assertEquals(2*GB, app_0.getCurrentConsumption().getMemorySize());
    assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize());
    assertEquals(0*GB, a.getMetrics().getReservedMB());
    assertEquals(2*GB, a.getMetrics().getAllocatedMB());
    
    // Can't allocate 3rd due to user-limit
    applyCSAssignment(clusterResource,
        a.assignContainers(clusterResource, node_0,
        new ResourceLimits(clusterResource),
        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
    assertEquals(2*GB, a.getUsedResources().getMemorySize());
    assertEquals(2*GB, app_0.getCurrentConsumption().getMemorySize());
    assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize());
    assertEquals(0*GB, a.getMetrics().getReservedMB());
    assertEquals(2*GB, a.getMetrics().getAllocatedMB());
    
    // Bump up user-limit-factor, now allocate should work
    a.setUserLimitFactor(10);
    applyCSAssignment(clusterResource,
        a.assignContainers(clusterResource, node_0,
        new ResourceLimits(clusterResource),
        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
    assertEquals(3*GB, a.getUsedResources().getMemorySize());
    assertEquals(3*GB, app_0.getCurrentConsumption().getMemorySize());
    assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize());
    assertEquals(0*GB, a.getMetrics().getReservedMB());
    assertEquals(3*GB, a.getMetrics().getAllocatedMB());

    // One more should work, for app_1, due to user-limit-factor
    applyCSAssignment(clusterResource,
        a.assignContainers(clusterResource, node_0,
        new ResourceLimits(clusterResource),
        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
    assertEquals(4*GB, a.getUsedResources().getMemorySize());
    assertEquals(3*GB, app_0.getCurrentConsumption().getMemorySize());
    assertEquals(1*GB, app_1.getCurrentConsumption().getMemorySize());
    assertEquals(0*GB, a.getMetrics().getReservedMB());
    assertEquals(4*GB, a.getMetrics().getAllocatedMB());

    // Test max-capacity
    // Now - no more allocs since we are at max-cap
    a.setMaxCapacity(0.5f);
    root.updateClusterResource(clusterResource,
        new ResourceLimits(clusterResource));
    applyCSAssignment(clusterResource,
        a.assignContainers(clusterResource, node_0,
        new ResourceLimits(clusterResource),
        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
    assertEquals(4*GB, a.getUsedResources().getMemorySize());
    assertEquals(3*GB, app_0.getCurrentConsumption().getMemorySize());
    assertEquals(1*GB, app_1.getCurrentConsumption().getMemorySize());
    assertEquals(0*GB, a.getMetrics().getReservedMB());
    assertEquals(4*GB, a.getMetrics().getAllocatedMB());
    
    // Release each container from app_0
    for (RMContainer rmContainer : app_0.getLiveContainers()) {
      a.completedContainer(clusterResource, app_0, node_0, rmContainer,
          ContainerStatus.newInstance(rmContainer.getContainerId(),
              ContainerState.COMPLETE, "",
              ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
          RMContainerEventType.KILL, null, true);
    }
    assertEquals(1*GB, a.getUsedResources().getMemorySize());
    assertEquals(0*GB, app_0.getCurrentConsumption().getMemorySize());
    assertEquals(1*GB, app_1.getCurrentConsumption().getMemorySize());
    assertEquals(0*GB, a.getMetrics().getReservedMB());
    assertEquals(1*GB, a.getMetrics().getAllocatedMB());
    
    // Release each container from app_1
    for (RMContainer rmContainer : app_1.getLiveContainers()) {
      a.completedContainer(clusterResource, app_1, node_0, rmContainer,
          ContainerStatus.newInstance(rmContainer.getContainerId(),
              ContainerState.COMPLETE, "",
              ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
          RMContainerEventType.KILL, null, true);
    }

    assertEquals(0*GB, a.getUsedResources().getMemorySize());
    assertEquals(0*GB, app_0.getCurrentConsumption().getMemorySize());
    assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize());
    assertEquals(0*GB, a.getMetrics().getReservedMB());
    assertEquals(0*GB, a.getMetrics().getAllocatedMB());
    assertEquals((int)(a.getCapacity() * node_0.getTotalResource().getMemorySize()),
        a.getMetrics().getAvailableMB());
  }

  @Test
  public void testHeadroomCheckWithDRF() throws Exception {
    CSAssignment assignment;
    setUpWithDominantResourceCalculator();
    // Mock the queue
    LeafQueue b = stubLeafQueue((LeafQueue) queues.get(B));
    // Users
    final String user0 = "user_0";

    // Submit applications
    final ApplicationAttemptId appAttemptId0 =
        TestUtils.getMockApplicationAttemptId(0, 0);
    FiCaSchedulerApp app0 =
        new FiCaSchedulerApp(appAttemptId0, user0, b,
            b.getAbstractUsersManager(), spyRMContext);
    b.submitApplicationAttempt(app0, user0);
    // Setup some nodes
    String host0 = "127.0.0.1";
    FiCaSchedulerNode node0 =
        TestUtils.getMockNode(host0, DEFAULT_RACK, 0, 100 * GB, 100);

    int numNodes = 1;
    Resource clusterResource =
        Resources.createResource(numNodes * (100 * GB), numNodes * 100);
    when(csContext.getNumClusterNodes()).thenReturn(numNodes);
    root.updateClusterResource(clusterResource,
        new ResourceLimits(clusterResource));

    // Increase the user-limit-factor to make user_0 fully use max resources of the queue.
    // The max resources can be used are 0.99 * [100 * GB, 100]
    b.setUserLimitFactor(10.0f);

    Map<ApplicationAttemptId, FiCaSchedulerApp> apps =
        ImmutableMap.of(app0.getApplicationAttemptId(), app0);
    Map<NodeId, FiCaSchedulerNode> nodes = ImmutableMap.of(node0.getNodeID(), node0);

    Priority priority = TestUtils.createMockPriority(1);
    app0.updateResourceRequests(Collections.singletonList(TestUtils
        .createResourceRequest(ResourceRequest.ANY, 90 * GB, 10, 1, true,
            priority, recordFactory, NO_LABEL)));
    assignment = b.assignContainers(clusterResource, node0, new ResourceLimits(
        clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
    applyCSAssignment(clusterResource, assignment, b, nodes, apps);
    verifyContainerAllocated(assignment, NodeType.OFF_SWITCH);

    app0.updateResourceRequests(Collections.singletonList(TestUtils
        .createResourceRequest(ResourceRequest.ANY, 10 * GB, 10, 1, true,
            priority, recordFactory, NO_LABEL)));
    assignment = b.assignContainers(clusterResource, node0, new ResourceLimits(
        clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
    // This assignment should have no containers assigned,
    // because the used memory (90 + 10)GB will exceed the max 99GB
    verifyNoContainerAllocated(assignment);
  }

  @Test
  public void testDRFUsageRatioRounding() throws Exception {
    CSAssignment assign;
    setUpWithDominantResourceCalculator();
    // Mock the queue
    LeafQueue b = stubLeafQueue((LeafQueue) queues.get(E));

    // Users
    final String user0 = "user_0";

    // Submit applications
    final ApplicationAttemptId appAttemptId0 =
        TestUtils.getMockApplicationAttemptId(0, 0);
    FiCaSchedulerApp app0 =
        new FiCaSchedulerApp(appAttemptId0, user0, b,
            b.getAbstractUsersManager(), spyRMContext);
    b.submitApplicationAttempt(app0, user0);

    // Setup some nodes
    String host0 = "127.0.0.1";
    FiCaSchedulerNode node0 =
        TestUtils.getMockNode(host0, DEFAULT_RACK, 0, 80 * GB, 100);

    // Make cluster relatively large so usageRatios are small
    int numNodes = 1000;
    Resource clusterResource =
        Resources.createResource(numNodes * (80 * GB), numNodes * 100);
    when(csContext.getNumClusterNodes()).thenReturn(numNodes);
    root.updateClusterResource(clusterResource,
        new ResourceLimits(clusterResource));

    // Set user-limit. Need a small queue within a large cluster.
    b.setUserLimit(50);
    b.setUserLimitFactor(1000000);
    b.setMaxCapacity(1.0f);
    b.setAbsoluteCapacity(0.00001f);

    // First allocation is larger than second but is still vcore dominant
    // so usage ratio will be based on vcores. If consumedRatio doesn't round
    // in our favor then new limit calculation will actually be less than
    // what is currently consumed and we will fail to allocate
    Priority priority = TestUtils.createMockPriority(1);
    app0.updateResourceRequests(Collections.singletonList(TestUtils
        .createResourceRequest(ResourceRequest.ANY, 20 * GB, 29, 1, true,
            priority, recordFactory, NO_LABEL)));
    assign = b.assignContainers(clusterResource, node0, new ResourceLimits(
        clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
    app0.updateResourceRequests(Collections.singletonList(TestUtils
        .createResourceRequest(ResourceRequest.ANY, 10 * GB, 29, 2, true,
            priority, recordFactory, NO_LABEL)));
    assign = b.assignContainers(clusterResource, node0, new ResourceLimits(
        clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
    assertTrue(assign.getResource().getMemorySize() > 0,
        "Still within limits, should assign");
  }

  private void applyCSAssignment(Resource clusterResource, CSAssignment assign,
      LeafQueue q, final Map<NodeId, FiCaSchedulerNode> nodes,
      final Map<ApplicationAttemptId, FiCaSchedulerApp> apps)
      throws IOException {
    TestUtils.applyResourceCommitRequest(clusterResource, assign, nodes, apps, csConf);
  }

  @Test
  public void testDRFUserLimits() throws Exception {
    setUpWithDominantResourceCalculator();

    // Mock the queue
    LeafQueue b = stubLeafQueue((LeafQueue) queues.get(B));
    // unset maxCapacity
    b.setMaxCapacity(1.0f);

    // Users
    final String user0 = "user_0";
    final String user1 = "user_1";

    // Submit applications
    final ApplicationAttemptId appAttemptId0 =
        TestUtils.getMockApplicationAttemptId(0, 0);
    FiCaSchedulerApp app0 =
        new FiCaSchedulerApp(appAttemptId0, user0, b,
            b.getAbstractUsersManager(), spyRMContext);
    b.submitApplicationAttempt(app0, user0);

    final ApplicationAttemptId appAttemptId2 =
        TestUtils.getMockApplicationAttemptId(2, 0);
    FiCaSchedulerApp app2 =
        new FiCaSchedulerApp(appAttemptId2, user1, b,
            b.getAbstractUsersManager(), spyRMContext);
    b.submitApplicationAttempt(app2, user1);

    // Setup some nodes
    String host0 = "127.0.0.1";
    FiCaSchedulerNode node0 =
        TestUtils.getMockNode(host0, DEFAULT_RACK, 0, 8 * GB, 100);
    String host1 = "127.0.0.2";
    FiCaSchedulerNode node1 =
        TestUtils.getMockNode(host1, DEFAULT_RACK, 0, 8 * GB, 100);

    Map<NodeId, FiCaSchedulerNode> nodes = ImmutableMap.of(node0.getNodeID(),
        node0, node1.getNodeID(), node1);
    Map<ApplicationAttemptId, FiCaSchedulerApp> apps = ImmutableMap.of(
        app0.getApplicationAttemptId(), app0, app2.getApplicationAttemptId(),
        app2);

    int numNodes = 2;
    Resource clusterResource =
        Resources.createResource(numNodes * (8 * GB), numNodes * 100);
    when(csContext.getNumClusterNodes()).thenReturn(numNodes);
    when(csContext.getClusterResource()).thenReturn(clusterResource);
    root.updateClusterResource(clusterResource,
        new ResourceLimits(clusterResource));

    // Setup resource-requests so that one application is memory dominant
    // and other application is vcores dominant
    Priority priority = TestUtils.createMockPriority(1);
    app0.updateResourceRequests(Collections.singletonList(TestUtils
        .createResourceRequest(ResourceRequest.ANY, 1 * GB, 40, 10, true,
            priority, recordFactory, NO_LABEL)));

    app2.updateResourceRequests(Collections.singletonList(TestUtils
        .createResourceRequest(ResourceRequest.ANY, 2 * GB, 10, 10, true,
            priority, recordFactory, NO_LABEL)));

    /**
     * Start testing...
     */

    // Set user-limit
    b.setUserLimit(50);
    b.setUserLimitFactor(2);
    User queueUser0 = b.getUser(user0);
    User queueUser1 = b.getUser(user1);

    assertEquals(2, b.getAbstractUsersManager().getNumActiveUsers(),
        "There should 2 active users!");
    // Fill both Nodes as far as we can
    CSAssignment assign;
    do {
      assign =
          b.assignContainers(clusterResource, node0, new ResourceLimits(
              clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
      LOG.info(assign.toString());
      applyCSAssignment(clusterResource, assign, b, nodes, apps);
    } while (assign.getResource().getMemorySize() > 0 &&
        assign.getAssignmentInformation().getNumReservations() == 0);
    do {
      assign =
          b.assignContainers(clusterResource, node1, new ResourceLimits(
              clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
      applyCSAssignment(clusterResource, assign, b, nodes, apps);
    } while (assign.getResource().getMemorySize() > 0 &&
        assign.getAssignmentInformation().getNumReservations() == 0);

    assertTrue(queueUser0.getUsed().getMemorySize() > 0,
        "Verify user_0 got resources ");
    assertTrue(queueUser1.getUsed().getMemorySize() > 0,
        "Verify user_1 got resources ");
    assertTrue(b.getAbsoluteUsedCapacity() > 0.95,
        "Expected AbsoluteUsedCapacity > 0.95, got: " + b.getAbsoluteUsedCapacity());

    // Verify consumedRatio is based on dominant resources
    float expectedRatio =
        queueUser0.getUsed().getVirtualCores()
            / (numNodes * 100.0f)
            + queueUser1.getUsed().getMemorySize()
            / (numNodes * 8.0f * GB);
    assertEquals(expectedRatio, b.getUsersManager().getUsageRatio(""), 0.001);
    // Add another node and make sure consumedRatio is adjusted
    // accordingly.
    numNodes = 3;
    clusterResource =
        Resources.createResource(numNodes * (8 * GB), numNodes * 100);
    when(csContext.getNumClusterNodes()).thenReturn(numNodes);
    root.updateClusterResource(clusterResource, new ResourceLimits(
        clusterResource));
    expectedRatio =
        queueUser0.getUsed().getVirtualCores()
            / (numNodes * 100.0f)
            + queueUser1.getUsed().getMemorySize()
            / (numNodes * 8.0f * GB);
    assertEquals(expectedRatio, b.getUsersManager().getUsageRatio(""), 0.001);
  }

  @Test
  public void testUserLimitCache() throws Exception {
    // Parameters
    final int numNodes = 4;
    final int nodeSize = 100;
    final int numAllocationThreads = 2;
    final int numUsers = 40;
    final int containerSize = 1 * GB;
    final int numContainersPerApp = 10;
    final int runTime = 5000; // in ms

    Random random = new Random();

    // Setup nodes
    FiCaSchedulerNode[] nodes = new FiCaSchedulerNode[numNodes];
    Map<NodeId, FiCaSchedulerNode> nodesMap = new HashMap<>(nodes.length);
    for (int i = 0; i < numNodes; i++) {
      String host = "127.0.0." + i;
      FiCaSchedulerNode node = TestUtils.getMockNode(host, DEFAULT_RACK, 0,
          nodeSize * GB, nodeSize);
      nodes[i] = node;
      nodesMap.put(node.getNodeID(), node);
    }

    Resource clusterResource =
        Resources.createResource(numNodes * (nodeSize * GB),
            numNodes * nodeSize);

    when(csContext.getNumClusterNodes()).thenReturn(numNodes);
    when(csContext.getClusterResource()).thenReturn(clusterResource);

    // working with just one queue
    csConf.setQueues(ROOT, new String[]{A});
    csConf.setCapacity(A_QUEUE_PATH, 100);
    csConf.setMaximumCapacity(A_QUEUE_PATH,
        100);
    queueContext.reinitialize();

    // reinitialize queues
    CSQueueStore newQueues = new CSQueueStore();
    CSQueue newRoot =
        CapacitySchedulerQueueManager.parseQueue(queueContext, csConf, null,
            CapacitySchedulerConfiguration.ROOT,
            newQueues, queues,
            TestUtils.spyHook);
    queues = newQueues;
    root.reinitialize(newRoot, csContext.getClusterResource());
    root.updateClusterResource(clusterResource,
        new ResourceLimits(clusterResource));

    // Mock the queue
    LeafQueue leafQueue = stubLeafQueue((LeafQueue) queues.get(A));

    // Set user limit factor so some users are at their limit and the
    // user limit cache has more than just a few entries
    leafQueue.setUserLimitFactor(10 / nodeSize);

    // Flag to let allocation threads know to stop
    AtomicBoolean stopThreads = new AtomicBoolean(false);
    AtomicBoolean errorInThreads = new AtomicBoolean(false);

    // Set up allocation threads
    Thread[] threads = new Thread[numAllocationThreads];
    for (int i = 0; i < numAllocationThreads; i++) {
      threads[i] = new Thread(new Runnable() {
        @Override
        public void run() {
          try {
            boolean alwaysNull = true;
            while (!stopThreads.get()) {
              CSAssignment assignment = leafQueue.assignContainers(
                  clusterResource,
                  nodes[random.nextInt(numNodes)],
                  new ResourceLimits(clusterResource),
                  SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
              applyCSAssignment(clusterResource, assignment, leafQueue,
                  nodesMap, leafQueue.applicationAttemptMap);

              if (assignment != CSAssignment.NULL_ASSIGNMENT) {
                alwaysNull = false;
              }
              Thread.sleep(500);
            }

            // One more assignment but not committing so that the
            // user limits cache is updated to the latest version
            CSAssignment assignment = leafQueue.assignContainers(
                clusterResource,
                nodes[random.nextInt(numNodes)],
                new ResourceLimits(clusterResource),
                SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);

            if (alwaysNull && assignment == CSAssignment.NULL_ASSIGNMENT) {
              LOG.error("Thread only got null assignments");
              errorInThreads.set(true);
            }
          } catch (Exception e) {
            LOG.error("Thread exiting because of exception", e);
            errorInThreads.set(true);
          }
        }
      }, "Scheduling Thread " + i);
    }

    // Set up users and some apps
    final String[] users = new String[numUsers];
    for (int i = 0; i < users.length; i++) {
      users[i] = "user_" + i;
    }
    List<ApplicationAttemptId> applicationAttemptIds =
        new ArrayList<>(10);
    List<FiCaSchedulerApp> apps = new ArrayList<>(10);
    Priority priority = TestUtils.createMockPriority(1);

    // Start up 10 apps to begin with
    int appId;
    for (appId = 0; appId < 10; appId++) {
      String user = users[random.nextInt(users.length)];
      ApplicationAttemptId applicationAttemptId =
          TestUtils.getMockApplicationAttemptId(appId, 0);
      FiCaSchedulerApp app = new FiCaSchedulerApp(applicationAttemptId,
          user,
          leafQueue, leafQueue.getUsersManager(), spyRMContext);

      leafQueue.submitApplicationAttempt(app, user);
      app.updateResourceRequests(Collections.singletonList(
          TestUtils.createResourceRequest(ResourceRequest.ANY, containerSize,
              numContainersPerApp, true, priority, recordFactory)));

      applicationAttemptIds.add(applicationAttemptId);
      apps.add(app);
    }

    // Start threads
    for (int i = 0; i < numAllocationThreads; i++) {
      threads[i].start();
    }

    final long startTime = Time.monotonicNow();
    while (true) {
      // Start a new app about half the iterations and stop a random app the
      // rest of the iterations
      boolean startOrStopApp = random.nextBoolean();
      if (startOrStopApp || (apps.size() == 1)) {
        // start a new app
        String user = users[random.nextInt(users.length)];
        ApplicationAttemptId applicationAttemptId =
            TestUtils.getMockApplicationAttemptId(appId, 0);
        FiCaSchedulerApp app = new FiCaSchedulerApp(applicationAttemptId,
            user,
            leafQueue, leafQueue.getUsersManager(), spyRMContext);

        leafQueue.submitApplicationAttempt(app, user);
        app.updateResourceRequests(Collections.singletonList(
            TestUtils.createResourceRequest(ResourceRequest.ANY, containerSize,
                numContainersPerApp, true, priority, recordFactory)));

        applicationAttemptIds.add(applicationAttemptId);
        apps.add(app);

        appId++;
      } else {
        // stop a random app
        int i = random.nextInt(apps.size());
        FiCaSchedulerApp app = apps.get(i);
        leafQueue.finishApplication(app.getApplicationId(), app.getUser());
        leafQueue.releaseResource(clusterResource, app,
            app.getCurrentConsumption(), "", null);
        apps.remove(i);
        applicationAttemptIds.remove(i);
      }

      if (errorInThreads.get() || (Time.monotonicNow() - startTime) > runTime) {
        break;
      }
    }

    // signal allocation threads to stop
    stopThreads.set(true);

    // wait for allocation threads to be done
    for (int i = 0; i < numAllocationThreads; i++) {
      threads[i].join();
    }

    // check if there was an error in the allocation threads
    assertFalse(errorInThreads.get());

    // check there is only one partition in the user limits cache
    assertEquals( 1, leafQueue.userLimitsCache.size());

    Map<SchedulingMode, ConcurrentMap<String, LeafQueue.CachedUserLimit>>
        uLCByPartition = leafQueue.userLimitsCache.get(nodes[0].getPartition());

    // check there is only one scheduling mode
    assertEquals(uLCByPartition.size(), 1);

    ConcurrentMap<String, LeafQueue.CachedUserLimit> uLCBySchedulingMode =
        uLCByPartition.get(SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);

    // check entries in the user limits cache
    for (Map.Entry<String, LeafQueue.CachedUserLimit> entry :
        uLCBySchedulingMode.entrySet()) {
      String user = entry.getKey();
      Resource userLimit = entry.getValue().userLimit;

      Resource expectedUL = leafQueue.getResourceLimitForActiveUsers(user,
          clusterResource, nodes[0].getPartition(),
          SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);

      assertEquals(expectedUL, userLimit);
    }

    // check the current version in the user limits cache
    assertEquals(leafQueue.getUsersManager().getLatestVersionOfUsersState(),
        leafQueue.currentUserLimitCacheVersion);
    assertTrue(leafQueue.currentUserLimitCacheVersion > 0);
  }

  @Test
  public void testUserLimitCacheActiveUsersChanged() throws Exception {
    // Setup some nodes
    String host_0 = "127.0.0.1";
    FiCaSchedulerNode node_0 =
        TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 6*GB);
    String host_1 = "127.0.0.2";
    FiCaSchedulerNode node_1 =
        TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, 6*GB);
    String host_2 = "127.0.0.3";
    FiCaSchedulerNode node_2 =
        TestUtils.getMockNode(host_2, DEFAULT_RACK, 0, 6*GB);
    String host_3 = "127.0.0.4";
    FiCaSchedulerNode node_3 =
        TestUtils.getMockNode(host_3, DEFAULT_RACK, 0, 6*GB);

    Map<NodeId, FiCaSchedulerNode> nodes =
        ImmutableMap.of(
            node_0.getNodeID(), node_0,
            node_1.getNodeID(), node_1,
            node_2.getNodeID(), node_2,
            node_3.getNodeID(), node_3
        );

    final int numNodes = 4;
    Resource clusterResource =
        Resources.createResource(numNodes * (6*GB), numNodes);

    when(csContext.getNumClusterNodes()).thenReturn(numNodes);
    when(csContext.getClusterResource()).thenReturn(clusterResource);

    // working with just one queue
    csConf.setQueues(ROOT, new String[] {A});
    csConf.setCapacity(A_QUEUE_PATH, 100);
    csConf.setMaximumCapacity(A_QUEUE_PATH,
        100);
    queueContext.reinitialize();

    // reinitialize queues
    CSQueueStore newQueues = new CSQueueStore();
    CSQueue newRoot =
        CapacitySchedulerQueueManager.parseQueue(queueContext, csConf, null,
            CapacitySchedulerConfiguration.ROOT,
            newQueues, queues,
            TestUtils.spyHook);
    queues = newQueues;
    root.reinitialize(newRoot, csContext.getClusterResource());
    root.updateClusterResource(clusterResource,
        new ResourceLimits(clusterResource));

    // Mock the queue
    LeafQueue leafQueue = stubLeafQueue((LeafQueue)queues.get(A));

    // initial check
    assertEquals(0, leafQueue.userLimitsCache.size());
    assertEquals(0,
        leafQueue.getUsersManager().preComputedAllUserLimit.size());
    assertEquals(0,
        leafQueue.getUsersManager().preComputedActiveUserLimit.size());

    // 4 users
    final String user_0 = "user_0";
    final String user_1 = "user_1";
    final String user_2 = "user_2";
    final String user_3 = "user_3";

    // Set user-limit
    leafQueue.setUserLimit(0);
    leafQueue.setUserLimitFactor(1.0f);

    Priority priority = TestUtils.createMockPriority(1);

    // Fill queue because user limit is calculated as (used / #active users).
    final ApplicationAttemptId appAttemptId_9 =
        TestUtils.getMockApplicationAttemptId(9, 0);
    FiCaSchedulerApp app_9 =
        new FiCaSchedulerApp(appAttemptId_9, user_0, leafQueue,
            leafQueue.getUsersManager(), spyRMContext);
    leafQueue.submitApplicationAttempt(app_9, user_0);

    Map<ApplicationAttemptId, FiCaSchedulerApp> apps =
        ImmutableMap.of(app_9.getApplicationAttemptId(), app_9);

    app_9.updateResourceRequests(Arrays.asList(
        TestUtils.createResourceRequest(host_0, 1*GB, 5, true,
            priority, recordFactory),
        TestUtils.createResourceRequest(DEFAULT_RACK, 1*GB, 5, true,
            priority, recordFactory),
        TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 5, true,
            priority, recordFactory)));
    assertEquals(1, leafQueue.getUsersManager().getNumActiveUsers());

    CSAssignment assignment;
    for (int i = 0; i < 5; i++) {
      assignment = leafQueue.assignContainers(clusterResource, node_0,
          new ResourceLimits(clusterResource),
          SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
      applyCSAssignment(clusterResource, assignment, leafQueue, nodes, apps);
    }
    app_9.updateResourceRequests(Arrays.asList(
        TestUtils.createResourceRequest(host_1, 1*GB, 5, true,
            priority, recordFactory),
        TestUtils.createResourceRequest(DEFAULT_RACK, 1*GB, 5, true,
            priority, recordFactory),
        TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 5, true,
            priority, recordFactory)));
    for (int i = 0; i < 5; i++) {
      assignment = leafQueue.assignContainers(clusterResource, node_1,
          new ResourceLimits(clusterResource),
          SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
      applyCSAssignment(clusterResource, assignment, leafQueue, nodes, apps);
    }
    // A total of 10GB have been allocated
    assertEquals(10*GB, leafQueue.getUsedResources().getMemorySize());
    assertEquals(10*GB, app_9.getCurrentConsumption().getMemorySize());
    // For one user who should have been cached in the assignContainers call
    assertEquals(1, leafQueue.userLimitsCache
        .get(RMNodeLabelsManager.NO_LABEL)
        .get(SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY)
        .size());
    // But the cache is stale because an allocation was made
    assertNotEquals(leafQueue.currentUserLimitCacheVersion,
        leafQueue.getUsersManager().getLatestVersionOfUsersState());
    // Have not made any calls to fill up the all user limit in UsersManager
    assertEquals(0,
        leafQueue.getUsersManager().preComputedAllUserLimit.size());
    // But the user limit cache in leafQueue got filled up using the active
    // user limit in UsersManager
    assertEquals(1,
            leafQueue.getUsersManager().preComputedActiveUserLimit.size());

    // submit 3 applications for now
    final ApplicationAttemptId appAttemptId_0 =
        TestUtils.getMockApplicationAttemptId(0, 0);
    FiCaSchedulerApp app_0 =
        new FiCaSchedulerApp(appAttemptId_0, user_0, leafQueue,
            leafQueue.getUsersManager(), spyRMContext);
    leafQueue.submitApplicationAttempt(app_0, user_0);

    final ApplicationAttemptId appAttemptId_1 =
        TestUtils.getMockApplicationAttemptId(1, 0);
    FiCaSchedulerApp app_1 =
        new FiCaSchedulerApp(appAttemptId_1, user_1, leafQueue,
            leafQueue.getUsersManager(), spyRMContext);
    leafQueue.submitApplicationAttempt(app_1, user_1);

    final ApplicationAttemptId appAttemptId_2 =
        TestUtils.getMockApplicationAttemptId(2, 0);
    FiCaSchedulerApp app_2 =
        new FiCaSchedulerApp(appAttemptId_2, user_2, leafQueue,
            leafQueue.getUsersManager(), spyRMContext);
    leafQueue.submitApplicationAttempt(app_2, user_2);

    apps = ImmutableMap.of(
        app_0.getApplicationAttemptId(), app_0,
        app_1.getApplicationAttemptId(), app_1,
        app_2.getApplicationAttemptId(), app_2
    );

    // requests from first three users (all of which will be locality delayed)
    app_0.updateResourceRequests(Arrays.asList(
        TestUtils.createResourceRequest(host_0, 4*GB, 1, true,
            priority, recordFactory),
        TestUtils.createResourceRequest(DEFAULT_RACK, 4*GB, 1, true,
            priority, recordFactory),
        TestUtils.createResourceRequest(ResourceRequest.ANY, 4*GB, 10, true,
            priority, recordFactory)));

    app_1.updateResourceRequests(Arrays.asList(
        TestUtils.createResourceRequest(host_0, 4*GB, 1, true,
            priority, recordFactory),
        TestUtils.createResourceRequest(DEFAULT_RACK, 4*GB, 1, true,
            priority, recordFactory),
        TestUtils.createResourceRequest(ResourceRequest.ANY, 4*GB, 10, true,
            priority, recordFactory)));

    app_2.updateResourceRequests(Arrays.asList(
        TestUtils.createResourceRequest(host_0, 4*GB, 1, true,
            priority, recordFactory),
        TestUtils.createResourceRequest(DEFAULT_RACK, 4*GB, 1, true,
            priority, recordFactory),
        TestUtils.createResourceRequest(ResourceRequest.ANY, 4*GB, 10, true,
            priority, recordFactory)));

    // There are 3 active users right now
    assertEquals(3, leafQueue.getUsersManager().getNumActiveUsers());

    // fill up user limit cache
    assignment = leafQueue.assignContainers(clusterResource, node_1,
        new ResourceLimits(clusterResource),
        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
    applyCSAssignment(clusterResource, assignment, leafQueue, nodes, apps);
    // A total of 10GB have been allocated
    assertEquals(10*GB, leafQueue.getUsedResources().getMemorySize());
    assertEquals(0*GB, app_0.getCurrentConsumption().getMemorySize());
    assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize());
    assertEquals(0*GB, app_2.getCurrentConsumption().getMemorySize());
    assertEquals(10*GB, app_9.getCurrentConsumption().getMemorySize());
    // There are three users who should have been cached
    assertEquals(3, leafQueue.userLimitsCache
        .get(RMNodeLabelsManager.NO_LABEL)
        .get(SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY)
        .size());
    // There are three users so each has a limit of 12/3 = 4GB
    assertEquals(4*GB, leafQueue.userLimitsCache
        .get(RMNodeLabelsManager.NO_LABEL)
        .get(SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY)
        .get(user_0).userLimit.getMemorySize());
    assertEquals(4*GB, leafQueue.userLimitsCache
        .get(RMNodeLabelsManager.NO_LABEL)
        .get(SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY)
        .get(user_1).userLimit.getMemorySize());
    assertEquals(4*GB, leafQueue.userLimitsCache
        .get(RMNodeLabelsManager.NO_LABEL)
        .get(SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY)
        .get(user_2).userLimit.getMemorySize());
    // And the cache is NOT stale because no allocation was made
    assertEquals(leafQueue.currentUserLimitCacheVersion,
        leafQueue.getUsersManager().getLatestVersionOfUsersState());
    // Have not made any calls to fill up the all user limit in UsersManager
    assertEquals(0,
        leafQueue.getUsersManager().preComputedAllUserLimit.size());
    // But the user limit cache in leafQueue got filled up using the active
    // user limit in UsersManager with 4GB limit (since there are three users
    // so 12/3 = 4GB each)
    assertEquals(1, leafQueue.getUsersManager()
        .preComputedActiveUserLimit.size());
    assertEquals(1, leafQueue.getUsersManager()
        .preComputedActiveUserLimit.get(RMNodeLabelsManager.NO_LABEL).size());
    assertEquals(4*GB, leafQueue.getUsersManager()
        .preComputedActiveUserLimit.get(RMNodeLabelsManager.NO_LABEL)
        .get(SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY).getMemorySize());

    // submit the 4th application
    final ApplicationAttemptId appAttemptId_3 =
        TestUtils.getMockApplicationAttemptId(3, 0);
    FiCaSchedulerApp app_3 =
        new FiCaSchedulerApp(appAttemptId_3, user_3, leafQueue,
            leafQueue.getUsersManager(), spyRMContext);
    leafQueue.submitApplicationAttempt(app_3, user_3);

    apps = ImmutableMap.of(
        app_0.getApplicationAttemptId(), app_0,
        app_1.getApplicationAttemptId(), app_1,
        app_2.getApplicationAttemptId(), app_2,
        app_3.getApplicationAttemptId(), app_3
    );

    app_3.updateResourceRequests(Arrays.asList(
        TestUtils.createResourceRequest(host_0, 4*GB, 1, true,
            priority, recordFactory),
        TestUtils.createResourceRequest(DEFAULT_RACK, 4*GB, 1, true,
            priority, recordFactory),
        TestUtils.createResourceRequest(ResourceRequest.ANY, 4*GB, 10, true,
            priority, recordFactory)));

    // 4 active users now
    assertEquals(4, leafQueue.getUsersManager().getNumActiveUsers());
    // Check that the user limits cache has become stale
    assertNotEquals(leafQueue.currentUserLimitCacheVersion,
        leafQueue.getUsersManager().getLatestVersionOfUsersState());

    // Even though there are no allocations, user limit cache is repopulated
    assignment = leafQueue.assignContainers(clusterResource, node_1,
        new ResourceLimits(clusterResource),
        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
    applyCSAssignment(clusterResource, assignment, leafQueue, nodes, apps);
    // A total of 10GB have been allocated
    assertEquals(10*GB, leafQueue.getUsedResources().getMemorySize());
    assertEquals(0*GB, app_0.getCurrentConsumption().getMemorySize());
    assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize());
    assertEquals(0*GB, app_2.getCurrentConsumption().getMemorySize());
    assertEquals(0*GB, app_3.getCurrentConsumption().getMemorySize());
    assertEquals(10*GB, app_9.getCurrentConsumption().getMemorySize());
    // There are four users who should have been cached
    assertEquals(4, leafQueue.userLimitsCache
        .get(RMNodeLabelsManager.NO_LABEL)
        .get(SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY)
        .size());
    // There are four users so each has a limit of 12/4 = 3GB
    assertEquals(3*GB, leafQueue.userLimitsCache
        .get(RMNodeLabelsManager.NO_LABEL)
        .get(SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY)
        .get(user_0).userLimit.getMemorySize());
    assertEquals(3*GB, leafQueue.userLimitsCache
        .get(RMNodeLabelsManager.NO_LABEL)
        .get(SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY)
        .get(user_1).userLimit.getMemorySize());
    assertEquals(3*GB, leafQueue.userLimitsCache
        .get(RMNodeLabelsManager.NO_LABEL)
        .get(SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY)
        .get(user_2).userLimit.getMemorySize());
    assertEquals(3*GB, leafQueue.userLimitsCache
        .get(RMNodeLabelsManager.NO_LABEL)
        .get(SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY)
        .get(user_3).userLimit.getMemorySize());
    // And the cache is NOT stale because no allocation was made
    assertEquals(leafQueue.currentUserLimitCacheVersion,
        leafQueue.getUsersManager().getLatestVersionOfUsersState());
    // Have not made any calls to fill up the all user limit in UsersManager
    assertEquals(0,
        leafQueue.getUsersManager().preComputedAllUserLimit.size());
    // But the user limit cache in leafQueue got filled up using the active
    // user limit in UsersManager with 3GB limit (since there are four users
    // so 12/4 = 3GB each)
    assertEquals(1, leafQueue.getUsersManager()
        .preComputedActiveUserLimit.size());
    assertEquals(1, leafQueue.getUsersManager()
        .preComputedActiveUserLimit.get(RMNodeLabelsManager.NO_LABEL).size());
    assertEquals(3*GB, leafQueue.getUsersManager()
        .preComputedActiveUserLimit.get(RMNodeLabelsManager.NO_LABEL)
        .get(SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY).getMemorySize());
  }

  @Test
  public void testDisabledUserLimitFactor() throws Exception {
    // Mock the queue
    LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A));
    //unset maxCapacity
    a.setMaxCapacity(1.0f);

    when(csContext.getClusterResource())
        .thenReturn(Resources.createResource(16 * GB, 32));

    // Users
    final String user0 = "user0";
    final String user1 = "user1";

    // Submit applications
    final ApplicationAttemptId appAttemptId0 =
        TestUtils.getMockApplicationAttemptId(0, 0);
    FiCaSchedulerApp app0 =
        new FiCaSchedulerApp(appAttemptId0, user0, a,
            a.getAbstractUsersManager(), spyRMContext);
    a.submitApplicationAttempt(app0, user0);

    final ApplicationAttemptId appAttemptId1 =
        TestUtils.getMockApplicationAttemptId(1, 0);
    FiCaSchedulerApp app1 =
        new FiCaSchedulerApp(appAttemptId1, user1, a,
            a.getAbstractUsersManager(), spyRMContext);
    a.submitApplicationAttempt(app1, user1); // different user

    // Setup some nodes
    String host0 = "127.0.0.1";
    FiCaSchedulerNode node0 =
        TestUtils.getMockNode(host0, DEFAULT_RACK, 0, 8*GB);
    String host1 = "127.0.0.2";
    FiCaSchedulerNode node1 =
        TestUtils.getMockNode(host1, DEFAULT_RACK, 0, 8*GB);

    final int numNodes = 2;
    Resource clusterResource =
        Resources.createResource(numNodes * (8*GB), numNodes * 16);
    when(csContext.getNumClusterNodes()).thenReturn(numNodes);
    root.updateClusterResource(clusterResource,
        new ResourceLimits(clusterResource));

    // Setup resource-requests
    Priority priority = TestUtils.createMockPriority(1);
    app0.updateResourceRequests(Collections.singletonList(
        TestUtils.createResourceRequest(ResourceRequest.ANY, 3*GB, 2, true,
            priority, recordFactory)));

    app1.updateResourceRequests(Collections.singletonList(
        TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, true,
            priority, recordFactory)));

    Map<ApplicationAttemptId, FiCaSchedulerApp> apps = ImmutableMap.of(
        app0.getApplicationAttemptId(), app0, app1.getApplicationAttemptId(),
        app1);
    Map<NodeId, FiCaSchedulerNode> nodes = ImmutableMap.of(node0.getNodeID(),
        node0, node1.getNodeID(), node1);

    /**
     * Start testing ...
     */
    a.setUserLimitFactor(1);
    a.setUserLimit(50);

    root.updateClusterResource(clusterResource,
        new ResourceLimits(clusterResource));

    // There're two active users
    assertEquals(2, a.getAbstractUsersManager().getNumActiveUsers());

    // 1 container to user0
    applyCSAssignment(clusterResource,
        a.assignContainers(clusterResource, node0,
            new ResourceLimits(clusterResource),
            SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
    assertEquals(3*GB, a.getUsedResources().getMemorySize());
    assertEquals(3*GB, app0.getCurrentConsumption().getMemorySize());
    assertEquals(0*GB, app1.getCurrentConsumption().getMemorySize());

    // Allocate one container to app1. Even if app0
    // submit earlier, it cannot get this container assigned since user0
    // exceeded user-limit already.
    applyCSAssignment(clusterResource,
        a.assignContainers(clusterResource, node0,
            new ResourceLimits(clusterResource),
            SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
    assertEquals(4*GB, a.getUsedResources().getMemorySize());
    assertEquals(3*GB, app0.getCurrentConsumption().getMemorySize());
    assertEquals(1*GB, app1.getCurrentConsumption().getMemorySize());

    // Set to -1 , disabled user limit factor
    // There will be not limited
    a.setUserLimitFactor(-1);
    root.updateClusterResource(clusterResource,
        new ResourceLimits(clusterResource));

    applyCSAssignment(clusterResource,
        a.assignContainers(clusterResource, node1,
            new ResourceLimits(clusterResource),
            SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
    assertEquals(7*GB, a.getUsedResources().getMemorySize());
    assertEquals(6*GB, app0.getCurrentConsumption().getMemorySize());
    assertEquals(1*GB, app1.getCurrentConsumption().getMemorySize());

  }

  @Test
  public void testUserLimits() throws Exception {
    // Mock the queue
    LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A));
    //unset maxCapacity
    a.setMaxCapacity(1.0f);

    when(csContext.getClusterResource())
        .thenReturn(Resources.createResource(16 * GB, 32));

    // Users
    final String user_0 = "user_0";
    final String user_1 = "user_1";

    // Submit applications
    final ApplicationAttemptId appAttemptId_0 = 
        TestUtils.getMockApplicationAttemptId(0, 0); 
    FiCaSchedulerApp app_0 = 
        new FiCaSchedulerApp(appAttemptId_0, user_0, a, 
            a.getAbstractUsersManager(), spyRMContext);
    a.submitApplicationAttempt(app_0, user_0);

    final ApplicationAttemptId appAttemptId_1 = 
        TestUtils.getMockApplicationAttemptId(1, 0); 
    FiCaSchedulerApp app_1 = 
        new FiCaSchedulerApp(appAttemptId_1, user_1, a, 
            a.getAbstractUsersManager(), spyRMContext);
    a.submitApplicationAttempt(app_1, user_1); // different user

    // Setup some nodes
    String host_0 = "127.0.0.1";
    FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 8*GB);
    String host_1 = "127.0.0.2";
    FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, 8*GB);
    
    final int numNodes = 2;
    Resource clusterResource = 
        Resources.createResource(numNodes * (8*GB), numNodes * 16);
    when(csContext.getNumClusterNodes()).thenReturn(numNodes);
    root.updateClusterResource(clusterResource,
        new ResourceLimits(clusterResource));
 
    // Setup resource-requests
    Priority priority = TestUtils.createMockPriority(1);
    app_0.updateResourceRequests(Collections.singletonList(
            TestUtils.createResourceRequest(ResourceRequest.ANY, 3*GB, 2, true,
                priority, recordFactory)));

    app_1.updateResourceRequests(Collections.singletonList(
        TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, true,
            priority, recordFactory)));

    Map<ApplicationAttemptId, FiCaSchedulerApp> apps = ImmutableMap.of(
        app_0.getApplicationAttemptId(), app_0, app_1.getApplicationAttemptId(),
        app_1);
    Map<NodeId, FiCaSchedulerNode> nodes = ImmutableMap.of(node_0.getNodeID(),
        node_0, node_1.getNodeID(), node_1);

    /**
     * Start testing...
     */

    // Set user-limit
    a.setUserLimit(50);
    a.setUserLimitFactor(2);
    root.updateClusterResource(clusterResource,
        new ResourceLimits(clusterResource));
    
    // There're two active users
    assertEquals(2, a.getAbstractUsersManager().getNumActiveUsers());

    // 1 container to user_0
    applyCSAssignment(clusterResource,
        a.assignContainers(clusterResource, node_0,
        new ResourceLimits(clusterResource),
        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
    assertEquals(3*GB, a.getUsedResources().getMemorySize());
    assertEquals(3*GB, app_0.getCurrentConsumption().getMemorySize());
    assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize());

    // Allocate one container to app_1. Even if app_0
    // submit earlier, it cannot get this container assigned since user_0
    // exceeded user-limit already. 
    applyCSAssignment(clusterResource,
        a.assignContainers(clusterResource, node_0,
        new ResourceLimits(clusterResource),
        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
    assertEquals(4*GB, a.getUsedResources().getMemorySize());
    assertEquals(3*GB, app_0.getCurrentConsumption().getMemorySize());
    assertEquals(1*GB, app_1.getCurrentConsumption().getMemorySize());

    // Allocate one container to app_0, before allocating this container,
    // user-limit = floor((5 + 1) / 2) = 3G. app_0's used resource (3G) <=
    // user-limit.
    applyCSAssignment(clusterResource,
        a.assignContainers(clusterResource, node_1,
        new ResourceLimits(clusterResource),
        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
    assertEquals(7*GB, a.getUsedResources().getMemorySize());
    assertEquals(6*GB, app_0.getCurrentConsumption().getMemorySize());
    assertEquals(1*GB, app_1.getCurrentConsumption().getMemorySize());

    // app_0 doesn't have outstanding resources, there's only one active user.
    assertEquals(1, a.getAbstractUsersManager().getNumActiveUsers(),
        "There should only be 1 active user!");
  }

  @Test
  public void testDecimalUserLimits() throws Exception {
    // Mock the queue
    LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A));
    //unset maxCapacity
    a.setMaxCapacity(1.0f);

    when(csContext.getClusterResource())
        .thenReturn(Resources.createResource(16 * GB, 32));

    // Users
    final String user0 = "user_0";
    final String user1 = "user_1";

    // Submit applications
    final ApplicationAttemptId appAttemptId0 =
        TestUtils.getMockApplicationAttemptId(0, 0);
    FiCaSchedulerApp app0 =
        new FiCaSchedulerApp(appAttemptId0, user0, a,
            a.getAbstractUsersManager(), spyRMContext);
    a.submitApplicationAttempt(app0, user0);

    final ApplicationAttemptId appAttemptId1 =
        TestUtils.getMockApplicationAttemptId(1, 0);
    FiCaSchedulerApp app1 =
        new FiCaSchedulerApp(appAttemptId1, user1, a,
            a.getAbstractUsersManager(), spyRMContext);
    a.submitApplicationAttempt(app1, user1); // different user

    // Setup some nodes
    String host0 = "127.0.0.1";
    FiCaSchedulerNode node0 =
        TestUtils.getMockNode(host0, DEFAULT_RACK, 0, 8*GB);
    String host1 = "127.0.0.2";
    FiCaSchedulerNode node1 =
        TestUtils.getMockNode(host1, DEFAULT_RACK, 0, 8*GB);

    final int numNodes = 2;
    Resource clusterResource =
        Resources.createResource(numNodes * (8*GB), numNodes * 16);
    when(csContext.getNumClusterNodes()).thenReturn(numNodes);
    root.updateClusterResource(clusterResource,
        new ResourceLimits(clusterResource));

    // Setup resource-requests
    Priority priority = TestUtils.createMockPriority(1);
    app0.updateResourceRequests(Collections.singletonList(
        TestUtils.createResourceRequest(ResourceRequest.ANY, 3*GB, 2, true,
            priority, recordFactory)));

    app1.updateResourceRequests(Collections.singletonList(
        TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, true,
            priority, recordFactory)));

    Map<ApplicationAttemptId, FiCaSchedulerApp> apps = ImmutableMap.of(
        app0.getApplicationAttemptId(), app0, app1.getApplicationAttemptId(),
        app1);
    Map<NodeId, FiCaSchedulerNode> nodes = ImmutableMap.of(node0.getNodeID(),
        node0, node1.getNodeID(), node1);

    /**
     * Start testing...
     */

    // Set user-limit
    a.setUserLimit(50.1f);
    a.setUserLimitFactor(2);
    root.updateClusterResource(clusterResource,
        new ResourceLimits(clusterResource));

    // There're two active users
    assertEquals(2, a.getAbstractUsersManager().getNumActiveUsers());

    // 1 container to user_0
    applyCSAssignment(clusterResource,
        a.assignContainers(clusterResource, node0,
            new ResourceLimits(clusterResource),
            SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
    assertEquals(3*GB, a.getUsedResources().getMemorySize());
    assertEquals(3*GB, app0.getCurrentConsumption().getMemorySize());
    assertEquals(0, app1.getCurrentConsumption().getMemorySize());

    // Allocate another container. Since the user limit is 50.1% it isn't
    // reached, app_0 will get another container.
    applyCSAssignment(clusterResource,
        a.assignContainers(clusterResource, node0,
            new ResourceLimits(clusterResource),
            SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
    assertEquals(6*GB, a.getUsedResources().getMemorySize());
    assertEquals(6*GB, app0.getCurrentConsumption().getMemorySize());
    assertEquals(0, app1.getCurrentConsumption().getMemorySize());

    applyCSAssignment(clusterResource,
        a.assignContainers(clusterResource, node1,
            new ResourceLimits(clusterResource),
            SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
    assertEquals(7*GB, a.getUsedResources().getMemorySize());
    assertEquals(6*GB, app0.getCurrentConsumption().getMemorySize());
    assertEquals(GB, app1.getCurrentConsumption().getMemorySize());

    // app_0 doesn't have outstanding resources, there's only one active user.
    assertEquals(1, a.getAbstractUsersManager().getNumActiveUsers(),
        "There should only be 1 active user!");
  }

  @Test
  public void testUserSpecificUserLimits() throws Exception {
    // Mock the queue
    LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A));
    // Set minimum-user-limit-percent for queue "a" in the configs.
    csConf.setUserLimit(a.getQueuePathObject(), 50);
    // Set weight for "user_0" to be 1.5f for the a queue in the configs.
    csConf.setFloat("yarn.scheduler.capacity." + a.getQueuePath()
        + ".user-settings.user_0." + CapacitySchedulerConfiguration.USER_WEIGHT,
        1.5f);
    // Set weight for "firstname.lastname" to be 0.7f for the "a" queue
    // in the configs. Notice the user contains a dot. This is to test
    // that weights are accepted for a username that contains dots.
    csConf.setFloat("yarn.scheduler.capacity." + a.getQueuePath()
        + ".user-settings.firstname.lastname."
        + CapacitySchedulerConfiguration.USER_WEIGHT,
        0.7f);
    csConf.reinitializeConfigurationProperties();
    queueContext.reinitialize();

    when(csContext.getClusterResource())
        .thenReturn(Resources.createResource(16 * GB, 32));
    // Verify that configs were updated and parsed correctly.
    assertEquals(UserWeights.DEFAULT_WEIGHT, a.getUserWeights().getByUser("user_0"), 0.0f);
    a.reinitialize(a, csContext.getClusterResource());
    assertEquals(1.5f, a.getUserWeights().getByUser("user_0"), 0.0f);
    assertEquals(0.7f, a.getUserWeights().getByUser("firstname.lastname"), 0.0f);

    // set maxCapacity
    a.setMaxCapacity(1.0f);

    // Set minimum user-limit-percent
    a.setUserLimit(50);
    a.setUserLimitFactor(2);

    // Users
    final String user_0 = "user_0";
    final String user_1 = "user_1";

    // Set user_0's weight to 1.5 in the a queue's object.
    a.getUsersManager().getUserAndAddIfAbsent(user_0).setWeight(1.5f);

    // Submit applications
    final ApplicationAttemptId appAttemptId_0 =
        TestUtils.getMockApplicationAttemptId(0, 0);
    FiCaSchedulerApp app_0 =
        new FiCaSchedulerApp(appAttemptId_0, user_0, a,
            a.getAbstractUsersManager(), spyRMContext);
    a.submitApplicationAttempt(app_0, user_0);

    final ApplicationAttemptId appAttemptId_1 =
        TestUtils.getMockApplicationAttemptId(1, 0);
    FiCaSchedulerApp app_1 =
        new FiCaSchedulerApp(appAttemptId_1, user_1, a,
            a.getAbstractUsersManager(), spyRMContext);
    a.submitApplicationAttempt(app_1, user_1); // different user

    // Setup some nodes
    String host_0 = "127.0.0.1";
    FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 8*GB);
    String host_1 = "127.0.0.2";
    FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, 8*GB);

    final int numNodes = 2;
    Resource clusterResource =
        Resources.createResource(numNodes * (8*GB), numNodes * 16);
    when(csContext.getNumClusterNodes()).thenReturn(numNodes);
    root.updateClusterResource(clusterResource,
        new ResourceLimits(clusterResource));

    // Setup resource-requests
    // app_0 asks for 3 3-GB containers
    Priority priority = TestUtils.createMockPriority(1);
    app_0.updateResourceRequests(Collections.singletonList(
            TestUtils.createResourceRequest(ResourceRequest.ANY, 4*GB, 3, true,
                priority, recordFactory)));

    // app_1 asks for 2 1-GB containers
    app_1.updateResourceRequests(Collections.singletonList(
        TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, true,
            priority, recordFactory)));

    Map<ApplicationAttemptId, FiCaSchedulerApp> apps = ImmutableMap.of(
        app_0.getApplicationAttemptId(), app_0, app_1.getApplicationAttemptId(),
        app_1);
    Map<NodeId, FiCaSchedulerNode> nodes = ImmutableMap.of(node_0.getNodeID(),
        node_0, node_1.getNodeID(), node_1);

    /**
     * Start testing...
     */

    // There're two active users
    assertEquals(2, a.getAbstractUsersManager().getNumActiveUsers());

    // 1 container to user_0. Since queue starts out empty, user limit would
    // normally be calculated to be the minumum container size (1024GB).
    // However, in this case, user_0 has a weight of 1.5, so the UL is 2048GB
    // because 1024 * 1.5 rounded up to container size is 2048GB.
    applyCSAssignment(clusterResource,
        a.assignContainers(clusterResource, node_0,
        new ResourceLimits(clusterResource),
        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
    assertEquals(4*GB, a.getUsedResources().getMemorySize());
    assertEquals(4*GB, app_0.getCurrentConsumption().getMemorySize());
    assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize());

    // At this point the queue-wide user limit is 3072GB, but since user_0 has a
    // weight of 1.5, its user limit is 5120GB. So, even though user_0 already
    // has 4096GB, it is under its user limit, so it gets another container.
    applyCSAssignment(clusterResource,
        a.assignContainers(clusterResource, node_0,
        new ResourceLimits(clusterResource),
        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
    assertEquals(8*GB, a.getUsedResources().getMemorySize());
    assertEquals(8*GB, app_0.getCurrentConsumption().getMemorySize());
    assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize());

    // Queue-wide user limit at this point is 4069GB and user_0's user limit is
    // 6144GB. user_0 has 8192GB.
    // Now that user_0 is above its user limit, the next container should go to user_1
    applyCSAssignment(clusterResource,
        a.assignContainers(clusterResource, node_1,
        new ResourceLimits(clusterResource),
        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
    assertEquals(9*GB, a.getUsedResources().getMemorySize());
    assertEquals(8*GB, app_0.getCurrentConsumption().getMemorySize());
    assertEquals(1*GB, app_1.getCurrentConsumption().getMemorySize());

    assertEquals(4 * GB,
        app_0.getTotalPendingRequestsPerPartition().get("").getMemorySize());

    assertEquals(1 * GB,
        app_1.getTotalPendingRequestsPerPartition().get("").getMemorySize());
  }

  @SuppressWarnings({ "unchecked", "rawtypes" })
  @Test
  public void testComputeUserLimitAndSetHeadroom() throws IOException {
    LeafQueue qb = stubLeafQueue((LeafQueue)queues.get(B));
    qb.setMaxCapacity(1.0f);
    // Users
    final String user_0 = "user_0";
    final String user_1 = "user_1";

    //create nodes
    String host_0 = "127.0.0.1";
    FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 8*GB);
    String host_1 = "127.0.0.2";
    FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, 8*GB);

    Map<NodeId, FiCaSchedulerNode> nodes = ImmutableMap.of(node_0.getNodeID(),
        node_0, node_1.getNodeID(), node_1);

    final int numNodes = 2;
    Resource clusterResource = Resources.createResource(numNodes * (8*GB), 1);
    when(csContext.getNumClusterNodes()).thenReturn(numNodes);
    root.updateClusterResource(clusterResource,
        new ResourceLimits(clusterResource));

    CapacitySchedulerQueueManager mockCapacitySchedulerQueueManager
        = mock(CapacitySchedulerQueueManager.class);
    QueueStateManager mockQueueStateManager = mock(QueueStateManager.class);
    when(mockCapacitySchedulerQueueManager.getQueueStateManager()).thenReturn(
        mockQueueStateManager);
    when(csContext.getCapacitySchedulerQueueManager()).thenReturn(
        mockCapacitySchedulerQueueManager);

    //our test plan contains three cases
    //1. single user dominate the queue, we test the headroom
    //2. two users, but user_0 is assigned 100% of the queue resource,
    //   submit user_1's application, check headroom correctness
    //3. two users, each is assigned 50% of the queue resource
    //   each user submit one application and check their headrooms
    //4. similarly to 3. but user_0 has no quote left and there are
    //   free resources left, check headroom

    //test case 1
    qb.setUserLimit(100);
    qb.setUserLimitFactor(1);

    root.updateClusterResource(clusterResource,
        new ResourceLimits(clusterResource));
    final ApplicationAttemptId appAttemptId_0 =
              TestUtils.getMockApplicationAttemptId(0, 0);
    FiCaSchedulerApp app_0 =
        new FiCaSchedulerApp(appAttemptId_0, user_0, qb,
            qb.getAbstractUsersManager(), spyRMContext);
    Map<ApplicationAttemptId, FiCaSchedulerApp> apps = new HashMap<>();
    apps.put(app_0.getApplicationAttemptId(), app_0);
    qb.submitApplicationAttempt(app_0, user_0);
    Priority u0Priority = TestUtils.createMockPriority(1);
    SchedulerRequestKey u0SchedKey = toSchedulerKey(u0Priority);
    app_0.updateResourceRequests(Collections.singletonList(
        TestUtils.createResourceRequest(ResourceRequest.ANY, 4*GB, 1, true,
            u0Priority, recordFactory)));

    assertEquals(1, qb.getAbstractUsersManager().getNumActiveUsers(),
        "There should only be 1 active user!");
    //get headroom
    applyCSAssignment(clusterResource,
        qb.assignContainers(clusterResource, node_0,
        new ResourceLimits(clusterResource),
        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), qb, nodes, apps);
    qb.computeUserLimitAndSetHeadroom(app_0, clusterResource,
        "", SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY, null);

    //maxqueue 16G, userlimit 13G, - 4G used = 9G
    assertEquals(9*GB,app_0.getHeadroom().getMemorySize());

    //test case 2
    final ApplicationAttemptId appAttemptId_2 =
        TestUtils.getMockApplicationAttemptId(2, 0);
    FiCaSchedulerApp app_2 =
        new FiCaSchedulerApp(appAttemptId_2, user_1, qb,
            qb.getAbstractUsersManager(), spyRMContext);
    apps.put(app_2.getApplicationAttemptId(), app_2);
    Priority u1Priority = TestUtils.createMockPriority(2);
    SchedulerRequestKey u1SchedKey = toSchedulerKey(u1Priority);
    app_2.updateResourceRequests(Collections.singletonList(
        TestUtils.createResourceRequest(ResourceRequest.ANY, 4*GB, 1, true,
            u1Priority, recordFactory)));
    qb.submitApplicationAttempt(app_2, user_1);
    applyCSAssignment(clusterResource,
        qb.assignContainers(clusterResource, node_1,
        new ResourceLimits(clusterResource),
        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), qb, nodes, apps);
    qb.computeUserLimitAndSetHeadroom(app_0, clusterResource,
        "", SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY, null);

    assertEquals(8*GB, qb.getUsedResources().getMemorySize());
    assertEquals(4*GB, app_0.getCurrentConsumption().getMemorySize());
    //maxqueue 16G, userlimit 13G, - 4G used = 9G BUT
    //maxqueue 16G - used 8G (4 each app/user) = 8G max headroom (the new logic)
    assertEquals(8*GB, app_0.getHeadroom().getMemorySize());
    assertEquals(4*GB, app_2.getCurrentConsumption().getMemorySize());
    assertEquals(8*GB, app_2.getHeadroom().getMemorySize());

    //test case 3
    qb.finishApplication(app_0.getApplicationId(), user_0);
    qb.finishApplication(app_2.getApplicationId(), user_1);
    qb.releaseResource(clusterResource, app_0, Resource.newInstance(4*GB, 1),
        "", null);
    qb.releaseResource(clusterResource, app_2, Resource.newInstance(4*GB, 1),
        "", null);

    qb.setUserLimit(50);
    qb.setUserLimitFactor(1);
    
    final ApplicationAttemptId appAttemptId_1 =
        TestUtils.getMockApplicationAttemptId(1, 0);
    FiCaSchedulerApp app_1 =
        new FiCaSchedulerApp(appAttemptId_1, user_0, qb,
            qb.getAbstractUsersManager(), spyRMContext);
    apps.put(app_1.getApplicationAttemptId(), app_1);
    final ApplicationAttemptId appAttemptId_3 =
        TestUtils.getMockApplicationAttemptId(3, 0);
    FiCaSchedulerApp app_3 =
        new FiCaSchedulerApp(appAttemptId_3, user_1, qb,
            qb.getAbstractUsersManager(), spyRMContext);
    apps.put(app_3.getApplicationAttemptId(), app_3);
    app_1.updateResourceRequests(Collections.singletonList(
        TestUtils.createResourceRequest(ResourceRequest.ANY, 2*GB, 1, true,
            u0Priority, recordFactory)));
    app_3.updateResourceRequests(Collections.singletonList(
        TestUtils.createResourceRequest(ResourceRequest.ANY, 2*GB, 1, true,
             u1Priority, recordFactory)));
    qb.submitApplicationAttempt(app_1, user_0);
    qb.submitApplicationAttempt(app_3, user_1);
    applyCSAssignment(clusterResource,
        qb.assignContainers(clusterResource, node_0,
        new ResourceLimits(clusterResource),
        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), qb, nodes, apps);
    applyCSAssignment(clusterResource,
        qb.assignContainers(clusterResource, node_0,
        new ResourceLimits(clusterResource),
        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), qb, nodes, apps);
    qb.computeUserLimitAndSetHeadroom(app_3, clusterResource,
        "", SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY, null);
    assertEquals(4*GB, qb.getUsedResources().getMemorySize());
    //maxqueue 16G, userlimit 7G, used (by each user) 2G, headroom 5G (both)
    assertEquals(5*GB, app_3.getHeadroom().getMemorySize());
    assertEquals(5*GB, app_1.getHeadroom().getMemorySize());
    //test case 4
    final ApplicationAttemptId appAttemptId_4 =
              TestUtils.getMockApplicationAttemptId(4, 0);
    FiCaSchedulerApp app_4 =
              new FiCaSchedulerApp(appAttemptId_4, user_0, qb,
                      qb.getAbstractUsersManager(), spyRMContext);
    apps.put(app_4.getApplicationAttemptId(), app_4);
    qb.submitApplicationAttempt(app_4, user_0);
    app_4.updateResourceRequests(Collections.singletonList(
              TestUtils.createResourceRequest(ResourceRequest.ANY, 6*GB, 1, true,
                      u0Priority, recordFactory)));
    applyCSAssignment(clusterResource,
        qb.assignContainers(clusterResource, node_1,
        new ResourceLimits(clusterResource),
        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), qb, nodes, apps);
    qb.computeUserLimitAndSetHeadroom(app_4, clusterResource,
        "", SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY, null);
    qb.computeUserLimitAndSetHeadroom(app_3, clusterResource,
        "", SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY, null);
    
    
    //app3 is user1, active from last test case
    //maxqueue 16G, userlimit 13G, used 2G, would be headroom 10G BUT
    //10G in use, so max possible headroom is 6G (new logic)
    assertEquals(6*GB, app_3.getHeadroom().getMemorySize());
    //testcase3 still active - 2+2+6=10
    assertEquals(10*GB, qb.getUsedResources().getMemorySize());
    //app4 is user 0
    //maxqueue 16G, userlimit 7G, used 8G, headroom 5G
    //(8G used is 6G from this test case - app4, 2 from last test case, app_1)
    assertEquals(1*GB, app_4.getHeadroom().getMemorySize());
  }

  @Test
  public void testHeadroomWithMaxCap() throws Exception {
    // Mock the queue
    LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A));
    //unset maxCapacity
    a.setMaxCapacity(1.0f);
    
    // Users
    final String user_0 = "user_0";
    final String user_1 = "user_1";

    // Submit applications
    final ApplicationAttemptId appAttemptId_0 = 
        TestUtils.getMockApplicationAttemptId(0, 0); 
    FiCaSchedulerApp app_0 = 
        new FiCaSchedulerApp(appAttemptId_0, user_0, a, 
            a.getAbstractUsersManager(), spyRMContext);
    a.submitApplicationAttempt(app_0, user_0);

    final ApplicationAttemptId appAttemptId_1 =
        TestUtils.getMockApplicationAttemptId(1, 0); 
    FiCaSchedulerApp app_1 = 
        new FiCaSchedulerApp(appAttemptId_1, user_0, a, 
            a.getAbstractUsersManager(), spyRMContext);
    a.submitApplicationAttempt(app_1, user_0);  // same user

    final ApplicationAttemptId appAttemptId_2 = 
        TestUtils.getMockApplicationAttemptId(2, 0); 
    FiCaSchedulerApp app_2 = 
        new FiCaSchedulerApp(appAttemptId_2, user_1, a, 
            a.getAbstractUsersManager(), spyRMContext);
    a.submitApplicationAttempt(app_2, user_1);

    // Setup some nodes
    String host_0 = "127.0.0.1";
    FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 8*GB);
    String host_1 = "127.0.0.2";
    FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, 8*GB);

    Map<ApplicationAttemptId, FiCaSchedulerApp> apps = ImmutableMap.of(
        app_0.getApplicationAttemptId(), app_0, app_1.getApplicationAttemptId(),
        app_1, app_2.getApplicationAttemptId(), app_2);
    Map<NodeId, FiCaSchedulerNode> nodes = ImmutableMap.of(node_0.getNodeID(),
        node_0, node_1.getNodeID(), node_1);
    
    final int numNodes = 2;
    Resource clusterResource = Resources.createResource(numNodes * (8*GB), 1);
    when(csContext.getNumClusterNodes()).thenReturn(numNodes);

    // Setup resource-requests
    Priority priority = TestUtils.createMockPriority(1);
    app_0.updateResourceRequests(Collections.singletonList(
            TestUtils.createResourceRequest(ResourceRequest.ANY, 2*GB, 1, true,
                priority, recordFactory)));

    app_1.updateResourceRequests(Collections.singletonList(
        TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, true,
            priority, recordFactory)));

    /**
     * Start testing...
     */
    
    // Set user-limit
    a.setUserLimit(50);
    a.setUserLimitFactor(2);

    ParentQueue root = (ParentQueue) queues
        .get(CapacitySchedulerConfiguration.ROOT);
    root.updateClusterResource(clusterResource,
        new ResourceLimits(clusterResource));

    // Now, only user_0 should be active since he is the only one with
    // outstanding requests
    assertEquals(1, a.getAbstractUsersManager().getNumActiveUsers(),
        "There should only be 1 active user!");

    // 1 container to user_0
    applyCSAssignment(clusterResource,
        a.assignContainers(clusterResource, node_0,
        new ResourceLimits(clusterResource),
        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
    assertEquals(2*GB, a.getUsedResources().getMemorySize());
    assertEquals(2*GB, app_0.getCurrentConsumption().getMemorySize());
    assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize());
    // TODO, fix headroom in the future patch
    assertEquals(1*GB, app_0.getHeadroom().getMemorySize());
      // User limit = 2G, 2 in use
    assertEquals(1*GB, app_1.getHeadroom().getMemorySize());
      // the application is not yet active

    // Again one to user_0 since he hasn't exceeded user limit yet
    applyCSAssignment(clusterResource,
        a.assignContainers(clusterResource, node_0,
        new ResourceLimits(clusterResource),
        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
    assertEquals(3*GB, a.getUsedResources().getMemorySize());
    assertEquals(2*GB, app_0.getCurrentConsumption().getMemorySize());
    assertEquals(1*GB, app_1.getCurrentConsumption().getMemorySize());
    assertEquals(0*GB, app_0.getHeadroom().getMemorySize());
    assertEquals(0*GB, app_1.getHeadroom().getMemorySize());

    // Submit requests for app_1 and set max-cap
    a.setMaxCapacity(.1f);
    root.updateClusterResource(clusterResource,
        new ResourceLimits(clusterResource));
    app_2.updateResourceRequests(Collections.singletonList(
        TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, true,
            priority, recordFactory)));
    assertEquals(2, a.getAbstractUsersManager().getNumActiveUsers());

    // No more to user_0 since he is already over user-limit
    // and no more containers to queue since it's already at max-cap
    applyCSAssignment(clusterResource,
        a.assignContainers(clusterResource, node_1,
        new ResourceLimits(clusterResource),
        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
    assertEquals(3*GB, a.getUsedResources().getMemorySize());
    assertEquals(2*GB, app_0.getCurrentConsumption().getMemorySize());
    assertEquals(1*GB, app_1.getCurrentConsumption().getMemorySize());
    assertEquals(0*GB, app_2.getCurrentConsumption().getMemorySize());
    assertEquals(0*GB, app_0.getHeadroom().getMemorySize());
    assertEquals(0*GB, app_1.getHeadroom().getMemorySize());
    
    // Check headroom for app_2 
    app_1.updateResourceRequests(Collections.singletonList(     // unset
        TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 0, true,
            priority, recordFactory)));
    assertEquals(1, a.getAbstractUsersManager().getNumActiveUsers());
    applyCSAssignment(clusterResource,
        a.assignContainers(clusterResource, node_1,
        new ResourceLimits(clusterResource),
        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
    assertEquals(0*GB, app_2.getHeadroom().getMemorySize());   // hit queue max-cap
  }

  @Test
  public void testUserHeadroomMultiApp() throws Exception {
    // Mock the queue
    LeafQueue a = stubLeafQueue((LeafQueue) queues.get(A));
    // unset maxCapacity
    a.setMaxCapacity(1.0f);

    // Users
    final String user_0 = "user_0";
    final String user_1 = "user_1";

    // Submit applications
    final ApplicationAttemptId appAttemptId_0 = TestUtils
        .getMockApplicationAttemptId(0, 0);
    FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a,
        a.getAbstractUsersManager(), spyRMContext);
    a.submitApplicationAttempt(app_0, user_0);

    final ApplicationAttemptId appAttemptId_1 = TestUtils
        .getMockApplicationAttemptId(1, 0);
    FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, a,
        a.getAbstractUsersManager(), spyRMContext);
    a.submitApplicationAttempt(app_1, user_0); // same user

    final ApplicationAttemptId appAttemptId_2 = TestUtils
        .getMockApplicationAttemptId(2, 0);
    FiCaSchedulerApp app_2 = new FiCaSchedulerApp(appAttemptId_2, user_1, a,
        a.getAbstractUsersManager(), spyRMContext);
    a.submitApplicationAttempt(app_2, user_1);

    // Setup some nodes
    String host_0 = "127.0.0.1";
    FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0,
        16 * GB);
    String host_1 = "127.0.0.2";
    FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0,
        16 * GB);

    Map<ApplicationAttemptId, FiCaSchedulerApp> apps = ImmutableMap.of(
        app_0.getApplicationAttemptId(), app_0, app_1.getApplicationAttemptId(),
        app_1, app_2.getApplicationAttemptId(), app_2);
    Map<NodeId, FiCaSchedulerNode> nodes = ImmutableMap.of(node_0.getNodeID(),
        node_0, node_1.getNodeID(), node_1);

    final int numNodes = 2;
    Resource clusterResource = Resources.createResource(numNodes * (16 * GB),
        1);
    when(csContext.getNumClusterNodes()).thenReturn(numNodes);
    root.updateClusterResource(clusterResource,
        new ResourceLimits(clusterResource));

    Priority priority = TestUtils.createMockPriority(1);

    app_0.updateResourceRequests(
        Collections.singletonList(TestUtils.createResourceRequest(
            ResourceRequest.ANY, 1 * GB, 1, true, priority, recordFactory)));

    applyCSAssignment(clusterResource,
        a.assignContainers(clusterResource, node_0,
            new ResourceLimits(clusterResource),
            SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY),
        a, nodes, apps);
    assertEquals(1 * GB, a.getUsedResources().getMemorySize());
    assertEquals(1 * GB, app_0.getCurrentConsumption().getMemorySize());
    assertEquals(0 * GB, app_1.getCurrentConsumption().getMemorySize());
    // Now, headroom is the same for all apps for a given user + queue combo
    // and a change to any app's headroom is reflected for all the user's apps
    // once those apps are active/have themselves calculated headroom for
    // allocation at least one time
    assertEquals(2 * GB, app_0.getHeadroom().getMemorySize());
    assertEquals(2 * GB, app_1.getHeadroom().getMemorySize());// not yet active
    assertEquals(3 * GB, app_2.getHeadroom().getMemorySize());// not yet active

    app_1.updateResourceRequests(
        Collections.singletonList(TestUtils.createResourceRequest(
            ResourceRequest.ANY, 1 * GB, 2, true, priority, recordFactory)));

    applyCSAssignment(clusterResource,
        a.assignContainers(clusterResource, node_0,
            new ResourceLimits(clusterResource),
            SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY),
        a, nodes, apps);
    assertEquals(2 * GB, a.getUsedResources().getMemorySize());
    assertEquals(1 * GB, app_0.getCurrentConsumption().getMemorySize());
    assertEquals(1 * GB, app_1.getCurrentConsumption().getMemorySize());
    assertEquals(1 * GB, app_0.getHeadroom().getMemorySize());
    assertEquals(1 * GB, app_1.getHeadroom().getMemorySize());// now active
    assertEquals(3 * GB, app_2.getHeadroom().getMemorySize());// not yet active

    // Complete container and verify that headroom is updated, for both apps
    // for the user
    RMContainer rmContainer = app_0.getLiveContainers().iterator().next();
    a.completedContainer(clusterResource, app_0, node_0, rmContainer,
        ContainerStatus.newInstance(rmContainer.getContainerId(),
            ContainerState.COMPLETE, "",
            ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
        RMContainerEventType.KILL, null, true);

    assertEquals(2 * GB, app_0.getHeadroom().getMemorySize());
    assertEquals(2 * GB, app_1.getHeadroom().getMemorySize());
  }

  @Test
  public void testSingleQueueWithMultipleUsers() throws Exception {
    
    // Mock the queue
    LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A));
    //unset maxCapacity
    a.setMaxCapacity(1.0f);
    
    // Users
    final String user_0 = "user_0";
    final String user_1 = "user_1";
    final String user_2 = "user_2";

    // Submit applications
    final ApplicationAttemptId appAttemptId_0 = 
        TestUtils.getMockApplicationAttemptId(0, 0); 
    FiCaSchedulerApp app_0 = 
        new FiCaSchedulerApp(appAttemptId_0, user_0, a, 
            a.getAbstractUsersManager(), spyRMContext);
    a.submitApplicationAttempt(app_0, user_0);

    final ApplicationAttemptId appAttemptId_1 = 
        TestUtils.getMockApplicationAttemptId(1, 0); 
    FiCaSchedulerApp app_1 = 
        new FiCaSchedulerApp(appAttemptId_1, user_0, a, 
            a.getAbstractUsersManager(), spyRMContext);
    a.submitApplicationAttempt(app_1, user_0);  // same user

    final ApplicationAttemptId appAttemptId_2 = 
        TestUtils.getMockApplicationAttemptId(2, 0); 
    FiCaSchedulerApp app_2 = 
        new FiCaSchedulerApp(appAttemptId_2, user_1, a, 
            a.getAbstractUsersManager(), spyRMContext);
    a.submitApplicationAttempt(app_2, user_1);

    final ApplicationAttemptId appAttemptId_3 = 
        TestUtils.getMockApplicationAttemptId(3, 0); 
    FiCaSchedulerApp app_3 = 
        new FiCaSchedulerApp(appAttemptId_3, user_2, a, 
            a.getAbstractUsersManager(), spyRMContext);
    a.submitApplicationAttempt(app_3, user_2);

    Map<ApplicationAttemptId, FiCaSchedulerApp> apps = ImmutableMap.of(
        app_0.getApplicationAttemptId(), app_0, app_1.getApplicationAttemptId(),
        app_1, app_2.getApplicationAttemptId(), app_2,
        app_3.getApplicationAttemptId(), app_3);
    
    // Setup some nodes
    String host_0 = "127.0.0.1";
    FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 8*GB);
    Map<NodeId, FiCaSchedulerNode> nodes = ImmutableMap.of(node_0.getNodeID(),
        node_0);
    
    final int numNodes = 1;
    Resource clusterResource = 
        Resources.createResource(numNodes * (8*GB), numNodes * 16);
    when(csContext.getNumClusterNodes()).thenReturn(numNodes);
    when(csContext.getClusterResource()).thenReturn(clusterResource);
    root.updateClusterResource(clusterResource,
        new ResourceLimits(clusterResource));

    // Setup resource-requests
    Priority priority = TestUtils.createMockPriority(1);
    app_0.updateResourceRequests(Collections.singletonList(
            TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 10, true,
                priority, recordFactory)));

    app_1.updateResourceRequests(Collections.singletonList(
        TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 10, true,
            priority, recordFactory)));

    /** 
     * Start testing... 
     */
    
    // Only 1 container
    applyCSAssignment(clusterResource,
        a.assignContainers(clusterResource, node_0,
        new ResourceLimits(clusterResource),
        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
    assertEquals(1*GB, a.getUsedResources().getMemorySize());
    assertEquals(1*GB, app_0.getCurrentConsumption().getMemorySize());
    assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize());

    // Also 2nd -> minCapacity = 1024 since (.1 * 8G) < minAlloc, also
    // you can get one container more than user-limit
    applyCSAssignment(clusterResource,
        a.assignContainers(clusterResource, node_0,
        new ResourceLimits(clusterResource),
        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
    assertEquals(2*GB, a.getUsedResources().getMemorySize());
    assertEquals(2*GB, app_0.getCurrentConsumption().getMemorySize());
    assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize());
    
    // Can't allocate 3rd due to user-limit
    a.setUserLimit(25);
    applyCSAssignment(clusterResource,
        a.assignContainers(clusterResource, node_0,
        new ResourceLimits(clusterResource),
        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
    assertEquals(2*GB, a.getUsedResources().getMemorySize());
    assertEquals(2*GB, app_0.getCurrentConsumption().getMemorySize());
    assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize());
    
    // Submit resource requests for other apps now to 'activate' them
    
    app_2.updateResourceRequests(Collections.singletonList(
        TestUtils.createResourceRequest(ResourceRequest.ANY, 3*GB, 1, true,
            priority, recordFactory)));

    app_3.updateResourceRequests(Collections.singletonList(
        TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, true,
            priority, recordFactory)));

    // Now allocations should goto app_2 since 
    // user_0 is at limit inspite of high user-limit-factor
    a.setUserLimitFactor(10);
    applyCSAssignment(clusterResource,
        a.assignContainers(clusterResource, node_0,
        new ResourceLimits(clusterResource),
        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
    assertEquals(5*GB, a.getUsedResources().getMemorySize());
    assertEquals(2*GB, app_0.getCurrentConsumption().getMemorySize());
    assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize());
    assertEquals(3*GB, app_2.getCurrentConsumption().getMemorySize());
    assertEquals(0*GB, app_3.getCurrentConsumption().getMemorySize());

    // Now allocations should goto app_0 since 
    // user_0 is at user-limit not above it
    applyCSAssignment(clusterResource,
        a.assignContainers(clusterResource, node_0,
        new ResourceLimits(clusterResource),
        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
    assertEquals(6*GB, a.getUsedResources().getMemorySize());
    assertEquals(3*GB, app_0.getCurrentConsumption().getMemorySize());
    assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize());
    assertEquals(3*GB, app_2.getCurrentConsumption().getMemorySize());
    assertEquals(0*GB, app_3.getCurrentConsumption().getMemorySize());
    
    // Test max-capacity
    // Now - no more allocs since we are at max-cap
    a.setMaxCapacity(0.5f);
    root.updateClusterResource(clusterResource,
        new ResourceLimits(clusterResource));
    applyCSAssignment(clusterResource,
        a.assignContainers(clusterResource, node_0,
        new ResourceLimits(clusterResource),
        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
    assertEquals(6*GB, a.getUsedResources().getMemorySize());
    assertEquals(3*GB, app_0.getCurrentConsumption().getMemorySize());
    assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize());
    assertEquals(3*GB, app_2.getCurrentConsumption().getMemorySize());
    assertEquals(0*GB, app_3.getCurrentConsumption().getMemorySize());
    
    // Revert max-capacity and user-limit-factor
    // Now, allocations should goto app_3 since it's under user-limit 
    a.setMaxCapacity(1.0f);
    a.setUserLimitFactor(1);
    root.updateClusterResource(clusterResource,
        new ResourceLimits(clusterResource));
    applyCSAssignment(clusterResource,
        a.assignContainers(clusterResource, node_0,
        new ResourceLimits(clusterResource),
        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
    assertEquals(7*GB, a.getUsedResources().getMemorySize());
    assertEquals(3*GB, app_0.getCurrentConsumption().getMemorySize());
    assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize());
    assertEquals(3*GB, app_2.getCurrentConsumption().getMemorySize());
    assertEquals(1*GB, app_3.getCurrentConsumption().getMemorySize());

    // Now we should assign to app_3 again since user_2 is under user-limit
    applyCSAssignment(clusterResource,
        a.assignContainers(clusterResource, node_0,
        new ResourceLimits(clusterResource),
        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
    assertEquals(8*GB, a.getUsedResources().getMemorySize());
    assertEquals(3*GB, app_0.getCurrentConsumption().getMemorySize());
    assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize());
    assertEquals(3*GB, app_2.getCurrentConsumption().getMemorySize());
    assertEquals(2*GB, app_3.getCurrentConsumption().getMemorySize());

    // 8. Release each container from app_0
    for (RMContainer rmContainer : app_0.getLiveContainers()) {
      a.completedContainer(clusterResource, app_0, node_0, rmContainer,
          ContainerStatus.newInstance(rmContainer.getContainerId(),
              ContainerState.COMPLETE, "",
              ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
          RMContainerEventType.KILL, null, true);
    }
    assertEquals(5*GB, a.getUsedResources().getMemorySize());
    assertEquals(0*GB, app_0.getCurrentConsumption().getMemorySize());
    assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize());
    assertEquals(3*GB, app_2.getCurrentConsumption().getMemorySize());
    assertEquals(2*GB, app_3.getCurrentConsumption().getMemorySize());
    
    // 9. Release each container from app_2
    for (RMContainer rmContainer : app_2.getLiveContainers()) {
      a.completedContainer(clusterResource, app_2, node_0, rmContainer,
          ContainerStatus.newInstance(rmContainer.getContainerId(),
              ContainerState.COMPLETE, "",
              ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
          RMContainerEventType.KILL, null, true);
    }
    assertEquals(2*GB, a.getUsedResources().getMemorySize());
    assertEquals(0*GB, app_0.getCurrentConsumption().getMemorySize());
    assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize());
    assertEquals(0*GB, app_2.getCurrentConsumption().getMemorySize());
    assertEquals(2*GB, app_3.getCurrentConsumption().getMemorySize());

    // 10. Release each container from app_3
    for (RMContainer rmContainer : app_3.getLiveContainers()) {
      a.completedContainer(clusterResource, app_3, node_0, rmContainer,
          ContainerStatus.newInstance(rmContainer.getContainerId(),
              ContainerState.COMPLETE, "",
              ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
          RMContainerEventType.KILL, null, true);
    }
    assertEquals(0*GB, a.getUsedResources().getMemorySize());
    assertEquals(0*GB, app_0.getCurrentConsumption().getMemorySize());
    assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize());
    assertEquals(0*GB, app_2.getCurrentConsumption().getMemorySize());
    assertEquals(0*GB, app_3.getCurrentConsumption().getMemorySize());
  }
  
  @Test
  public void testReservation() throws Exception {

    // Manipulate queue 'a'
    LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A));
    //unset maxCapacity
    a.setMaxCapacity(1.0f);

    // Users
    final String user_0 = "user_0";
    final String user_1 = "user_1";

    // Submit applications
    final ApplicationAttemptId appAttemptId_0 = 
        TestUtils.getMockApplicationAttemptId(0, 0); 
    FiCaSchedulerApp app_0 = 
        new FiCaSchedulerApp(appAttemptId_0, user_0, a, 
            mock(ActiveUsersManager.class), spyRMContext);
    a.submitApplicationAttempt(app_0, user_0);

    final ApplicationAttemptId appAttemptId_1 = 
        TestUtils.getMockApplicationAttemptId(1, 0); 
    FiCaSchedulerApp app_1 = 
        new FiCaSchedulerApp(appAttemptId_1, user_1, a, 
            mock(ActiveUsersManager.class), spyRMContext);
    a.submitApplicationAttempt(app_1, user_1);  

    // Setup some nodes
    String host_0 = "127.0.0.1";
    FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 4*GB);

    Map<ApplicationAttemptId, FiCaSchedulerApp> apps = ImmutableMap.of(
        app_0.getApplicationAttemptId(), app_0, app_1.getApplicationAttemptId(),
        app_1);
    Map<NodeId, FiCaSchedulerNode> nodes = ImmutableMap.of(node_0.getNodeID(),
        node_0);
    
    final int numNodes = 2;
    Resource clusterResource = 
        Resources.createResource(numNodes * (4*GB), numNodes * 16);
    when(csContext.getNumClusterNodes()).thenReturn(numNodes);
    root.updateClusterResource(clusterResource,
        new ResourceLimits(clusterResource));
    
    // Setup resource-requests
    Priority priority = TestUtils.createMockPriority(1);
    app_0.updateResourceRequests(Collections.singletonList(
            TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, true,
                priority, recordFactory)));

    app_1.updateResourceRequests(Collections.singletonList(
        TestUtils.createResourceRequest(ResourceRequest.ANY, 4*GB, 1, true,
            priority, recordFactory)));

    // Start testing...
    
    // Only 1 container
    applyCSAssignment(clusterResource,
        a.assignContainers(clusterResource, node_0,
            new ResourceLimits(clusterResource),
            SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
    assertEquals(1*GB, a.getUsedResources().getMemorySize());
    assertEquals(1*GB, app_0.getCurrentConsumption().getMemorySize());
    assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize());
    assertEquals(0*GB, a.getMetrics().getReservedMB());
    assertEquals(1*GB, a.getMetrics().getAllocatedMB());
    assertEquals(0*GB, a.getMetrics().getAvailableMB());

    // Also 2nd -> minCapacity = 1024 since (.1 * 8G) < minAlloc, also
    // you can get one container more than user-limit
    applyCSAssignment(clusterResource,
        a.assignContainers(clusterResource, node_0,
            new ResourceLimits(clusterResource),
            SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
    assertEquals(2*GB, a.getUsedResources().getMemorySize());
    assertEquals(2*GB, app_0.getCurrentConsumption().getMemorySize());
    assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize());
    assertEquals(0*GB, a.getMetrics().getReservedMB());
    assertEquals(2*GB, a.getMetrics().getAllocatedMB());
    
    // Now, reservation should kick in for app_1
    applyCSAssignment(clusterResource,
        a.assignContainers(clusterResource, node_0,
            new ResourceLimits(clusterResource),
            SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
    assertEquals(6*GB, a.getUsedResources().getMemorySize());
    assertEquals(2*GB, app_0.getCurrentConsumption().getMemorySize());
    assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize());
    assertEquals(4*GB, app_1.getCurrentReservation().getMemorySize());
    assertEquals(2*GB, node_0.getAllocatedResource().getMemorySize());
    assertEquals(4*GB, a.getMetrics().getReservedMB());
    assertEquals(2*GB, a.getMetrics().getAllocatedMB());
    
    // Now free 1 container from app_0 i.e. 1G
    RMContainer rmContainer = app_0.getLiveContainers().iterator().next();
    a.completedContainer(clusterResource, app_0, node_0, rmContainer,
        ContainerStatus.newInstance(rmContainer.getContainerId(),
            ContainerState.COMPLETE, "",
            ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
        RMContainerEventType.KILL, null, true);
    applyCSAssignment(clusterResource,
        a.assignContainers(clusterResource, node_0,
            new ResourceLimits(clusterResource),
            SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
    assertEquals(5*GB, a.getUsedResources().getMemorySize());
    assertEquals(1*GB, app_0.getCurrentConsumption().getMemorySize());
    assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize());
    assertEquals(4*GB, app_1.getCurrentReservation().getMemorySize());
    assertEquals(1*GB, node_0.getAllocatedResource().getMemorySize());
    assertEquals(4*GB, a.getMetrics().getReservedMB());
    assertEquals(1*GB, a.getMetrics().getAllocatedMB());

    // Now finish another container from app_0 and fulfill the reservation
    rmContainer = app_0.getLiveContainers().iterator().next();
    a.completedContainer(clusterResource, app_0, node_0, rmContainer,
        ContainerStatus.newInstance(rmContainer.getContainerId(),
            ContainerState.COMPLETE, "",
            ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
        RMContainerEventType.KILL, null, true);
    applyCSAssignment(clusterResource,
        a.assignContainers(clusterResource, node_0,
            new ResourceLimits(clusterResource),
            SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
    assertEquals(4*GB, a.getUsedResources().getMemorySize());
    assertEquals(0*GB, app_0.getCurrentConsumption().getMemorySize());
    assertEquals(4*GB, app_1.getCurrentConsumption().getMemorySize());
    assertEquals(0*GB, app_1.getCurrentReservation().getMemorySize());
    assertEquals(4*GB, node_0.getAllocatedResource().getMemorySize());
    assertEquals(0*GB, a.getMetrics().getReservedMB());
    assertEquals(4*GB, a.getMetrics().getAllocatedMB());
  }

  @Test
  public void testReservationExchange() throws Exception {

    // Manipulate queue 'a'
    LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A));
    //unset maxCapacity
    a.setMaxCapacity(1.0f);
    a.setUserLimitFactor(10);

    // Users
    final String user_0 = "user_0";
    final String user_1 = "user_1";

    // Submit applications
    final ApplicationAttemptId appAttemptId_0 = 
        TestUtils.getMockApplicationAttemptId(0, 0); 
    FiCaSchedulerApp app_0 = 
        new FiCaSchedulerApp(appAttemptId_0, user_0, a, 
            mock(ActiveUsersManager.class), spyRMContext);
    a.submitApplicationAttempt(app_0, user_0);

    final ApplicationAttemptId appAttemptId_1 = 
        TestUtils.getMockApplicationAttemptId(1, 0); 
    FiCaSchedulerApp app_1 = 
        new FiCaSchedulerApp(appAttemptId_1, user_1, a, 
            mock(ActiveUsersManager.class), spyRMContext);
    a.submitApplicationAttempt(app_1, user_1);  

    // Setup some nodes
    String host_0 = "127.0.0.1";
    FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 4*GB);
    
    String host_1 = "127.0.0.2";
    FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, 4*GB);
    
    when(csContext.getNode(node_0.getNodeID())).thenReturn(node_0);
    when(csContext.getNode(node_1.getNodeID())).thenReturn(node_1);
    when(csContext.getClusterResource()).thenReturn(Resource.newInstance(8, 1));

    Map<ApplicationAttemptId, FiCaSchedulerApp> apps = ImmutableMap.of(
        app_0.getApplicationAttemptId(), app_0, app_1.getApplicationAttemptId(),
        app_1);
    Map<NodeId, FiCaSchedulerNode> nodes = ImmutableMap.of(node_0.getNodeID(),
        node_0, node_1.getNodeID(), node_1);
    
    final int numNodes = 3;
    Resource clusterResource = 
        Resources.createResource(numNodes * (4*GB), numNodes * 16);
    root.updateClusterResource(clusterResource,
        new ResourceLimits(clusterResource));
    when(csContext.getNumClusterNodes()).thenReturn(numNodes);
    when(csContext.getMaximumResourceCapability()).thenReturn(
        Resources.createResource(4*GB, 16));
    when(a.getMaximumAllocation()).thenReturn(
        Resources.createResource(4*GB, 16));
    when(a.getMinimumAllocationFactor()).thenReturn(0.25f); // 1G / 4G 
    
    
    
    // Setup resource-requests
    Priority priority = TestUtils.createMockPriority(1);
    app_0.updateResourceRequests(Collections.singletonList(
            TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, true,
                priority, recordFactory)));

    app_1.updateResourceRequests(Collections.singletonList(
        TestUtils.createResourceRequest(ResourceRequest.ANY, 4*GB, 1, true,
            priority, recordFactory)));

    // Start testing...
    
    // Only 1 container
    applyCSAssignment(clusterResource,
        a.assignContainers(clusterResource, node_0,
        new ResourceLimits(clusterResource),
        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
    assertEquals(1*GB, a.getUsedResources().getMemorySize());
    assertEquals(1*GB, app_0.getCurrentConsumption().getMemorySize());
    assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize());

    // Also 2nd -> minCapacity = 1024 since (.1 * 8G) < minAlloc, also
    // you can get one container more than user-limit
    applyCSAssignment(clusterResource,
        a.assignContainers(clusterResource, node_0,
        new ResourceLimits(clusterResource),
        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
    assertEquals(2*GB, a.getUsedResources().getMemorySize());
    assertEquals(2*GB, app_0.getCurrentConsumption().getMemorySize());
    assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize());
    
    // Now, reservation should kick in for app_1
    applyCSAssignment(clusterResource,
        a.assignContainers(clusterResource, node_0,
        new ResourceLimits(clusterResource),
        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
    assertEquals(6*GB, a.getUsedResources().getMemorySize());
    assertEquals(2*GB, app_0.getCurrentConsumption().getMemorySize());
    assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize());
    assertEquals(4*GB, app_1.getCurrentReservation().getMemorySize());
    assertEquals(2*GB, node_0.getAllocatedResource().getMemorySize());
    
    // Now free 1 container from app_0 i.e. 1G, and re-reserve it
    RMContainer rmContainer = app_0.getLiveContainers().iterator().next();
    a.completedContainer(clusterResource, app_0, node_0, rmContainer,
        ContainerStatus.newInstance(rmContainer.getContainerId(),
            ContainerState.COMPLETE, "",
            ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
        RMContainerEventType.KILL, null, true);
    applyCSAssignment(clusterResource,
        a.assignContainers(clusterResource, node_0,
        new ResourceLimits(clusterResource),
        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
    assertEquals(5*GB, a.getUsedResources().getMemorySize());
    assertEquals(1*GB, app_0.getCurrentConsumption().getMemorySize());
    assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize());
    assertEquals(4*GB, app_1.getCurrentReservation().getMemorySize());
    assertEquals(1*GB, node_0.getAllocatedResource().getMemorySize());
    assertEquals(1, app_1.getReReservations(toSchedulerKey(priority)));

    // Re-reserve
    applyCSAssignment(clusterResource,
        a.assignContainers(clusterResource, node_0,
        new ResourceLimits(clusterResource),
        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
    assertEquals(5*GB, a.getUsedResources().getMemorySize());
    assertEquals(1*GB, app_0.getCurrentConsumption().getMemorySize());
    assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize());
    assertEquals(4*GB, app_1.getCurrentReservation().getMemorySize());
    assertEquals(1*GB, node_0.getAllocatedResource().getMemorySize());
    assertEquals(2, app_1.getReReservations(toSchedulerKey(priority)));
    
    // Try to schedule on node_1 now, should *move* the reservation
    applyCSAssignment(clusterResource,
        a.assignContainers(clusterResource, node_1,
        new ResourceLimits(clusterResource),
        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
    assertEquals(9*GB, a.getUsedResources().getMemorySize());
    assertEquals(1*GB, app_0.getCurrentConsumption().getMemorySize());
    assertEquals(4*GB, app_1.getCurrentConsumption().getMemorySize());
    assertEquals(4*GB, app_1.getCurrentReservation().getMemorySize());
    assertEquals(4*GB, node_1.getAllocatedResource().getMemorySize());
    // Doesn't change yet... only when reservation is cancelled or a different
    // container is reserved
    assertEquals(2, app_1.getReReservations(toSchedulerKey(priority)));
    
    // Now finish another container from app_0 and see the reservation cancelled
    rmContainer = app_0.getLiveContainers().iterator().next();
    a.completedContainer(clusterResource, app_0, node_0, rmContainer,
        ContainerStatus.newInstance(rmContainer.getContainerId(),
            ContainerState.COMPLETE, "",
            ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
        RMContainerEventType.KILL, null, true);
    applyCSAssignment(clusterResource,
        a.assignContainers(clusterResource, node_0,
        new ResourceLimits(clusterResource),
        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
    assertEquals(4*GB, a.getUsedResources().getMemorySize());
    assertEquals(0*GB, app_0.getCurrentConsumption().getMemorySize());
    assertEquals(4*GB, app_1.getCurrentConsumption().getMemorySize());
    assertEquals(0*GB, app_1.getCurrentReservation().getMemorySize());
    assertEquals(0*GB, node_0.getAllocatedResource().getMemorySize());
  }
  
  private void verifyContainerAllocated(CSAssignment assignment, NodeType nodeType) {
    assertTrue(Resources.greaterThan(resourceCalculator, null,
        assignment.getResource(), Resources.none()));
    assertTrue(assignment.getAssignmentInformation().getNumAllocations() > 0);
    assertEquals(nodeType, assignment.getType());
  }

  private void verifyNoContainerAllocated(CSAssignment assignment) {
    assertTrue(Resources.equals(assignment.getResource(),
        Resources.none()));
    assertTrue(assignment.getAssignmentInformation().getNumAllocations() == 0);
  }

  @Test
  public void testLocalityScheduling() throws Exception {

    // Manipulate queue 'b'
    LeafQueue a = stubLeafQueue((LeafQueue)queues.get(B));

    // User
    String user_0 = "user_0";
    
    // Submit applications
    final ApplicationAttemptId appAttemptId_0 = 
        TestUtils.getMockApplicationAttemptId(0, 0); 
    FiCaSchedulerApp app_0 =
        new FiCaSchedulerApp(appAttemptId_0, user_0, a,
            mock(ActiveUsersManager.class), spyRMContext);
    a.submitApplicationAttempt(app_0, user_0);

    // Setup some nodes and racks
    String host_0 = "127.0.0.1";
    String rack_0 = "rack_0";
    FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, rack_0, 0, 8*GB);
    
    String host_1 = "127.0.0.2";
    String rack_1 = "rack_1";
    FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, rack_1, 0, 8*GB);
    
    String host_2 = "127.0.0.3";
    String rack_2 = "rack_2";
    FiCaSchedulerNode node_2 = TestUtils.getMockNode(host_2, rack_2, 0, 8*GB);

    String host_3 = "127.0.0.4"; // on rack_1
    FiCaSchedulerNode node_3 = TestUtils.getMockNode(host_3, rack_1, 0, 8*GB);

    Map<ApplicationAttemptId, FiCaSchedulerApp> apps = ImmutableMap.of(
        app_0.getApplicationAttemptId(), app_0);
    Map<NodeId, FiCaSchedulerNode> nodes = ImmutableMap.of(node_0.getNodeID(),
        node_0, node_1.getNodeID(), node_1, node_2.getNodeID(), node_2,
        node_3.getNodeID(), node_3);

    final int numNodes = 3;
    Resource clusterResource = 
        Resources.createResource(numNodes * (8*GB), numNodes * 16);
    when(csContext.getNumClusterNodes()).thenReturn(numNodes);
    root.updateClusterResource(clusterResource,
        new ResourceLimits(clusterResource));
    
    // Setup resource-requests and submit
    Priority priority = TestUtils.createMockPriority(1);
    List<ResourceRequest> app_0_requests_0 = new ArrayList<ResourceRequest>();
    app_0_requests_0.add(
        TestUtils.createResourceRequest(host_0, 1*GB, 1, 
            true, priority, recordFactory));
    app_0_requests_0.add(
        TestUtils.createResourceRequest(rack_0, 1*GB, 1, 
            true, priority, recordFactory));
    app_0_requests_0.add(
        TestUtils.createResourceRequest(host_1, 1*GB, 1, 
            true, priority, recordFactory));
    app_0_requests_0.add(
        TestUtils.createResourceRequest(rack_1, 1*GB, 1, 
            true, priority, recordFactory));
    app_0_requests_0.add(
        TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 3, // one extra
            true, priority, recordFactory));
    app_0.updateResourceRequests(app_0_requests_0);

    // Start testing...
    CSAssignment assignment = null;

    SchedulerRequestKey schedulerKey = toSchedulerKey(priority);
    // Start with off switch, shouldn't allocate due to delay scheduling
    assignment = a.assignContainers(clusterResource, node_2,
        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
    applyCSAssignment(clusterResource, assignment, a, nodes, apps);
    verifyNoContainerAllocated(assignment);
    assertEquals(1, app_0.getSchedulingOpportunities(schedulerKey));
    assertEquals(3, app_0.getOutstandingAsksCount(schedulerKey));
    assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // None->NODE_LOCAL

    // Another off switch, shouldn't allocate due to delay scheduling
    assignment = a.assignContainers(clusterResource, node_2,
        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
    applyCSAssignment(clusterResource, assignment, a, nodes, apps);
    verifyNoContainerAllocated(assignment);
    assertEquals(2, app_0.getSchedulingOpportunities(schedulerKey));
    assertEquals(3, app_0.getOutstandingAsksCount(schedulerKey));
    assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // None->NODE_LOCAL
    
    // Another off switch, shouldn't allocate due to delay scheduling
    assignment = a.assignContainers(clusterResource, node_2,
        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
    applyCSAssignment(clusterResource, assignment, a, nodes, apps);
    verifyNoContainerAllocated(assignment);
    assertEquals(3, app_0.getSchedulingOpportunities(schedulerKey));
    assertEquals(3, app_0.getOutstandingAsksCount(schedulerKey));
    assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // None->NODE_LOCAL
    
    // Another off switch, now we should allocate 
    // since missedOpportunities=3 and reqdContainers=3
    assignment = a.assignContainers(clusterResource, node_2,
        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
    applyCSAssignment(clusterResource, assignment, a, nodes, apps);
    verifyContainerAllocated(assignment, NodeType.OFF_SWITCH);
    // should NOT reset
    assertEquals(4, app_0.getSchedulingOpportunities(schedulerKey));
    assertEquals(2, app_0.getOutstandingAsksCount(schedulerKey));
    
    // NODE_LOCAL - node_0
    assignment = a.assignContainers(clusterResource, node_0,
        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
    applyCSAssignment(clusterResource, assignment, a, nodes, apps);
    verifyContainerAllocated(assignment, NodeType.NODE_LOCAL);
    // should reset
    assertEquals(0, app_0.getSchedulingOpportunities(schedulerKey));
    assertEquals(1, app_0.getOutstandingAsksCount(schedulerKey));
    
    // NODE_LOCAL - node_1
    assignment = a.assignContainers(clusterResource, node_1,
        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
    applyCSAssignment(clusterResource, assignment, a, nodes, apps);
    verifyContainerAllocated(assignment, NodeType.NODE_LOCAL);
    // should reset
    assertEquals(0, app_0.getSchedulingOpportunities(schedulerKey));
    assertEquals(0, app_0.getOutstandingAsksCount(schedulerKey));
    assertEquals(NodeType.NODE_LOCAL, assignment.getType());
    
    // Add 1 more request to check for RACK_LOCAL
    app_0_requests_0.clear();
    app_0_requests_0.add(
        TestUtils.createResourceRequest(host_1, 1*GB, 1, 
            true, priority, recordFactory));
    app_0_requests_0.add(
        TestUtils.createResourceRequest(rack_1, 1*GB, 3, 
            true, priority, recordFactory));
    app_0_requests_0.add(
        TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 4, // one extra
            true, priority, recordFactory));
    app_0.updateResourceRequests(app_0_requests_0);
    assertEquals(4, app_0.getOutstandingAsksCount(schedulerKey));
    
    // Rack-delay
    doReturn(true).when(a).getRackLocalityFullReset();
    doReturn(1).when(a).getNodeLocalityDelay();
    
    // Shouldn't assign RACK_LOCAL yet
    assignment = a.assignContainers(clusterResource, node_3,
        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
    applyCSAssignment(clusterResource, assignment, a, nodes, apps);
    assertEquals(1, app_0.getSchedulingOpportunities(schedulerKey));
    assertEquals(4, app_0.getOutstandingAsksCount(schedulerKey));

    // Should assign RACK_LOCAL now
    assignment = a.assignContainers(clusterResource, node_3,
        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
    applyCSAssignment(clusterResource, assignment, a, nodes, apps);
    verifyContainerAllocated(assignment, NodeType.RACK_LOCAL);
    // should reset
    assertEquals(0, app_0.getSchedulingOpportunities(schedulerKey));
    assertEquals(3, app_0.getOutstandingAsksCount(schedulerKey));
    
    // Shouldn't assign RACK_LOCAL because schedulingOpportunities should have gotten reset.
    assignment = a.assignContainers(clusterResource, node_3,
        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
    applyCSAssignment(clusterResource, assignment, a, nodes, apps);
    assertEquals(1, app_0.getSchedulingOpportunities(schedulerKey));
    assertEquals(3, app_0.getOutstandingAsksCount(schedulerKey));

    // Next time we schedule RACK_LOCAL, don't reset
    doReturn(false).when(a).getRackLocalityFullReset();

    // Should assign RACK_LOCAL now
    assignment = a.assignContainers(clusterResource, node_3,
        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
    applyCSAssignment(clusterResource, assignment, a, nodes, apps);
    verifyContainerAllocated(assignment, NodeType.RACK_LOCAL);
    // should NOT reset
    assertEquals(2, app_0.getSchedulingOpportunities(schedulerKey));
    assertEquals(2, app_0.getOutstandingAsksCount(schedulerKey));

    // Another RACK_LOCAL since schedulingOpportunities not reset
    assignment = a.assignContainers(clusterResource, node_3,
        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
    applyCSAssignment(clusterResource, assignment, a, nodes, apps);
    verifyContainerAllocated(assignment, NodeType.RACK_LOCAL);
    // should NOT reset
    assertEquals(3, app_0.getSchedulingOpportunities(schedulerKey));
    assertEquals(1, app_0.getOutstandingAsksCount(schedulerKey));
    
    // Add a request larger than cluster size to verify
    // OFF_SWITCH delay is capped by cluster size
    app_0.resetSchedulingOpportunities(schedulerKey);
    app_0_requests_0.clear();
    app_0_requests_0.add(
        TestUtils.createResourceRequest(host_0, 1*GB, 100,
            true, priority, recordFactory));
    app_0_requests_0.add(
        TestUtils.createResourceRequest(rack_0, 1*GB, 100,
            true, priority, recordFactory));
    app_0_requests_0.add(
        TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 100,
            true, priority, recordFactory));
    app_0.updateResourceRequests(app_0_requests_0);

    // Start with off switch. 3 nodes in cluster so shouldn't allocate first 3
    for (int i = 0; i < numNodes; i++) {
      assignment =
          a.assignContainers(clusterResource, node_2, new ResourceLimits(
              clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
      applyCSAssignment(clusterResource, assignment, a, nodes, apps);
      verifyNoContainerAllocated(assignment);
      assertEquals(i+1, app_0.getSchedulingOpportunities(schedulerKey));
    }
    // delay should be capped at numNodes so next one should allocate
    assignment = a.assignContainers(clusterResource, node_2,
        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
    applyCSAssignment(clusterResource, assignment, a, nodes, apps);
    verifyContainerAllocated(assignment, NodeType.OFF_SWITCH);
    assertEquals(numNodes+1, app_0.getSchedulingOpportunities(schedulerKey));
  }

  @Test
  public void testRackLocalityDelayScheduling() throws Exception {

    // Change parameter values for node locality and rack locality delay.
    csConf.setInt(CapacitySchedulerConfiguration.NODE_LOCALITY_DELAY, 2);
    csConf.setInt(
        CapacitySchedulerConfiguration.RACK_LOCALITY_ADDITIONAL_DELAY, 1);
    queueContext.reinitialize();
    CSQueueStore newQueues = new CSQueueStore();
    CSQueue newRoot = CapacitySchedulerQueueManager.parseQueue(queueContext,
        csConf, null, ROOT.getFullPath(), newQueues, queues,
        TestUtils.spyHook);
    csContext.getCapacitySchedulerQueueManager().setRootQueue(newRoot);
    root.reinitialize(newRoot, cs.getClusterResource());

    // Manipulate queue 'b'
    LeafQueue a = stubLeafQueue((LeafQueue) newQueues.get(B));

    // Check locality parameters.
    assertEquals(2, a.getNodeLocalityDelay());
    assertEquals(1, a.getRackLocalityAdditionalDelay());

    // User
    String user1 = "user_1";

    // Submit applications
    final ApplicationAttemptId appAttemptId1 =
        TestUtils.getMockApplicationAttemptId(0, 0);
    FiCaSchedulerApp app1 = new FiCaSchedulerApp(appAttemptId1, user1, a,
        mock(ActiveUsersManager.class), spyRMContext);
    a.submitApplicationAttempt(app1, user1);

    // Setup some nodes and racks
    String host1 = "127.0.0.1";
    String host2 = "127.0.0.2";
    String host3 = "127.0.0.3";
    String host4 = "127.0.0.4";
    String rack1 = "rack_1";
    String rack2 = "rack_2";
    String rack3 = "rack_3";
    FiCaSchedulerNode node2 = TestUtils.getMockNode(host3, rack2, 0, 8 * GB);
    FiCaSchedulerNode node3 = TestUtils.getMockNode(host4, rack3, 0, 8 * GB);

    Map<ApplicationAttemptId, FiCaSchedulerApp> apps =
        ImmutableMap.of(app1.getApplicationAttemptId(), app1);
    Map<NodeId, FiCaSchedulerNode> nodes =
        ImmutableMap.of(node2.getNodeID(), node2, node3.getNodeID(), node3);

    final int numNodes = 5;
    Resource clusterResource =
        Resources.createResource(numNodes * (8 * GB), numNodes * 16);
    when(spyRMContext.getScheduler().getNumClusterNodes()).thenReturn(numNodes);
    newRoot.updateClusterResource(clusterResource,
        new ResourceLimits(clusterResource));

    // Setup resource-requests and submit
    Priority priority = TestUtils.createMockPriority(1);
    List<ResourceRequest> app1Requests1 = new ArrayList<ResourceRequest>();
    app1Requests1.add(TestUtils.createResourceRequest(host1, 1 * GB, 1,
        true, priority, recordFactory));
    app1Requests1.add(TestUtils.createResourceRequest(rack1, 1 * GB, 1,
        true, priority, recordFactory));
    app1Requests1.add(TestUtils.createResourceRequest(host2, 1 * GB, 1,
        true, priority, recordFactory));
    app1Requests1.add(TestUtils.createResourceRequest(rack2, 1 * GB, 1,
        true, priority, recordFactory));
    // Adding one extra in the ANY.
    app1Requests1.add(TestUtils.createResourceRequest(ResourceRequest.ANY,
        1 * GB, 3, true, priority, recordFactory));
    app1.updateResourceRequests(app1Requests1);

    // Start testing...
    CSAssignment assignment = null;

    SchedulerRequestKey schedulerKey = toSchedulerKey(priority);
    assertEquals(3, app1.getOutstandingAsksCount(schedulerKey));

    // No rack-local yet.
    assignment = a.assignContainers(clusterResource, node2,
        new ResourceLimits(clusterResource),
        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
    applyCSAssignment(clusterResource, assignment, a, nodes, apps);
    verifyNoContainerAllocated(assignment);
    assertEquals(1, app1.getSchedulingOpportunities(schedulerKey));
    assertEquals(3, app1.getOutstandingAsksCount(schedulerKey));
    assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // None->NODE_LOCAL

    // Still no rack-local.
    assignment = a.assignContainers(clusterResource, node2,
        new ResourceLimits(clusterResource),
        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
    applyCSAssignment(clusterResource, assignment, a, nodes, apps);
    assertEquals(2, app1.getSchedulingOpportunities(schedulerKey));
    assertEquals(3, app1.getOutstandingAsksCount(schedulerKey));
    assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // None->NODE_LOCAL

    // Rack local now.
    assignment = a.assignContainers(clusterResource, node2,
        new ResourceLimits(clusterResource),
        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
    applyCSAssignment(clusterResource, assignment, a, nodes, apps);
    assertEquals(0, app1.getSchedulingOpportunities(schedulerKey));
    assertEquals(2, app1.getOutstandingAsksCount(schedulerKey));
    assertEquals(NodeType.RACK_LOCAL, assignment.getType());

    // No off-switch until 3 missed opportunities.
    a.assignContainers(clusterResource, node3,
        new ResourceLimits(clusterResource),
        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
    applyCSAssignment(clusterResource, assignment, a, nodes, apps);
    a.assignContainers(clusterResource, node3,
        new ResourceLimits(clusterResource),
        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
    applyCSAssignment(clusterResource, assignment, a, nodes, apps);
    assignment = a.assignContainers(clusterResource, node3,
        new ResourceLimits(clusterResource),
        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
    applyCSAssignment(clusterResource, assignment, a, nodes, apps);
    assertEquals(3, app1.getSchedulingOpportunities(schedulerKey));
    assertEquals(2, app1.getOutstandingAsksCount(schedulerKey));
    assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // None->NODE_LOCAL

    // Now off-switch should succeed.
    assignment = a.assignContainers(clusterResource, node3,
        new ResourceLimits(clusterResource),
        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
    applyCSAssignment(clusterResource, assignment, a, nodes, apps);
    assertEquals(4, app1.getSchedulingOpportunities(schedulerKey));
    assertEquals(1, app1.getOutstandingAsksCount(schedulerKey));
    assertEquals(NodeType.OFF_SWITCH, assignment.getType());

    // Check capping by number of cluster nodes.
    doReturn(10).when(a).getRackLocalityAdditionalDelay();
    // Off-switch will happen at 6 missed opportunities now, since cluster size
    // is 5.
    assignment = a.assignContainers(clusterResource, node3,
        new ResourceLimits(clusterResource),
        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
    applyCSAssignment(clusterResource, assignment, a, nodes, apps);
    assertEquals(5, app1.getSchedulingOpportunities(schedulerKey));
    assertEquals(1, app1.getOutstandingAsksCount(schedulerKey));
    assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // None->NODE_LOCAL
    assignment = a.assignContainers(clusterResource, node3,
        new ResourceLimits(clusterResource),
        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
    applyCSAssignment(clusterResource, assignment, a, nodes, apps);
    assertEquals(6, app1.getSchedulingOpportunities(schedulerKey));
    assertEquals(0, app1.getOutstandingAsksCount(schedulerKey));
    assertEquals(NodeType.OFF_SWITCH, assignment.getType());
  }

  @Test
  public void testApplicationPriorityScheduling() throws Exception {
    // Manipulate queue 'a'
    LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A));

    // User
    String user_0 = "user_0";
    
    // Submit applications
    final ApplicationAttemptId appAttemptId_0 = 
        TestUtils.getMockApplicationAttemptId(0, 0); 
    FiCaSchedulerApp app_0 =
        new FiCaSchedulerApp(appAttemptId_0, user_0, a,
            mock(ActiveUsersManager.class), spyRMContext);
    a.submitApplicationAttempt(app_0, user_0);
    
    // Setup some nodes and racks
    String host_0 = "127.0.0.1";
    String rack_0 = "rack_0";
    FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, rack_0, 0, 8*GB);
    
    String host_1 = "127.0.0.2";
    String rack_1 = "rack_1";
    FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, rack_1, 0, 8*GB);
    
    String host_2 = "127.0.0.3";
    String rack_2 = "rack_2";
    FiCaSchedulerNode node_2 = TestUtils.getMockNode(host_2, rack_2, 0, 8*GB);

    Map<ApplicationAttemptId, FiCaSchedulerApp> apps = ImmutableMap.of(
        app_0.getApplicationAttemptId(), app_0);
    Map<NodeId, FiCaSchedulerNode> nodes = ImmutableMap.of(node_0.getNodeID(),
        node_0, node_1.getNodeID(), node_1, node_2.getNodeID(), node_2);

    final int numNodes = 3;
    Resource clusterResource = 
        Resources.createResource(numNodes * (8*GB), 1);
    when(csContext.getNumClusterNodes()).thenReturn(numNodes);
    root.updateClusterResource(clusterResource,
        new ResourceLimits(clusterResource));
    
    // Setup resource-requests and submit
    List<ResourceRequest> app_0_requests_0 = new ArrayList<ResourceRequest>();
    
    // P1
    Priority priority_1 = TestUtils.createMockPriority(1);
    SchedulerRequestKey schedulerKey1 = toSchedulerKey(priority_1);
    app_0_requests_0.add(
        TestUtils.createResourceRequest(host_0, 1*GB, 1, 
            true, priority_1, recordFactory));
    app_0_requests_0.add(
        TestUtils.createResourceRequest(rack_0, 1*GB, 1, 
            true, priority_1, recordFactory));
    app_0_requests_0.add(
        TestUtils.createResourceRequest(host_1, 1*GB, 1, 
            true, priority_1, recordFactory));
    app_0_requests_0.add(
        TestUtils.createResourceRequest(rack_1, 1*GB, 1, 
            true, priority_1, recordFactory));
    app_0_requests_0.add(
        TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2,
            true, priority_1, recordFactory));
    
    // P2
    Priority priority_2 = TestUtils.createMockPriority(2);
    SchedulerRequestKey schedulerKey2 = toSchedulerKey(priority_2);
    app_0_requests_0.add(
        TestUtils.createResourceRequest(host_2, 2*GB, 1, 
            true, priority_2, recordFactory));
    app_0_requests_0.add(
        TestUtils.createResourceRequest(rack_2, 2*GB, 1, 
            true, priority_2, recordFactory));
    app_0_requests_0.add(
        TestUtils.createResourceRequest(ResourceRequest.ANY, 2*GB, 1,
            true, priority_2, recordFactory));
    
    app_0.updateResourceRequests(app_0_requests_0);

    // Start testing...
    
    // Start with off switch, shouldn't allocate P1 due to delay scheduling
    // thus, no P2 either!
    CSAssignment assignment = a.assignContainers(clusterResource, node_2,
        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
    applyCSAssignment(clusterResource, assignment, a, nodes, apps);
    verifyNoContainerAllocated(assignment);
    assertEquals(1, app_0.getSchedulingOpportunities(schedulerKey1));
    assertEquals(2, app_0.getOutstandingAsksCount(schedulerKey1));
    assertEquals(0, app_0.getSchedulingOpportunities(schedulerKey2));
    assertEquals(1, app_0.getOutstandingAsksCount(schedulerKey2));

    // Another off-switch, shouldn't allocate P1 due to delay scheduling
    // thus, no P2 either!
    assignment = a.assignContainers(clusterResource, node_2,
        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
    applyCSAssignment(clusterResource, assignment, a, nodes, apps);
    verifyNoContainerAllocated(assignment);
    assertEquals(2, app_0.getSchedulingOpportunities(schedulerKey1));
    assertEquals(2, app_0.getOutstandingAsksCount(schedulerKey1));
    assertEquals(0, app_0.getSchedulingOpportunities(schedulerKey2));
    assertEquals(1, app_0.getOutstandingAsksCount(schedulerKey2));

    // Another off-switch, shouldn't allocate OFF_SWITCH P1
    assignment = a.assignContainers(clusterResource, node_2,
        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
    applyCSAssignment(clusterResource, assignment, a, nodes, apps);
    verifyContainerAllocated(assignment, NodeType.OFF_SWITCH);
    assertEquals(3, app_0.getSchedulingOpportunities(schedulerKey1));
    assertEquals(1, app_0.getOutstandingAsksCount(schedulerKey1));
    assertEquals(0, app_0.getSchedulingOpportunities(schedulerKey2));
    assertEquals(1, app_0.getOutstandingAsksCount(schedulerKey2));

    // Now, DATA_LOCAL for P1
    assignment = a.assignContainers(clusterResource, node_0,
        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
    applyCSAssignment(clusterResource, assignment, a, nodes, apps);
    verifyContainerAllocated(assignment, NodeType.NODE_LOCAL);
    assertEquals(0, app_0.getSchedulingOpportunities(schedulerKey1));
    assertEquals(0, app_0.getOutstandingAsksCount(schedulerKey1));
    assertEquals(0, app_0.getSchedulingOpportunities(schedulerKey2));
    assertEquals(1, app_0.getOutstandingAsksCount(schedulerKey2));

    // Now, OFF_SWITCH for P2
    assignment = a.assignContainers(clusterResource, node_1,
        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
    applyCSAssignment(clusterResource, assignment, a, nodes, apps);
    verifyContainerAllocated(assignment, NodeType.OFF_SWITCH);
    assertEquals(0, app_0.getSchedulingOpportunities(schedulerKey1));
    assertEquals(0, app_0.getOutstandingAsksCount(schedulerKey1));
    assertEquals(1, app_0.getSchedulingOpportunities(schedulerKey2));
    assertEquals(0, app_0.getOutstandingAsksCount(schedulerKey2));

  }
  
  @Test
  public void testSchedulingConstraints() throws Exception {

    // Manipulate queue 'a'
    LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A));

    // User
    String user_0 = "user_0";
    
    // Submit applications
    final ApplicationAttemptId appAttemptId_0 = 
        TestUtils.getMockApplicationAttemptId(0, 0); 
    FiCaSchedulerApp app_0 = 
        new FiCaSchedulerApp(appAttemptId_0, user_0, a, 
            mock(ActiveUsersManager.class), spyRMContext);
    a.submitApplicationAttempt(app_0, user_0);
    
    // Setup some nodes and racks
    String host_0_0 = "127.0.0.1";
    String rack_0 = "rack_0";
    FiCaSchedulerNode node_0_0 = TestUtils.getMockNode(host_0_0, rack_0, 0, 8*GB);
    String host_0_1 = "127.0.0.2";
    FiCaSchedulerNode node_0_1 = TestUtils.getMockNode(host_0_1, rack_0, 0, 8*GB);
    
    
    String host_1_0 = "127.0.0.3";
    String rack_1 = "rack_1";
    FiCaSchedulerNode node_1_0 = TestUtils.getMockNode(host_1_0, rack_1, 0, 8*GB);

    Map<ApplicationAttemptId, FiCaSchedulerApp> apps = ImmutableMap.of(
        app_0.getApplicationAttemptId(), app_0);
    Map<NodeId, FiCaSchedulerNode> nodes = ImmutableMap.of(node_0_0.getNodeID(),
        node_0_0, node_0_1.getNodeID(), node_0_1, node_1_0.getNodeID(),
        node_1_0);
    
    final int numNodes = 3;
    Resource clusterResource = Resources.createResource(
        numNodes * (8*GB), numNodes * 16);
    when(csContext.getNumClusterNodes()).thenReturn(numNodes);
    root.updateClusterResource(clusterResource,
        new ResourceLimits(clusterResource));

    // Setup resource-requests and submit
    Priority priority = TestUtils.createMockPriority(1);
    SchedulerRequestKey schedulerKey = toSchedulerKey(priority);
    List<ResourceRequest> app_0_requests_0 = new ArrayList<ResourceRequest>();
    app_0_requests_0.add(
        TestUtils.createResourceRequest(host_0_0, 1*GB, 1, 
            true, priority, recordFactory));
    app_0_requests_0.add(
        TestUtils.createResourceRequest(host_0_1, 1*GB, 1, 
            true, priority, recordFactory));
    app_0_requests_0.add(
        TestUtils.createResourceRequest(rack_0, 1*GB, 1, 
            true, priority, recordFactory));
    app_0_requests_0.add(
        TestUtils.createResourceRequest(host_1_0, 1*GB, 1, 
            true, priority, recordFactory));
    app_0_requests_0.add(
        TestUtils.createResourceRequest(rack_1, 1*GB, 1, 
            true, priority, recordFactory));
    app_0.updateResourceRequests(app_0_requests_0);

    // Start testing...
    
    // Add one request
    app_0_requests_0.clear();
    app_0_requests_0.add(
        TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, // only 1
            true, priority, recordFactory));
    app_0.updateResourceRequests(app_0_requests_0);
    
    // NODE_LOCAL - node_0_1
    CSAssignment assignment = a.assignContainers(clusterResource, node_0_0,
        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
    applyCSAssignment(clusterResource, assignment, a, nodes, apps);
    verifyContainerAllocated(assignment, NodeType.NODE_LOCAL);
    assertEquals(0, app_0.getSchedulingOpportunities(schedulerKey));
    // should reset
    assertEquals(0, app_0.getOutstandingAsksCount(schedulerKey));

    // No allocation on node_1_0 even though it's node/rack local since
    // required(ANY) == 0
    assignment = a.assignContainers(clusterResource, node_1_0,
        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
    applyCSAssignment(clusterResource, assignment, a, nodes, apps);
    verifyNoContainerAllocated(assignment);
    // Still zero
    // since #req=0
    assertEquals(0, app_0.getSchedulingOpportunities(schedulerKey));
    assertEquals(0, app_0.getOutstandingAsksCount(schedulerKey));
    
    // Add one request
    app_0_requests_0.clear();
    app_0_requests_0.add(
        TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, // only one
            true, priority, recordFactory));
    app_0.updateResourceRequests(app_0_requests_0);

    // No allocation on node_0_1 even though it's node/rack local since
    // required(rack_1) == 0
    assignment = a.assignContainers(clusterResource, node_0_1,
        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
    applyCSAssignment(clusterResource, assignment, a, nodes, apps);
    verifyNoContainerAllocated(assignment);
    assertEquals(1, app_0.getSchedulingOpportunities(schedulerKey));
    assertEquals(1, app_0.getOutstandingAsksCount(schedulerKey));
    
    // NODE_LOCAL - node_1
    assignment = a.assignContainers(clusterResource, node_1_0,
        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
    applyCSAssignment(clusterResource, assignment, a, nodes, apps);
    verifyContainerAllocated(assignment, NodeType.NODE_LOCAL);
    // should reset
    assertEquals(0, app_0.getSchedulingOpportunities(schedulerKey));
    assertEquals(0, app_0.getOutstandingAsksCount(schedulerKey));
  }

  @Test
  @Timeout(value = 30)
  public void testActivateApplicationAfterQueueRefresh() throws Exception {

    // Manipulate queue 'e'
    LeafQueue e = stubLeafQueue((LeafQueue)queues.get(E));

    // Users
    final String user_e = "user_e";
    
    when(amResourceRequest.getCapability()).thenReturn(
      Resources.createResource(1 * GB, 0));

    // Submit applications
    final ApplicationAttemptId appAttemptId_0 =
        TestUtils.getMockApplicationAttemptId(0, 0);
    FiCaSchedulerApp app_0 =
        new FiCaSchedulerApp(appAttemptId_0, user_e, e,
            mock(ActiveUsersManager.class), spyRMContext);
    e.submitApplicationAttempt(app_0, user_e);

    final ApplicationAttemptId appAttemptId_1 =
        TestUtils.getMockApplicationAttemptId(1, 0);
    FiCaSchedulerApp app_1 =
        new FiCaSchedulerApp(appAttemptId_1, user_e, e,
            mock(ActiveUsersManager.class), spyRMContext);
    e.submitApplicationAttempt(app_1, user_e);  // same user

    final ApplicationAttemptId appAttemptId_2 =
        TestUtils.getMockApplicationAttemptId(2, 0);
    FiCaSchedulerApp app_2 =
        new FiCaSchedulerApp(appAttemptId_2, user_e, e,
            mock(ActiveUsersManager.class), spyRMContext);
    e.submitApplicationAttempt(app_2, user_e);  // same user

    // before reinitialization
    assertEquals(2, e.getNumActiveApplications());
    assertEquals(1, e.getNumPendingApplications());

    csConf.setDouble(
        CapacitySchedulerConfiguration.MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT,
        CapacitySchedulerConfiguration.DEFAULT_MAXIMUM_APPLICATIONMASTERS_RESOURCE_PERCENT
            * 2);
    queueContext.reinitialize();
    CSQueueStore newQueues = new CSQueueStore();
    CSQueue newRoot =
        CapacitySchedulerQueueManager.parseQueue(queueContext, csConf, null,
            ROOT.getFullPath(),
            newQueues, queues,
            TestUtils.spyHook);
    queues = newQueues;
    // This will not update active apps
    root.reinitialize(newRoot, csContext.getClusterResource());
    // Cause this to update active apps
    root.updateClusterResource(csContext.getClusterResource(),
        new ResourceLimits(csContext.getClusterResource()));

    // after reinitialization
    assertEquals(3, e.getNumActiveApplications());
    assertEquals(0, e.getNumPendingApplications());
  }
  
  @Test
  @Timeout(value = 30)
  public void testLocalityDelaysAfterQueueRefresh() throws Exception {

    // Manipulate queue 'e'
    LeafQueue e = stubLeafQueue((LeafQueue)queues.get(E));

    // before reinitialization
    assertEquals(40, e.getNodeLocalityDelay());
    assertEquals(-1, e.getRackLocalityAdditionalDelay());

    csConf.setInt(CapacitySchedulerConfiguration.NODE_LOCALITY_DELAY, 60);
    csConf.setInt(
        CapacitySchedulerConfiguration.RACK_LOCALITY_ADDITIONAL_DELAY, 600);
    queueContext.reinitialize();
    CSQueueStore newQueues = new CSQueueStore();
    CSQueue newRoot =
        CapacitySchedulerQueueManager.parseQueue(queueContext, csConf, null,
            ROOT.getFullPath(),
            newQueues, queues,
            TestUtils.spyHook);
    csContext.getCapacitySchedulerQueueManager().setRootQueue(newRoot);
    root.reinitialize(newRoot, cs.getClusterResource());

    // after reinitialization
    assertEquals(60, e.getNodeLocalityDelay());
    assertEquals(600, e.getRackLocalityAdditionalDelay());
  }

  @Test
  @Timeout(value = 30)
  public void testActivateApplicationByUpdatingClusterResource()
      throws Exception {

    // Manipulate queue 'e'
    LeafQueue e = stubLeafQueue((LeafQueue)queues.get(E));

    // Users
    final String user_e = "user_e";
    
    when(amResourceRequest.getCapability()).thenReturn(
      Resources.createResource(1 * GB, 0));

    // Submit applications
    final ApplicationAttemptId appAttemptId_0 =
        TestUtils.getMockApplicationAttemptId(0, 0);
    FiCaSchedulerApp app_0 =
        new FiCaSchedulerApp(appAttemptId_0, user_e, e,
            mock(ActiveUsersManager.class), spyRMContext);
    e.submitApplicationAttempt(app_0, user_e);

    final ApplicationAttemptId appAttemptId_1 =
        TestUtils.getMockApplicationAttemptId(1, 0);
    FiCaSchedulerApp app_1 =
        new FiCaSchedulerApp(appAttemptId_1, user_e, e,
            mock(ActiveUsersManager.class), spyRMContext);
    e.submitApplicationAttempt(app_1, user_e);  // same user

    final ApplicationAttemptId appAttemptId_2 =
        TestUtils.getMockApplicationAttemptId(2, 0);
    FiCaSchedulerApp app_2 =
        new FiCaSchedulerApp(appAttemptId_2, user_e, e,
            mock(ActiveUsersManager.class), spyRMContext);
    e.submitApplicationAttempt(app_2, user_e);  // same user

    // before updating cluster resource
    assertEquals(2, e.getNumActiveApplications());
    assertEquals(1, e.getNumPendingApplications());

    Resource clusterResource = Resources.createResource(200 * 16 * GB, 100 * 32); 
    root.updateClusterResource(clusterResource,
        new ResourceLimits(clusterResource));

    // after updating cluster resource
    assertEquals(3, e.getNumActiveApplications());
    assertEquals(0, e.getNumPendingApplications());
  }

  public boolean hasQueueACL(List<QueueUserACLInfo> aclInfos, QueueACL acl) {
    for (QueueUserACLInfo aclInfo : aclInfos) {
      if (aclInfo.getUserAcls().contains(acl)) {
        return true;
      }
    }    
    return false;
  }

  @Test
  public void testInheritedQueueAcls() throws IOException {
    UserGroupInformation user = UserGroupInformation.getCurrentUser();

    LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A));
    LeafQueue b = stubLeafQueue((LeafQueue)queues.get(B));
    ParentQueue c = (ParentQueue)queues.get(C);
    LeafQueue c1 = stubLeafQueue((LeafQueue)queues.get(C1));

    assertFalse(root.hasAccess(QueueACL.SUBMIT_APPLICATIONS, user));
    assertTrue(a.hasAccess(QueueACL.SUBMIT_APPLICATIONS, user));
    assertTrue(b.hasAccess(QueueACL.SUBMIT_APPLICATIONS, user));
    assertFalse(c.hasAccess(QueueACL.SUBMIT_APPLICATIONS, user));
    assertFalse(c1.hasAccess(QueueACL.SUBMIT_APPLICATIONS, user));

    assertTrue(hasQueueACL(
          a.getQueueUserAclInfo(user), QueueACL.SUBMIT_APPLICATIONS));
    assertTrue(hasQueueACL(
          b.getQueueUserAclInfo(user), QueueACL.SUBMIT_APPLICATIONS));
    assertFalse(hasQueueACL(
          c.getQueueUserAclInfo(user), QueueACL.SUBMIT_APPLICATIONS));
    assertFalse(hasQueueACL(
          c1.getQueueUserAclInfo(user), QueueACL.SUBMIT_APPLICATIONS));

  }

  @Test
  public void testLocalityConstraints() throws Exception {

    // Manipulate queue 'a'
    LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A));

    // User
    String user_0 = "user_0";
    
    // Submit applications
    final ApplicationAttemptId appAttemptId_0 = 
        TestUtils.getMockApplicationAttemptId(0, 0); 
    FiCaSchedulerApp app_0 =
        new FiCaSchedulerApp(appAttemptId_0, user_0, a,
            mock(ActiveUsersManager.class), spyRMContext);
    a.submitApplicationAttempt(app_0, user_0);

    final ApplicationAttemptId appAttemptId_1 = 
        TestUtils.getMockApplicationAttemptId(1, 0); 
    FiCaSchedulerApp app_1 =
        new FiCaSchedulerApp(appAttemptId_1, user_0, a,
            mock(ActiveUsersManager.class), spyRMContext);
    a.submitApplicationAttempt(app_1, user_0);

    Map<ApplicationAttemptId, FiCaSchedulerApp> apps = ImmutableMap.of(
        app_0.getApplicationAttemptId(), app_0, app_1.getApplicationAttemptId(),
        app_1);

    // Setup some nodes and racks
    String host_0_0 = "127.0.0.1";
    String rack_0 = "rack_0";
    String host_0_1 = "127.0.0.2";
    FiCaSchedulerNode node_0_1 = TestUtils.getMockNode(host_0_1, rack_0, 0, 8*GB);

    String host_1_0 = "127.0.0.3";
    String rack_1 = "rack_1";
    FiCaSchedulerNode node_1_0 = TestUtils.getMockNode(host_1_0, rack_1, 0, 8*GB);
    String host_1_1 = "127.0.0.4";
    FiCaSchedulerNode node_1_1 = TestUtils.getMockNode(host_1_1, rack_1, 0, 8*GB);

    Map<NodeId, FiCaSchedulerNode> nodes = ImmutableMap.of(node_0_1.getNodeID(),
        node_0_1, node_1_0.getNodeID(), node_1_0, node_1_1.getNodeID(),
        node_1_1);
    
    final int numNodes = 4;
    Resource clusterResource = Resources.createResource(
        numNodes * (8*GB), numNodes * 1);
    when(csContext.getNumClusterNodes()).thenReturn(numNodes);

    root.updateClusterResource(clusterResource,
        new ResourceLimits(clusterResource));

    // Setup resource-requests
    // resourceName: <priority, memory, #containers, relaxLocality>
    // host_0_0: < 1, 1GB, 1, true >
    // host_0_1: < null >
    // rack_0:   < null >                     <----
    // host_1_0: < 1, 1GB, 1, true >
    // host_1_1: < null >
    // rack_1:   < 1, 1GB, 1, false >         <----
    // ANY:      < 1, 1GB, 1, false >         <----
    // Availability:
    // host_0_0: 8G
    // host_0_1: 8G
    // host_1_0: 8G
    // host_1_1: 8G
    // Blacklist: <host_0_0>
    Priority priority = TestUtils.createMockPriority(1);
    SchedulerRequestKey schedulerKey = toSchedulerKey(priority);
    List<ResourceRequest> app_0_requests_0 = new ArrayList<ResourceRequest>();
    app_0_requests_0.add(
        TestUtils.createResourceRequest(host_0_0, 1*GB, 1, 
            true, priority, recordFactory));
    app_0_requests_0.add(
        TestUtils.createResourceRequest(host_1_0, 1*GB, 1, 
            true, priority, recordFactory));
    app_0_requests_0.add(
        TestUtils.createResourceRequest(rack_1, 1*GB, 1, 
            false, priority, recordFactory));
    app_0_requests_0.add(
        TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, // only one
            false, priority, recordFactory));
    app_0.updateResourceRequests(app_0_requests_0);
    app_0.updateBlacklist(Collections.singletonList(host_0_0), null);
    app_0_requests_0.clear();

    //
    // Start testing...
    //
    
    // node_0_1  
    // Shouldn't allocate since RR(rack_0) = null && RR(ANY) = relax: false
    CSAssignment assignment =
        a.assignContainers(clusterResource, node_0_1, new ResourceLimits(
            clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
    applyCSAssignment(clusterResource, assignment, a, nodes, apps);
    verifyNoContainerAllocated(assignment);
    // should be 0
    assertEquals(0, app_0.getSchedulingOpportunities(schedulerKey));
    
    // resourceName: <priority, memory, #containers, relaxLocality>
    // host_0_0: < 1, 1GB, 1, true >
    // host_0_1: < null >
    // rack_0:   < null >                     <----
    // host_1_0: < 1, 1GB, 1, true >
    // host_1_1: < null >
    // rack_1:   < 1, 1GB, 1, false >         <----
    // ANY:      < 1, 1GB, 1, false >         <----
    // Availability:
    // host_0_0: 8G
    // host_0_1: 8G
    // host_1_0: 8G
    // host_1_1: 8G
    // Blacklist: <host_0_0>

    // node_1_1  
    // Shouldn't allocate since RR(rack_1) = relax: false
    assignment = a.assignContainers(clusterResource, node_1_1, 
        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
    applyCSAssignment(clusterResource, assignment, a, nodes, apps);
    verifyNoContainerAllocated(assignment);
    // should be 0
    assertEquals(0, app_0.getSchedulingOpportunities(schedulerKey));
    
    // Allow rack-locality for rack_1, but blacklist node_1_1
    app_0_requests_0.add(
        TestUtils.createResourceRequest(rack_1, 1*GB, 1, 
            true, priority, recordFactory));
    app_0.updateResourceRequests(app_0_requests_0);
    app_0.updateBlacklist(Collections.singletonList(host_1_1), null);
    app_0_requests_0.clear();

    // resourceName: <priority, memory, #containers, relaxLocality>
    // host_0_0: < 1, 1GB, 1, true >
    // host_0_1: < null >
    // rack_0:   < null >                     
    // host_1_0: < 1, 1GB, 1, true >
    // host_1_1: < null >
    // rack_1:   < 1, 1GB, 1, true >         
    // ANY:      < 1, 1GB, 1, false >         
    // Availability:
    // host_0_0: 8G
    // host_0_1: 8G
    // host_1_0: 8G
    // host_1_1: 8G
    // Blacklist: < host_0_0 , host_1_1 >       <----

    // node_1_1  
    // Shouldn't allocate since node_1_1 is blacklisted
    assignment = a.assignContainers(clusterResource, node_1_1, 
        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
    applyCSAssignment(clusterResource, assignment, a, nodes, apps);
    verifyNoContainerAllocated(assignment);
    // should be 0
    assertEquals(0, app_0.getSchedulingOpportunities(schedulerKey));

    // Now, remove node_1_1 from blacklist, but add rack_1 to blacklist
    app_0.updateResourceRequests(app_0_requests_0);
    app_0.updateBlacklist(
        Collections.singletonList(rack_1), Collections.singletonList(host_1_1));
    app_0_requests_0.clear();

    // resourceName: <priority, memory, #containers, relaxLocality>
    // host_0_0: < 1, 1GB, 1, true >
    // host_0_1: < null >
    // rack_0:   < null >                     
    // host_1_0: < 1, 1GB, 1, true >
    // host_1_1: < null >
    // rack_1:   < 1, 1GB, 1, true >         
    // ANY:      < 1, 1GB, 1, false >         
    // Availability:
    // host_0_0: 8G
    // host_0_1: 8G
    // host_1_0: 8G
    // host_1_1: 8G
    // Blacklist: < host_0_0 , rack_1 >       <----

    // node_1_1  
    // Shouldn't allocate since rack_1 is blacklisted
    assignment = a.assignContainers(clusterResource, node_1_1,
        new ResourceLimits(clusterResource),
        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
    applyCSAssignment(clusterResource, assignment, a, nodes, apps);
    verifyNoContainerAllocated(assignment);
    // should be 0
    assertEquals(0, app_0.getSchedulingOpportunities(schedulerKey));
    
    // Now remove rack_1 from blacklist
    app_0.updateResourceRequests(app_0_requests_0);
    app_0.updateBlacklist(null, Collections.singletonList(rack_1));
    app_0_requests_0.clear();
    
    // resourceName: <priority, memory, #containers, relaxLocality>
    // host_0_0: < 1, 1GB, 1, true >
    // host_0_1: < null >
    // rack_0:   < null >                     
    // host_1_0: < 1, 1GB, 1, true >
    // host_1_1: < null >
    // rack_1:   < 1, 1GB, 1, true >         
    // ANY:      < 1, 1GB, 1, false >         
    // Availability:
    // host_0_0: 8G
    // host_0_1: 8G
    // host_1_0: 8G
    // host_1_1: 8G
    // Blacklist: < host_0_0 >       <----

    // Now, should allocate since RR(rack_1) = relax: true
    assignment = a.assignContainers(clusterResource, node_1_1, 
        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
    applyCSAssignment(clusterResource, assignment, a, nodes, apps);
    verifyNoContainerAllocated(assignment);
    assertEquals(0, app_0.getSchedulingOpportunities(schedulerKey));
    assertEquals(1, app_0.getOutstandingAsksCount(schedulerKey));

    // Now sanity-check node_local
    app_0_requests_0.add(
        TestUtils.createResourceRequest(rack_1, 1*GB, 1, 
            false, priority, recordFactory));
    app_0_requests_0.add(
        TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, // only one
            false, priority, recordFactory));
    app_0.updateResourceRequests(app_0_requests_0);
    app_0_requests_0.clear();
    
    // resourceName: <priority, memory, #containers, relaxLocality>
    // host_0_0: < 1, 1GB, 1, true >
    // host_0_1: < null >
    // rack_0:   < null >                     
    // host_1_0: < 1, 1GB, 1, true >
    // host_1_1: < null >
    // rack_1:   < 1, 1GB, 1, false >          <----
    // ANY:      < 1, 1GB, 1, false >          <----
    // Availability:
    // host_0_0: 8G
    // host_0_1: 8G
    // host_1_0: 8G
    // host_1_1: 7G

    assignment = a.assignContainers(clusterResource, node_1_0, 
        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
    applyCSAssignment(clusterResource, assignment, a, nodes, apps);
    verifyContainerAllocated(assignment, NodeType.NODE_LOCAL);
    assertEquals(0, app_0.getSchedulingOpportunities(schedulerKey));
    assertEquals(0, app_0.getOutstandingAsksCount(schedulerKey));

  }
  
  @Test
  public void testMaxAMResourcePerQueuePercentAfterQueueRefresh()
      throws Exception {
    queues = new CSQueueStore();
    CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration();
    final String newRootName = "root" + System.currentTimeMillis();
    setupQueueConfiguration(csConf, newRootName, false);

    Resource clusterResource = Resources.createResource(100 * 16 * GB,
        100 * 32);
    CapacitySchedulerContext csContext = mockCSContext(csConf, clusterResource);
    when(csContext.getRMContext()).thenReturn(rmContext);
    csConf.setFloat(
        CapacitySchedulerConfiguration.MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT,
        0.1f);

    CapacitySchedulerQueueManager queueManager = new CapacitySchedulerQueueManager(csConf,
        rmContext.getNodeLabelManager(), null);
    when(csContext.getCapacitySchedulerQueueManager()).thenReturn(queueManager);

    CapacitySchedulerQueueContext newQueueContext = new CapacitySchedulerQueueContext(csContext);

    CSQueue root;
    root = CapacitySchedulerQueueManager.parseQueue(newQueueContext, csConf, null,
        CapacitySchedulerConfiguration.ROOT, queues, queues, TestUtils.spyHook);
    root.updateClusterResource(clusterResource,
        new ResourceLimits(clusterResource));

    // Manipulate queue 'b'
    LeafQueue b = stubLeafQueue((LeafQueue) queues.get(B));
    assertEquals(0.1f, b.getMaxAMResourcePerQueuePercent(), 1e-3f);
    // Queue b has 100 * 16 = 1600 GB effective usable resource, so the
    // AM limit is 1600 GB * 0.1 * 0.99 = 162816 MB
    assertEquals(Resources.createResource(162816, 1),
        b.calculateAndGetAMResourceLimit());

    csConf.setFloat(
        CapacitySchedulerConfiguration.MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT,
        0.2f);
    newQueueContext.reinitialize();
    clusterResource = Resources.createResource(100 * 20 * GB, 100 * 32);
    CSQueueStore newQueues = new CSQueueStore();
    CSQueue newRoot = CapacitySchedulerQueueManager.parseQueue(newQueueContext,
        csConf, null, CapacitySchedulerConfiguration.ROOT, newQueues, queues,
        TestUtils.spyHook);
    root.reinitialize(newRoot, clusterResource);

    b = stubLeafQueue((LeafQueue) newQueues.get(B));
    assertEquals(b.calculateAndGetAMResourceLimit(),
        Resources.createResource(320 * GB, 1));
  }
  
  @Test
  public void testAllocateContainerOnNodeWithoutOffSwitchSpecified()
      throws Exception {
    // Manipulate queue 'a'
    LeafQueue a = stubLeafQueue((LeafQueue) queues.get(B));

    // Users
    final String user_0 = "user_0";

    // Submit applications
    final ApplicationAttemptId appAttemptId_0 =
        TestUtils.getMockApplicationAttemptId(0, 0);
    FiCaSchedulerApp app_0 =
        new FiCaSchedulerApp(appAttemptId_0, user_0, a,
            mock(ActiveUsersManager.class), spyRMContext);
    a.submitApplicationAttempt(app_0, user_0);

    final ApplicationAttemptId appAttemptId_1 =
        TestUtils.getMockApplicationAttemptId(1, 0);
    FiCaSchedulerApp app_1 =
        new FiCaSchedulerApp(appAttemptId_1, user_0, a,
            mock(ActiveUsersManager.class), spyRMContext);
    a.submitApplicationAttempt(app_1, user_0); // same user

    // Setup some nodes
    String host_0 = "127.0.0.1";
    FiCaSchedulerNode node_0 =
        TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 8 * GB);

    Map<ApplicationAttemptId, FiCaSchedulerApp> apps = ImmutableMap.of(
        app_0.getApplicationAttemptId(), app_0, app_1.getApplicationAttemptId(),
        app_1);
    Map<NodeId, FiCaSchedulerNode> nodes = ImmutableMap.of(node_0.getNodeID(),
        node_0);

    final int numNodes = 1;
    Resource clusterResource =
        Resources.createResource(numNodes * (8 * GB), numNodes * 16);
    when(csContext.getNumClusterNodes()).thenReturn(numNodes);

    // Setup resource-requests
    Priority priority = TestUtils.createMockPriority(1);
    app_0.updateResourceRequests(Arrays.asList(TestUtils.createResourceRequest(
        "127.0.0.1", 1 * GB, 3, true, priority, recordFactory), TestUtils
        .createResourceRequest(DEFAULT_RACK, 1 * GB, 3, true, priority,
            recordFactory)));

    try {
      applyCSAssignment(clusterResource,
          a.assignContainers(clusterResource, node_0,
          new ResourceLimits(clusterResource),
          SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
    } catch (NullPointerException e) {
      fail("NPE when allocating container on node but "
          + "forget to set off-switch request should be handled");
    }
  }

  @Test
  public void testFifoAssignment() throws Exception {

    LeafQueue a = stubLeafQueue((LeafQueue) queues.get(A));

    a.setOrderingPolicy(new FifoOrderingPolicy<FiCaSchedulerApp>());

    String host_0_0 = "127.0.0.1";
    String rack_0 = "rack_0";
    FiCaSchedulerNode node_0_0 = TestUtils.getMockNode(host_0_0, rack_0, 0,
        16 * GB);

    final int numNodes = 4;
    Resource clusterResource = Resources.createResource(numNodes * (16 * GB),
        numNodes * 16);
    when(csContext.getNumClusterNodes()).thenReturn(numNodes);
    root.updateClusterResource(clusterResource,
        new ResourceLimits(clusterResource));

    String user_0 = "user_0";

    final ApplicationAttemptId appAttemptId_0 =
        TestUtils.getMockApplicationAttemptId(0, 0);
    FiCaSchedulerApp app_0 = spy(new FiCaSchedulerApp(appAttemptId_0, user_0, a,
        mock(ActiveUsersManager.class), spyRMContext, Priority.newInstance(3),
        false));
    a.submitApplicationAttempt(app_0, user_0);

    final ApplicationAttemptId appAttemptId_1 =
        TestUtils.getMockApplicationAttemptId(1, 0);
    FiCaSchedulerApp app_1 = spy(new FiCaSchedulerApp(appAttemptId_1, user_0, a,
        mock(ActiveUsersManager.class), spyRMContext, Priority.newInstance(5),
        false));
    a.submitApplicationAttempt(app_1, user_0);

    Map<ApplicationAttemptId, FiCaSchedulerApp> apps = ImmutableMap.of(
        app_0.getApplicationAttemptId(), app_0, app_1.getApplicationAttemptId(),
        app_1);
    Map<NodeId, FiCaSchedulerNode> nodes = ImmutableMap.of(node_0_0.getNodeID(),
        node_0_0);

    Priority priority = TestUtils.createMockPriority(1);
    List<ResourceRequest> app_0_requests_0 = new ArrayList<ResourceRequest>();
    List<ResourceRequest> app_1_requests_0 = new ArrayList<ResourceRequest>();

    app_0_requests_0.clear();
    app_0_requests_0.add(TestUtils
        .createResourceRequest(ResourceRequest.ANY, 2 * GB, 1, true, priority,
            recordFactory));
    app_0.updateResourceRequests(app_0_requests_0);

    app_1_requests_0.clear();
    app_1_requests_0.add(TestUtils
        .createResourceRequest(ResourceRequest.ANY, 1 * GB, 1, true, priority,
            recordFactory));
    app_1.updateResourceRequests(app_1_requests_0);

    // app_1 will get containers as it has high priority
    applyCSAssignment(clusterResource,
        a.assignContainers(clusterResource, node_0_0,
            new ResourceLimits(clusterResource),
            SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
    assertEquals(1 * GB, app_1.getCurrentConsumption().getMemorySize());
    applyCSAssignment(clusterResource,
        a.assignContainers(clusterResource, node_0_0,
            new ResourceLimits(clusterResource),
            SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
    assertEquals(2 * GB, app_0.getCurrentConsumption().getMemorySize());

    app_0_requests_0.clear();
    app_0_requests_0.add(TestUtils
        .createResourceRequest(ResourceRequest.ANY, 1 * GB, 1, true, priority,
            recordFactory));
    app_0.updateResourceRequests(app_0_requests_0);

    app_1_requests_0.clear();
    app_1_requests_0.add(TestUtils
        .createResourceRequest(ResourceRequest.ANY, 1 * GB, 1, true, priority,
            recordFactory));
    app_1.updateResourceRequests(app_1_requests_0);

    //app_1 will still get assigned first as priority is more.
    applyCSAssignment(clusterResource,
        a.assignContainers(clusterResource, node_0_0,
            new ResourceLimits(clusterResource),
            SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
    assertEquals(2 * GB, app_1.getCurrentConsumption().getMemorySize());
    assertEquals(2 * GB, app_0.getCurrentConsumption().getMemorySize());

    //and only then will app_2
    applyCSAssignment(clusterResource,
        a.assignContainers(clusterResource, node_0_0,
            new ResourceLimits(clusterResource),
            SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
    assertEquals(3 * GB, app_0.getCurrentConsumption().getMemorySize());
  }

  @Test
  public void testFifoWithPartitionsAssignment() throws Exception {
    setUpWithNodeLabels();

    LeafQueue a = stubLeafQueue((LeafQueue) queues.get(A));
    OrderingPolicy<FiCaSchedulerApp> policy =
        new FifoOrderingPolicyWithExclusivePartitions<>();
    policy.configure(Collections.singletonMap(
        YarnConfiguration.EXCLUSIVE_ENFORCED_PARTITIONS_SUFFIX, LABEL));
    a.setOrderingPolicy(policy);
    String host00 = "127.0.0.1";
    String rack0 = "rack_0";
    FiCaSchedulerNode node00 = TestUtils.getMockNode(host00, rack0, 0,
        16 * GB);
    when(node00.getPartition()).thenReturn(LABEL);
    String host01 = "127.0.0.2";
    FiCaSchedulerNode node01 = TestUtils.getMockNode(host01, rack0, 0,
        16 * GB);
    when(node01.getPartition()).thenReturn("");

    final int numNodes = 4;
    Resource clusterResource = Resources.createResource(numNodes * (16 * GB),
        numNodes * 16);
    when(csContext.getNumClusterNodes()).thenReturn(numNodes);

    String user0 = "user_0";

    final ApplicationAttemptId appAttemptId0 =
        TestUtils.getMockApplicationAttemptId(0, 0);
    FiCaSchedulerApp app0 = spy(new FiCaSchedulerApp(appAttemptId0, user0, a,
        mock(ActiveUsersManager.class), spyRMContext, Priority.newInstance(5),
        false));
    a.submitApplicationAttempt(app0, user0);

    final ApplicationAttemptId appAttemptId1 =
        TestUtils.getMockApplicationAttemptId(1, 0);
    FiCaSchedulerApp app1 = spy(new FiCaSchedulerApp(appAttemptId1, user0, a,
        mock(ActiveUsersManager.class), spyRMContext, Priority.newInstance(3),
        false));
    when(app1.getPartition()).thenReturn(LABEL);
    a.submitApplicationAttempt(app1, user0);

    Map<ApplicationAttemptId, FiCaSchedulerApp> apps = ImmutableMap.of(
        app0.getApplicationAttemptId(), app0, app1.getApplicationAttemptId(),
        app1);
    Map<NodeId, FiCaSchedulerNode> nodes = ImmutableMap.of(node00.getNodeID(),
        node00, node01.getNodeID(), node01);

    Priority priority = TestUtils.createMockPriority(1);
    List<ResourceRequest> app0Requests = new ArrayList<>();
    List<ResourceRequest> app1Requests = new ArrayList<>();

    app0Requests.clear();
    app0Requests.add(TestUtils
        .createResourceRequest(ResourceRequest.ANY, 2 * GB, 1, true, priority,
            recordFactory));
    app0.updateResourceRequests(app0Requests);

    app1Requests.clear();
    app1Requests.add(TestUtils
        .createResourceRequest(ResourceRequest.ANY, 1 * GB, 1, true, priority,
            recordFactory, LABEL));
    app1.updateResourceRequests(app1Requests);

    // app_1 will get containers since it is exclusive-enforced
    applyCSAssignment(clusterResource,
        a.assignContainers(clusterResource, node00,
            new ResourceLimits(clusterResource),
            SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
    assertEquals(1 * GB, app1.getSchedulingResourceUsage()
        .getUsed(LABEL).getMemorySize());
    // app_0 should not get resources from node_0_0 since the labels
    // don't match
    applyCSAssignment(clusterResource,
        a.assignContainers(clusterResource, node00,
            new ResourceLimits(clusterResource),
            SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
    assertEquals(0 * GB, app0.getCurrentConsumption().getMemorySize());

    app1Requests.clear();
    app1Requests.add(TestUtils
        .createResourceRequest(ResourceRequest.ANY, 1 * GB, 1, true, priority,
            recordFactory, LABEL));
    app1.updateResourceRequests(app1Requests);

    // When node_0_1 heartbeats, app_0 should get containers
    applyCSAssignment(clusterResource,
        a.assignContainers(clusterResource, node01,
            new ResourceLimits(clusterResource),
            SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
    assertEquals(2 * GB, app0.getCurrentConsumption().getMemorySize());
    assertEquals(1 * GB, app1.getSchedulingResourceUsage()
        .getUsed(LABEL).getMemorySize());

    app0Requests.clear();
    app0Requests.add(TestUtils
        .createResourceRequest(ResourceRequest.ANY, 1 * GB, 1, true, priority,
            recordFactory));
    app0.updateResourceRequests(app0Requests);

    // When node_0_0 heartbeats, app_1 should get containers again
    applyCSAssignment(clusterResource,
        a.assignContainers(clusterResource, node00,
            new ResourceLimits(clusterResource),
            SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
    assertEquals(2 * GB, app0.getCurrentConsumption().getMemorySize());
    assertEquals(2 * GB, app1.getSchedulingResourceUsage()
        .getUsed(LABEL).getMemorySize());
  }

  @Test
  public void testConcurrentAccess() throws Exception {
    YarnConfiguration conf = new YarnConfiguration();
    MockRM rm = new MockRM();
    rm.init(conf);
    rm.start();

    final String queue = "default";
    final String user = "user";
    CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
    final LeafQueue defaultQueue = (LeafQueue) cs.getQueue(queue);

    final List<FiCaSchedulerApp> listOfApps =
        createListOfApps(10000, user, defaultQueue);

    final CyclicBarrier cb = new CyclicBarrier(2);
    final List<ConcurrentModificationException> conException =
        new ArrayList<ConcurrentModificationException>();

    Thread submitAndRemove = new Thread(new Runnable() {

      @Override
      public void run() {

        for (FiCaSchedulerApp fiCaSchedulerApp : listOfApps) {
          defaultQueue.submitApplicationAttempt(fiCaSchedulerApp, user);
        }
        try {
          cb.await();
        } catch (Exception e) {
          // Ignore
        }
        for (FiCaSchedulerApp fiCaSchedulerApp : listOfApps) {
          defaultQueue.finishApplicationAttempt(fiCaSchedulerApp, queue);
        }
      }
    }, "SubmitAndRemoveApplicationAttempt Thread");

    Thread getAppsInQueue = new Thread(new Runnable() {
      List<ApplicationAttemptId> apps = new ArrayList<ApplicationAttemptId>();

      @Override
      public void run() {
        try {
          try {
            cb.await();
          } catch (Exception e) {
            // Ignore
          }
          defaultQueue.collectSchedulerApplications(apps);
        } catch (ConcurrentModificationException e) {
          conException.add(e);
        }
      }

    }, "GetAppsInQueue Thread");

    submitAndRemove.start();
    getAppsInQueue.start();

    submitAndRemove.join();
    getAppsInQueue.join();

    assertTrue(conException.isEmpty(),
        "ConcurrentModificationException is thrown");
    rm.stop();

  }

  @Test
  public void testFairAssignment() throws Exception {

    LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A));

    OrderingPolicy<FiCaSchedulerApp> schedulingOrder =
      new FairOrderingPolicy<FiCaSchedulerApp>();

    a.setOrderingPolicy(schedulingOrder);

    String host_0_0 = "127.0.0.1";
    String rack_0 = "rack_0";
    FiCaSchedulerNode node_0_0 = TestUtils.getMockNode(host_0_0, rack_0, 0, 16*GB);

    final int numNodes = 4;
    Resource clusterResource = Resources.createResource(
        numNodes * (16*GB), numNodes * 16);
    when(csContext.getNumClusterNodes()).thenReturn(numNodes);
    root.updateClusterResource(clusterResource,
        new ResourceLimits(clusterResource));

    String user_0 = "user_0";

    final ApplicationAttemptId appAttemptId_0 =
        TestUtils.getMockApplicationAttemptId(0, 0);
    FiCaSchedulerApp app_0 =
        spy(new FiCaSchedulerApp(appAttemptId_0, user_0, a,
            mock(ActiveUsersManager.class), spyRMContext));
    a.submitApplicationAttempt(app_0, user_0);

    final ApplicationAttemptId appAttemptId_1 =
        TestUtils.getMockApplicationAttemptId(1, 0);
    FiCaSchedulerApp app_1 =
        spy(new FiCaSchedulerApp(appAttemptId_1, user_0, a,
            mock(ActiveUsersManager.class), spyRMContext));
    a.submitApplicationAttempt(app_1, user_0);

    Map<ApplicationAttemptId, FiCaSchedulerApp> apps = ImmutableMap.of(
        app_0.getApplicationAttemptId(), app_0, app_1.getApplicationAttemptId(),
        app_1);
    Map<NodeId, FiCaSchedulerNode> nodes = ImmutableMap.of(node_0_0.getNodeID(),
        node_0_0);

    Priority priority = TestUtils.createMockPriority(1);
    List<ResourceRequest> app_0_requests_0 = new ArrayList<ResourceRequest>();
    List<ResourceRequest> app_1_requests_0 = new ArrayList<ResourceRequest>();

    app_0_requests_0.clear();
    app_0_requests_0.add(
        TestUtils.createResourceRequest(ResourceRequest.ANY, 2*GB, 1,
            true, priority, recordFactory));
    app_0.updateResourceRequests(app_0_requests_0);

    app_1_requests_0.clear();
    app_1_requests_0.add(
        TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1,
            true, priority, recordFactory));
    app_1.updateResourceRequests(app_1_requests_0);

    // app_0 will get containers as its submitted first.
    applyCSAssignment(clusterResource,
        a.assignContainers(clusterResource, node_0_0,
        new ResourceLimits(clusterResource),
        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
    assertEquals(2*GB, app_0.getCurrentConsumption().getMemorySize());
    applyCSAssignment(clusterResource,
        a.assignContainers(clusterResource, node_0_0,
        new ResourceLimits(clusterResource),
        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
    assertEquals(1*GB, app_1.getCurrentConsumption().getMemorySize());

    app_0_requests_0.clear();
    app_0_requests_0.add(
        TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1,
            true, priority, recordFactory));
    app_0.updateResourceRequests(app_0_requests_0);

    app_1_requests_0.clear();
    app_1_requests_0.add(
        TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1,
            true, priority, recordFactory));
    app_1.updateResourceRequests(app_1_requests_0);

    //Since it already has more resources, app_0 will not get
    //assigned first, but app_1 will
    applyCSAssignment(clusterResource,
        a.assignContainers(clusterResource, node_0_0,
        new ResourceLimits(clusterResource),
        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
    assertEquals(2*GB, app_0.getCurrentConsumption().getMemorySize());
    assertEquals(2*GB, app_1.getCurrentConsumption().getMemorySize());

    //and only then will app_0
    applyCSAssignment(clusterResource,
        a.assignContainers(clusterResource, node_0_0,
        new ResourceLimits(clusterResource),
        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
    assertEquals(3*GB, app_0.getCurrentConsumption().getMemorySize());

  }
  
  @Test
  public void testLocalityDelaySkipsApplication() throws Exception {

    // Manipulate queue 'a'
    LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A));

    // User
    String user_0 = "user_0";
    
    // Submit applications
    final ApplicationAttemptId appAttemptId_0 = 
        TestUtils.getMockApplicationAttemptId(0, 0); 
    FiCaSchedulerApp app_0 =
        new FiCaSchedulerApp(appAttemptId_0, user_0, a,
            mock(ActiveUsersManager.class), spyRMContext);
    a.submitApplicationAttempt(app_0, user_0);
    final ApplicationAttemptId appAttemptId_1 = 
        TestUtils.getMockApplicationAttemptId(1, 0);
    FiCaSchedulerApp app_1 =
        new FiCaSchedulerApp(appAttemptId_1, user_0, a,
            mock(ActiveUsersManager.class), spyRMContext);
    a.submitApplicationAttempt(app_1, user_0);

    // Setup some nodes and racks
    String host_0 = "127.0.0.1";
    String rack_0 = "rack_0";
    FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, rack_0, 0, 8*GB);
    
    String host_1 = "127.0.0.2";
    String rack_1 = "rack_1";
    FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, rack_1, 0, 8*GB);
    
    String host_2 = "127.0.0.3";
    String rack_2 = "rack_2";
    FiCaSchedulerNode node_2 = TestUtils.getMockNode(host_2, rack_2, 0, 8*GB);

    Map<ApplicationAttemptId, FiCaSchedulerApp> apps = ImmutableMap.of(
        app_0.getApplicationAttemptId(), app_0, app_1.getApplicationAttemptId(),
        app_1);
    Map<NodeId, FiCaSchedulerNode> nodes = ImmutableMap.of(node_0.getNodeID(),
        node_0, node_1.getNodeID(), node_1, node_2.getNodeID(), node_2);

    final int numNodes = 3;
    Resource clusterResource = 
        Resources.createResource(numNodes * (8*GB), numNodes * 16);
    when(csContext.getNumClusterNodes()).thenReturn(numNodes);
    root.updateClusterResource(clusterResource,
        new ResourceLimits(clusterResource));
    
    // Setup resource-requests and submit
    // App0 has node local request for host_0/host_1, and app1 has node local
    // request for host2.
    Priority priority = TestUtils.createMockPriority(1);
    SchedulerRequestKey schedulerKey = toSchedulerKey(priority);
    List<ResourceRequest> app_0_requests_0 = new ArrayList<ResourceRequest>();
    app_0_requests_0.add(
        TestUtils.createResourceRequest(host_0, 1*GB, 1, 
            true, priority, recordFactory));
    app_0_requests_0.add(
        TestUtils.createResourceRequest(rack_0, 1*GB, 1, 
            true, priority, recordFactory));
    app_0_requests_0.add(
        TestUtils.createResourceRequest(host_1, 1*GB, 1, 
            true, priority, recordFactory));
    app_0_requests_0.add(
        TestUtils.createResourceRequest(rack_1, 1*GB, 1, 
            true, priority, recordFactory));
    app_0_requests_0.add(
        TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 3, // one extra
            true, priority, recordFactory));
    app_0.updateResourceRequests(app_0_requests_0);

    List<ResourceRequest> app_1_requests_0 = new ArrayList<ResourceRequest>();
    app_1_requests_0.add(
        TestUtils.createResourceRequest(host_2, 1*GB, 1, 
            true, priority, recordFactory));
    app_1_requests_0.add(
        TestUtils.createResourceRequest(rack_2, 1*GB, 1, 
            true, priority, recordFactory));
    app_1_requests_0.add(
        TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, // one extra
            true, priority, recordFactory));
    app_1.updateResourceRequests(app_1_requests_0);

    // Start testing...
    // When doing allocation, even if app_0 submit earlier than app_1, app_1 can
    // still get allocated because app_0 is waiting for node-locality-delay
    CSAssignment assignment = null;
    
    // Check app_0's scheduling opportunities increased and app_1 get allocated
    assignment = a.assignContainers(clusterResource, node_2,
        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
    applyCSAssignment(clusterResource, assignment, a, nodes, apps);
    verifyContainerAllocated(assignment, NodeType.NODE_LOCAL);
    assertEquals(1, app_0.getSchedulingOpportunities(schedulerKey));
    assertEquals(3, app_0.getOutstandingAsksCount(schedulerKey));
    assertEquals(0, app_0.getLiveContainers().size());
    assertEquals(1, app_1.getLiveContainers().size());
  }

  @Test
  public void testGetTotalPendingResourcesConsideringUserLimitOneUser()
      throws Exception {
    // Manipulate queue 'e'
    LeafQueue e = stubLeafQueue((LeafQueue)queues.get(E));
    // Allow queue 'e' to use 100% of cluster resources (max capacity).
    e.setMaxCapacity(1.0f);
    // When used queue resources goes above capacity (in this case, 1%), user
    // resource limit (used in calculating headroom) is calculated in small
    // increments to ensure that user-limit-percent can be met for all users in
    // a queue. Take user-limit-percent out of the equation so that user
    // resource limit will always be calculated to its max possible value.
    e.setUserLimit(1000);

    final String user_0 = "user_0";

    // Submit 2 applications for user_0
    final ApplicationAttemptId appAttemptId_0 =
        TestUtils.getMockApplicationAttemptId(0, 0);
    FiCaSchedulerApp app_0 =
        new FiCaSchedulerApp(appAttemptId_0, user_0, e,
            mock(ActiveUsersManager.class), spyRMContext);
    e.submitApplicationAttempt(app_0, user_0);

    final ApplicationAttemptId appAttemptId_1 =
        TestUtils.getMockApplicationAttemptId(1, 0);
    FiCaSchedulerApp app_1 =
        new FiCaSchedulerApp(appAttemptId_1, user_0, e,
            mock(ActiveUsersManager.class), spyRMContext);
    e.submitApplicationAttempt(app_1, user_0);  // same user

    // Setup 1 node with 100GB of memory resources.
    String host_0 = "127.0.0.1";
    FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0,
        100*GB);

    Map<ApplicationAttemptId, FiCaSchedulerApp> apps = ImmutableMap.of(
        app_0.getApplicationAttemptId(), app_0, app_1.getApplicationAttemptId(),
        app_1);
    Map<NodeId, FiCaSchedulerNode> nodes = ImmutableMap.of(node_0.getNodeID(),
        node_0);

    final int numNodes = 1;
    Resource clusterResource =
        Resources.createResource(numNodes * (100*GB), numNodes * 128);
    when(csContext.getNumClusterNodes()).thenReturn(numNodes);
    root.updateClusterResource(clusterResource,
        new ResourceLimits(clusterResource));

    // Pending resource requests for app_0 and app_1 total 5GB.
    Priority priority = TestUtils.createMockPriority(1);
    app_0.updateResourceRequests(Collections.singletonList(
            TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 3, true,
                priority, recordFactory)));

    app_1.updateResourceRequests(Collections.singletonList(
        TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, true,
            priority, recordFactory)));

    // Start testing...

    // Assign 1st Container of 1GB
    applyCSAssignment(clusterResource,
        e.assignContainers(clusterResource, node_0,
            new ResourceLimits(clusterResource),
            SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), e, nodes, apps);
    // With queue capacity set at 1% of 100GB and user-limit-factor set to 1.0,
    // all users (only user_0) queue 'e' should be able to consume 1GB.
    // The first container should be assigned to app_0 with no headroom left
    // even though user_0's apps are still asking for a total of 4GB.
    assertEquals(1*GB, app_0.getCurrentConsumption().getMemorySize());
    assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize());
    assertEquals(0 * GB,
        e.getTotalPendingResourcesConsideringUserLimit(clusterResource,
            NO_LABEL, false).getMemorySize());

    // Assign 2nd container of 1GB
    applyCSAssignment(clusterResource,
        e.assignContainers(clusterResource, node_0,
            new ResourceLimits(clusterResource),
            SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), e, nodes, apps);
    // user_0 has no headroom due to user-limit-factor of 1.0. However capacity
    // scheduler will assign one container more than user-limit-factor.
    // This container also went to app_0. Still with no neadroom even though
    // app_0 and app_1 are asking for a cumulative 3GB.
    assertEquals(2*GB, app_0.getCurrentConsumption().getMemorySize());
    assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize());
    assertEquals(0*GB, e.getTotalPendingResourcesConsideringUserLimit(
        clusterResource, NO_LABEL, false).getMemorySize());

    // Can't allocate 3rd container due to user-limit. Headroom still 0.
    applyCSAssignment(clusterResource,
        e.assignContainers(clusterResource, node_0,
            new ResourceLimits(clusterResource),
            SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), e, nodes, apps);
    assertEquals(2*GB, app_0.getCurrentConsumption().getMemorySize());
    assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize());
    assertEquals(0*GB, e.getTotalPendingResourcesConsideringUserLimit(
        clusterResource, NO_LABEL, false).getMemorySize());

    // Increase user-limit-factor from 1GB to 10GB (1% * 10 * 100GB = 10GB).
    // Pending for both app_0 and app_1 are still 3GB, so user-limit-factor
    // is no longer limiting the return value of
    // getTotalPendingResourcesConsideringUserLimit()
    e.setUserLimitFactor(10.0f);
    assertEquals(3*GB, e.getTotalPendingResourcesConsideringUserLimit(
        clusterResource, NO_LABEL, false).getMemorySize());

    applyCSAssignment(clusterResource,
        e.assignContainers(clusterResource, node_0,
            new ResourceLimits(clusterResource),
            SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), e, nodes, apps);
    // app_0 is now satisified, app_1 is still asking for 2GB.
    assertEquals(3*GB, app_0.getCurrentConsumption().getMemorySize());
    assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize());
    assertEquals(2*GB, e.getTotalPendingResourcesConsideringUserLimit(
        clusterResource, NO_LABEL, false).getMemorySize());

    // Get the last 2 containers for app_1, no more pending requests.
    applyCSAssignment(clusterResource,
        e.assignContainers(clusterResource, node_0,
            new ResourceLimits(clusterResource),
            SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), e, nodes, apps);
    applyCSAssignment(clusterResource,
        e.assignContainers(clusterResource, node_0,
            new ResourceLimits(clusterResource),
            SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), e, nodes, apps);
    assertEquals(3*GB, app_0.getCurrentConsumption().getMemorySize());
    assertEquals(2*GB, app_1.getCurrentConsumption().getMemorySize());
    assertEquals(0*GB, e.getTotalPendingResourcesConsideringUserLimit(
        clusterResource, NO_LABEL, false).getMemorySize());

    // Release each container from app_0
    for (RMContainer rmContainer : app_0.getLiveContainers()) {
      e.completedContainer(clusterResource, app_0, node_0, rmContainer,
          ContainerStatus.newInstance(rmContainer.getContainerId(),
              ContainerState.COMPLETE, "",
              ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
          RMContainerEventType.KILL, null, true);
    }

    // Release each container from app_1
    for (RMContainer rmContainer : app_1.getLiveContainers()) {
      e.completedContainer(clusterResource, app_1, node_0, rmContainer,
          ContainerStatus.newInstance(rmContainer.getContainerId(),
              ContainerState.COMPLETE, "",
              ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
          RMContainerEventType.KILL, null, true);
    }
  }

  @Test
  public void testGetTotalPendingResourcesConsideringUserLimitTwoUsers()
      throws Exception {
    // Manipulate queue 'e'
    LeafQueue e = stubLeafQueue((LeafQueue)queues.get(E));
    // Allow queue 'e' to use 100% of cluster resources (max capacity).
    e.setMaxCapacity(1.0f);
    // When used queue resources goes above capacity (in this case, 1%), user
    // resource limit (used in calculating headroom) is calculated in small
    // increments to ensure that user-limit-percent can be met for all users in
    // a queue. Take user-limit-percent out of the equation so that user
    // resource limit will always be calculated to its max possible value.
    e.setUserLimit(1000);

    final String user_0 = "user_0";
    final String user_1 = "user_1";

    // Submit 2 applications for user_0
    final ApplicationAttemptId appAttemptId_0 =
        TestUtils.getMockApplicationAttemptId(0, 0);
    FiCaSchedulerApp app_0 =
        new FiCaSchedulerApp(appAttemptId_0, user_0, e,
            mock(ActiveUsersManager.class), spyRMContext);
    e.submitApplicationAttempt(app_0, user_0);

    final ApplicationAttemptId appAttemptId_1 =
        TestUtils.getMockApplicationAttemptId(1, 0);
    FiCaSchedulerApp app_1 =
        new FiCaSchedulerApp(appAttemptId_1, user_0, e,
            mock(ActiveUsersManager.class), spyRMContext);
    e.submitApplicationAttempt(app_1, user_0);

    // Submit 2 applications for user_1
    final ApplicationAttemptId appAttemptId_2 =
        TestUtils.getMockApplicationAttemptId(2, 0);
    FiCaSchedulerApp app_2 =
        new FiCaSchedulerApp(appAttemptId_2, user_1, e,
            mock(ActiveUsersManager.class), spyRMContext);
    e.submitApplicationAttempt(app_2, user_1);

    final ApplicationAttemptId appAttemptId_3 =
        TestUtils.getMockApplicationAttemptId(3, 0);
    FiCaSchedulerApp app_3 =
        new FiCaSchedulerApp(appAttemptId_3, user_1, e,
            mock(ActiveUsersManager.class), spyRMContext);
    e.submitApplicationAttempt(app_3, user_1);

    Map<ApplicationAttemptId, FiCaSchedulerApp> apps = ImmutableMap.of(
        app_0.getApplicationAttemptId(), app_0, app_1.getApplicationAttemptId(),
        app_1, app_2.getApplicationAttemptId(), app_2,
        app_3.getApplicationAttemptId(), app_3);

    // Setup 1 node with 100GB of memory resources.
    String host_0 = "127.0.0.1";
    FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0,
        100*GB);
    Map<NodeId, FiCaSchedulerNode> nodes = ImmutableMap.of(node_0.getNodeID(),
        node_0);

    final int numNodes = 1;
    Resource clusterResource =
        Resources.createResource(numNodes * (100*GB), numNodes * 128);
    when(csContext.getNumClusterNodes()).thenReturn(numNodes);
    root.updateClusterResource(clusterResource,
        new ResourceLimits(clusterResource));

    // Pending resource requests for user_0: app_0 and app_1 total 3GB.
    Priority priority = TestUtils.createMockPriority(1);
    app_0.updateResourceRequests(Collections.singletonList(
            TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, true,
                priority, recordFactory)));

    app_1.updateResourceRequests(Collections.singletonList(
        TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, true,
            priority, recordFactory)));

    // Pending resource requests for user_1: app_2 and app_3 total 1GB.
    priority = TestUtils.createMockPriority(1);
    app_2.updateResourceRequests(Collections.singletonList(
            TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, true,
                priority, recordFactory)));

    app_3.updateResourceRequests(Collections.singletonList(
        TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, true,
            priority, recordFactory)));

    // Start testing...
    // With queue capacity set at 1% of 100GB and user-limit-factor set to 1.0,
    // queue 'e' should be able to consume 1GB per user.
    assertEquals(2*GB, e.getTotalPendingResourcesConsideringUserLimit(
        clusterResource, NO_LABEL, false).getMemorySize());
    // None of the apps have assigned resources
    // user_0's apps:
    assertEquals(0*GB, app_0.getCurrentConsumption().getMemorySize());
    assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize());
    // user_1's apps:
    assertEquals(0*GB, app_2.getCurrentConsumption().getMemorySize());
    assertEquals(0*GB, app_3.getCurrentConsumption().getMemorySize());

    // Assign 1st Container of 1GB
    applyCSAssignment(clusterResource,
        e.assignContainers(clusterResource, node_0,
            new ResourceLimits(clusterResource),
            SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), e, nodes, apps);
   // The first container was assigned to user_0's app_0. Queues total headroom
    // has 1GB left for user_1.
    assertEquals(1*GB, e.getTotalPendingResourcesConsideringUserLimit(
        clusterResource, NO_LABEL, false).getMemorySize());
    // user_0's apps:
    assertEquals(1*GB, app_0.getCurrentConsumption().getMemorySize());
    assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize());
    // user_1's apps:
    assertEquals(0*GB, app_2.getCurrentConsumption().getMemorySize());
    assertEquals(0*GB, app_3.getCurrentConsumption().getMemorySize());

    // Assign 2nd container of 1GB
    applyCSAssignment(clusterResource,
        e.assignContainers(clusterResource, node_0,
        new ResourceLimits(clusterResource),
        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), e, nodes, apps);
    // user_0 has no headroom due to user-limit-factor of 1.0. However capacity
    // scheduler will assign one container more than user-limit-factor. So,
    // this container went to user_0's app_1. so, headroom for queue 'e'e is
    // still 1GB for user_1
    assertEquals(1*GB, e.getTotalPendingResourcesConsideringUserLimit(
        clusterResource, NO_LABEL, false).getMemorySize());
    // user_0's apps:
    assertEquals(1*GB, app_0.getCurrentConsumption().getMemorySize());
    assertEquals(1*GB, app_1.getCurrentConsumption().getMemorySize());
    // user_1's apps:
    assertEquals(0*GB, app_2.getCurrentConsumption().getMemorySize());
    assertEquals(0*GB, app_3.getCurrentConsumption().getMemorySize());

    // Assign 3rd container.
    applyCSAssignment(clusterResource,
        e.assignContainers(clusterResource, node_0,
        new ResourceLimits(clusterResource),
        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), e, nodes, apps);
    // Container was allocated to user_1's app_2 since user_1, Now, no headroom
    // is left.
    assertEquals(0*GB, e.getTotalPendingResourcesConsideringUserLimit(
        clusterResource, NO_LABEL, false).getMemorySize());
    // user_0's apps:
    assertEquals(1*GB, app_0.getCurrentConsumption().getMemorySize());
    assertEquals(1*GB, app_1.getCurrentConsumption().getMemorySize());
    // user_1's apps:
    assertEquals(1*GB, app_2.getCurrentConsumption().getMemorySize());
    assertEquals(0*GB, app_3.getCurrentConsumption().getMemorySize());

    // Assign 4th container.
    applyCSAssignment(clusterResource,
        e.assignContainers(clusterResource, node_0,
        new ResourceLimits(clusterResource),
        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), e, nodes, apps);
    // Allocated to user_1's app_2 since scheduler allocates 1 container
    // above user resource limit. Available headroom still 0.
    assertEquals(0*GB, e.getTotalPendingResourcesConsideringUserLimit(
        clusterResource, NO_LABEL, false).getMemorySize());
    // user_0's apps:
    long app_0_consumption = app_0.getCurrentConsumption().getMemorySize();
    assertEquals(1*GB, app_0_consumption);
    long app_1_consumption = app_1.getCurrentConsumption().getMemorySize();
    assertEquals(1*GB, app_1_consumption);
    // user_1's apps:
    long app_2_consumption = app_2.getCurrentConsumption().getMemorySize();
    assertEquals(2*GB, app_2_consumption);
    long app_3_consumption = app_3.getCurrentConsumption().getMemorySize();
    assertEquals(0*GB, app_3_consumption);

    // Attempt to assign 5th container. Will be a no-op.
    applyCSAssignment(clusterResource,
        e.assignContainers(clusterResource, node_0,
        new ResourceLimits(clusterResource),
        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), e, nodes, apps);
    // Cannot allocate 5th container because both users are above their allowed
    // user resource limit. Values should be the same as previously.
    assertEquals(0*GB, e.getTotalPendingResourcesConsideringUserLimit(
        clusterResource, NO_LABEL, false).getMemorySize());
    // user_0's apps:
    assertEquals(app_0_consumption, app_0.getCurrentConsumption().getMemorySize());
    assertEquals(app_1_consumption, app_1.getCurrentConsumption().getMemorySize());
    // user_1's apps:
    assertEquals(app_2_consumption, app_2.getCurrentConsumption().getMemorySize());
    assertEquals(app_3_consumption, app_3.getCurrentConsumption().getMemorySize());

    // Increase user-limit-factor from 1GB to 10GB (1% * 10 * 100GB = 10GB).
    // Pending for both user_0 and user_1 are still 1GB each, so user-limit-
    // factor is no longer the limiting factor.
    e.setUserLimitFactor(10.0f);

    applyCSAssignment(clusterResource,
        e.assignContainers(clusterResource, node_0,
        new ResourceLimits(clusterResource),
        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), e, nodes, apps);
    // Next container goes to user_0's app_1, since it still wanted 1GB.
    assertEquals(1*GB, e.getTotalPendingResourcesConsideringUserLimit(
        clusterResource, NO_LABEL, false).getMemorySize());
    // user_0's apps:
    assertEquals(1*GB, app_0.getCurrentConsumption().getMemorySize());
    assertEquals(2*GB, app_1.getCurrentConsumption().getMemorySize());
    // user_1's apps:
    assertEquals(2*GB, app_2.getCurrentConsumption().getMemorySize());
    assertEquals(0*GB, app_3.getCurrentConsumption().getMemorySize());

    applyCSAssignment(clusterResource,
        e.assignContainers(clusterResource, node_0,
        new ResourceLimits(clusterResource),
        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), e, nodes, apps);
    // Last container goes to user_1's app_3, since it still wanted 1GB.
    // user_0's apps:
    assertEquals(0*GB, e.getTotalPendingResourcesConsideringUserLimit(
        clusterResource, NO_LABEL, false).getMemorySize());
    assertEquals(1*GB, app_0.getCurrentConsumption().getMemorySize());
    assertEquals(2*GB, app_1.getCurrentConsumption().getMemorySize());
    // user_1's apps:
    assertEquals(2*GB, app_2.getCurrentConsumption().getMemorySize());
    assertEquals(1*GB, app_3.getCurrentConsumption().getMemorySize());

    // Release each container from app_0
    for (RMContainer rmContainer : app_0.getLiveContainers()) {
      e.completedContainer(clusterResource, app_0, node_0, rmContainer,
          ContainerStatus.newInstance(rmContainer.getContainerId(),
              ContainerState.COMPLETE, "",
              ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
          RMContainerEventType.KILL, null, true);
    }

    // Release each container from app_1
    for (RMContainer rmContainer : app_1.getLiveContainers()) {
      e.completedContainer(clusterResource, app_1, node_0, rmContainer,
          ContainerStatus.newInstance(rmContainer.getContainerId(),
              ContainerState.COMPLETE, "",
              ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
          RMContainerEventType.KILL, null, true);
    }
  }

  private List<FiCaSchedulerApp> createListOfApps(int noOfApps, String user,
      LeafQueue defaultQueue) {
    List<FiCaSchedulerApp> appsLists = new ArrayList<FiCaSchedulerApp>();
    for (int i = 0; i < noOfApps; i++) {
      ApplicationAttemptId appAttemptId_0 =
          TestUtils.getMockApplicationAttemptId(i, 0);
      FiCaSchedulerApp app_0 =
          new FiCaSchedulerApp(appAttemptId_0, user, defaultQueue,
              mock(ActiveUsersManager.class), spyRMContext);
      appsLists.add(app_0);
    }
    return appsLists;
  }

  private CapacitySchedulerContext mockCSContext(
      CapacitySchedulerConfiguration csConf, Resource clusterResource) {
    CapacitySchedulerContext csContext = mock(CapacitySchedulerContext.class);
    when(csContext.getConfiguration()).thenReturn(csConf);
    when(csContext.getConf()).thenReturn(new YarnConfiguration());
    when(csContext.getResourceCalculator()).thenReturn(resourceCalculator);
    when(csContext.getClusterResource()).thenReturn(clusterResource);
    when(csContext.getMinimumResourceCapability()).thenReturn(
        Resources.createResource(GB, 1));
    when(csContext.getMaximumResourceCapability()).thenReturn(
        Resources.createResource(2 * GB, 2));
    when(csContext.getPreemptionManager()).thenReturn(new PreemptionManager());
    return csContext;
  }

  @Test
  public void testApplicationQueuePercent()
      throws Exception {
    Resource res = Resource.newInstance(10 * 1024, 10);
    CapacityScheduler scheduler = mock(CapacityScheduler.class);
    when(scheduler.getClusterResource()).thenReturn(res);
    when(scheduler.getResourceCalculator())
        .thenReturn(new DefaultResourceCalculator());

    ApplicationAttemptId appAttId = createAppAttemptId(0, 0);
    RMContext rmContext = mock(RMContext.class);
    when(rmContext.getEpoch()).thenReturn(3L);
    when(rmContext.getScheduler()).thenReturn(scheduler);
    when(rmContext.getRMApps())
      .thenReturn(new ConcurrentHashMap<ApplicationId, RMApp>());
    RMNodeLabelsManager nlm = mock(RMNodeLabelsManager.class);
    when(nlm.getResourceByLabel(any(), any())).thenReturn(res);
    when(rmContext.getNodeLabelManager()).thenReturn(nlm);
    when(rmContext.getYarnConfiguration()).thenReturn(csConf);

    // Queue "test" consumes 100% of the cluster, so its capacity and absolute
    // capacity are both 1.0f.
    Queue queue = createQueue("test", "root.test", null, 1.0f, 1.0f, res);
    final String user = "user1";
    FiCaSchedulerApp app =
        new FiCaSchedulerApp(appAttId, user, queue,
            queue.getAbstractUsersManager(), rmContext);

    // Resource request
    Resource requestedResource = Resource.newInstance(1536, 2);
    app.getAppAttemptResourceUsage().incUsed(requestedResource);
    // In "test" queue, 1536 used is 15% of both the queue and the cluster
    assertEquals(15.0f, app.getResourceUsageReport().getQueueUsagePercentage(),
        0.01f);
    assertEquals(15.0f,
        app.getResourceUsageReport().getClusterUsagePercentage(), 0.01f);

    // Queue "test2" is a child of root and its capacity is 50% of root. As a
    // child of root, its absolute capaicty is also 50%.
    queue = createQueue("test2", "root.test2", null, 0.5f, 0.5f,
        Resources.divideAndCeil(dominantResourceCalculator, res, 2));
    app = new FiCaSchedulerApp(appAttId, user, queue,
        queue.getAbstractUsersManager(), rmContext);
    app.getAppAttemptResourceUsage().incUsed(requestedResource);
    // In "test2" queue, 1536 used is 30% of "test2" and 15% of the cluster.
    assertEquals(30.0f, app.getResourceUsageReport().getQueueUsagePercentage(),
        0.01f);
    assertEquals(15.0f,
        app.getResourceUsageReport().getClusterUsagePercentage(), 0.01f);

    // Queue "test2.1" is 50% of queue "test2", which is 50% of the cluster.
    // Therefore, "test2.1" capacity is 50% and absolute capacity is 25%.
    AbstractCSQueue qChild =
        createQueue("test2.1", "root.test2.1", queue, 0.5f, 0.25f,
        Resources.divideAndCeil(dominantResourceCalculator, res, 4));
    app = new FiCaSchedulerApp(appAttId, user, qChild,
        qChild.getAbstractUsersManager(), rmContext);
    app.getAppAttemptResourceUsage().incUsed(requestedResource);
    // In "test2.1" queue, 1536 used is 60% of "test2.1" and 15% of the cluster.
    assertEquals(60.0f, app.getResourceUsageReport().getQueueUsagePercentage(),
        0.01f);
    assertEquals(15.0f,
        app.getResourceUsageReport().getClusterUsagePercentage(), 0.01f);

    // test that queueUsagePercentage returns neither NaN nor Infinite
    AbstractCSQueue zeroQueue = createQueue("test2.2", "root.test2.2", null,
        Float.MIN_VALUE, Float.MIN_VALUE,
        Resources.multiply(res, Float.MIN_VALUE));
    app = new FiCaSchedulerApp(appAttId, user, zeroQueue,
        qChild.getAbstractUsersManager(), rmContext);
    app.getAppAttemptResourceUsage().incUsed(requestedResource);
    assertEquals(0.0f, app.getResourceUsageReport().getQueueUsagePercentage(),
        0.01f);
  }

  @Test
  public void testSetupQueueConfigsWithSpecifiedConfiguration()
      throws IOException {

    try {
      CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(
          new Configuration(false), false);

      final String leafQueueName =
          "testSetupQueueConfigsWithSpecifiedConfiguration";
      final QueuePath leafQueuePath = new QueuePath(ROOT.getFullPath(), leafQueueName);
      final QueuePath bQueuePath = new QueuePath(ROOT + DOT + B);

      assertEquals(0, conf.size());
      conf.setNodeLocalityDelay(60);

      csConf.setQueues(ROOT, new String[] {leafQueueName, B});
      csConf.setCapacity(leafQueuePath, 10);
      csConf.setMaximumCapacity(leafQueuePath, 100);
      csConf.setUserLimitFactor(leafQueuePath, 0.1f);

      csConf.setCapacity(bQueuePath, 90);
      csConf.setMaximumCapacity(bQueuePath, 100);

      csConf.setNodeLocalityDelay(30);
      csConf.setGlobalMaximumApplicationsPerQueue(20);
      queueContext.reinitialize();

      // reinitialize queues
      CSQueueStore newQueues = new CSQueueStore();
      CSQueue newRoot =
          CapacitySchedulerQueueManager.parseQueue(queueContext, csConf, null,
              CapacitySchedulerConfiguration.ROOT,
              newQueues, queues,
              TestUtils.spyHook);
      queues = newQueues;
      root.reinitialize(newRoot, csContext.getClusterResource());
      root.updateClusterResource(csContext.getClusterResource(),
          new ResourceLimits(csContext.getClusterResource()));

      // Mock the queue
      LeafQueue leafQueue = stubLeafQueue((LeafQueue)queues.get(leafQueueName));

      assertEquals(30, leafQueue.getNodeLocalityDelay());
      assertEquals(20, leafQueue.getMaxApplications());
      assertEquals(2, leafQueue.getMaxApplicationsPerUser());

      //check queue configs
      conf.setMaximumAMResourcePercentPerPartition(leafQueue.getQueuePathObject(),
          NO_LABEL, 10);
      conf.setMaximumCapacity(leafQueue.getQueuePathObject(), 10);

      assertEquals(0.1, leafQueue.getMaxAMResourcePerQueuePercent(),
          EPSILON);
      assertEquals(1, leafQueue.getMaximumCapacity(),
          EPSILON);
      assertEquals(0.1, leafQueue.getCapacity(),
          EPSILON);
      assertEquals(0.1, leafQueue.getAbsoluteCapacity(),
          EPSILON);
      assertEquals(1.0, leafQueue.getAbsoluteMaximumCapacity(),
          EPSILON);

      // limit maximum apps by max system apps
      csConf.setMaximumSystemApplications(15);
      queueContext.reinitialize();
      leafQueue.updateClusterResource(csContext.getClusterResource(),
          new ResourceLimits(csContext.getClusterResource()));

      assertEquals(15, leafQueue.getMaxApplications());

     } finally {
        //revert config changes
        csConf.setNodeLocalityDelay(
            CapacitySchedulerConfiguration.DEFAULT_NODE_LOCALITY_DELAY);
        csConf.setGlobalMaximumApplicationsPerQueue(
            (int) CapacitySchedulerConfiguration.UNDEFINED);
     }
  }

  private ApplicationAttemptId createAppAttemptId(int appId, int attemptId) {
    ApplicationId appIdImpl = ApplicationId.newInstance(0, appId);
    ApplicationAttemptId attId =
        ApplicationAttemptId.newInstance(appIdImpl, attemptId);
    return attId;
  }

  private AbstractCSQueue
      createQueue(String name, String path, Queue parent, float capacity,
      float absCap, Resource res) {
    CSQueueMetrics metrics = CSQueueMetrics.forQueue(name, parent, false, cs.getConf());
    QueueInfo queueInfo = QueueInfo.
        newInstance(name, path, capacity, 1.0f, 0, null,
        null, QueueState.RUNNING, null, "", null, false, -1.0f, 10, null, false);
    ActiveUsersManager activeUsersManager = new ActiveUsersManager(metrics);
    AbstractCSQueue queue = mock(AbstractCSQueue.class);
    when(queue.getMetrics()).thenReturn(metrics);
    when(queue.getAbstractUsersManager()).thenReturn(activeUsersManager);
    when(queue.getQueueInfo(false, false)).thenReturn(queueInfo);
    QueueCapacities qCaps = mock(QueueCapacities.class);
    when(qCaps.getAbsoluteCapacity(any())).thenReturn(absCap);
    when(queue.getQueueCapacities()).thenReturn(qCaps);
    QueueResourceQuotas qQuotas = mock(QueueResourceQuotas.class);
    when(qQuotas.getConfiguredMinResource(any())).thenReturn(res);
    when(qQuotas.getConfiguredMaxResource(any())).thenReturn(res);
    when(qQuotas.getEffectiveMinResource(any())).thenReturn(res);
    when(qQuotas.getEffectiveMaxResource(any())).thenReturn(res);
    when(queue.getQueueResourceQuotas()).thenReturn(qQuotas);
    when(queue.getEffectiveCapacity(any())).thenReturn(res);
    return queue;
  }

  @Test
  public void testMaxApplicationsWithNodeLabels() throws IOException {
    CapacitySchedulerConfiguration conf = csConf;
    String rootChild = root.getChildQueues().get(0).getQueuePath();
    when(cs.getClusterResource()).thenReturn(
        Resources.createResource(2 * 16 * GB, 2 * 32));

    conf.setCapacityByLabel(ROOT, "test", 100);
    conf.setCapacityByLabel(new QueuePath(rootChild), "test", 100);
    conf.setCapacityByLabel(new QueuePath(rootChild, A), "test", 20);
    conf.setCapacityByLabel(new QueuePath(rootChild, B), "test", 40);
    conf.setCapacityByLabel(new QueuePath(rootChild, C), "test", 10);
    conf.setCapacityByLabel(QueuePath.createFromQueues(rootChild, C, C1), "test", 100);
    conf.setCapacityByLabel(new QueuePath(rootChild, D), "test", 30);
    conf.setCapacityByLabel(new QueuePath(rootChild, E), "test", 0);
    cs.getCapacitySchedulerQueueManager().reinitConfiguredNodeLabels(conf);
    cs.setMaxRunningAppsEnforcer(new CSMaxRunningAppsEnforcer(cs));
    cs.reinitialize(conf, cs.getRMContext());

    LeafQueue e = (LeafQueue) cs.getQueue("e");
    // Maximum application should be calculated with the default node label
    assertEquals((int)(conf.getMaximumSystemApplications() * e.getAbsoluteCapacity()),
        e.getMaxApplications(), "Maximum application is not calculated properly");

    conf.setCapacityByLabel(new QueuePath(rootChild, A), "test", 10);
    conf.setCapacityByLabel(new QueuePath(rootChild, B), "test", 10);
    conf.setCapacityByLabel(new QueuePath(rootChild, C), "test", 10);
    conf.setCapacityByLabel(new QueuePath(rootChild, D), "test", 10);
    conf.setCapacityByLabel(new QueuePath(rootChild, E), "test", 60);
    cs.reinitialize(conf, cs.getRMContext());

    e = (LeafQueue) cs.getQueue("e");
    // Maximum application is now determined by test label, because that would
    // yield a higher value than with default node label
    assertEquals((int)(conf.getMaximumSystemApplications() *
        e.getQueueCapacities().getAbsoluteCapacity("test")),
        e.getMaxApplications(), "Maximum application is not calculated properly");
  }

  @Test
  public void testRootHasAllNodeLabelsOfItsDescendants() throws IOException {
    CapacitySchedulerConfiguration conf = csConf;
    String rootChild = root.getChildQueues().get(0).getQueuePath();

    conf.setCapacityByLabel(new QueuePath(rootChild), "test", 100);
    conf.setCapacityByLabel(new QueuePath(rootChild, A), "test", 20);
    conf.setCapacityByLabel(new QueuePath(rootChild, B), "test", 40);
    conf.setCapacityByLabel(new QueuePath(rootChild, C), "test", 10);
    conf.setCapacityByLabel(QueuePath.createFromQueues(rootChild, C, C1), "test", 100);
    conf.setCapacityByLabel(new QueuePath(rootChild, D), "test", 30);
    conf.setCapacityByLabel(new QueuePath(rootChild, E), "test", 0);

    conf.setCapacityByLabel(new QueuePath(rootChild), "test2", 100);
    conf.setCapacityByLabel(new QueuePath(rootChild, A), "test2", 20);
    conf.setCapacityByLabel(new QueuePath(rootChild, B), "test2", 40);
    conf.setCapacityByLabel(new QueuePath(rootChild, C), "test2", 10);
    conf.setCapacityByLabel(QueuePath.createFromQueues(rootChild, C, C1), "test2", 100);
    conf.setCapacityByLabel(new QueuePath(rootChild, D), "test2", 30);
    conf.setCapacityByLabel(new QueuePath(rootChild, E), "test2", 0);

    cs.getCapacitySchedulerQueueManager().reinitConfiguredNodeLabels(conf);
    cs.setMaxRunningAppsEnforcer(new CSMaxRunningAppsEnforcer(cs));
    cs.reinitialize(conf, cs.getRMContext());

    ParentQueue rootQueue = (ParentQueue) cs.getRootQueue();

    assertEquals(Sets.newHashSet("", "test", "test2"),
        rootQueue.queueNodeLabelsSettings.getConfiguredNodeLabels());
  }

  @AfterEach
  public void tearDown() throws Exception {
    if (cs != null) {
      cs.stop();
    }
  }

  private static class TestRMAppManager extends RMAppManager {
    TestRMAppManager(RMContext context, YarnScheduler scheduler,
        ApplicationMasterService masterService,
        ApplicationACLsManager applicationACLsManager, Configuration conf) {
      super(context, scheduler, masterService, applicationACLsManager, conf);
    }

    @Override
    public void submitApplication(
        ApplicationSubmissionContext submissionContext, long submitTime,
        UserGroupInformation userUgi) throws YarnException {
      super.submitApplication(submissionContext, submitTime, userUgi);
    }
  }

  @Test
  public void testSubmitUsingRealUserAcls() throws Exception {
    final String realUser = "AdminUser";
    final String user0 = "user0";
    final String user1 = "user1";
    final String queue = "default";

    YarnConfiguration conf = new YarnConfiguration();
    MockRM rm = new MockRM();
    rm.init(conf);
    rm.start();
    rm.getConfig().setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true);

    UserGroupInformation realUserUgi =
        UserGroupInformation.createRemoteUser(realUser);
    UserGroupInformation ugi0 =
        UserGroupInformation.createProxyUserForTesting("user0", realUserUgi,
            new String[] {"group1"});
    UserGroupInformation ugi1 =
        UserGroupInformation.createProxyUserForTesting("user1", realUserUgi,
            new String[] {"group1"});
    ApplicationId applicationId0 = TestUtils.getMockApplicationId(0);
    ApplicationId applicationId1 = TestUtils.getMockApplicationId(1);
    CapacityScheduler cSched = (CapacityScheduler) rm.getResourceScheduler();

    ParentQueue rootQueue = (ParentQueue) cSched.getRootQueue();
    Map<AccessType, AccessControlList> rootAcls = rootQueue.acls;
    rootAcls.put(AccessType.SUBMIT_APP, new AccessControlList(realUser));
    rootAcls.put(AccessType.ADMINISTER_QUEUE, new AccessControlList(realUser));

    LeafQueue defaultQueue = (LeafQueue)cSched.getQueue(queue);
    Map<AccessType, AccessControlList> a = defaultQueue.acls;
    a.put(AccessType.SUBMIT_APP, new AccessControlList(realUser));
    a.put(AccessType.ADMINISTER_QUEUE, new AccessControlList(realUser));

    TestRMAppManager testRmAppManager =
        new TestRMAppManager(rmContext, cSched, rm.getApplicationMasterService(),
            rm.getApplicationACLsManager(), rm.getConfig());
    ContainerLaunchContext clc =
        mock(ContainerLaunchContext.class);
    ApplicationSubmissionContext asc =
        ApplicationSubmissionContext.newInstance(
            applicationId0, "myApp0", "default", Priority.newInstance(0),
            clc, false, false, 1, Resource.newInstance(1024, 1));

    // Each of the following test cases has a proxied user and a real user.
    // The proxied users are user0 and user1, depending on the test. The real
    // user is always AdminUser.

    // Ensure that user0 is not allowed to submit to the default queue when only
    // the admin user is in the submit ACL and the admin user does not have the
    // USE_REAL_ACLS character prepended.
    try {
      testRmAppManager.submitApplication(asc, System.currentTimeMillis(), ugi0);
      fail(user0 + " should not be allowed to submit to the "
                  + queue + " queue when only admin user is in submit ACL.");
    } catch (YarnException e) {
      // This is the expected behavior.
      assertTrue(e.getCause() instanceof AccessControlException,
          "Should have received an AccessControlException.");
    }

    // With only user0 in the list of users authorized to submit apps to the
    // queue, ensure that user0 is allowed to submit to the default queue.
    a.put(AccessType.SUBMIT_APP, new AccessControlList(user0));
    a.put(AccessType.ADMINISTER_QUEUE, new AccessControlList(realUser));
    try {
      testRmAppManager.submitApplication(asc, System.currentTimeMillis(), ugi0);
    } catch (YarnException e) {
      fail(user0 + " should be allowed to submit to the "
                  + queue + " queue.");
    }

    // With only user0 in the list of users authorized to submit apps to the
    // queue, ensure that user1 is NOT allowed to submit to the default queue
    try {
      testRmAppManager.submitApplication(asc, System.currentTimeMillis(), ugi1);
      fail(user1 + " should not be allowed to submit to the "
          + queue + " queue.");
    } catch (YarnException e) {
      // This is the expected behavior.
      assertTrue(e.getCause() instanceof AccessControlException,
          "Should have received an AccessControlException.");
    }

    // Even though the admin user is in the list of users allowed to submit to
    // the default queue and user1's real user is the admin user, user1 should
    // not be allowed to submit to queue because the ACL entry does not have the
    // special character prepended in the list.
    a.put(AccessType.SUBMIT_APP,
        new AccessControlList(user0 + "," + realUser));
    try {
      testRmAppManager.submitApplication(asc, System.currentTimeMillis(), ugi1);
      fail(user1 + " should not be allowed to submit to the "
                  + queue + " queue.");
    } catch (YarnException e) {
      // This is the expected behavior.
      assertTrue(e.getCause() instanceof AccessControlException,
          "Should have received an AccessControlException.");
    }

    // user1 should now be allowed to submit to the default queue because the
    // admin user is in the ACL list and has the USE_REAL_ACLS character
    // prepended.
    a.put(AccessType.SUBMIT_APP,
        new AccessControlList(user0 + ","
            + AccessControlList.USE_REAL_ACLS + realUser));
    asc.setApplicationId(applicationId1);
    try {
      testRmAppManager.submitApplication(asc, System.currentTimeMillis(), ugi1);
    } catch (YarnException e) {
      LOG.error("failed to submit", e);
      fail(user1 + " should be allowed to submit to the " +
          queue + " queue when real user is" + realUser + ".");
    }

    rm.stop();
    rm.close();
  }
}