TestCapacitySchedulerConfigValidator.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.metrics2.MetricsSource;
import org.apache.hadoop.metrics2.MetricsSystem;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableMap;
import org.apache.hadoop.yarn.LocalConfigurationProvider;
import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.api.records.impl.LightWeightResource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.util.YarnVersionInfo;
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

import static org.apache.hadoop.yarn.api.records.ResourceInformation.GPU_URI;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class TestCapacitySchedulerConfigValidator {
  public static final int NODE_MEMORY = 16;
  public static final int NODE1_VCORES = 8;
  public static final int NODE2_VCORES = 10;
  public static final int NODE3_VCORES = 12;
  public static final Map<String, Long> NODE_GPU = ImmutableMap.of(GPU_URI, 2L);
  public static final int GB = 1024;

  private static final String PARENT_A = "parentA";
  private static final String PARENT_B = "parentB";
  private static final String LEAF_A = "leafA";
  private static final String LEAF_B = "leafB";

  private static final QueuePath PARENT_A_FULL_PATH =
          new QueuePath(CapacitySchedulerConfiguration.ROOT + "." + PARENT_A);
  private static final QueuePath LEAF_A_FULL_PATH =
          new QueuePath(PARENT_A_FULL_PATH + "." + LEAF_A);
  private static final QueuePath PARENT_B_FULL_PATH =
          new QueuePath(CapacitySchedulerConfiguration.ROOT + "." + PARENT_B);
  private static final QueuePath LEAF_B_FULL_PATH =
          new QueuePath(PARENT_B_FULL_PATH + "." + LEAF_B);

  private final Resource A_MINRES = Resource.newInstance(16 * GB, 10);
  private final Resource B_MINRES = Resource.newInstance(32 * GB, 5);
  private final Resource FULL_MAXRES = Resource.newInstance(48 * GB, 30);
  private final Resource PARTIAL_MAXRES = Resource.newInstance(16 * GB, 10);
  private final Resource VCORE_EXCEEDED_MAXRES = Resource.newInstance(16 * GB, 50);
  private Resource A_MINRES_GPU;
  private Resource B_MINRES_GPU;
  private Resource FULL_MAXRES_GPU;
  private Resource PARTIAL_MAXRES_GPU;
  private Resource GPU_EXCEEDED_MAXRES_GPU;

  protected MockRM mockRM = null;
  protected MockNM nm1 = null;
  protected MockNM nm2 = null;
  protected MockNM nm3 = null;
  protected CapacityScheduler cs;

  public static void setupResources(boolean useGpu) {
    Map<String, ResourceInformation> riMap = new HashMap<>();

    ResourceInformation memory = ResourceInformation.newInstance(
        ResourceInformation.MEMORY_MB.getName(),
        ResourceInformation.MEMORY_MB.getUnits(),
        YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
        YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
    ResourceInformation vcores = ResourceInformation.newInstance(
        ResourceInformation.VCORES.getName(),
        ResourceInformation.VCORES.getUnits(),
        YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
        YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES);
    riMap.put(ResourceInformation.MEMORY_URI, memory);
    riMap.put(ResourceInformation.VCORES_URI, vcores);
    if (useGpu) {
      riMap.put(ResourceInformation.GPU_URI,
          ResourceInformation.newInstance(ResourceInformation.GPU_URI, "", 0,
              ResourceTypes.COUNTABLE, 0, 10L));
    }

    ResourceUtils.initializeResourcesFromResourceInformationMap(riMap);
  }

  /**
   * Test for the case when the scheduler.minimum-allocation-mb == 0.
   */
  @Test
  public void testValidateMemoryAllocationInvalidMinMem() {
    assertThrows(YarnRuntimeException.class, () -> {
      Map<String, String> configs = new HashMap();
      configs.put(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, "0");
      Configuration config = CapacitySchedulerConfigGeneratorForTest
          .createConfiguration(configs);
      CapacitySchedulerConfigValidator.validateMemoryAllocation(config);
      fail(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB +
          " should be > 0");
    });
  }

  /**
   * Test for the case when the scheduler.minimum-allocation-mb is greater than
   * scheduler.maximum-allocation-mb.
   */
  @Test
  public void testValidateMemoryAllocationHIgherMinThanMaxMem() {
    assertThrows(YarnRuntimeException.class, () -> {
      Map<String, String> configs = new HashMap();
      configs.put(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, "8192");
      configs.put(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, "1024");
      Configuration config = CapacitySchedulerConfigGeneratorForTest
          .createConfiguration(configs);
      CapacitySchedulerConfigValidator.validateMemoryAllocation(config);
      fail(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB + " should be > "
          + YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
    });
  }

  @Test
  public void testValidateMemoryAllocation() {
    Map<String, String> configs = new HashMap();
    configs.put(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, "1024");
    configs.put(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, "8192");
    Configuration config = CapacitySchedulerConfigGeneratorForTest
            .createConfiguration(configs);
    // there is no need for assertion, since there is no further method call
    // inside the tested code and in case of a valid configuration no exception
    // is thrown
    CapacitySchedulerConfigValidator.validateMemoryAllocation(config);
  }

  /**
   * Test for the case when the scheduler.minimum-allocation-vcores == 0.
   */
  @Test
  public void testValidateVCoresInvalidMinVCore() {
    assertThrows(YarnRuntimeException.class, () -> {
      Map<String, String> configs = new HashMap();
      configs.put(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, "0");
      Configuration config = CapacitySchedulerConfigGeneratorForTest
          .createConfiguration(configs);
      CapacitySchedulerConfigValidator.validateVCores(config);
      fail(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES
          + " should be > 0");
    });
  }

  /**
   * Test for the case when the scheduler.minimum-allocation-vcores is greater
   * than scheduler.maximum-allocation-vcores.
   */
  @Test
  public void testValidateVCoresHigherMinThanMaxVCore() {
    assertThrows(YarnRuntimeException.class, () -> {
      Map<String, String> configs = new HashMap();
      configs.put(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, "4");
      configs.put(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES, "1");
      Configuration config = CapacitySchedulerConfigGeneratorForTest
          .createConfiguration(configs);
      CapacitySchedulerConfigValidator.validateVCores(config);
      fail(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES +
          " should be > "
          + YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
    });
  }

  @Test
  public void testValidateVCores() {
    Map<String, String> configs = new HashMap();
    configs.put(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, "1");
    configs.put(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES, "4");
    Configuration config = CapacitySchedulerConfigGeneratorForTest
            .createConfiguration(configs);
    // there is no need for assertion, since there is no further method call
    // inside the tested code and in case of a valid configuration no exception
    // is thrown
    CapacitySchedulerConfigValidator.validateVCores(config);
  }

  @Test
  public void testValidateCSConfigInvalidCapacity() {
    Configuration oldConfig = CapacitySchedulerConfigGeneratorForTest
            .createBasicCSConfiguration();
    Configuration newConfig = new Configuration(oldConfig);
    newConfig
            .set("yarn.scheduler.capacity.root.test1.capacity", "500");
    RMContext rmContext = prepareRMContext();
    try {
      CapacitySchedulerConfigValidator
              .validateCSConfiguration(oldConfig, newConfig, rmContext);
      fail("Invalid capacity");
    } catch (IOException e) {
      assertTrue(e.getCause().getMessage().startsWith("Illegal capacity"));
    }
  }

  @Test
  public void testValidateCSConfigDefaultRCAbsoluteModeParentMaxMemoryExceeded()
      throws Exception {
    setUpMockRM(false);
    RMContext rmContext = mockRM.getRMContext();
    CapacitySchedulerConfiguration oldConfiguration = cs.getConfiguration();
    CapacitySchedulerConfiguration newConfiguration =
        new CapacitySchedulerConfiguration(cs.getConfiguration());
    newConfiguration.setMaximumResourceRequirement("",
            LEAF_A_FULL_PATH, FULL_MAXRES);
    try {
      CapacitySchedulerConfigValidator
          .validateCSConfiguration(oldConfiguration, newConfiguration, rmContext);
      fail("Parent maximum capacity exceeded");
    } catch (IOException e) {
      assertTrue(e.getCause().getMessage()
          .startsWith("Max resource configuration"));
    } finally {
      mockRM.stop();
    }
  }

  @Test
  public void testValidateCSConfigDefaultRCAbsoluteModeParentMaxVcoreExceeded() throws Exception {
    setUpMockRM(false);
    RMContext rmContext = mockRM.getRMContext();
    CapacitySchedulerConfiguration oldConfiguration = cs.getConfiguration();
    CapacitySchedulerConfiguration newConfiguration =
        new CapacitySchedulerConfiguration(cs.getConfiguration());
    newConfiguration.setMaximumResourceRequirement("",
            LEAF_A_FULL_PATH, VCORE_EXCEEDED_MAXRES);
    try {
      CapacitySchedulerConfigValidator
          .validateCSConfiguration(oldConfiguration, newConfiguration, rmContext);
    } catch (IOException e) {
      fail("In DefaultResourceCalculator vcore limits are not enforced");
    } finally {
      mockRM.stop();
    }
  }

  @Test
  public void testValidateCSConfigDominantRCAbsoluteModeParentMaxMemoryExceeded()
      throws Exception {
    setUpMockRM(true);
    RMContext rmContext = mockRM.getRMContext();
    CapacitySchedulerConfiguration oldConfiguration = cs.getConfiguration();
    CapacitySchedulerConfiguration newConfiguration =
        new CapacitySchedulerConfiguration(cs.getConfiguration());
    newConfiguration.setMaximumResourceRequirement("",
            LEAF_A_FULL_PATH, FULL_MAXRES);
    try {
      CapacitySchedulerConfigValidator
          .validateCSConfiguration(oldConfiguration, newConfiguration, rmContext);
      fail("Parent maximum capacity exceeded");
    } catch (IOException e) {
      assertTrue(e.getCause().getMessage()
          .startsWith("Max resource configuration"));
    } finally {
      mockRM.stop();
    }
  }

  @Test
  public void testValidateCSConfigDominantRCAbsoluteModeParentMaxVcoreExceeded() throws Exception {
    setUpMockRM(true);
    RMContext rmContext = mockRM.getRMContext();
    CapacitySchedulerConfiguration oldConfiguration = cs.getConfiguration();
    CapacitySchedulerConfiguration newConfiguration =
        new CapacitySchedulerConfiguration(cs.getConfiguration());
    newConfiguration.setMaximumResourceRequirement("",
            LEAF_A_FULL_PATH, VCORE_EXCEEDED_MAXRES);
    try {
      CapacitySchedulerConfigValidator
          .validateCSConfiguration(oldConfiguration, newConfiguration, rmContext);
      fail("Parent maximum capacity exceeded");
    } catch (IOException e) {
      assertTrue(e.getCause().getMessage()
          .startsWith("Max resource configuration"));
    } finally {
      mockRM.stop();
    }
  }

  @Test
  public void testValidateCSConfigDominantRCAbsoluteModeParentMaxGPUExceeded() throws Exception {
    setUpMockRM(true);
    RMContext rmContext = mockRM.getRMContext();
    CapacitySchedulerConfiguration oldConfiguration = cs.getConfiguration();
    CapacitySchedulerConfiguration newConfiguration =
        new CapacitySchedulerConfiguration(cs.getConfiguration());
    newConfiguration.setMaximumResourceRequirement("",
            LEAF_A_FULL_PATH, GPU_EXCEEDED_MAXRES_GPU);
    try {
      CapacitySchedulerConfigValidator
          .validateCSConfiguration(oldConfiguration, newConfiguration, rmContext);
      fail("Parent maximum capacity exceeded");
    } catch (IOException e) {
      assertTrue(e.getCause().getMessage()
          .startsWith("Max resource configuration"));
    } finally {
      mockRM.stop();
    }
  }

  @Test
  public void testValidateCSConfigStopALeafQueue() throws IOException {
    Configuration oldConfig = CapacitySchedulerConfigGeneratorForTest
            .createBasicCSConfiguration();
    Configuration newConfig = new Configuration(oldConfig);
    newConfig
            .set("yarn.scheduler.capacity.root.test1.state", "STOPPED");
    RMContext rmContext = prepareRMContext();
    boolean isValidConfig = CapacitySchedulerConfigValidator
            .validateCSConfiguration(oldConfig, newConfig, rmContext);
    assertTrue(isValidConfig);
  }

  /**
   * Stop the root queue if there are running child queues.
   */
  @Test
  public void testValidateCSConfigStopANonLeafQueueInvalid() {
    Configuration oldConfig = CapacitySchedulerConfigGeneratorForTest
            .createBasicCSConfiguration();
    Configuration newConfig = new Configuration(oldConfig);
    newConfig
            .set("yarn.scheduler.capacity.root.state", "STOPPED");
    RMContext rmContext = prepareRMContext();
    try {
      CapacitySchedulerConfigValidator
              .validateCSConfiguration(oldConfig, newConfig, rmContext);
      fail("There are child queues in running state");
    } catch (IOException e) {
      assertTrue(e.getCause().getMessage()
              .contains("The parent queue:root cannot be STOPPED"));
    }
  }

  @Test
  public void testValidateCSConfigStopANonLeafQueue() throws IOException {
    Configuration oldConfig = CapacitySchedulerConfigGeneratorForTest
            .createBasicCSConfiguration();
    Configuration newConfig = new Configuration(oldConfig);
    newConfig
            .set("yarn.scheduler.capacity.root.state", "STOPPED");
    newConfig
            .set("yarn.scheduler.capacity.root.test1.state", "STOPPED");
    newConfig
            .set("yarn.scheduler.capacity.root.test2.state", "STOPPED");
    RMContext rmContext = prepareRMContext();
    Boolean isValidConfig = CapacitySchedulerConfigValidator
            .validateCSConfiguration(oldConfig, newConfig, rmContext);
    assertTrue(isValidConfig);

  }

  /**
   * Add a leaf queue without modifying the capacity of other leaf queues
   * so the total capacity != 100.
   */
  @Test
  public void testValidateCSConfigAddALeafQueueInvalid() {
    Configuration oldConfig = CapacitySchedulerConfigGeneratorForTest
            .createBasicCSConfiguration();
    CapacitySchedulerConfiguration newConfig = new CapacitySchedulerConfiguration(oldConfig);
    newConfig
            .set("yarn.scheduler.capacity.root.queues", "test1, test2, test3");
    newConfig
            .set("yarn.scheduler.capacity.root.test3.state", "RUNNING");
    newConfig
            .set("yarn.scheduler.capacity.root.test3.capacity", "30");

    RMContext rmContext = prepareRMContext();
    try {
      CapacitySchedulerConfigValidator
              .validateCSConfiguration(oldConfig, newConfig, rmContext);
      if (newConfig.isLegacyQueueMode()) {
        fail("Invalid capacity for children of queue root");
      }
    } catch (IOException e) {
      assertTrue(e.getCause().getMessage()
              .startsWith("Illegal capacity"));
    }
  }

  /**
   * Add a leaf queue by modifying the capacity of other leaf queues
   * and adjust the capacities of other leaf queues, so total capacity = 100.
   */
  @Test
  public void testValidateCSConfigAddALeafQueueValid() throws IOException {
    Configuration oldConfig = CapacitySchedulerConfigGeneratorForTest
            .createBasicCSConfiguration();
    Configuration newConfig = new Configuration(oldConfig);
    newConfig
            .set("yarn.scheduler.capacity.root.queues", "test1, test2, test3");
    newConfig
            .set("yarn.scheduler.capacity.root.test3.state", "RUNNING");
    newConfig
            .set("yarn.scheduler.capacity.root.test3.capacity", "30");
    newConfig
            .set("yarn.scheduler.capacity.root.test1.capacity", "20");

    RMContext rmContext = prepareRMContext();
    Boolean isValidConfig = CapacitySchedulerConfigValidator
            .validateCSConfiguration(oldConfig, newConfig, rmContext);
    assertTrue(isValidConfig);
  }

  @Test
  public void testValidateDoesNotModifyTheDefaultMetricsSystem() throws Exception {
    try {
      YarnConfiguration conf = new YarnConfiguration(CapacitySchedulerConfigGeneratorForTest
          .createBasicCSConfiguration());
      conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
          ResourceScheduler.class);
      mockRM = new MockRM(conf);

      cs = (CapacityScheduler) mockRM.getResourceScheduler();
      mockRM.start();
      cs.start();
      RMContext rmContext = mockRM.getRMContext();
      Configuration oldConfig = cs.getConfig();

      final Map<String, QueueMetrics> cache = QueueMetrics.getQueueMetrics();
      final MetricsSystem ms = DefaultMetricsSystem.instance();

      QueueMetrics origQM1 = cache.get("root.test1");
      QueueMetrics origQM2 = cache.get("root.test2");
      assertNotNull(origQM1, "Original queues should be found in the cache");
      assertNotNull(origQM2, "Original queues should be found in the cache");

      QueueMetrics origPQM1 = cache.get("default.root.test1");
      QueueMetrics origPQM2 = cache.get("default.root.test2");
      assertNotNull(origPQM1,
          "Original queues should be found in the cache (PartitionQueueMetrics)");
      assertNotNull(origPQM2,
          "Original queues should be found in the cache (PartitionQueueMetrics)");

      MetricsSource origMS1 =
          ms.getSource("QueueMetrics,q0=root,q1=test1");
      MetricsSource origMS2 =
          ms.getSource("QueueMetrics,q0=root,q1=test2");
      assertNotNull(origMS1,
          "Original queues should be found in the Metrics System");
      assertNotNull(origMS2,
          "Original queues should be found in the Metrics System");

      MetricsSource origPMS1 = ms
          .getSource("PartitionQueueMetrics,partition=,q0=root,q1=test1");
      MetricsSource origPMS2 = ms
          .getSource("PartitionQueueMetrics,partition=,q0=root,q1=test2");
      assertNotNull(origPMS1,
          "Original queues should be found in Metrics System (PartitionQueueMetrics)");
      assertNotNull(origPMS2,
          "Original queues should be found in Metrics System (PartitionQueueMetrics)");

      Configuration newConfig = new Configuration(oldConfig);
      newConfig
          .set("yarn.scheduler.capacity.root.queues", "test1, test2, test3");
      newConfig
          .set("yarn.scheduler.capacity.root.test3.state", "RUNNING");
      newConfig
          .set("yarn.scheduler.capacity.root.test3.capacity", "30");
      newConfig
          .set("yarn.scheduler.capacity.root.test1.capacity", "20");

      boolean isValidConfig = CapacitySchedulerConfigValidator
          .validateCSConfiguration(oldConfig, newConfig, rmContext);
      assertTrue(isValidConfig);

      assertFalse(cache.containsKey("root.test3"),
          "Validated new queue should not be in the cache");
      assertFalse(cache.containsKey("default.root.test3"),
          "Validated new queue should not be in the cache (PartitionQueueMetrics)");
      assertNull(ms.getSource("QueueMetrics,q0=root,q1=test3"),
          "Validated new queue should not be in the Metrics System");
      assertNull(ms.getSource("PartitionQueueMetrics,partition=,q0=root,q1=test3"),
          "Validated new queue should not be in Metrics System (PartitionQueueMetrics)");

      // Config validation should not change the existing
      // objects in the cache and the metrics system
      assertEquals(origQM1, cache.get("root.test1"));
      assertEquals(origQM2, cache.get("root.test2"));
      assertEquals(origPQM1, cache.get("default.root.test1"));
      assertEquals(origPQM1, cache.get("default.root.test1"));
      assertEquals(origMS1,
          ms.getSource("QueueMetrics,q0=root,q1=test1"));
      assertEquals(origMS2,
          ms.getSource("QueueMetrics,q0=root,q1=test2"));
      assertEquals(origPMS1,
          ms.getSource("PartitionQueueMetrics,partition=,q0=root,q1=test1"));
      assertEquals(origPMS2,
          ms.getSource("PartitionQueueMetrics,partition=,q0=root,q1=test2"));
    } finally {
      mockRM.stop();
    }
  }

  /**
   * Delete a running queue.
   */
  @Test
  public void testValidateCSConfigInvalidQueueDeletion() {
    Configuration oldConfig = CapacitySchedulerConfigGeneratorForTest
            .createBasicCSConfiguration();
    Configuration newConfig = new Configuration(oldConfig);
    newConfig.set("yarn.scheduler.capacity.root.queues", "test1");
    newConfig.set("yarn.scheduler.capacity.root.test1.capacity", "100");
    newConfig.unset("yarn.scheduler.capacity.root.test2.maximum-capacity");
    newConfig.unset("yarn.scheduler.capacity.root.test2.state");
    newConfig.set("yarn.scheduler.capacity.queue-mappings",
            "u:test1:test1");
    RMContext rmContext = prepareRMContext();
    try {
      CapacitySchedulerConfigValidator
              .validateCSConfiguration(oldConfig, newConfig, rmContext);
      fail("Invalid capacity for children of queue root");
    } catch (IOException e) {
      assertTrue(e.getCause().getMessage()
              .contains("root.test2 cannot be deleted"));
      assertTrue(e.getCause().getMessage()
              .contains("the queue is not yet in stopped state"));
    }
  }

  /**
   * Delete a queue and not adjust capacities.
   */
  @Test
  public void testValidateCSConfigInvalidQueueDeletion2() {
    Configuration oldConfig = CapacitySchedulerConfigGeneratorForTest
            .createBasicCSConfiguration();
    oldConfig.set("yarn.scheduler.capacity.root.test2.state", "STOPPED");
    CapacitySchedulerConfiguration newConfig = new CapacitySchedulerConfiguration(oldConfig);
    newConfig.set("yarn.scheduler.capacity.root.queues", "test1");
    newConfig.unset("yarn.scheduler.capacity.root.test2.maximum-capacity");
    newConfig.unset("yarn.scheduler.capacity.root.test2.state");
    newConfig.set("yarn.scheduler.capacity.queue-mappings",
            "u:test1:test1");
    RMContext rmContext = prepareRMContext();
    try {
      CapacitySchedulerConfigValidator
              .validateCSConfiguration(oldConfig, newConfig, rmContext);
      if (newConfig.isLegacyQueueMode()) {
        fail("Invalid capacity for children of queue root");
      }
    } catch (IOException e) {
      assertTrue(e.getCause().getMessage()
              .contains("Illegal capacity"));
    }
  }

  /**
   * Delete a queue and adjust capacities to have total capacity = 100.
   */
  @Test
  public void testValidateCSConfigValidQueueDeletion() throws IOException {
    Configuration oldConfig = CapacitySchedulerConfigGeneratorForTest
            .createBasicCSConfiguration();
    oldConfig.set("yarn.scheduler.capacity.root.test2.state", "STOPPED");
    Configuration newConfig = new Configuration(oldConfig);
    newConfig.set("yarn.scheduler.capacity.root.queues", "test1");
    newConfig.set("yarn.scheduler.capacity.root.test1.capacity", "100");
    newConfig.unset("yarn.scheduler.capacity.root.test2.maximum-capacity");
    newConfig.unset("yarn.scheduler.capacity.root.test2.state");
    newConfig.set("yarn.scheduler.capacity.queue-mappings",
            "u:test1:test1");
    RMContext rmContext = prepareRMContext();
    boolean isValidConfig = CapacitySchedulerConfigValidator
              .validateCSConfiguration(oldConfig, newConfig, rmContext);
    assertTrue(isValidConfig);

  }

  @Test
  public void testAddQueueToALeafQueue() throws IOException {
    Configuration oldConfig = CapacitySchedulerConfigGeneratorForTest
            .createBasicCSConfiguration();
    oldConfig.set("yarn.scheduler.capacity.root.test1.state", "STOPPED");
    Configuration newConfig = new Configuration(oldConfig);
    newConfig.set("yarn.scheduler.capacity.root.test1.queues", "newQueue");
    newConfig
            .set("yarn.scheduler.capacity.root.test1.newQueue.capacity", "100");
    newConfig.set("yarn.scheduler.capacity.queue-mappings",
            "u:test1:test2");
    RMContext rmContext = prepareRMContext();
    boolean isValidConfig = CapacitySchedulerConfigValidator
            .validateCSConfiguration(oldConfig, newConfig, rmContext);
    assertTrue(isValidConfig);
  }

  public static RMContext prepareRMContext() {
    setupResources(false);
    RMContext rmContext = mock(RMContext.class);
    CapacityScheduler mockCs = mock(CapacityScheduler.class);
    when(rmContext.getScheduler()).thenReturn(mockCs);
    LocalConfigurationProvider configProvider = mock(LocalConfigurationProvider.class);
    when(rmContext.getConfigurationProvider())
            .thenReturn(configProvider);
    RMNodeLabelsManager nodeLabelsManager = mock(RMNodeLabelsManager.class);
    when(rmContext.getNodeLabelManager()).thenReturn(nodeLabelsManager);
    LightWeightResource partitionResource = mock(LightWeightResource.class);
    when(nodeLabelsManager
        .getResourceByLabel(any(), any()))
        .thenReturn(partitionResource);
    PlacementManager queuePlacementManager = mock(PlacementManager.class);
    when(rmContext.getQueuePlacementManager())
        .thenReturn(queuePlacementManager);
    return rmContext;
  }

  private void setUpMockRM(boolean useDominantRC) throws Exception {
    YarnConfiguration conf = new YarnConfiguration();
    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
        ResourceScheduler.class);
    setupResources(useDominantRC);
    CapacitySchedulerConfiguration csConf = setupCSConfiguration(conf, useDominantRC);

    mockRM = new MockRM(csConf);

    cs = (CapacityScheduler) mockRM.getResourceScheduler();
    mockRM.start();
    cs.start();

    setupNodes(mockRM);
  }

  private void setupNodes(MockRM newMockRM) throws Exception {
      nm1 = new MockNM("h1:1234",
          Resource.newInstance(NODE_MEMORY * GB, NODE1_VCORES, NODE_GPU),
          newMockRM.getResourceTrackerService(),
          YarnVersionInfo.getVersion());

      nm1.registerNode();

      nm2 = new MockNM("h2:1234",
          Resource.newInstance(NODE_MEMORY * GB, NODE2_VCORES, NODE_GPU),
          newMockRM.getResourceTrackerService(),
          YarnVersionInfo.getVersion());
      nm2.registerNode();

      nm3 = new MockNM("h3:1234",
          Resource.newInstance(NODE_MEMORY * GB, NODE3_VCORES, NODE_GPU),
          newMockRM.getResourceTrackerService(),
          YarnVersionInfo.getVersion());
      nm3.registerNode();
  }

  private void setupGpuResourceValues() {
    A_MINRES_GPU = Resource.newInstance(A_MINRES.getMemorySize(), A_MINRES.getVirtualCores(),
        ImmutableMap.of(GPU_URI, 2L));
    B_MINRES_GPU =  Resource.newInstance(B_MINRES.getMemorySize(), B_MINRES.getVirtualCores(),
        ImmutableMap.of(GPU_URI, 2L));
    FULL_MAXRES_GPU = Resource.newInstance(FULL_MAXRES.getMemorySize(),
        FULL_MAXRES.getVirtualCores(), ImmutableMap.of(GPU_URI, 6L));
    PARTIAL_MAXRES_GPU = Resource.newInstance(PARTIAL_MAXRES.getMemorySize(),
        PARTIAL_MAXRES.getVirtualCores(), ImmutableMap.of(GPU_URI, 4L));
    GPU_EXCEEDED_MAXRES_GPU = Resource.newInstance(PARTIAL_MAXRES.getMemorySize(),
        PARTIAL_MAXRES.getVirtualCores(), ImmutableMap.of(GPU_URI, 50L));
  }

  private CapacitySchedulerConfiguration setupCSConfiguration(YarnConfiguration configuration,
                                                              boolean useDominantRC) {
    CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(configuration);
    if (useDominantRC) {
      csConf.set(CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS,
          DominantResourceCalculator.class.getName());
      csConf.set(YarnConfiguration.RESOURCE_TYPES, ResourceInformation.GPU_URI);
    }

    csConf.setQueues(new QueuePath(CapacitySchedulerConfiguration.ROOT),
        new String[]{PARENT_A, PARENT_B});
    csConf.setQueues(PARENT_A_FULL_PATH, new String[]{LEAF_A});
    csConf.setQueues(PARENT_B_FULL_PATH, new String[]{LEAF_B});

    if (useDominantRC) {
      setupGpuResourceValues();
      csConf.setMinimumResourceRequirement("", PARENT_A_FULL_PATH, A_MINRES_GPU);
      csConf.setMinimumResourceRequirement("", PARENT_B_FULL_PATH, B_MINRES_GPU);
      csConf.setMinimumResourceRequirement("", LEAF_A_FULL_PATH, A_MINRES_GPU);
      csConf.setMinimumResourceRequirement("", LEAF_B_FULL_PATH, B_MINRES_GPU);

      csConf.setMaximumResourceRequirement("", PARENT_A_FULL_PATH, PARTIAL_MAXRES_GPU);
      csConf.setMaximumResourceRequirement("", PARENT_B_FULL_PATH, FULL_MAXRES_GPU);
      csConf.setMaximumResourceRequirement("", LEAF_A_FULL_PATH, PARTIAL_MAXRES_GPU);
      csConf.setMaximumResourceRequirement("", LEAF_B_FULL_PATH, FULL_MAXRES_GPU);
    } else {
      csConf.setMinimumResourceRequirement("", PARENT_A_FULL_PATH, A_MINRES);
      csConf.setMinimumResourceRequirement("", PARENT_B_FULL_PATH, B_MINRES);
      csConf.setMinimumResourceRequirement("", LEAF_A_FULL_PATH, A_MINRES);
      csConf.setMinimumResourceRequirement("", LEAF_B_FULL_PATH, B_MINRES);

      csConf.setMaximumResourceRequirement("", PARENT_A_FULL_PATH, PARTIAL_MAXRES);
      csConf.setMaximumResourceRequirement("", PARENT_B_FULL_PATH, FULL_MAXRES);
      csConf.setMaximumResourceRequirement("", LEAF_A_FULL_PATH, PARTIAL_MAXRES);
      csConf.setMaximumResourceRequirement("", LEAF_B_FULL_PATH, FULL_MAXRES);
    }

    return csConf;
  }
}