TestQueueParsing.java

/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;

import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.service.ServiceOperations;
import org.apache.hadoop.service.ServiceStateException;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.policy.PriorityUtilizationQueueOrderingPolicy;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet;

import static java.util.Collections.emptySet;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils.toSet;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assumptions.assumeTrue;

public class TestQueueParsing {

  private static final Logger LOG =
      LoggerFactory.getLogger(TestQueueParsing.class);
  private static final String A_PATH = CapacitySchedulerConfiguration.ROOT + ".a";
  private static final String B_PATH = CapacitySchedulerConfiguration.ROOT + ".b";
  private static final String C_PATH = CapacitySchedulerConfiguration.ROOT + ".c";
  private static final String A1_PATH = A_PATH + ".a1";
  private static final String A2_PATH = A_PATH + ".a2";
  private static final String B1_PATH = B_PATH + ".b1";
  private static final String B2_PATH = B_PATH + ".b2";
  private static final String B3_PATH = B_PATH + ".b3";
  private static final String B4_PATH = B_PATH + ".b4";
  private static final String C1_PATH = C_PATH + ".c1";
  private static final String C2_PATH = C_PATH + ".c2";
  private static final String C3_PATH = C_PATH + ".c3";
  private static final String C4_PATH = C_PATH + ".c4";
  private static final String C11_PATH = C1_PATH + ".c11";
  private static final String C12_PATH = C1_PATH + ".c12";
  private static final String C13_PATH = C1_PATH + ".c13";
  private static final String AX_PATH = "root.a.x";
  private static final String AY_PATH = "root.a.y";
  private static final String X_PATH = "root.x";

  private static final QueuePath ROOT = new QueuePath(CapacitySchedulerConfiguration.ROOT);
  private static final QueuePath A = new QueuePath(A_PATH);
  private static final QueuePath B = new QueuePath(B_PATH);
  private static final QueuePath C = new QueuePath(C_PATH);
  private static final QueuePath A1 = new QueuePath(A1_PATH);
  private static final QueuePath A2 = new QueuePath(A2_PATH);
  private static final QueuePath B1 = new QueuePath(B1_PATH);
  private static final QueuePath B2 = new QueuePath(B2_PATH);
  private static final QueuePath B3 = new QueuePath(B3_PATH);
  private static final QueuePath B4 = new QueuePath(B4_PATH);
  private static final QueuePath C1 = new QueuePath(C1_PATH);
  private static final QueuePath C2 = new QueuePath(C2_PATH);
  private static final QueuePath C3 = new QueuePath(C3_PATH);
  private static final QueuePath C4 = new QueuePath(C4_PATH);
  private static final QueuePath C11 = new QueuePath(C11_PATH);
  private static final QueuePath C12 = new QueuePath(C12_PATH);
  private static final QueuePath C13 = new QueuePath(C13_PATH);
  private static final QueuePath AX = new QueuePath(AX_PATH);
  private static final QueuePath AY = new QueuePath(AY_PATH);
  private static final QueuePath X = new QueuePath(X_PATH);
  private static final double DELTA = 0.001;
  private static final int GB = 1024;

  private RMNodeLabelsManager nodeLabelManager;

  @BeforeEach
  public void setup() throws Exception{
    YarnConfiguration conf = new YarnConfiguration();
    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
            ResourceScheduler.class);
    Resource clusterResource = Resources.createResource(2 * 8 * GB, 2 * 16);

    nodeLabelManager = new NullRMNodeLabelsManager();
    nodeLabelManager.init(conf);
    ((NullRMNodeLabelsManager)nodeLabelManager)
        .setResourceForLabel(CommonNodeLabelsManager.NO_LABEL, clusterResource);
  }

  private void setupQueueConfiguration(CapacitySchedulerConfiguration conf) {
    // Define top-level queues
    conf.setQueues(ROOT, new String[] {"a", "b", "c"});

    conf.setCapacity(A, 10);
    conf.setMaximumCapacity(A, 15);
    conf.setCapacity(B, 20);
    conf.setCapacity(C, 70);
    conf.setMaximumCapacity(C, 70);

    LOG.info("Setup top-level queues");

    // Define 2nd-level queues
    conf.setQueues(A, new String[] {"a1", "a2"});
    conf.setCapacity(A1, 30);
    conf.setMaximumCapacity(A1, 45);
    conf.setCapacity(A2, 70);
    conf.setMaximumCapacity(A2, 85);

    conf.setQueues(B, new String[] {"b1", "b2", "b3"});
    conf.setCapacity(B1, 50);
    conf.setMaximumCapacity(B1, 85);
    conf.setCapacity(B2, 30);
    conf.setMaximumCapacity(B2, 35);
    conf.setCapacity(B3, 20);
    conf.setMaximumCapacity(B3, 35);

    conf.setQueues(C, new String[] {"c1", "c2", "c3", "c4"});
    conf.setCapacity(C1, 50);
    conf.setMaximumCapacity(C1, 55);
    conf.setCapacity(C2, 10);
    conf.setMaximumCapacity(C2, 25);
    conf.setCapacity(C3, 35);
    conf.setMaximumCapacity(C3, 38);
    conf.setCapacity(C4, 5);
    conf.setMaximumCapacity(C4, 5);

    LOG.info("Setup 2nd-level queues");

    // Define 3rd-level queues
    conf.setQueues(C1, new String[] {"c11", "c12", "c13"});
    conf.setCapacity(C11, 15);
    conf.setMaximumCapacity(C11, 30);
    conf.setCapacity(C12, 45);
    conf.setMaximumCapacity(C12, 70);
    conf.setCapacity(C13, 40);
    conf.setMaximumCapacity(C13, 40);

    LOG.info("Setup 3rd-level queues");
  }

  private void setupQueueConfigurationWithSpacesShouldBeTrimmed(
          CapacitySchedulerConfiguration conf) {
    // Define top-level queues
    conf.set(
            QueuePrefixes.getQueuePrefix(ROOT)
                    + CapacitySchedulerConfiguration.QUEUES, " a ,b, c");

    conf.setCapacity(A, 10);
    conf.setMaximumCapacity(A, 15);
    conf.setCapacity(B, 20);
    conf.setCapacity(C, 70);
    conf.setMaximumCapacity(C, 70);
  }

  private void setupNestedQueueConfigurationWithSpacesShouldBeTrimmed(
          CapacitySchedulerConfiguration conf) {
    // Define top-level queues
    conf.set(
            QueuePrefixes.getQueuePrefix(ROOT)
                    + CapacitySchedulerConfiguration.QUEUES, " a ,b, c");

    conf.setCapacity(A, 10);
    conf.setMaximumCapacity(A, 15);
    conf.setCapacity(B, 20);
    conf.setCapacity(C, 70);
    conf.setMaximumCapacity(C, 70);

    // sub queues for A
    conf.set(QueuePrefixes.getQueuePrefix(A)
            + CapacitySchedulerConfiguration.QUEUES, "a1, a2 ");

    conf.setCapacity(A1, 60);
    conf.setCapacity(A2, 40);
  }

  private void setupQueueConfigurationWithoutLabels(CapacitySchedulerConfiguration conf) {
    // Define top-level queues
    conf.setQueues(ROOT, new String[] {"a", "b"});

    conf.setCapacity(A, 10);
    conf.setMaximumCapacity(A, 15);
    conf.setCapacity(B, 90);

    LOG.info("Setup top-level queues");

    // Define 2nd-level queues
    conf.setQueues(A, new String[] {"a1", "a2"});
    conf.setCapacity(A1, 30);
    conf.setMaximumCapacity(A1, 45);
    conf.setCapacity(A2, 70);
    conf.setMaximumCapacity(A2, 85);

    conf.setQueues(B, new String[] {"b1", "b2", "b3"});
    conf.setCapacity(B1, 50);
    conf.setMaximumCapacity(B1, 85);
    conf.setCapacity(B2, 30);
    conf.setMaximumCapacity(B2, 35);
    conf.setCapacity(B3, 20);
    conf.setMaximumCapacity(B3, 35);
  }

  private void setupQueueConfigurationWithLabels(CapacitySchedulerConfiguration conf) {
    // Define top-level queues
    conf.setQueues(ROOT, new String[] {"a", "b"});
    conf.setCapacityByLabel(ROOT, "red", 100);
    conf.setCapacityByLabel(ROOT, "blue", 100);

    conf.setCapacity(A, 10);
    conf.setMaximumCapacity(A, 15);
    conf.setCapacity(B, 90);

    LOG.info("Setup top-level queues");

    // Define 2nd-level queues
    conf.setQueues(A, new String[] {"a1", "a2"});
    conf.setAccessibleNodeLabels(A, ImmutableSet.of("red", "blue"));
    conf.setCapacityByLabel(A, "red", 50);
    conf.setMaximumCapacityByLabel(A, "red", 50);
    conf.setCapacityByLabel(A, "blue", 50);

    conf.setCapacity(A1, 30);
    conf.setMaximumCapacity(A1, 45);
    conf.setCapacityByLabel(A1, "red", 50);
    conf.setCapacityByLabel(A1, "blue", 100);

    conf.setCapacity(A2, 70);
    conf.setMaximumCapacity(A2, 85);
    conf.setAccessibleNodeLabels(A2, ImmutableSet.of("red"));
    conf.setCapacityByLabel(A2, "red", 50);
    conf.setMaximumCapacityByLabel(A2, "red", 60);

    conf.setQueues(B, new String[] {"b1", "b2", "b3"});
    conf.setAccessibleNodeLabels(B, ImmutableSet.of("red", "blue"));
    conf.setCapacityByLabel(B, "red", 50);
    conf.setCapacityByLabel(B, "blue", 50);

    conf.setCapacity(B1, 50);
    conf.setMaximumCapacity(B1, 85);
    conf.setCapacityByLabel(B1, "red", 50);
    conf.setCapacityByLabel(B1, "blue", 50);

    conf.setCapacity(B2, 30);
    conf.setMaximumCapacity(B2, 35);
    conf.setCapacityByLabel(B2, "red", 25);
    conf.setCapacityByLabel(B2, "blue", 25);

    conf.setCapacity(B3, 20);
    conf.setMaximumCapacity(B3, 35);
    conf.setCapacityByLabel(B3, "red", 25);
    conf.setCapacityByLabel(B3, "blue", 25);
  }

  private void setupQueueConfigurationWithLabelsAndReleaseCheck(
      CapacitySchedulerConfiguration conf) {
    // Define top-level queues
    conf.setQueues(ROOT, new String[] {"a", "b"});
    conf.setCapacityByLabel(ROOT, "red", 100);
    conf.setCapacityByLabel(ROOT, "blue", 100);

    // The cap <= max-cap check is not needed
    conf.setCapacity(A, 50);
    conf.setMaximumCapacity(A, 100);

    conf.setCapacity(B, 50);
    conf.setMaximumCapacity(B, 100);

    LOG.info("Setup top-level queues");

    // Define 2nd-level queues
    conf.setQueues(A, new String[] {"a1", "a2"});
    conf.setAccessibleNodeLabels(A, ImmutableSet.of("red", "blue"));
    conf.setCapacityByLabel(A, "red", 50);
    conf.setMaximumCapacityByLabel(A, "red", 100);
    conf.setCapacityByLabel(A, "blue", 30);
    conf.setMaximumCapacityByLabel(A, "blue", 50);

    conf.setCapacity(A1, 60);
    conf.setMaximumCapacity(A1, 60);
    conf.setCapacityByLabel(A1, "red", 60);
    conf.setMaximumCapacityByLabel(A1, "red", 30);
    conf.setCapacityByLabel(A1, "blue", 100);
    conf.setMaximumCapacityByLabel(A1, "blue", 100);

    conf.setCapacity(A2, 40);
    conf.setMaximumCapacity(A2, 85);
    conf.setAccessibleNodeLabels(A2, ImmutableSet.of("red"));
    conf.setCapacityByLabel(A2, "red", 40);
    conf.setMaximumCapacityByLabel(A2, "red", 60);

    conf.setQueues(B, new String[] {"b1", "b2", "b3"});
    conf.setAccessibleNodeLabels(B, ImmutableSet.of("red", "blue"));
    conf.setCapacityByLabel(B, "red", 50);
    conf.setMaximumCapacityByLabel(B, "red", 100);
    conf.setCapacityByLabel(B, "blue", 70);
    conf.setMaximumCapacityByLabel(B, "blue", 100);

    conf.setCapacity(B1, 10);
    conf.setMaximumCapacity(B1, 10);
    conf.setCapacityByLabel(B1, "red", 60);
    conf.setMaximumCapacityByLabel(B1, "red", 30);
    conf.setCapacityByLabel(B1, "blue", 50);
    conf.setMaximumCapacityByLabel(B1, "blue", 100);

    conf.setCapacity(B2, 80);
    conf.setMaximumCapacity(B2, 40);
    conf.setCapacityByLabel(B2, "red", 30);
    conf.setCapacityByLabel(B2, "blue", 25);

    conf.setCapacity(B3, 10);
    conf.setMaximumCapacity(B3, 25);
    conf.setCapacityByLabel(B3, "red", 10);
    conf.setCapacityByLabel(B3, "blue", 25);
  }

  private void setupQueueConfigurationWithLabelsInherit(
          CapacitySchedulerConfiguration conf) {
    // Define top-level queues
    conf.setQueues(ROOT, new String[] {"a", "b"});
    conf.setCapacityByLabel(ROOT, "red", 100);
    conf.setCapacityByLabel(ROOT, "blue", 100);

    // Set A configuration
    conf.setCapacity(A, 10);
    conf.setMaximumCapacity(A, 15);
    conf.setQueues(A, new String[] {"a1", "a2"});
    conf.setAccessibleNodeLabels(A, ImmutableSet.of("red", "blue"));
    conf.setCapacityByLabel(A, "red", 100);
    conf.setCapacityByLabel(A, "blue", 100);

    // Set B configuraiton
    conf.setCapacity(B, 90);
    conf.setAccessibleNodeLabels(B, CommonNodeLabelsManager.EMPTY_STRING_SET);

    // Define 2nd-level queues
    conf.setCapacity(A1, 30);
    conf.setMaximumCapacity(A1, 45);
    conf.setCapacityByLabel(A1, "red", 50);
    conf.setCapacityByLabel(A1, "blue", 100);

    conf.setCapacity(A2, 70);
    conf.setMaximumCapacity(A2, 85);
    conf.setAccessibleNodeLabels(A2, ImmutableSet.of("red"));
    conf.setCapacityByLabel(A2, "red", 50);
  }

  private void setupQueueConfigurationWithSingleLevel(
          CapacitySchedulerConfiguration conf) {
    // Define top-level queues
    conf.setQueues(ROOT, new String[] {"a", "b"});

    // Set A configuration
    conf.setCapacity(A, 10);
    conf.setMaximumCapacity(A, 15);
    conf.setAccessibleNodeLabels(A, ImmutableSet.of("red", "blue"));
    conf.setCapacityByLabel(A, "red", 90);
    conf.setCapacityByLabel(A, "blue", 90);

    // Set B configuraiton
    conf.setCapacity(B, 90);
    conf.setAccessibleNodeLabels(B, ImmutableSet.of("red", "blue"));
    conf.setCapacityByLabel(B, "red", 10);
    conf.setCapacityByLabel(B, "blue", 10);
  }

  public MockRM createMockRMWithoutLabels(YarnConfiguration conf) throws Exception {
    return createMockRMWithLabels(conf, emptySet());
  }

  public MockRM createMockRMWithLabels(
      YarnConfiguration conf, Set<String> nodeLabels) throws Exception {
    nodeLabelManager.addToCluserNodeLabelsWithDefaultExclusivity(nodeLabels);
    Map<NodeId, Set<String>> nodeIdMap = new HashMap<>();
    int i = 0;
    for (String label : nodeLabels) {
      nodeIdMap.put(NodeId.newInstance("h" + ++i, 0), toSet(label));
    }
    nodeLabelManager.addLabelsToNode(nodeIdMap);

    MockRM rm = new MockRM(conf) {
      @Override
      protected RMNodeLabelsManager createNodeLabelManager() {
        return nodeLabelManager;
      }
    };
    rm.getRMContext().setNodeLabelManager(nodeLabelManager);
    rm.start();

    for (NodeId key : nodeIdMap.keySet()) {
      rm.registerNode(key.toString(), 8 * GB);
    }

    return rm;
  }

  @Test
  public void testQueueParsing() throws Exception {
    CapacitySchedulerConfiguration csConf =
        new CapacitySchedulerConfiguration();
    setupQueueConfiguration(csConf);
    YarnConfiguration conf = new YarnConfiguration(csConf);

    MockRM rm = createMockRMWithoutLabels(conf);
    CapacityScheduler capacityScheduler = (CapacityScheduler) rm.getResourceScheduler();

    CSQueue a = capacityScheduler.getQueue("a");
    assertEquals(0.10, a.getAbsoluteCapacity(), DELTA);
    assertEquals(0.15, a.getAbsoluteMaximumCapacity(), DELTA);

    CSQueue b1 = capacityScheduler.getQueue("b1");
    assertEquals(0.2 * 0.5, b1.getAbsoluteCapacity(), DELTA);
    assertEquals(
        0.85, b1.getAbsoluteMaximumCapacity(), DELTA, "Parent B has no MAX_CAP");

    CSQueue c12 = capacityScheduler.getQueue("c12");
    assertEquals(0.7 * 0.5 * 0.45, c12.getAbsoluteCapacity(), DELTA);
    assertEquals(0.7 * 0.55 * 0.7,
        c12.getAbsoluteMaximumCapacity(), DELTA);
  }

  @Test
  public void testRootQueueParsing() throws Exception {
    assertThrows(IllegalArgumentException.class, ()->{
      CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration();
      // non-100 percent value will throw IllegalArgumentException
      csConf.setCapacity(ROOT, 90);
    });
  }
  
  @Test
  public void testQueueParsingReinitializeWithLabels() throws Exception {
    CapacitySchedulerConfiguration csConf =
        new CapacitySchedulerConfiguration();
    setupQueueConfigurationWithoutLabels(csConf);
    YarnConfiguration conf = new YarnConfiguration(csConf);

    MockRM rm = createMockRMWithLabels(conf, ImmutableSet.of("red", "blue"));

    CapacityScheduler capacityScheduler = (CapacityScheduler) rm.getResourceScheduler();
    csConf = new CapacitySchedulerConfiguration();
    setupQueueConfigurationWithLabels(csConf);
    conf = new YarnConfiguration(csConf);
    capacityScheduler.reinitialize(conf, rm.getRMContext());
    checkQueueLabels(capacityScheduler);
    ServiceOperations.stopQuietly(rm);
  }

  @Test
  public void testQueueParsingWithLabels() throws Exception {
    CapacitySchedulerConfiguration csConf =
            new CapacitySchedulerConfiguration();
    setupQueueConfigurationWithLabels(csConf);
    YarnConfiguration conf = new YarnConfiguration(csConf);

    MockRM rm = createMockRMWithLabels(conf, ImmutableSet.of("red", "blue"));

    CapacityScheduler capacityScheduler = (CapacityScheduler) rm.getResourceScheduler();
    checkQueueLabels(capacityScheduler);
    ServiceOperations.stopQuietly(rm);
  }

  private void checkQueueLabels(CapacityScheduler capacityScheduler) {
    // queue-A is red, blue
    assertTrue(capacityScheduler.getQueue("a").getAccessibleNodeLabels()
        .containsAll(ImmutableSet.of("red", "blue")));

    // queue-A1 inherits A's configuration
    assertTrue(capacityScheduler.getQueue("a1")
        .getAccessibleNodeLabels().containsAll(ImmutableSet.of("red", "blue")));

    // queue-A2 is "red"
    assertEquals(1, capacityScheduler.getQueue("a2")
        .getAccessibleNodeLabels().size());
    assertTrue(capacityScheduler.getQueue("a2")
        .getAccessibleNodeLabels().contains("red"));

    // queue-B is "red"/"blue"
    assertTrue(capacityScheduler.getQueue("b").getAccessibleNodeLabels()
        .containsAll(ImmutableSet.of("red", "blue")));

    // queue-B2 inherits "red"/"blue"
    assertTrue(capacityScheduler.getQueue("b2")
        .getAccessibleNodeLabels().containsAll(ImmutableSet.of("red", "blue")));

    // check capacity of A2
    CSQueue qA2 = capacityScheduler.getQueue("a2");
    assertEquals(0.7, qA2.getCapacity(), DELTA);
    assertEquals(0.5, qA2.getQueueCapacities().getCapacity("red"), DELTA);
    assertEquals(0.07, qA2.getAbsoluteCapacity(), DELTA);
    assertEquals(0.25, qA2.getQueueCapacities().getAbsoluteCapacity("red"), DELTA);
    assertEquals(0.1275, qA2.getAbsoluteMaximumCapacity(), DELTA);
    assertEquals(0.3, qA2.getQueueCapacities().getAbsoluteMaximumCapacity("red"), DELTA);

    // check capacity of B3
    CSQueue qB3 = capacityScheduler.getQueue("b3");
    assertEquals(0.18, qB3.getAbsoluteCapacity(), DELTA);
    assertEquals(0.125, qB3.getQueueCapacities().getAbsoluteCapacity("red"), DELTA);
    assertEquals(0.35, qB3.getAbsoluteMaximumCapacity(), DELTA);
    assertEquals(1, qB3.getQueueCapacities().getAbsoluteMaximumCapacity("red"), DELTA);
  }

  @Test
  public void testQueueParsingWithLeafQueueDisableElasticity() throws Exception {
    CapacitySchedulerConfiguration csConf =
        new CapacitySchedulerConfiguration();
    setupQueueConfigurationWithLabelsAndReleaseCheck(csConf);
    YarnConfiguration conf = new YarnConfiguration(csConf);

    MockRM rm = createMockRMWithLabels(conf, ImmutableSet.of("red", "blue"));

    CapacityScheduler capacityScheduler = (CapacityScheduler) rm.getResourceScheduler();
    checkQueueLabelsWithLeafQueueDisableElasticity(capacityScheduler);
    ServiceOperations.stopQuietly(rm);
  }

  private void checkQueueLabelsWithLeafQueueDisableElasticity(
      CapacityScheduler capacityScheduler) {
    // queue-A is red, blue
    assertTrue(capacityScheduler.getQueue("a").getAccessibleNodeLabels()
            .containsAll(ImmutableSet.of("red", "blue")));

    // queue-A1 inherits A's configuration
    assertTrue(capacityScheduler.getQueue("a1")
            .getAccessibleNodeLabels().containsAll(ImmutableSet.of("red", "blue")));

    // queue-A2 is "red"
    assertEquals(1, capacityScheduler.getQueue("a2")
            .getAccessibleNodeLabels().size());
    assertTrue(capacityScheduler.getQueue("a2")
            .getAccessibleNodeLabels().contains("red"));

    // queue-B is "red"/"blue"
    assertTrue(capacityScheduler.getQueue("b").getAccessibleNodeLabels()
            .containsAll(ImmutableSet.of("red", "blue")));

    // queue-B2 inherits "red"/"blue"
    assertTrue(capacityScheduler.getQueue("b2")
            .getAccessibleNodeLabels().containsAll(ImmutableSet.of("red", "blue")));

    // check capacity of A2
    CSQueue qA2 = capacityScheduler.getQueue("a2");
    assertEquals(0.4, qA2.getCapacity(), DELTA);
    assertEquals(0.4, qA2.getQueueCapacities()
            .getCapacity("red"), DELTA);
    assertEquals(0.2, qA2.getAbsoluteCapacity(), DELTA);
    assertEquals(0.2, qA2.getQueueCapacities()
            .getAbsoluteCapacity("red"), DELTA);
    assertEquals(0.85, qA2.getAbsoluteMaximumCapacity(), DELTA);
    assertEquals(0.6, qA2.getQueueCapacities()
            .getAbsoluteMaximumCapacity("red"), DELTA);

    // check disable elasticity at leaf queue level without label
    CSQueue qB2 = capacityScheduler.getQueue("b2");
    assertEquals(0.4, qB2.getAbsoluteCapacity(), DELTA);
    assertEquals(0.4, qB2.getAbsoluteMaximumCapacity(), DELTA);

    // check disable elasticity at leaf queue level with label
    CSQueue qA1 = capacityScheduler.getQueue("a1");
    assertEquals(0.3, qA1.getQueueCapacities().
            getAbsoluteCapacity("red"), DELTA);
    assertEquals(0.3, qA1.getQueueCapacities().
            getAbsoluteMaximumCapacity("red"), DELTA);

    CSQueue qB1 = capacityScheduler.getQueue("b1");
    assertEquals(0.3, qB1.getQueueCapacities()
            .getAbsoluteCapacity("red"), DELTA);
    assertEquals(0.3, qB1.getQueueCapacities()
            .getAbsoluteMaximumCapacity("red"), DELTA);

    // check capacity of B3
    CSQueue qB3 = capacityScheduler.getQueue("b3");
    assertEquals(0.05, qB3.getAbsoluteCapacity(), DELTA);
    assertEquals(0.175, qB3.getQueueCapacities()
            .getAbsoluteCapacity("blue"), DELTA);
    assertEquals(0.25, qB3.getAbsoluteMaximumCapacity(), DELTA);
    assertEquals(1, qB3.getQueueCapacities()
            .getAbsoluteMaximumCapacity("blue"), DELTA);
  }

  @Test
  public void testQueueParsingWithLabelsInherit() throws Exception {
    CapacitySchedulerConfiguration csConf =
        new CapacitySchedulerConfiguration();
    setupQueueConfigurationWithLabelsInherit(csConf);
    YarnConfiguration conf = new YarnConfiguration(csConf);

    MockRM rm = createMockRMWithLabels(conf, ImmutableSet.of("red", "blue"));

    CapacityScheduler capacityScheduler = (CapacityScheduler) rm.getResourceScheduler();
    checkQueueLabelsInheritConfig(capacityScheduler);
    ServiceOperations.stopQuietly(rm);
  }

  private void checkQueueLabelsInheritConfig(CapacityScheduler capacityScheduler) {
    // queue-A is red, blue
    assertTrue(capacityScheduler.getQueue("a").getAccessibleNodeLabels()
            .containsAll(ImmutableSet.of("red", "blue")));

    // queue-A1 inherits A's configuration
    assertTrue(capacityScheduler.getQueue("a1")
            .getAccessibleNodeLabels().containsAll(ImmutableSet.of("red", "blue")));

    // queue-A2 is "red"
    assertEquals(1, capacityScheduler.getQueue("a2")
            .getAccessibleNodeLabels().size());
    assertTrue(capacityScheduler.getQueue("a2")
            .getAccessibleNodeLabels().contains("red"));

    // queue-B is "red"/"blue"
    assertTrue(capacityScheduler.getQueue("b").getAccessibleNodeLabels()
            .isEmpty());
  }
  
  @Test
  public void testQueueParsingWhenLabelsNotExistedInNodeLabelManager() throws Exception {
    CapacitySchedulerConfiguration csConf =
        new CapacitySchedulerConfiguration();
    setupQueueConfigurationWithLabels(csConf);
    YarnConfiguration conf = new YarnConfiguration(csConf);

    MockRM rm = createMockRMWithoutLabels(conf);

    ServiceOperations.stopQuietly(rm);
  }
  
  @Test
  public void testQueueParsingWhenLabelsInheritedNotExistedInNodeLabelManager() throws Exception {
    CapacitySchedulerConfiguration csConf =
        new CapacitySchedulerConfiguration();
    setupQueueConfigurationWithLabelsInherit(csConf);
    YarnConfiguration conf = new YarnConfiguration(csConf);

    MockRM rm = createMockRMWithoutLabels(conf);

    ServiceOperations.stopQuietly(rm);
  }
  
  @Test
  public void testSingleLevelQueueParsingWhenLabelsNotExistedInNodeLabelManager() throws Exception {
    CapacitySchedulerConfiguration csConf =
        new CapacitySchedulerConfiguration();
    setupQueueConfigurationWithSingleLevel(csConf);
    YarnConfiguration conf = new YarnConfiguration(csConf);

    MockRM rm = createMockRMWithoutLabels(conf);

    ServiceOperations.stopQuietly(rm);
  }
  
  @Test
  public void testQueueParsingWhenLabelsNotExist() throws Exception {
    CapacitySchedulerConfiguration csConf =
        new CapacitySchedulerConfiguration();
    setupQueueConfigurationWithLabels(csConf);
    YarnConfiguration conf = new YarnConfiguration(csConf);

    MockRM rm = createMockRMWithoutLabels(conf);

    ServiceOperations.stopQuietly(rm);
  }
  
  @Test
  public void testQueueParsingWithUnusedLabels() throws Exception {
    // Initialize a cluster with labels, but don't use them, reinitialize
    // shouldn't fail
    Set<String> labels = ImmutableSet.of("red", "blue");
    CapacitySchedulerConfiguration csConf =
            new CapacitySchedulerConfiguration();
    setupQueueConfiguration(csConf);
    csConf.setAccessibleNodeLabels(ROOT, labels);
    YarnConfiguration conf = new YarnConfiguration(csConf);

    MockRM rm = createMockRMWithLabels(conf, labels);

    CapacityScheduler capacityScheduler = (CapacityScheduler) rm.getResourceScheduler();
    capacityScheduler.reinitialize(csConf, rm.getRMContext());

    // check root queue's capacity by label -- they should be all zero
    CSQueue root = capacityScheduler.getQueue(CapacitySchedulerConfiguration.ROOT);
    assertEquals(0, root.getQueueCapacities().getCapacity("red"), DELTA);
    assertEquals(0, root.getQueueCapacities().getCapacity("blue"), DELTA);

    CSQueue a = capacityScheduler.getQueue("a");
    assertEquals(0.10, a.getAbsoluteCapacity(), DELTA);
    assertEquals(0.15, a.getAbsoluteMaximumCapacity(), DELTA);

    CSQueue b1 = capacityScheduler.getQueue("b1");
    assertEquals(0.2 * 0.5, b1.getAbsoluteCapacity(), DELTA);
    assertEquals(0.85,
        b1.getAbsoluteMaximumCapacity(), DELTA, "Parent B has no MAX_CAP");

    CSQueue c12 = capacityScheduler.getQueue("c12");
    assertEquals(0.7 * 0.5 * 0.45, c12.getAbsoluteCapacity(), DELTA);
    assertEquals(0.7 * 0.55 * 0.7, c12.getAbsoluteMaximumCapacity(),
        DELTA);
    ServiceOperations.stopQuietly(rm);
  }
  
  @Test
  public void testQueueParsingShouldTrimSpaces() throws Exception {
    CapacitySchedulerConfiguration csConf = 
      new CapacitySchedulerConfiguration();
    setupQueueConfigurationWithSpacesShouldBeTrimmed(csConf);
    YarnConfiguration conf = new YarnConfiguration(csConf);

    MockRM rm = createMockRMWithoutLabels(conf);

    CapacityScheduler capacityScheduler = (CapacityScheduler) rm.getResourceScheduler();
    CSQueue a = capacityScheduler.getQueue("a");
    assertNotNull(a);
    assertEquals(0.10, a.getAbsoluteCapacity(), DELTA);
    assertEquals(0.15, a.getAbsoluteMaximumCapacity(), DELTA);
    
    CSQueue c = capacityScheduler.getQueue("c");
    assertNotNull(c);
    assertEquals(0.70, c.getAbsoluteCapacity(), DELTA);
    assertEquals(0.70, c.getAbsoluteMaximumCapacity(), DELTA);
  }
  
  @Test
  public void testNestedQueueParsingShouldTrimSpaces() throws Exception {
    CapacitySchedulerConfiguration csConf = 
      new CapacitySchedulerConfiguration();
    setupNestedQueueConfigurationWithSpacesShouldBeTrimmed(csConf);
    YarnConfiguration conf = new YarnConfiguration(csConf);

    MockRM rm = createMockRMWithoutLabels(conf);
    CapacityScheduler capacityScheduler = (CapacityScheduler) rm.getResourceScheduler();

    CSQueue a = capacityScheduler.getQueue("a");
    assertNotNull(a);
    assertEquals(0.10, a.getAbsoluteCapacity(), DELTA);
    assertEquals(0.15, a.getAbsoluteMaximumCapacity(), DELTA);
    
    CSQueue c = capacityScheduler.getQueue("c");
    assertNotNull(c);
    assertEquals(0.70, c.getAbsoluteCapacity(), DELTA);
    assertEquals(0.70, c.getAbsoluteMaximumCapacity(), DELTA);
    
    CSQueue a1 = capacityScheduler.getQueue("a1");
    assertNotNull(a1);
    assertEquals(0.10 * 0.6, a1.getAbsoluteCapacity(), DELTA);
    assertEquals(0.15, a1.getAbsoluteMaximumCapacity(), DELTA);
    
    CSQueue a2 = capacityScheduler.getQueue("a2");
    assertNotNull(a2);
    assertEquals(0.10 * 0.4, a2.getAbsoluteCapacity(), DELTA);
    assertEquals(0.15, a2.getAbsoluteMaximumCapacity(), DELTA);
  }
  
  /**
   * Test init a queue configuration, children's capacity for a given label
   * doesn't equals to 100%. This expect IllegalArgumentException thrown.
   */
  @Test
  public void testQueueParsingFailWhenSumOfChildrenNonLabeledCapacityNot100Percent()
      throws Exception {
    assertThrows(ServiceStateException.class, ()->{
      CapacitySchedulerConfiguration csConf =
              new CapacitySchedulerConfiguration();

      // If the new queue mode is used it's allowed to leave
      // some of the resources of a parent queue unallocated
      assumeTrue(csConf.isLegacyQueueMode() == true);
      setupQueueConfiguration(csConf);
      csConf.setCapacity(C2, 5);
      YarnConfiguration conf = new YarnConfiguration(csConf);

      MockRM rm = createMockRMWithLabels(conf, ImmutableSet.of("red", "blue"));
      ServiceOperations.stopQuietly(rm);
    });
  }
  
  /**
   * Test init a queue configuration, children's capacity for a given label
   * doesn't equals to 100%. This expect IllegalArgumentException thrown.
   */
  @Test
  public void testQueueParsingFailWhenSumOfChildrenLabeledCapacityNot100Percent()
      throws Exception {

    assertThrows(ServiceStateException.class, ()->{
      CapacitySchedulerConfiguration csConf =
              new CapacitySchedulerConfiguration();

      // If the new queue mode is used it's allowed to leave
      // some of the resources of a parent queue unallocated
      assumeTrue(csConf.isLegacyQueueMode() == true);

      setupQueueConfigurationWithLabels(csConf);
      csConf.setCapacityByLabel(B3, "red", 24);
      YarnConfiguration conf = new YarnConfiguration(csConf);

      MockRM rm = createMockRMWithLabels(conf, ImmutableSet.of("red", "blue"));
      ServiceOperations.stopQuietly(rm);
    });


  }

  /**
   * Test init a queue configuration, children's capacity for a given label
   * doesn't equals to 100%. This expect IllegalArgumentException thrown.
   */
  @Test
  public void testQueueParsingWithSumOfChildLabelCapacityNot100PercentWithWildCard()
      throws Exception {
    assertThrows(ServiceStateException.class, ()->{
      CapacitySchedulerConfiguration csConf =
              new CapacitySchedulerConfiguration();

      // If the new queue mode is used it's allowed to leave
      // some of the resources of a parent queue unallocated
      assumeTrue(csConf.isLegacyQueueMode() == true);

      setupQueueConfigurationWithLabels(csConf);
      csConf.setCapacityByLabel(B3, "red", 24);
      csConf.setAccessibleNodeLabels(ROOT, ImmutableSet.of(RMNodeLabelsManager.ANY));
      csConf.setAccessibleNodeLabels(B, ImmutableSet.of(RMNodeLabelsManager.ANY));
      YarnConfiguration conf = new YarnConfiguration(csConf);

      MockRM rm = createMockRMWithLabels(conf, ImmutableSet.of("red", "blue"));
      ServiceOperations.stopQuietly(rm);
    });
  }
  
  @Test
  public void testQueueParsingWithMoveQueue() throws Exception {
    assertThrows(IOException.class, ()->{
      CapacitySchedulerConfiguration csConf =
              new CapacitySchedulerConfiguration();
      csConf.setQueues(ROOT, new String[] {"a"});
      csConf.setQueues(A, new String[] {"x", "y"});
      csConf.setCapacity(A, 100);
      csConf.setCapacity(AX, 50);
      csConf.setCapacity(AY, 50);
      YarnConfiguration conf = new YarnConfiguration(csConf);

      MockRM rm = createMockRMWithoutLabels(conf);
      CapacityScheduler capacityScheduler = (CapacityScheduler) rm.getResourceScheduler();

      csConf.setQueues(ROOT, new String[] {"a", "x"});
      csConf.setQueues(A, new String[] {"y"});
      csConf.setCapacity(X, 50);
      csConf.setCapacity(A, 50);
      csConf.setCapacity(AY, 100);

      capacityScheduler.reinitialize(csConf, rm.getRMContext());
      ServiceOperations.stopQuietly(rm);
    });
  }

  @Test
  @Timeout(value = 60)
  public void testRMStartWrongNodeCapacity() throws Exception {
    assertThrows(ServiceStateException.class, ()->{
      CapacitySchedulerConfiguration csConf =
              new CapacitySchedulerConfiguration();
      Set<String> labels = ImmutableSet.of("x", "y", "z");

      // If the new queue mode is used it's allowed to leave
      // some of the resources of a parent queue unallocated
      assumeTrue(csConf.isLegacyQueueMode() == true);

      // Define top-level queues
      csConf.setQueues(ROOT, new String[] {"a"});
      csConf.setCapacityByLabel(ROOT, "x", 100);
      csConf.setCapacityByLabel(ROOT, "y", 100);
      csConf.setCapacityByLabel(ROOT, "z", 100);

      csConf.setCapacity(A, 100);
      csConf.setAccessibleNodeLabels(A, labels);
      csConf.setCapacityByLabel(A, "x", 100);
      csConf.setCapacityByLabel(A, "y", 100);
      csConf.setCapacityByLabel(A, "z", 70);

      YarnConfiguration conf = new YarnConfiguration(csConf);
      MockRM rm = createMockRMWithLabels(conf, labels);
      ServiceOperations.stopQuietly(rm);
    });
  }


  @Test
  public void testQueueOrderingPolicyUpdatedAfterReinitialize() throws Exception {
    CapacitySchedulerConfiguration csConf =
        new CapacitySchedulerConfiguration();
    setupQueueConfigurationWithoutLabels(csConf);
    YarnConfiguration conf = new YarnConfiguration(csConf);

    MockRM rm = createMockRMWithoutLabels(conf);
    CapacityScheduler capacityScheduler = (CapacityScheduler) rm.getResourceScheduler();

    // Add a new b4 queue
    csConf.setQueues(B,
        new String[] { "b1", "b2", "b3", "b4" });
    csConf.setCapacity(B4, 0f);
    ParentQueue bQ = (ParentQueue) capacityScheduler.getQueue("b");
    checkEqualsToQueueSet(bQ.getChildQueues(),
        new String[] { "b1", "b2", "b3" });
    capacityScheduler.reinitialize(csConf, rm.getRMContext());

    // Check child queue of b
    checkEqualsToQueueSet(bQ.getChildQueues(),
        new String[] { "b1", "b2", "b3", "b4" });

    PriorityUtilizationQueueOrderingPolicy queueOrderingPolicy =
        (PriorityUtilizationQueueOrderingPolicy) bQ.getQueueOrderingPolicy();
    checkEqualsToQueueSet(queueOrderingPolicy.getQueues(),
        new String[] { "b1", "b2", "b3", "b4" });

    ServiceOperations.stopQuietly(rm);
  }

  @Test
  @Timeout(value = 60)
  public void testQueueCapacityWithWeight() throws Exception {
    Set<String> labels = ImmutableSet.of("x", "y", "z");

    CapacitySchedulerConfiguration csConf =
            new CapacitySchedulerConfiguration();
    // Define top-level queues
    csConf.setQueues(ROOT, new String[] {"a"});
    csConf.setLabeledQueueWeight(ROOT, "x", 100);
    csConf.setLabeledQueueWeight(ROOT, "y", 100);
    csConf.setLabeledQueueWeight(ROOT, "z", 100);

    csConf.setNonLabeledQueueWeight(A, 100);
    csConf.setAccessibleNodeLabels(A, labels);
    csConf.setLabeledQueueWeight(A, "x", 100);
    csConf.setLabeledQueueWeight(A, "y", 100);
    csConf.setLabeledQueueWeight(A, "z", 70);
    YarnConfiguration conf = new YarnConfiguration(csConf);

    MockRM rm = createMockRMWithLabels(conf, labels);

    verifyQueueAbsCapacity(rm, CapacitySchedulerConfiguration.ROOT, "", 1f);
    verifyQueueAbsCapacity(rm, CapacitySchedulerConfiguration.ROOT, "x", 1f);
    verifyQueueAbsCapacity(rm, CapacitySchedulerConfiguration.ROOT, "y", 1f);
    verifyQueueAbsCapacity(rm, CapacitySchedulerConfiguration.ROOT, "z", 1f);

    verifyQueueAbsCapacity(rm, A_PATH, "", 1f);
    verifyQueueAbsCapacity(rm, A_PATH, "x", 1f);
    verifyQueueAbsCapacity(rm, A_PATH, "y", 1f);
    verifyQueueAbsCapacity(rm, A_PATH, "z", 1f);
    ServiceOperations.stopQuietly(rm);
  }

  @Test
  public void testQueueParsingWithDefaultUserLimitValues() throws Exception {
    CapacitySchedulerConfiguration csConf =
            new CapacitySchedulerConfiguration();

    final String queueA = CapacitySchedulerConfiguration.ROOT + ".a";
    final String queueB = CapacitySchedulerConfiguration.ROOT + ".b";

    // Define top-level queues
    csConf.setQueues(ROOT, new String[] {"a", "b"});

    // Set default value
    csConf.setDefaultUserLimit(20);
    csConf.setDefaultUserLimitFactor(2.0f);

    // Set A configuration and let B use default values
    csConf.setCapacity(A, 50);
    csConf.setUserLimit(A, 15);
    csConf.setUserLimitFactor(A, 1.5f);
    csConf.setCapacity(B, 50);
    YarnConfiguration conf = new YarnConfiguration(csConf);

    MockRM rm = createMockRMWithoutLabels(conf);

    // Test
    CapacityScheduler capacityScheduler = (CapacityScheduler) rm.getResourceScheduler();

    assertEquals(15,
            ((LeafQueue)capacityScheduler.getQueue(A_PATH)).getUserLimit(), DELTA);
    assertEquals(1.5,
            ((LeafQueue)capacityScheduler.getQueue(A_PATH)).getUserLimitFactor(), DELTA);
    assertEquals(20,
            ((LeafQueue)capacityScheduler.getQueue(B_PATH)).getUserLimit(), DELTA);
    assertEquals(2.0,
            ((LeafQueue)capacityScheduler.getQueue(B_PATH)).getUserLimitFactor(), DELTA);

    // Use hadoop default value
    csConf = new CapacitySchedulerConfiguration();

    csConf.setQueues(ROOT, new String[] {"a", "b"});
    csConf.setCapacity(A, 50);
    csConf.setUserLimit(A, 15);
    csConf.setUserLimitFactor(A, 1.5f);
    csConf.setCapacity(B, 50);

    capacityScheduler.reinitialize(csConf, rm.getRMContext());

    assertEquals(15,
            ((LeafQueue)capacityScheduler.getQueue(A_PATH)).getUserLimit(), DELTA);
    assertEquals(1.5,
            ((LeafQueue)capacityScheduler.getQueue(A_PATH)).getUserLimitFactor(), DELTA);
    assertEquals(100,
            ((LeafQueue)capacityScheduler.getQueue(B_PATH)).getUserLimit(), DELTA);
    assertEquals(1,
            ((LeafQueue)capacityScheduler.getQueue(B_PATH)).getUserLimitFactor(), DELTA);
  }

  private void verifyQueueAbsCapacity(MockRM rm, String queuePath, String label,
      float expectedAbsCapacity) {
    CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
    CSQueue queue = cs.getQueue(queuePath);
    assertEquals(expectedAbsCapacity,
        queue.getQueueCapacities().getAbsoluteCapacity(label), 1e-6);
  }

  private void checkEqualsToQueueSet(List<CSQueue> queues, String[] queueNames) {
    Set<String> existedQueues = new HashSet<>();
    for (CSQueue q : queues) {
      existedQueues.add(q.getQueueShortName());
    }
    for (String q : queueNames) {
      assertTrue(existedQueues.remove(q));
    }
    assertTrue(existedQueues.isEmpty());
  }
}