TestCapacitySchedulerAutoCreatedQueueBase.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 * <p>
 * http://www.apache.org/licenses/LICENSE-2.0
 * <p>
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;

import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableMap;
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet;
import org.apache.commons.lang3.RandomUtils;
import org.apache.hadoop.security.Groups;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.security.ShellBasedUnixGroupsMapping;
import org.apache.hadoop.security.TestGroupsCaching;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmissionData;
import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmitter;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels
    .NullRMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.placement
    .ApplicationPlacementContext;
import org.apache.hadoop.yarn.server.resourcemanager.placement.QueueMapping;
import org.apache.hadoop.yarn.server.resourcemanager.placement.QueueMapping.QueueMappingBuilder;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler
    .ResourceScheduler;

import org.apache.hadoop.yarn.server.resourcemanager.scheduler
    .SchedulerDynamicEditException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity
    .queuemanagement.GuaranteedOrZeroCapacityOverTimePolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common
    .QueueEntitlement;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event
    .AppAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event
    .AppAttemptAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event
    .SchedulerEvent;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.Records;
import org.apache.hadoop.yarn.util.YarnVersionInfo;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

import static org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager
    .NO_LABEL;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AbstractCSQueue.CapacityConfigType.ABSOLUTE_RESOURCE;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
    .capacity.CSQueueUtils.EPSILON;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
    .capacity.CapacitySchedulerConfiguration.DOT;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
    .capacity.CapacitySchedulerConfiguration.FAIR_APP_ORDERING_POLICY;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

public class TestCapacitySchedulerAutoCreatedQueueBase {

  private static final Logger LOG = LoggerFactory.getLogger(
      TestCapacitySchedulerAutoCreatedQueueBase.class);
  public static final int GB = 1024;
  public static final ContainerUpdates NULL_UPDATE_REQUESTS =
      new ContainerUpdates();

  public static final String DEFAULT_PATH = CapacitySchedulerConfiguration.ROOT + ".default";
  public static final String A_PATH = CapacitySchedulerConfiguration.ROOT + ".a";
  public static final String B_PATH = CapacitySchedulerConfiguration.ROOT + ".b";
  public static final String C_PATH = CapacitySchedulerConfiguration.ROOT + ".c";
  public static final String D_PATH = CapacitySchedulerConfiguration.ROOT + ".d";
  public static final String E_PATH = CapacitySchedulerConfiguration.ROOT + ".e";

  public static final QueuePath ROOT = new QueuePath(CapacitySchedulerConfiguration.ROOT);
  public static final QueuePath DEFAULT = new QueuePath(DEFAULT_PATH);
  public static final QueuePath A = new QueuePath(A_PATH);
  public static final QueuePath B = new QueuePath(B_PATH);
  public static final QueuePath C = new QueuePath(C_PATH);
  public static final QueuePath D = new QueuePath(D_PATH);
  public static final QueuePath E = new QueuePath(E_PATH);
  public static final String ESUBGROUP1_PATH =
      CapacitySchedulerConfiguration.ROOT + ".esubgroup1";
  public static final String FGROUP_PATH =
      CapacitySchedulerConfiguration.ROOT + ".fgroup";
  public static final String A1_PATH = A_PATH + ".a1";
  public static final String A2_PATH = A_PATH + ".a2";
  public static final String B1_PATH = B_PATH + ".b1";
  public static final String B2_PATH = B_PATH + ".b2";
  public static final String B3_PATH = B_PATH + ".b3";
  public static final String B4_PATH = B_PATH + ".b4subgroup1";
  public static final String ESUBGROUP1_A_PATH = ESUBGROUP1_PATH + ".e";
  public static final String FGROUP_F_PATH = FGROUP_PATH + ".f";

  public static final QueuePath A1 = new QueuePath(A1_PATH);
  public static final QueuePath A2 = new QueuePath(A2_PATH);
  public static final QueuePath B1 = new QueuePath(B1_PATH);
  public static final QueuePath B2 = new QueuePath(B2_PATH);
  public static final QueuePath B3 = new QueuePath(B3_PATH);
  public static final QueuePath B4 = new QueuePath(B4_PATH);
  public static final QueuePath E_GROUP = new QueuePath(ESUBGROUP1_PATH);
  public static final QueuePath F_GROUP = new QueuePath(FGROUP_PATH);
  public static final QueuePath E_SG = new QueuePath(ESUBGROUP1_A_PATH);
  public static final QueuePath F_SG = new QueuePath(FGROUP_F_PATH);

  public static final float A_CAPACITY = 20f;
  public static final float B_CAPACITY = 20f;
  public static final float C_CAPACITY = 20f;
  public static final float D_CAPACITY = 20f;
  public static final float ESUBGROUP1_CAPACITY = 10f;
  public static final float FGROUP_CAPACITY = 10f;

  public static final float A1_CAPACITY = 30;
  public static final float A2_CAPACITY = 70;
  public static final float B1_CAPACITY = 60f;
  public static final float B2_CAPACITY = 20f;
  public static final float B3_CAPACITY = 10f;
  public static final float B4_CAPACITY = 10f;
  public static final int NODE_MEMORY = 16;

  public static final int NODE1_VCORES = 16;
  public static final int NODE2_VCORES = 32;
  public static final int NODE3_VCORES = 48;

  public static final String TEST_GROUP = "testusergroup";
  public static final String TEST_GROUPUSER = "testuser";
  public static final String TEST_GROUP1 = "testusergroup1";
  public static final String TEST_GROUPUSER1 = "testuser1";
  public static final String TEST_GROUP2 = "testusergroup2";
  public static final String TEST_GROUPUSER2 = "testuser2";
  public static final String USER = "user_";
  public static final String USER0 = USER + 0;
  public static final String USER1 = USER + 1;
  public static final String USER2 = USER + 2;
  public static final String USER3 = USER + 3;
  public static final String PARENT_QUEUE = "c";
  public static final QueuePath PARENT_QUEUE_PATH = new QueuePath(PARENT_QUEUE);

  public static final Set<String> accessibleNodeLabelsOnC = new HashSet<>();

  public static final String NODEL_LABEL_GPU = "GPU";
  public static final String NODEL_LABEL_SSD = "SSD";

  public static final float NODE_LABEL_GPU_TEMPLATE_CAPACITY = 30.0f;
  public static final float NODEL_LABEL_SSD_TEMPLATE_CAPACITY = 40.0f;
  public static final ImmutableSet<String> RESOURCE_TYPES = ImmutableSet.of("memory", "vcores");

  protected MockRM mockRM = null;
  protected MockNM nm1 = null;
  protected MockNM nm2 = null;
  protected MockNM nm3 = null;
  protected CapacityScheduler cs;
  protected SpyDispatcher dispatcher;
  private static EventHandler<Event> rmAppEventEventHandler;

  public static class SpyDispatcher extends AsyncDispatcher {

    public static BlockingQueue<Event> eventQueue = new LinkedBlockingQueue<>();

    public static class SpyRMAppEventHandler implements EventHandler<Event> {
      public void handle(Event event) {
        eventQueue.add(event);
      }
    }

    @Override
    protected void dispatch(Event event) {
      eventQueue.add(event);
    }

    @Override
    public EventHandler<Event> getEventHandler() {
      return rmAppEventEventHandler;
    }

    void spyOnNextEvent(Event expectedEvent, long timeout)
        throws InterruptedException {

      Event event = eventQueue.poll(timeout, TimeUnit.MILLISECONDS);
      assertEquals(expectedEvent.getType(), event.getType());
      assertEquals(expectedEvent.getClass(), event.getClass());
    }
  }

  @BeforeEach
  public void setUp() throws Exception {
    QueueMetrics.clearQueueMetrics();
    CapacitySchedulerConfiguration conf = setupSchedulerConfiguration();
    setupQueueConfiguration(conf);
    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
        ResourceScheduler.class);

    setupQueueMappings(conf, PARENT_QUEUE, true, new int[] { 0, 1, 2, 3 });

    dispatcher = new SpyDispatcher();
    rmAppEventEventHandler = new SpyDispatcher.SpyRMAppEventHandler();
    dispatcher.register(RMAppEventType.class, rmAppEventEventHandler);

    RMNodeLabelsManager mgr = setupNodeLabelManager(conf);

    mockRM = new MockRM(conf) {
      protected RMNodeLabelsManager createNodeLabelManager() {
        return mgr;
      }
    };

    cs = (CapacityScheduler) mockRM.getResourceScheduler();
    cs.updatePlacementRules();
    mockRM.start();
    cs.start();

    setupNodes(mockRM);
  }

  protected void setupNodes(MockRM newMockRM) throws Exception {
    NodeLabel ssdLabel = Records.newRecord(NodeLabel.class);
    ssdLabel.setName(NODEL_LABEL_SSD);
    ssdLabel.setExclusivity(true);

    nm1 = // label = SSD
        new MockNM("h1:1234",
            Resource.newInstance(NODE_MEMORY * GB, NODE1_VCORES),
            newMockRM.getResourceTrackerService(),
            YarnVersionInfo.getVersion(),
            new HashSet<NodeLabel>() {{ add(ssdLabel); }});

    nm1.registerNode();

    NodeLabel gpuLabel = Records.newRecord(NodeLabel.class);
    gpuLabel.setName(NODEL_LABEL_GPU);
    gpuLabel.setExclusivity(true);

    //Label = GPU
    nm2 = new MockNM("h2:1234",
        Resource.newInstance(NODE_MEMORY * GB, NODE2_VCORES),
        newMockRM.getResourceTrackerService(),
        YarnVersionInfo.getVersion(),
        new HashSet<NodeLabel>() {{ add(gpuLabel); }});
    nm2.registerNode();

    nm3 = // label = ""
        new MockNM("h3:1234", NODE_MEMORY * GB, NODE3_VCORES, newMockRM
            .getResourceTrackerService
                ());
    nm3.registerNode();
  }

  public static CapacitySchedulerConfiguration setupQueueMappings(
      CapacitySchedulerConfiguration conf, String parentQueue, boolean
      overrideWithQueueMappings, int[] userIds) {

    List<String> queuePlacementRules = new ArrayList<>();
    queuePlacementRules.add(YarnConfiguration.USER_GROUP_PLACEMENT_RULE);
    conf.setQueuePlacementRules(queuePlacementRules);

    List<QueueMapping> existingMappings = conf.getQueueMappings();

    //set queue mapping
    List<QueueMapping> queueMappings = new ArrayList<>();
    for (int i = 0; i < userIds.length; i++) {
      //Set C as parent queue name for auto queue creation
      QueueMapping userQueueMapping = QueueMappingBuilder.create()
                                          .type(QueueMapping.MappingType.USER)
                                          .source(USER + userIds[i])
                                          .queue(
                                              getQueueMapping(parentQueue,
                                                  USER + userIds[i]))
                                          .build();
      queueMappings.add(userQueueMapping);
    }

    existingMappings.addAll(queueMappings);
    conf.setQueueMappings(existingMappings);
    //override with queue mappings
    conf.setOverrideWithQueueMappings(overrideWithQueueMappings);
    return conf;
  }

  public static CapacitySchedulerConfiguration setupGroupQueueMappings
      (String parentQueue, CapacitySchedulerConfiguration conf, String
          leafQueueName) {

    List<QueueMapping> existingMappings = conf.getQueueMappings();

    //set queue mapping
    List<QueueMapping> queueMappings = new ArrayList<>();

    //setup group mapping
    conf.setClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING,
        TestGroupsCaching.FakeunPrivilegedGroupMapping.class, ShellBasedUnixGroupsMapping.class);
    conf.set(CommonConfigurationKeys.HADOOP_USER_GROUP_STATIC_OVERRIDES,
        TEST_GROUPUSER +"=" + TEST_GROUP + ";" + TEST_GROUPUSER1 +"="
            + TEST_GROUP1 + ";" + TEST_GROUPUSER2 + "=" + TEST_GROUP2 + ";invalid_user=invalid_group");
    Groups.getUserToGroupsMappingServiceWithLoadedConfiguration(conf);

    QueueMapping userQueueMapping = QueueMappingBuilder.create()
                                        .type(QueueMapping.MappingType.GROUP)
                                        .source(TEST_GROUP)
                                        .queue(
                                            getQueueMapping(parentQueue,
                                                leafQueueName))
                                        .build();

    QueueMapping userQueueMapping1 = QueueMappingBuilder.create()
        .type(QueueMapping.MappingType.GROUP)
        .source(TEST_GROUP1)
        .queue(
            getQueueMapping(parentQueue,
                leafQueueName))
        .build();

    QueueMapping userQueueMapping2 = QueueMappingBuilder.create()
        .type(QueueMapping.MappingType.GROUP)
        .source(TEST_GROUP2)
        .queue(
            getQueueMapping(parentQueue,
                leafQueueName))
        .build();

    queueMappings.add(userQueueMapping);
    queueMappings.add(userQueueMapping1);
    queueMappings.add(userQueueMapping2);
    existingMappings.addAll(queueMappings);
    conf.setQueueMappings(existingMappings);
    return conf;
  }

  /**
   * @param conf, to be modified
   * @return, CS configuration which has C
   * as an auto creation enabled parent queue
   *  <p>
   * root
   * /     \      \       \
   * a        b      c    d
   * / \    /  |  \
   * a1  a2 b1  b2  b3
   */

  public static CapacitySchedulerConfiguration setupQueueConfiguration(
      CapacitySchedulerConfiguration conf) {

    //setup new queues with one of them auto enabled
    // Define top-level queues
    // Set childQueue for root
    conf.setQueues(ROOT,
        new String[] {"a", "b", "c", "d", "esubgroup1", "esubgroup2", "fgroup",
            "a1group", "ggroup", "g"});

    conf.setCapacity(A, A_CAPACITY);
    conf.setCapacity(B, B_CAPACITY);
    conf.setCapacity(C, C_CAPACITY);
    conf.setCapacity(D, D_CAPACITY);
    conf.setCapacity(E_GROUP, ESUBGROUP1_CAPACITY);
    conf.setCapacity(F_GROUP, FGROUP_CAPACITY);

    // Define 2nd-level queues
    conf.setQueues(A, new String[] { "a1", "a2" });
    conf.setCapacity(A1, A1_CAPACITY);
    conf.setUserLimitFactor(A1, 100.0f);
    conf.setCapacity(A2, A2_CAPACITY);
    conf.setUserLimitFactor(A2, 100.0f);

    conf.setQueues(B, new String[] { "b1", "b2", "b3", "b4subgroup1" });
    conf.setCapacity(B1, B1_CAPACITY);
    conf.setUserLimitFactor(B1, 100.0f);
    conf.setCapacity(B2, B2_CAPACITY);
    conf.setUserLimitFactor(B2, 100.0f);
    conf.setCapacity(B3, B3_CAPACITY);
    conf.setUserLimitFactor(B3, 100.0f);
    conf.setCapacity(B4, B4_CAPACITY);
    conf.setUserLimitFactor(B4, 100.0f);

    conf.setQueues(E_GROUP, new String[] {"e"});
    conf.setCapacity(E_SG, 100f);
    conf.setUserLimitFactor(E_SG, 100.0f);
    conf.setQueues(F_GROUP, new String[] {"f"});
    conf.setCapacity(F_SG, 100f);
    conf.setUserLimitFactor(F_SG, 100.0f);

    conf.setUserLimitFactor(C, 1.0f);
    conf.setAutoCreateChildQueueEnabled(C, true);

    //Setup leaf queue template configs
    conf.setAutoCreatedLeafQueueConfigCapacity(C, 50.0f);
    conf.setAutoCreatedLeafQueueConfigMaxCapacity(C, 100.0f);
    conf.setAutoCreatedLeafQueueConfigUserLimit(C, 100);
    conf.setAutoCreatedLeafQueueConfigUserLimitFactor(C, 3.0f);
    conf.setAutoCreatedLeafQueueConfigUserLimitFactor(C, 3.0f);
    conf.setAutoCreatedLeafQueueConfigMaximumAllocation(C,
        "memory-mb=10240,vcores=6");

    conf.setAutoCreatedLeafQueueTemplateCapacityByLabel(C, NODEL_LABEL_GPU,
        NODE_LABEL_GPU_TEMPLATE_CAPACITY);
    conf.setAutoCreatedLeafQueueTemplateMaxCapacity(C, NODEL_LABEL_GPU, 100.0f);
    conf.setAutoCreatedLeafQueueTemplateCapacityByLabel(C, NODEL_LABEL_SSD,
        NODEL_LABEL_SSD_TEMPLATE_CAPACITY);
    conf.setAutoCreatedLeafQueueTemplateMaxCapacity(C, NODEL_LABEL_SSD,
        100.0f);

    conf.setDefaultNodeLabelExpression(C, NODEL_LABEL_GPU);
    conf.setAutoCreatedLeafQueueConfigDefaultNodeLabelExpression
        (C, NODEL_LABEL_SSD);


    LOG.info("Setup " + D + " as an auto leaf creation enabled parent queue");

    conf.setUserLimitFactor(D, 1.0f);
    conf.setAutoCreateChildQueueEnabled(D, true);
    conf.setUserLimit(D, 100);
    conf.setUserLimitFactor(D, 3.0f);

    //Setup leaf queue template configs
    conf.setAutoCreatedLeafQueueConfigCapacity(D, 10.0f);
    conf.setAutoCreatedLeafQueueConfigMaxCapacity(D, 100.0f);
    conf.setAutoCreatedLeafQueueConfigUserLimit(D, 3);
    conf.setAutoCreatedLeafQueueConfigUserLimitFactor(D, 100);

    conf.set(CapacitySchedulerConfiguration.PREFIX + C + DOT
            + CapacitySchedulerConfiguration
            .AUTO_CREATED_LEAF_QUEUE_TEMPLATE_PREFIX
            + DOT + CapacitySchedulerConfiguration.ORDERING_POLICY,
        FAIR_APP_ORDERING_POLICY);

    accessibleNodeLabelsOnC.add(NODEL_LABEL_GPU);
    accessibleNodeLabelsOnC.add(NODEL_LABEL_SSD);
    accessibleNodeLabelsOnC.add(NO_LABEL);

    conf.setAccessibleNodeLabels(C, accessibleNodeLabelsOnC);
    conf.setAccessibleNodeLabels(ROOT, accessibleNodeLabelsOnC);
    conf.setCapacityByLabel(ROOT, NODEL_LABEL_GPU, 100f);
    conf.setCapacityByLabel(ROOT, NODEL_LABEL_SSD, 100f);

    conf.setAccessibleNodeLabels(C, accessibleNodeLabelsOnC);
    conf.setCapacityByLabel(C, NODEL_LABEL_GPU, 100f);
    conf.setCapacityByLabel(C, NODEL_LABEL_SSD, 100f);

    LOG.info("Setup " + D + " as an auto leaf creation enabled parent queue");

    return conf;
  }

  public static CapacitySchedulerConfiguration
      setupQueueConfigurationForSingleAutoCreatedLeafQueue(
      CapacitySchedulerConfiguration conf) {

    //setup new queues with one of them auto enabled
    // Define top-level queues
    // Set childQueue for root
    conf.setQueues(ROOT,
        new String[] {"c"});
    conf.setCapacity(C, 100f);

    conf.setUserLimitFactor(C, 1.0f);
    conf.setAutoCreateChildQueueEnabled(C, true);

    //Setup leaf queue template configs
    conf.setAutoCreatedLeafQueueConfigCapacity(C, 100f);
    conf.setAutoCreatedLeafQueueConfigMaxCapacity(C, 100.0f);
    conf.setAutoCreatedLeafQueueConfigUserLimit(C, 100);
    conf.setAutoCreatedLeafQueueConfigUserLimitFactor(C, 3.0f);

    return conf;
  }

  public static void setupQueueConfigurationForSingleFlexibleAutoCreatedLeafQueue(
          CapacitySchedulerConfiguration conf) {

    //setup new queues with one of them auto enabled
    // Define top-level queues
    // Set childQueue for root
    conf.setQueues(ROOT,
            new String[] {"c"});
    conf.setCapacity(C, 100f);

    conf.setUserLimitFactor(C, 1.0f);
    conf.setAutoQueueCreationV2Enabled(C, true);
  }

  @AfterEach
  public void tearDown() throws Exception {
    if (mockRM != null) {
      mockRM.stop();
    }
  }

  protected void validateCapacities(AutoCreatedLeafQueue autoCreatedLeafQueue,
      float capacity, float absCapacity, float maxCapacity,
      float absMaxCapacity) {
    assertEquals(capacity, autoCreatedLeafQueue.getCapacity(), EPSILON);
    assertEquals(absCapacity, autoCreatedLeafQueue.getAbsoluteCapacity(),
        EPSILON);
    assertEquals(maxCapacity, autoCreatedLeafQueue.getMaximumCapacity(),
        EPSILON);
    assertEquals(absMaxCapacity,
        autoCreatedLeafQueue.getAbsoluteMaximumCapacity(), EPSILON);
  }

  protected void cleanupQueue(String queueName) throws YarnException {
    AutoCreatedLeafQueue queue = (AutoCreatedLeafQueue) cs.getQueue(queueName);
    if (queue != null) {
      setEntitlement(queue, new QueueEntitlement(0.0f, 0.0f));
      ((ManagedParentQueue) queue.getParent()).removeChildQueue(
          queue.getQueuePath());
      cs.getCapacitySchedulerQueueManager().removeQueue(queue.getQueuePath());
    }
  }

  protected ApplicationId submitApp(MockRM rm, CSQueue parentQueue,
      String leafQueueName, String user, int expectedNumAppsInParentQueue,
      int expectedNumAppsInLeafQueue) throws Exception {

    CapacityScheduler capacityScheduler =
        (CapacityScheduler) rm.getResourceScheduler();
    // submit an app
    MockRMAppSubmissionData data =
        MockRMAppSubmissionData.Builder.createWithMemory(GB, rm)
            .withAppName("test-auto-queue-activation")
            .withUser(user)
            .withAcls(null)
            .withQueue(leafQueueName)
            .withUnmanagedAM(false)
            .build();
    RMApp rmApp = MockRMAppSubmitter.submit(rm, data);

    // check preconditions
    List<ApplicationAttemptId> appsInParentQueue =
        capacityScheduler.getAppsInQueue(parentQueue.getQueuePath());
    assertEquals(expectedNumAppsInParentQueue, appsInParentQueue.size());

    List<ApplicationAttemptId> appsInLeafQueue =
        capacityScheduler.getAppsInQueue(leafQueueName);
    assertEquals(expectedNumAppsInLeafQueue, appsInLeafQueue.size());

    return rmApp.getApplicationId();
  }

  protected List<QueueMapping> setupQueueMapping(
      CapacityScheduler newCS, String user, String parentQueue, String queue) {
    List<QueueMapping> queueMappings = new ArrayList<>();
    queueMappings.add(QueueMappingBuilder.create()
                          .type(QueueMapping.MappingType.USER)
                          .source(user)
                          .queue(getQueueMapping(parentQueue, queue))
                          .build());
    newCS.getConfiguration().setQueueMappings(queueMappings);
    return queueMappings;
  }

  protected CapacitySchedulerConfiguration setupSchedulerConfiguration() {
    Configuration schedConf = new Configuration();
    schedConf.setInt(YarnConfiguration.RESOURCE_TYPES
        + ".vcores.minimum-allocation", 1);
    schedConf.setInt(YarnConfiguration.RESOURCE_TYPES
        + ".vcores.maximum-allocation", 8);
    schedConf.setInt(YarnConfiguration.RESOURCE_TYPES
        + ".memory-mb.minimum-allocation", 1024);
    schedConf.setInt(YarnConfiguration.RESOURCE_TYPES
        + ".memory-mb.maximum-allocation", 16384);


    return new CapacitySchedulerConfiguration(schedConf);
  }

  protected void setSchedulerMinMaxAllocation(CapacitySchedulerConfiguration conf) {
    unsetMinMaxAllocation(conf);

    conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, 1);
    conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES, 8);
    conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 1024);
    conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 18384);

  }

  private void unsetMinMaxAllocation(CapacitySchedulerConfiguration conf) {
    conf.unset(YarnConfiguration.RESOURCE_TYPES
        + ".vcores.minimum-allocation");
    conf.unset(YarnConfiguration.RESOURCE_TYPES
        + ".vcores.maximum-allocation");
    conf.unset(YarnConfiguration.RESOURCE_TYPES
        + ".memory-mb.minimum-allocation");
    conf.unset(YarnConfiguration.RESOURCE_TYPES
        + ".memory-mb.maximum-allocation");
  }

  protected MockRM setupSchedulerInstance() throws Exception {

    if (mockRM != null) {
      mockRM.stop();
    }

    CapacitySchedulerConfiguration conf = setupSchedulerConfiguration();
    setupQueueConfiguration(conf);
    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
        ResourceScheduler.class);

    setupQueueMappings(conf, PARENT_QUEUE, true, new int[] {0, 1, 2, 3});

    RMNodeLabelsManager mgr = setupNodeLabelManager(conf);
    MockRM newMockRM = new MockRM(conf) {
      protected RMNodeLabelsManager createNodeLabelManager() {
        return mgr;
      }
    };
    newMockRM.start();
    ((CapacityScheduler) newMockRM.getResourceScheduler()).start();
    setupNodes(newMockRM);
    return newMockRM;
  }

  static String getQueueMapping(String parentQueue, String leafQueue) {
    return parentQueue + DOT + leafQueue;
  }

  protected RMNodeLabelsManager setupNodeLabelManager(
      CapacitySchedulerConfiguration conf) throws IOException {
    final RMNodeLabelsManager mgr = new NullRMNodeLabelsManager();
    mgr.init(conf);
    mgr.addToCluserNodeLabelsWithDefaultExclusivity(
        ImmutableSet.of(NODEL_LABEL_SSD, NODEL_LABEL_GPU));
    mgr.addLabelsToNode(ImmutableMap
        .of(NodeId.newInstance("h1", 0),
            TestUtils.toSet(NODEL_LABEL_SSD)));
    mgr.addLabelsToNode(ImmutableMap
        .of(NodeId.newInstance("h2", 0),
            TestUtils.toSet(NODEL_LABEL_GPU)));
    return mgr;
  }

  protected ApplicationAttemptId submitApp(CapacityScheduler newCS, String user,
      String queue, String parentQueue) {
    ApplicationId appId = BuilderUtils.newApplicationId(1, 1);
    SchedulerEvent addAppEvent = new AppAddedSchedulerEvent(appId, queue, user,
        new ApplicationPlacementContext(queue, parentQueue));
    ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(
        appId, 1);
    SchedulerEvent addAttemptEvent = new AppAttemptAddedSchedulerEvent(
        appAttemptId, false);
    newCS.handle(addAppEvent);
    newCS.handle(addAttemptEvent);
    return appAttemptId;
  }

  protected RMApp submitApp(String user, String queue, String nodeLabel)
      throws Exception {
    MockRMAppSubmissionData data =
        MockRMAppSubmissionData.Builder.createWithMemory(GB, mockRM)
            .withAppName("test-auto-queue-creation" + RandomUtils.nextInt(0, 100))
            .withUser(user)
            .withAcls(null)
            .withQueue(queue)
            .withAmLabel(nodeLabel)
            .build();
    RMApp app = MockRMAppSubmitter.submit(mockRM, data);
    assertEquals(app.getAmNodeLabelExpression(), nodeLabel);
    // check preconditions
    List<ApplicationAttemptId> appsInC = cs.getAppsInQueue(PARENT_QUEUE);
    assertEquals(1, appsInC.size());
    assertNotNull(cs.getQueue(queue));

    return app;
  }

  void setEntitlement(AutoCreatedLeafQueue queue,
      QueueEntitlement entitlement) {
    queue.setCapacity(entitlement.getCapacity());
    queue.setAbsoluteCapacity(
        queue.getParent().getAbsoluteCapacity() * entitlement.getCapacity());
    // note: we currently set maxCapacity to capacity
    // this might be revised later
    queue.setMaxCapacity(entitlement.getMaxCapacity());
  }

  protected void validateUserAndAppLimits(
      AutoCreatedLeafQueue autoCreatedLeafQueue, int maxApps,
      int maxAppsPerUser) {
    assertEquals(maxApps, autoCreatedLeafQueue.getMaxApplications());
    assertEquals(maxAppsPerUser,
        autoCreatedLeafQueue.getMaxApplicationsPerUser());
  }

  protected void validateContainerLimits(
      AutoCreatedLeafQueue autoCreatedLeafQueue, int vCoreLimit,
      long memorySize) {
    assertEquals(vCoreLimit,
        autoCreatedLeafQueue.getMaximumAllocation().getVirtualCores());
    assertEquals(memorySize,
        autoCreatedLeafQueue.getMaximumAllocation().getMemorySize());
  }

  protected void validateInitialQueueEntitlement(CSQueue parentQueue,
      String leafQueueName,
      Map<String, Float> expectedTotalChildQueueAbsCapacityByLabel,
      Set<String> nodeLabels)
      throws SchedulerDynamicEditException, InterruptedException {
    validateInitialQueueEntitlement(mockRM, cs, parentQueue, leafQueueName,
        expectedTotalChildQueueAbsCapacityByLabel, nodeLabels);
  }

  protected void validateInitialQueueEntitlement(ResourceManager rm,
      CSQueue parentQueue, String leafQueueName,
      Map<String, Float> expectedTotalChildQueueAbsCapacityByLabel,
      Set<String> nodeLabels)
      throws SchedulerDynamicEditException, InterruptedException {
    validateInitialQueueEntitlement(rm,
        (CapacityScheduler) rm.getResourceScheduler(), parentQueue,
        leafQueueName, expectedTotalChildQueueAbsCapacityByLabel, nodeLabels);
  }

  protected void validateInitialQueueEntitlement(ResourceManager rm,
      CapacityScheduler capacityScheduler, CSQueue parentQueue,
      String leafQueueName,
      Map<String, Float> expectedTotalChildQueueAbsCapacityByLabel,
      Set<String> nodeLabels)
      throws SchedulerDynamicEditException, InterruptedException {
    ManagedParentQueue autoCreateEnabledParentQueue =
        (ManagedParentQueue) parentQueue;

    GuaranteedOrZeroCapacityOverTimePolicy policy =
        (GuaranteedOrZeroCapacityOverTimePolicy) autoCreateEnabledParentQueue
            .getAutoCreatedQueueManagementPolicy();

    AutoCreatedLeafQueue leafQueue =
        (AutoCreatedLeafQueue) capacityScheduler.getQueue(leafQueueName);

    Map<String, QueueEntitlement> expectedEntitlements = new HashMap<>();
    QueueCapacities cap = autoCreateEnabledParentQueue.getLeafQueueTemplate()
        .getQueueCapacities();

    for (String label : nodeLabels) {
      validateCapacitiesByLabel(autoCreateEnabledParentQueue, leafQueue, label);
      assertEquals(true, policy.isActive(leafQueue, label));

      assertEquals(expectedTotalChildQueueAbsCapacityByLabel.get(label),
          policy.getAbsoluteActivatedChildQueueCapacity(label), EPSILON);

      QueueEntitlement expectedEntitlement = new QueueEntitlement(
          cap.getCapacity(label), cap.getMaximumCapacity(label));

      expectedEntitlements.put(label, expectedEntitlement);

      validateEffectiveMinResource(rm, capacityScheduler, leafQueue, label,
          expectedEntitlements);
    }
  }

  protected void validateCapacitiesByLabel(ManagedParentQueue
      autoCreateEnabledParentQueue, AutoCreatedLeafQueue leafQueue, String
      label) throws InterruptedException {
    assertEquals(autoCreateEnabledParentQueue.getLeafQueueTemplate()
            .getQueueCapacities().getCapacity(label),
        leafQueue.getQueueCapacities()
            .getCapacity(label), EPSILON);
    assertEquals(autoCreateEnabledParentQueue.getLeafQueueTemplate()
            .getQueueCapacities().getMaximumCapacity(label),
        leafQueue.getQueueCapacities()
            .getMaximumCapacity(label), EPSILON);
  }

  protected void validateEffectiveMinResource(ResourceManager rm,
      CapacityScheduler cs, CSQueue leafQueue, String label,
      Map<String, QueueEntitlement> expectedQueueEntitlements) {
    ManagedParentQueue parentQueue = (ManagedParentQueue) leafQueue.getParent();

    Resource resourceByLabel = rm.getRMContext().getNodeLabelManager()
        .getResourceByLabel(label, cs.getClusterResource());
    Resource effMinCapacity = Resources.multiply(resourceByLabel,
        expectedQueueEntitlements.get(label).getCapacity()
            * parentQueue.getQueueCapacities().getAbsoluteCapacity(label));
    assertEquals(effMinCapacity, Resources.multiply(resourceByLabel,
        leafQueue.getQueueCapacities().getAbsoluteCapacity(label)));

    if (expectedQueueEntitlements.get(label).getCapacity() > EPSILON) {
      if (leafQueue.getCapacityConfigType().equals(ABSOLUTE_RESOURCE)) {
        QueuePath templatePrefix = QueuePrefixes.getAutoCreatedQueueObjectTemplateConfPrefix(
            parentQueue.getQueuePathObject());
        Resource resourceTemplate = parentQueue.getLeafQueueTemplate().getLeafQueueConfigs()
            .getMinimumResourceRequirement(label, templatePrefix, RESOURCE_TYPES);
        assertEquals(resourceTemplate, leafQueue.getEffectiveCapacity(label));
      } else {
        assertEquals(effMinCapacity, leafQueue.getEffectiveCapacity(label));
      }
    } else {
      assertEquals(Resource.newInstance(0, 0),
          leafQueue.getEffectiveCapacity(label));
    }

    if (leafQueue.getQueueCapacities().getAbsoluteCapacity(label) > 0) {
      assertTrue(Resources.greaterThan(cs.getResourceCalculator(),
          cs.getClusterResource(), effMinCapacity, Resources.none()));
    } else {
      assertTrue(Resources.equals(effMinCapacity, Resources.none()));
    }
  }

  protected void validateActivatedQueueEntitlement(CSQueue parentQueue,
      String leafQueueName, Map<String, Float>
      expectedTotalChildQueueAbsCapacity,
      List<QueueManagementChange> queueManagementChanges, Set<String>
      expectedNodeLabels)
      throws SchedulerDynamicEditException {
    ManagedParentQueue autoCreateEnabledParentQueue =
        (ManagedParentQueue) parentQueue;

    GuaranteedOrZeroCapacityOverTimePolicy policy =
        (GuaranteedOrZeroCapacityOverTimePolicy) autoCreateEnabledParentQueue
            .getAutoCreatedQueueManagementPolicy();

    QueueCapacities cap = autoCreateEnabledParentQueue.getLeafQueueTemplate()
        .getQueueCapacities();

    AutoCreatedLeafQueue leafQueue = (AutoCreatedLeafQueue)
        cs.getQueue(leafQueueName);

    Map<String, QueueEntitlement> expectedEntitlements = new HashMap<>();

    for (String label : expectedNodeLabels) {
      //validate leaf queue state
      assertEquals(true, policy.isActive(leafQueue, label));

      QueueEntitlement expectedEntitlement = new QueueEntitlement(
          cap.getCapacity(label), cap.getMaximumCapacity(label));

      //validate parent queue state
      assertEquals(expectedTotalChildQueueAbsCapacity.get(label),
          policy.getAbsoluteActivatedChildQueueCapacity(label), EPSILON);

      expectedEntitlements.put(label, expectedEntitlement);
    }

    //validate capacity
    validateQueueEntitlements(leafQueueName, expectedEntitlements,
        queueManagementChanges, expectedNodeLabels);
  }

  protected void validateDeactivatedQueueEntitlement(CSQueue parentQueue,
      String leafQueueName, Map<String, Float>
      expectedTotalChildQueueAbsCapacity,
      List<QueueManagementChange>
          queueManagementChanges)
      throws SchedulerDynamicEditException {
    QueueEntitlement expectedEntitlement =
        new QueueEntitlement(0.0f, 1.0f);

    ManagedParentQueue autoCreateEnabledParentQueue =
        (ManagedParentQueue) parentQueue;

    AutoCreatedLeafQueue leafQueue =
        (AutoCreatedLeafQueue) cs.getQueue(leafQueueName);

    GuaranteedOrZeroCapacityOverTimePolicy policy =
        (GuaranteedOrZeroCapacityOverTimePolicy) autoCreateEnabledParentQueue
            .getAutoCreatedQueueManagementPolicy();

    Map<String, QueueEntitlement> expectedEntitlements = new HashMap<>();

    for (String label : accessibleNodeLabelsOnC) {
      //validate parent queue state
      LOG.info("Validating label " + label);
      assertEquals(expectedTotalChildQueueAbsCapacity.get(label), policy
          .getAbsoluteActivatedChildQueueCapacity(label), EPSILON);

      //validate leaf queue state
      assertEquals(false, policy.isActive(leafQueue, label));
      expectedEntitlements.put(label, expectedEntitlement);
    }

    //validate capacity
    validateQueueEntitlements(leafQueueName, expectedEntitlements,
        queueManagementChanges, accessibleNodeLabelsOnC);
  }

  void validateQueueEntitlements(String leafQueueName,
      Map<String, QueueEntitlement> expectedEntitlements,
      List<QueueManagementChange>
          queueEntitlementChanges, Set<String> expectedNodeLabels) {
    AutoCreatedLeafQueue leafQueue = (AutoCreatedLeafQueue) cs.getQueue(
        leafQueueName);
    validateQueueEntitlementChanges(leafQueue, expectedEntitlements,
        queueEntitlementChanges, expectedNodeLabels);
  }

  private void validateQueueEntitlementChanges(AutoCreatedLeafQueue leafQueue,
      Map<String, QueueEntitlement> expectedQueueEntitlements,
      final List<QueueManagementChange> queueEntitlementChanges, Set<String>
      expectedNodeLabels) {
    boolean found = false;

    for (QueueManagementChange entitlementChange : queueEntitlementChanges) {
      if (leafQueue.getQueuePath().equals(
          entitlementChange.getQueue().getQueuePath())) {

        AutoCreatedLeafQueueConfig updatedQueueTemplate =
            entitlementChange.getUpdatedQueueTemplate();

        for (String label : expectedNodeLabels) {
          QueueEntitlement newEntitlement = new QueueEntitlement(
              updatedQueueTemplate.getQueueCapacities().getCapacity(label),
              updatedQueueTemplate.getQueueCapacities().getMaximumCapacity
                  (label));
          assertEquals(expectedQueueEntitlements.get(label), newEntitlement);
          validateEffectiveMinResource(mockRM, cs, leafQueue, label,
              expectedQueueEntitlements);
        }
        found = true;
        break;
      }
    }
    if (!found) {
      fail(
          "Could not find the specified leaf queue in entitlement changes : "
              + leafQueue.getQueuePath());
    }
  }

  protected Map<String, Float> populateExpectedAbsCapacityByLabelForParentQueue
      (int numLeafQueues) {
    Map<String, Float> expectedChildQueueAbsCapacity = new HashMap<>();
    expectedChildQueueAbsCapacity.put(NODEL_LABEL_GPU,
        NODE_LABEL_GPU_TEMPLATE_CAPACITY/100 * numLeafQueues);
    expectedChildQueueAbsCapacity.put(NODEL_LABEL_SSD,
        NODEL_LABEL_SSD_TEMPLATE_CAPACITY/100 * numLeafQueues);
    expectedChildQueueAbsCapacity.put(NO_LABEL, 0.1f * numLeafQueues);
    return expectedChildQueueAbsCapacity;
  }
}