TestCapacitySchedulerQueues.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;

import java.io.IOException;
import java.util.Map;

import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfigGeneratorForTest.setMaxAllocMb;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfigGeneratorForTest.setMaxAllocVcores;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfigGeneratorForTest.setMaxAllocation;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfigGeneratorForTest.unsetMaxAllocation;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.A;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.A1;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.A2;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.A1_B1;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.B;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.B1;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.B1_CAPACITY;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.B2;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.B2_CAPACITY;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.B3;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.B3_CAPACITY;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.ROOT;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.checkQueueStructureCapacities;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.ExpectedCapacities;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.findQueue;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.getDefaultCapacities;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.setupQueueConfWithoutChildrenOfB;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.setupQueueConfiguration;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.setupQueueConfigurationWithB1AsParentQueue;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.setupQueueConfigurationWithoutB;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.setupQueueConfigurationWithoutB1;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.GB;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;

public class TestCapacitySchedulerQueues {

  private static final Logger LOG =
      LoggerFactory.getLogger(TestCapacitySchedulerQueues.class);
  private MockRM rm;
  private NullRMNodeLabelsManager mgr;
  private CapacitySchedulerConfiguration conf;

  @BeforeEach
  public void setUp() throws Exception {
    conf = new CapacitySchedulerConfiguration();
    setupQueueConfiguration(conf);
    mgr = new NullRMNodeLabelsManager();
    mgr.init(conf);
    rm = new MockRM(conf) {
      protected RMNodeLabelsManager createNodeLabelManager() {
        return mgr;
      }
    };
    CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();

    cs.init(conf);
    cs.start();
    cs.reinitialize(conf, rm.getRMContext());

    Resource clusterResource = Resource.newInstance(128 * GB, 128);
    mgr.setResourceForLabel(CommonNodeLabelsManager.NO_LABEL, clusterResource);
    cs.getRootQueue().updateClusterResource(clusterResource,
        new ResourceLimits(clusterResource));
  }

  @AfterEach
  public void tearDown() throws Exception {
    if (rm != null) {
      rm.stop();
    }
    if (mgr != null) {
      mgr.close();
    }
  }

  /**
   * Test that parseQueue throws an exception when two leaf queues have the
   * same name.
   *
   * @throws IOException
   */
  @Test
  public void testParseQueue() throws IOException {
    assertThrows(IOException.class, () -> {
      CapacityScheduler cs = new CapacityScheduler();
      cs.setConf(new YarnConfiguration());
      cs.setRMContext(rm.getRMContext());
      cs.init(conf);
      cs.start();

      conf.setQueues(A1, new String[]{"b1"});
      conf.setCapacity(A1_B1, 100.0f);
      conf.setUserLimitFactor(A1_B1, 100.0f);

      cs.reinitialize(conf, new RMContextImpl(null, null, null, null, null,
              null, new RMContainerTokenSecretManager(conf),
              new NMTokenSecretManagerInRM(conf),
              new ClientToAMTokenSecretManagerInRM(), null));
      cs.stop();
    });
  }

  @Test
  public void testRefreshQueues() throws Exception {
    CapacityScheduler cs = new CapacityScheduler();
    setupQueueConfiguration(conf);
    cs.setConf(new YarnConfiguration());
    cs.setRMContext(rm.getRMContext());
    cs.init(conf);
    cs.start();
    cs.reinitialize(conf, rm.getRMContext());
    checkQueueStructureCapacities(cs);

    conf.setCapacity(A, 80f);
    conf.setCapacity(B, 20f);
    cs.reinitialize(conf, rm.getRMContext());
    checkQueueStructureCapacities(cs, getDefaultCapacities(80f / 100.0f, 20f / 100.0f));
    cs.stop();
  }

  @Test
  public void testRefreshQueuesWithNewQueue() throws Exception {
    CapacityScheduler cs = new CapacityScheduler();
    cs.setConf(new YarnConfiguration());
    cs.setRMContext(rm.getRMContext());
    cs.init(conf);
    cs.start();
    cs.reinitialize(conf, rm.getRMContext());
    checkQueueStructureCapacities(cs);

    // Add a new queue b4
    final String b4Path = B + ".b4";
    final QueuePath b4 = new QueuePath(b4Path);
    final float b4Capacity = 10;
    final float modifiedB3Capacity = B3_CAPACITY - b4Capacity;

    try {
      conf.setCapacity(A, 80f);
      conf.setCapacity(B, 20f);
      conf.setQueues(B, new String[]{"b1", "b2", "b3", "b4"});
      conf.setCapacity(B1, B1_CAPACITY);
      conf.setCapacity(B2, B2_CAPACITY);
      conf.setCapacity(B3, modifiedB3Capacity);
      conf.setCapacity(b4, b4Capacity);
      cs.reinitialize(conf, rm.getRMContext());

      final float capA = 80f / 100.0f;
      final float capB = 20f / 100.0f;
      Map<String, ExpectedCapacities> expectedCapacities =
          getDefaultCapacities(capA, capB);
      expectedCapacities.put(B3.getFullPath(),
          new ExpectedCapacities(modifiedB3Capacity / 100.0f, capB));
      expectedCapacities.put(b4Path, new ExpectedCapacities(b4Capacity / 100.0f, capB));
      checkQueueStructureCapacities(cs, expectedCapacities);

      // Verify parent for B4
      CSQueue rootQueue = cs.getRootQueue();
      CSQueue queueB = findQueue(rootQueue, B.getFullPath());
      CSQueue queueB4 = findQueue(queueB, b4Path);

      assertEquals(queueB, queueB4.getParent());
    } finally {
      cs.stop();
    }
  }

  @Test
  public void testRefreshQueuesMaxAllocationRefresh() throws Exception {
    // queue refresh should not allow changing the maximum allocation setting
    // per queue to be smaller than previous setting
    CapacityScheduler cs = new CapacityScheduler();
    cs.setConf(new YarnConfiguration());
    cs.setRMContext(rm.getRMContext());
    cs.init(conf);
    cs.start();
    cs.reinitialize(conf, rm.getRMContext());
    checkQueueStructureCapacities(cs);

    assertEquals(YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
        cs.getMaximumResourceCapability().getMemorySize(), "max allocation in CS");
    assertEquals(Resources.none(),
        conf.getQueueMaximumAllocation(A1), "max allocation for A1");
    assertEquals(YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
        ResourceUtils.fetchMaximumAllocationFromConfig(conf).getMemorySize(),
        "max allocation");

    CSQueue rootQueue = cs.getRootQueue();
    CSQueue queueA = findQueue(rootQueue, A.getFullPath());
    CSQueue queueA1 = findQueue(queueA, A1.getFullPath());
    assertEquals(((LeafQueue) queueA1)
        .getMaximumAllocation().getMemorySize(), 8192, "queue max allocation");

    setMaxAllocMb(conf, A1, 4096);

    try {
      cs.reinitialize(conf, rm.getRMContext());
      fail("should have thrown exception");
    } catch (IOException e) {
      assertTrue(e.getCause().toString().contains("not be decreased"),
          "max allocation exception");
    }

    setMaxAllocMb(conf, A1, 8192);
    cs.reinitialize(conf, rm.getRMContext());

    setMaxAllocVcores(conf, A1,
        YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES - 1);
    try {
      cs.reinitialize(conf, rm.getRMContext());
      fail("should have thrown exception");
    } catch (IOException e) {
      assertTrue(e.getCause().toString().contains("not be decreased"),
          "max allocation exception");
    }
    cs.stop();
  }

  @Test
  public void testRefreshQueuesMaxAllocationPerQueueLarge() throws Exception {
    // verify we can't set the allocation per queue larger then cluster setting
    CapacityScheduler cs = new CapacityScheduler();
    cs.setConf(new YarnConfiguration());
    cs.setRMContext(rm.getRMContext());
    cs.init(conf);
    cs.start();
    // change max allocation for B3 queue to be larger then cluster max
    setMaxAllocMb(conf, B3,
        YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB + 2048);
    try {
      cs.reinitialize(conf, rm.getRMContext());
      fail("should have thrown exception");
    } catch (IOException e) {
      assertTrue(e.getCause().getMessage().contains("maximum allocation"),
          "maximum allocation exception");
    }

    setMaxAllocMb(conf, B3,
        YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
    cs.reinitialize(conf, rm.getRMContext());

    setMaxAllocVcores(conf, B3,
        YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES + 1);
    try {
      cs.reinitialize(conf, rm.getRMContext());
      fail("should have thrown exception");
    } catch (IOException e) {
      assertTrue(e.getCause().getMessage().contains("maximum allocation"),
          "maximum allocation exception");
    }
    cs.stop();
  }

  @Test
  public void testRefreshQueuesMaxAllocationRefreshLarger() throws Exception {
    // queue refresh should allow max allocation per queue to go larger
    CapacityScheduler cs = new CapacityScheduler();
    cs.setConf(new YarnConfiguration());
    cs.setRMContext(rm.getRMContext());
    setMaxAllocMb(conf,
        YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
    setMaxAllocVcores(conf,
        YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES);
    setMaxAllocMb(conf, A1, 4096);
    setMaxAllocVcores(conf, A1, 2);
    cs.init(conf);
    cs.start();
    cs.reinitialize(conf, rm.getRMContext());
    checkQueueStructureCapacities(cs);

    CSQueue rootQueue = cs.getRootQueue();
    CSQueue queueA = findQueue(rootQueue, A.getFullPath());
    CSQueue queueA1 = findQueue(queueA, A1.getFullPath());

    assertEquals(YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
        cs.getMaximumResourceCapability().getMemorySize(),
        "max capability MB in CS");
    assertEquals(YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
        cs.getMaximumResourceCapability().getVirtualCores(),
        "max capability vcores in CS");
    assertEquals(4096, queueA1.getMaximumAllocation().getMemorySize(),
        "max allocation MB A1");
    assertEquals(2, queueA1.getMaximumAllocation().getVirtualCores(),
        "max allocation vcores A1");
    assertEquals(YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
        ResourceUtils.fetchMaximumAllocationFromConfig(conf).getMemorySize(),
        "cluster max allocation MB");
    assertEquals(YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
        ResourceUtils.fetchMaximumAllocationFromConfig(conf).getVirtualCores(),
        "cluster max allocation vcores");

    assertEquals(4096, queueA1.getMaximumAllocation().getMemorySize(),
        "queue max allocation");

    setMaxAllocMb(conf, A1, 6144);
    setMaxAllocVcores(conf, A1, 3);
    cs.reinitialize(conf, null);
    // conf will have changed but we shouldn't be able to change max allocation
    // for the actual queue
    assertEquals(6144, queueA1.getMaximumAllocation().getMemorySize(),
        "max allocation MB A1");
    assertEquals(3, queueA1.getMaximumAllocation().getVirtualCores(),
        "max allocation vcores A1");
    assertEquals(YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
        ResourceUtils.fetchMaximumAllocationFromConfig(conf).getMemorySize(),
        "max allocation MB cluster");
    assertEquals(YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
        ResourceUtils.fetchMaximumAllocationFromConfig(conf).getVirtualCores(),
        "max allocation vcores cluster");
    assertEquals(6144, queueA1.getMaximumAllocation().getMemorySize(),
        "queue max allocation MB");
    assertEquals(3, queueA1.getMaximumAllocation().getVirtualCores(),
        "queue max allocation vcores");
    assertEquals(YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
        cs.getMaximumResourceCapability().getMemorySize(),
        "max capability MB cluster");
    assertEquals(YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
        cs.getMaximumResourceCapability().getVirtualCores(),
        "cluster max capability vcores");
    cs.stop();
  }

  @Test
  public void testRefreshQueuesMaxAllocationCSError() throws Exception {
    // Try to refresh the cluster level max allocation size to be smaller
    // and it should error out
    CapacityScheduler cs = new CapacityScheduler();
    cs.setConf(new YarnConfiguration());
    cs.setRMContext(rm.getRMContext());
    setMaxAllocMb(conf, 10240);
    setMaxAllocVcores(conf, 10);
    setMaxAllocMb(conf, A1, 4096);
    setMaxAllocVcores(conf, A1, 4);
    cs.init(conf);
    cs.start();
    cs.reinitialize(conf, rm.getRMContext());

    checkQueueStructureCapacities(cs);

    assertEquals(10240, cs.getMaximumResourceCapability().getMemorySize(),
        "max allocation MB in CS");
    assertEquals(10, cs.getMaximumResourceCapability().getVirtualCores(),
        "max allocation vcores in CS");

    setMaxAllocMb(conf, 6144);
    try {
      cs.reinitialize(conf, rm.getRMContext());
      fail("should have thrown exception");
    } catch (IOException e) {
      assertTrue(e.getCause().toString().contains("not be decreased"),
          "max allocation exception");
    }

    setMaxAllocMb(conf, 10240);
    cs.reinitialize(conf, rm.getRMContext());

    setMaxAllocVcores(conf, 8);
    try {
      cs.reinitialize(conf, rm.getRMContext());
      fail("should have thrown exception");
    } catch (IOException e) {
      assertTrue(e.getCause().toString().contains("not be decreased"),
          "max allocation exception");
    }
    cs.stop();
  }

  @Test
  public void testRefreshQueuesMaxAllocationCSLarger() throws Exception {
    // Try to refresh the cluster level max allocation size to be larger
    // and verify that if there is no setting per queue it uses the
    // cluster level setting.
    CapacityScheduler cs = new CapacityScheduler();
    cs.setConf(new YarnConfiguration());
    cs.setRMContext(rm.getRMContext());
    setMaxAllocMb(conf, 10240);
    setMaxAllocVcores(conf, 10);
    setMaxAllocMb(conf, A1, 4096);
    setMaxAllocVcores(conf, A1, 4);
    cs.init(conf);
    cs.start();
    cs.reinitialize(conf, rm.getRMContext());
    checkQueueStructureCapacities(cs);

    assertEquals(10240, cs.getMaximumResourceCapability().getMemorySize(),
        "max allocation MB in CS");
    assertEquals(10, cs.getMaximumResourceCapability().getVirtualCores(),
        "max allocation vcores in CS");

    CSQueue rootQueue = cs.getRootQueue();
    CSQueue queueA = findQueue(rootQueue, A.getFullPath());
    CSQueue queueB = findQueue(rootQueue, B.getFullPath());
    CSQueue queueA1 = findQueue(queueA, A1.getFullPath());
    CSQueue queueA2 = findQueue(queueA, A2.getFullPath());
    CSQueue queueB2 = findQueue(queueB, B2.getFullPath());

    assertEquals(4096, queueA1.getMaximumAllocation().getMemorySize(),
        "queue A1 max allocation MB");
    assertEquals(4, queueA1.getMaximumAllocation().getVirtualCores(),
        "queue A1 max allocation vcores");
    assertEquals(10240, queueA2.getMaximumAllocation().getMemorySize(),
        "queue A2 max allocation MB");
    assertEquals(10, queueA2.getMaximumAllocation().getVirtualCores(),
        "queue A2 max allocation vcores");
    assertEquals(10240, queueB2.getMaximumAllocation().getMemorySize(),
        "queue B2 max allocation MB");
    assertEquals(10, queueB2.getMaximumAllocation().getVirtualCores(),
        "queue B2 max allocation vcores");

    setMaxAllocMb(conf, 12288);
    setMaxAllocVcores(conf, 12);
    cs.reinitialize(conf, null);
    // cluster level setting should change and any queues without
    // per queue setting
    assertEquals(12288, cs.getMaximumResourceCapability().getMemorySize(),
        "max allocation MB in CS");
    assertEquals(12, cs.getMaximumResourceCapability().getVirtualCores(),
        "max allocation vcores in CS");
    assertEquals(4096, queueA1.getMaximumAllocation().getMemorySize(),
        "queue A1 max MB allocation");
    assertEquals(4, queueA1.getMaximumAllocation().getVirtualCores(),
        "queue A1 max vcores allocation");
    assertEquals(12288, queueA2.getMaximumAllocation().getMemorySize(),
        "queue A2 max MB allocation");
    assertEquals(12, queueA2.getMaximumAllocation().getVirtualCores(),
        "queue A2 max vcores allocation");
    assertEquals(12288, queueB2.getMaximumAllocation().getMemorySize(),
        "queue B2 max MB allocation");
    assertEquals(12, queueB2.getMaximumAllocation().getVirtualCores(),
        "queue B2 max vcores allocation");
    cs.stop();
  }

  /**
   * Test for queue deletion.
   *
   * @throws Exception
   */
  @Test
  public void testRefreshQueuesWithQueueDelete() throws Exception {
    CapacityScheduler cs = new CapacityScheduler();
    cs.setConf(new YarnConfiguration());
    cs.setRMContext(rm.getRMContext());
    cs.init(conf);
    cs.start();
    cs.reinitialize(conf, rm.getRMContext());
    checkQueueStructureCapacities(cs);

    // test delete leaf queue when there is application running.
    Map<String, CSQueue> queues =
        cs.getCapacitySchedulerQueueManager().getShortNameQueues();
    String b1QTobeDeleted = "b1";
    LeafQueue csB1Queue = spy((LeafQueue) queues.get(b1QTobeDeleted));
    when(csB1Queue.getState()).thenReturn(QueueState.DRAINING)
        .thenReturn(QueueState.STOPPED);
    cs.getCapacitySchedulerQueueManager().addQueue(b1QTobeDeleted, csB1Queue);
    conf = new CapacitySchedulerConfiguration();
    setupQueueConfigurationWithoutB1(conf);
    try {
      cs.reinitialize(conf, rm.getRMContext());
      fail("Expected to throw exception when refresh queue tries to delete a"
          + " queue with running apps");
    } catch (IOException e) {
      // ignore
    }

    // test delete leaf queue(root.b.b1) when there is no application running.
    conf = new CapacitySchedulerConfiguration();
    setupQueueConfigurationWithoutB1(conf);
    try {
      cs.reinitialize(conf, rm.getRMContext());
    } catch (IOException e) {
      LOG.error(
          "Expected to NOT throw exception when refresh queue tries to delete"
              + " a queue WITHOUT running apps",
          e);
      fail("Expected to NOT throw exception when refresh queue tries to delete"
          + " a queue WITHOUT running apps");
    }
    CSQueue rootQueue = cs.getRootQueue();
    CSQueue queueB = findQueue(rootQueue, B.getFullPath());
    CSQueue queueB3 = findQueue(queueB, B1.getFullPath());
    assertNull(queueB3, "Refresh needs to support delete of leaf queue ");

    // reset back to default configuration for testing parent queue delete
    conf = new CapacitySchedulerConfiguration();
    setupQueueConfiguration(conf);
    cs.reinitialize(conf, rm.getRMContext());
    checkQueueStructureCapacities(cs);

    // set the configurations such that it fails once but should be successfull
    // next time
    queues = cs.getCapacitySchedulerQueueManager().getShortNameQueues();
    CSQueue bQueue = spy((ParentQueue) queues.get("b"));
    when(bQueue.getState()).thenReturn(QueueState.DRAINING)
        .thenReturn(QueueState.STOPPED);
    cs.getCapacitySchedulerQueueManager().addQueue("b", bQueue);

    bQueue = spy((LeafQueue) queues.get("b1"));
    when(bQueue.getState()).thenReturn(QueueState.STOPPED);
    cs.getCapacitySchedulerQueueManager().addQueue("b1", bQueue);

    bQueue = spy((LeafQueue) queues.get("b2"));
    when(bQueue.getState()).thenReturn(QueueState.STOPPED);
    cs.getCapacitySchedulerQueueManager().addQueue("b2", bQueue);

    bQueue = spy((LeafQueue) queues.get("b3"));
    when(bQueue.getState()).thenReturn(QueueState.STOPPED);
    cs.getCapacitySchedulerQueueManager().addQueue("b3", bQueue);

    // test delete Parent queue when there is application running.
    conf = new CapacitySchedulerConfiguration();
    setupQueueConfigurationWithoutB(conf);
    try {
      cs.reinitialize(conf, rm.getRMContext());
      fail("Expected to throw exception when refresh queue tries to delete a"
          + " parent queue with running apps in children queue");
    } catch (IOException e) {
      // ignore
    }

    // test delete Parent queue when there is no application running.
    conf = new CapacitySchedulerConfiguration();
    setupQueueConfigurationWithoutB(conf);
    try {
      cs.reinitialize(conf, rm.getRMContext());
    } catch (IOException e) {
      fail("Expected to not throw exception when refresh queue tries to delete"
          + " a queue without running apps");
    }
    rootQueue = cs.getRootQueue();
    queueB = findQueue(rootQueue, B.getFullPath());
    String message =
        "Refresh needs to support delete of Parent queue and its children.";
    assertNull(queueB, message);
    assertNull(cs.getCapacitySchedulerQueueManager().getQueues().get("b"),
        message);
    assertNull(cs.getCapacitySchedulerQueueManager().getQueues().get("b1"),
        message);
    assertNull(cs.getCapacitySchedulerQueueManager().getQueues().get("b2"),
        message);

    cs.stop();
  }

  /**
   * Test for all child queue deletion and thus making parent queue a child.
   *
   * @throws Exception
   */
  @Test
  public void testRefreshQueuesWithAllChildQueuesDeleted() throws Exception {
    CapacityScheduler cs = new CapacityScheduler();
    cs.setConf(new YarnConfiguration());
    cs.setRMContext(rm.getRMContext());
    cs.init(conf);
    cs.start();
    cs.reinitialize(conf, rm.getRMContext());
    checkQueueStructureCapacities(cs);

    // test delete all leaf queues when there is no application running.
    Map<String, CSQueue> queues =
        cs.getCapacitySchedulerQueueManager().getShortNameQueues();

    CSQueue bQueue = spy((LeafQueue) queues.get("b1"));
    when(bQueue.getState()).thenReturn(QueueState.RUNNING)
        .thenReturn(QueueState.STOPPED);
    cs.getCapacitySchedulerQueueManager().addQueue("b1", bQueue);

    bQueue = spy((LeafQueue) queues.get("b2"));
    when(bQueue.getState()).thenReturn(QueueState.STOPPED);
    cs.getCapacitySchedulerQueueManager().addQueue("b2", bQueue);

    bQueue = spy((LeafQueue) queues.get("b3"));
    when(bQueue.getState()).thenReturn(QueueState.STOPPED);
    cs.getCapacitySchedulerQueueManager().addQueue("b3", bQueue);

    conf = new CapacitySchedulerConfiguration();
    setupQueueConfWithoutChildrenOfB(conf);

    // test convert parent queue to leaf queue(root.b) when there is no
    // application running.
    try {
      cs.reinitialize(conf, rm.getRMContext());
      fail("Expected to throw exception when refresh queue tries to make parent"
          + " queue a child queue when one of its children is still running.");
    } catch (IOException e) {
      //do not do anything, expected exception
    }

    // test delete leaf queues(root.b.b1,b2,b3) when there is no application
    // running.
    try {
      cs.reinitialize(conf, rm.getRMContext());
    } catch (IOException e) {
      e.printStackTrace();
      fail("Expected to NOT throw exception when refresh queue tries to delete"
          + " all children of a parent queue(without running apps).");
    }
    CSQueue rootQueue = cs.getRootQueue();
    CSQueue queueB = findQueue(rootQueue, B.getFullPath());
    assertNotNull(queueB, "Parent Queue B should not be deleted");
    assertTrue(queueB instanceof LeafQueue,
        "As Queue'B children are not deleted");

    String message =
        "Refresh needs to support delete of all children of Parent queue.";
    assertNull(cs.getCapacitySchedulerQueueManager().getQueues().get("b3"),
        message);
    assertNull(cs.getCapacitySchedulerQueueManager().getQueues().get("b1"),
        message);
    assertNull(cs.getCapacitySchedulerQueueManager().getQueues().get("b2"),
        message);

    cs.stop();
  }

  /**
   * Test if we can convert a leaf queue to a parent queue.
   *
   * @throws Exception
   */
  @Test
  @Timeout(value = 10)
  public void testConvertLeafQueueToParentQueue() throws Exception {
    CapacityScheduler cs = new CapacityScheduler();
    cs.setConf(new YarnConfiguration());
    cs.setRMContext(rm.getRMContext());
    cs.init(conf);
    cs.start();
    cs.reinitialize(conf, rm.getRMContext());
    checkQueueStructureCapacities(cs);

    String targetQueue = "b1";
    CSQueue b1 = cs.getQueue(targetQueue);
    assertEquals(QueueState.RUNNING, b1.getState());

    // test if we can convert a leaf queue which is in RUNNING state
    conf = new CapacitySchedulerConfiguration();
    setupQueueConfigurationWithB1AsParentQueue(conf);
    try {
      cs.reinitialize(conf, rm.getRMContext());
      fail("Expected to throw exception when refresh queue tries to convert"
          + " a child queue to a parent queue.");
    } catch (IOException e) {
      // ignore
    }

    // now set queue state for b1 to STOPPED
    conf = new CapacitySchedulerConfiguration();
    setupQueueConfiguration(conf);
    conf.set("yarn.scheduler.capacity.root.b.b1.state", "STOPPED");
    cs.reinitialize(conf, rm.getRMContext());
    assertEquals(QueueState.STOPPED, b1.getState());

    // test if we can convert a leaf queue which is in STOPPED state
    conf = new CapacitySchedulerConfiguration();
    setupQueueConfigurationWithB1AsParentQueue(conf);
    try {
      cs.reinitialize(conf, rm.getRMContext());
    } catch (IOException e) {
      fail("Expected to NOT throw exception when refresh queue tries"
          + " to convert a leaf queue WITHOUT running apps");
    }
    b1 = cs.getQueue(targetQueue);
    assertTrue(b1 instanceof AbstractParentQueue);
    assertEquals(QueueState.RUNNING, b1.getState());
    assertTrue(!b1.getChildQueues().isEmpty());
    cs.stop();
  }

  @Test
  public void testQueuesMaxAllocationInheritance() throws Exception {
    // queue level max allocation is set by the queue configuration explicitly
    // or inherits from the parent.

    CapacityScheduler cs = new CapacityScheduler();
    cs.setConf(new YarnConfiguration());
    cs.setRMContext(rm.getRMContext());
    setMaxAllocMb(conf,
        YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
    setMaxAllocVcores(conf,
        YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES);

    // Test the child queue overrides
    setMaxAllocation(conf, ROOT,
        "memory-mb=4096,vcores=2");
    setMaxAllocation(conf, A1, "memory-mb=6144,vcores=2");
    setMaxAllocation(conf, B, "memory-mb=5120, vcores=2");
    setMaxAllocation(conf, B2, "memory-mb=1024, vcores=2");

    cs.init(conf);
    cs.start();
    cs.reinitialize(conf, rm.getRMContext());
    checkQueueStructureCapacities(cs);

    CSQueue rootQueue = cs.getRootQueue();
    CSQueue queueA = findQueue(rootQueue, A.getFullPath());
    CSQueue queueB = findQueue(rootQueue, B.getFullPath());
    CSQueue queueA1 = findQueue(queueA, A1.getFullPath());
    CSQueue queueA2 = findQueue(queueA, A2.getFullPath());
    CSQueue queueB1 = findQueue(queueB, B1.getFullPath());
    CSQueue queueB2 = findQueue(queueB, B2.getFullPath());

    assertEquals(YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
        cs.getMaximumResourceCapability().getMemorySize(),
        "max capability MB in CS");
    assertEquals(YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
        cs.getMaximumResourceCapability().getVirtualCores(),
        "max capability vcores in CS");
    assertEquals(6144, queueA1.getMaximumAllocation().getMemorySize(),
        "max allocation MB A1");
    assertEquals(2, queueA1.getMaximumAllocation().getVirtualCores(),
        "max allocation vcores A1");
    assertEquals(4096, queueA2.getMaximumAllocation().getMemorySize(),
        "max allocation MB A2");
    assertEquals(2, queueA2.getMaximumAllocation().getVirtualCores(),
        "max allocation vcores A2");
    assertEquals(5120, queueB.getMaximumAllocation().getMemorySize(),
        "max allocation MB B");
    assertEquals(5120, queueB1.getMaximumAllocation().getMemorySize(),
        "max allocation MB B1");
    assertEquals(1024, queueB2.getMaximumAllocation().getMemorySize(),
        "max allocation MB B2");

    // Test get the max-allocation from different parent
    unsetMaxAllocation(conf, A1);
    unsetMaxAllocation(conf, B);
    unsetMaxAllocation(conf, B1);
    setMaxAllocation(conf, ROOT,
        "memory-mb=6144,vcores=2");
    setMaxAllocation(conf, A, "memory-mb=8192,vcores=2");

    cs.reinitialize(conf, rm.getRMContext());

    assertEquals(YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
        cs.getMaximumResourceCapability().getMemorySize(),
        "max capability MB in CS");
    assertEquals(YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
        cs.getMaximumResourceCapability().getVirtualCores(),
        "max capability vcores in CS");
    assertEquals(8192, queueA1.getMaximumAllocation().getMemorySize(),
        "max allocation MB A1");
    assertEquals(2, queueA1.getMaximumAllocation().getVirtualCores(),
        "max allocation vcores A1");
    assertEquals(6144, queueB1.getMaximumAllocation().getMemorySize(),
        "max allocation MB B1");
    assertEquals(2, queueB1.getMaximumAllocation().getVirtualCores(),
        "max allocation vcores B1");

    // Test the default
    unsetMaxAllocation(conf, ROOT);
    unsetMaxAllocation(conf, A);
    unsetMaxAllocation(conf, A1);
    cs.reinitialize(conf, rm.getRMContext());

    assertEquals(YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
        cs.getMaximumResourceCapability().getMemorySize(),
        "max capability MB in CS");
    assertEquals(YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
        cs.getMaximumResourceCapability().getVirtualCores(),
        "max capability vcores in CS");
    assertEquals(YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
        queueA1.getMaximumAllocation().getMemorySize(), "max allocation MB A1");
    assertEquals(YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
        queueA1.getMaximumAllocation().getVirtualCores(),
        "max allocation vcores A1");
    assertEquals(YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
        queueA2.getMaximumAllocation().getMemorySize(), "max allocation MB A2");
    assertEquals(YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
        queueA2.getMaximumAllocation().getVirtualCores(),
        "max allocation vcores A2");
    cs.stop();
  }

  @Test
  public void testVerifyQueuesMaxAllocationConf() throws Exception {
    // queue level max allocation can't exceed the cluster setting

    CapacityScheduler cs = new CapacityScheduler();
    cs.setConf(new YarnConfiguration());
    cs.setRMContext(rm.getRMContext());
    setMaxAllocMb(conf,
        YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
    setMaxAllocVcores(conf,
        YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES);

    long largerMem =
        YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB + 1024;
    long largerVcores =
        YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES + 10;

    cs.init(conf);
    cs.start();
    cs.reinitialize(conf, rm.getRMContext());
    checkQueueStructureCapacities(cs);

    setMaxAllocation(conf, ROOT,
        "memory-mb=" + largerMem + ",vcores=2");
    try {
      cs.reinitialize(conf, rm.getRMContext());
      fail("Queue Root maximum allocation can't exceed the cluster setting");
    } catch (Exception e) {
      assertTrue(e.getCause().getMessage().contains("maximum allocation"),
          "maximum allocation exception");
    }

    setMaxAllocation(conf, ROOT,
        "memory-mb=4096,vcores=2");
    setMaxAllocation(conf, A, "memory-mb=6144,vcores=2");
    setMaxAllocation(conf, A1, "memory-mb=" + largerMem + ",vcores=2");
    try {
      cs.reinitialize(conf, rm.getRMContext());
      fail("Queue A1 maximum allocation can't exceed the cluster setting");
    } catch (Exception e) {
      assertTrue(e.getCause().getMessage().contains("maximum allocation"),
          "maximum allocation exception");
    }
    setMaxAllocation(conf, A1, "memory-mb=8192" + ",vcores=" + largerVcores);
    try {
      cs.reinitialize(conf, rm.getRMContext());
      fail("Queue A1 maximum allocation can't exceed the cluster setting");
    } catch (Exception e) {
      assertTrue(e.getCause().getMessage().contains("maximum allocation"),
          "maximum allocation exception");
    }
    cs.stop();
  }
}