TestRMWebServicesConfigurationMutation.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.server.resourcemanager.webapp;

import org.glassfish.jersey.internal.inject.AbstractBinder;
import org.glassfish.jersey.jettison.JettisonFeature;
import org.glassfish.jersey.server.ResourceConfig;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.http.JettyUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Sets;
import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueuePath;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueuePrefixes;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeLabelInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeLabelsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.reader.NodeLabelsInfoReader;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.writer.SchedConfUpdateInfoWriter;
import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
import org.apache.hadoop.yarn.webapp.JerseyTestBase;
import org.apache.hadoop.yarn.webapp.dao.QueueConfigInfo;
import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo;

import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.client.Entity;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.Application;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;

import java.io.FileOutputStream;
import java.io.IOException;
import java.security.Principal;
import java.util.HashMap;
import java.util.Map;

import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.ACCESSIBLE_NODE_LABELS;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.CAPACITY;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.MAXIMUM_CAPACITY;
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.TestWebServiceUtil.getCapacitySchedulerConfigFileInTarget;
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.TestWebServiceUtil.backupSchedulerConfigFileInTarget;
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.TestWebServiceUtil.restoreSchedulerConfigFileInTarget;
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.TestWebServiceUtil.toJson;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.ORDERING_POLICY;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

/**
 * Test scheduler configuration mutation via REST API.
 */
public class TestRMWebServicesConfigurationMutation extends JerseyTestBase {
  private static final Logger LOG = LoggerFactory
      .getLogger(TestRMWebServicesConfigurationMutation.class);
  private static final String LABEL_1 = "label1";
  public static final QueuePath ROOT = new QueuePath("root");
  public static final QueuePath ROOT_A = new QueuePath("root", "a");
  public static final QueuePath ROOT_A_A1 = QueuePath.createFromQueues("root", "a", "a1");
  public static final QueuePath ROOT_A_A2 = QueuePath.createFromQueues("root", "a", "a2");
  public static final QueuePath ROOT_B = new QueuePath("root", "b");
  public static final QueuePath ROOT_C = new QueuePath("root", "c");
  public static final QueuePath ROOT_C_C1 = QueuePath.createFromQueues("root", "c", "c1");
  public static final QueuePath ROOT_D = new QueuePath("root", "d");
  private static MockRM rm;
  private static String userName;
  private static CapacitySchedulerConfiguration csConf;
  private static YarnConfiguration conf;
  private HttpServletRequest request;

  protected Application configure() {
    ResourceConfig config = new ResourceConfig();
    config.register(RMWebServices.class);
    config.register(new JerseyBinder());
    config.register(GenericExceptionHandler.class);
    config.register(NodeLabelsInfoReader.class);
    config.register(TestRMWebServicesAppsModification.TestRMCustomAuthFilter.class);
    config.register(new JettisonFeature()).register(JAXBContextResolver.class);
    return config;
  }

  private class JerseyBinder extends AbstractBinder {
    @Override
    protected void configure() {
      try {
        userName = UserGroupInformation.getCurrentUser().getShortUserName();
      } catch (IOException ioe) {
        throw new RuntimeException("Unable to get current user name "
            + ioe.getMessage(), ioe);
      }
      csConf = new CapacitySchedulerConfiguration(new Configuration(false),
          false);
      setupQueueConfiguration(csConf);
      conf = new YarnConfiguration();
      conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
          ResourceScheduler.class);
      conf.set(YarnConfiguration.SCHEDULER_CONFIGURATION_STORE_CLASS,
          YarnConfiguration.MEMORY_CONFIGURATION_STORE);
      conf.set(YarnConfiguration.YARN_ADMIN_ACL, userName);
      try {
        FileOutputStream out = new FileOutputStream(getCapacitySchedulerConfigFileInTarget());
        csConf.writeXml(out);
        out.close();
      } catch (IOException e) {
        throw new RuntimeException("Failed to write XML file", e);
      }
      rm = new MockRM(conf);

      request = mock(HttpServletRequest.class);
      when(request.getScheme()).thenReturn("http");
      final HttpServletResponse response = mock(HttpServletResponse.class);
      bind(rm).to(ResourceManager.class).named("rm");
      bind(csConf).to(Configuration.class).named("conf");
      Principal principal = () -> userName;
      bind(request).to(HttpServletRequest.class);
      when(request.getUserPrincipal()).thenReturn(principal);
      bind(response).to(HttpServletResponse.class);
    }
  }


  @BeforeAll
  public static void beforeClass() {
    backupSchedulerConfigFileInTarget();
  }

  @AfterAll
  public static void afterClass() {
    restoreSchedulerConfigFileInTarget();
  }

  @Override
  @BeforeEach
  public void setUp() throws Exception {
    super.setUp();
  }

  private static void setupQueueConfiguration(
      CapacitySchedulerConfiguration config) {
    config.setQueues(ROOT, new String[]{"a", "b", "c", "mappedqueue"});

    config.setCapacity(ROOT_A, 25f);
    config.setMaximumCapacity(ROOT_A, 50f);

    config.setQueues(ROOT_A, new String[]{"a1", "a2"});
    config.setCapacity(ROOT_A_A1, 100f);
    config.setCapacity(ROOT_A_A2, 0f);

    config.setCapacity(ROOT_B, 75f);

    config.setCapacity(ROOT_C, 0f);

    config.setQueues(ROOT_C, new String[] {"c1"});
    config.setCapacity(ROOT_C_C1, 0f);

    config.setCapacity(ROOT_D, 0f);
    config.set(CapacitySchedulerConfiguration.QUEUE_MAPPING,
        "g:hadoop:mappedqueue");
  }

  public TestRMWebServicesConfigurationMutation() {
  }

  private CapacitySchedulerConfiguration getSchedulerConf()
      throws JSONException {
    WebTarget r = targetWithJsonObject();
    Response response =
        r.path("ws").path("v1").path("cluster")
            .queryParam("user.name", userName).path("scheduler-conf")
            .request(MediaType.APPLICATION_JSON)
            .get(Response.class);
    assertEquals(Status.OK.getStatusCode(), response.getStatus());
    JSONObject json = response.readEntity(JSONObject.class).
        getJSONObject("configuration");
    JSONArray items = (JSONArray) json.get("property");
    CapacitySchedulerConfiguration parsedConf =
        new CapacitySchedulerConfiguration();
    for (int i = 0; i < items.length(); i++) {
      JSONObject obj = (JSONObject) items.get(i);
      parsedConf.set(obj.get("name").toString(),
          obj.get("value").toString());
    }
    return parsedConf;
  }

  @Test
  public void testGetSchedulerConf() throws Exception {
    CapacitySchedulerConfiguration orgConf = getSchedulerConf();
    assertNotNull(orgConf);
    assertEquals(4, orgConf.getQueues(ROOT).size());
  }

  @Test
  public void testFormatSchedulerConf() throws Exception {
    CapacitySchedulerConfiguration newConf = getSchedulerConf();
    assertNotNull(newConf);
    assertEquals(4, newConf.getQueues(ROOT).size());

    SchedConfUpdateInfo updateInfo = new SchedConfUpdateInfo();
    Map<String, String> nearEmptyCapacity = new HashMap<>();
    nearEmptyCapacity.put(CapacitySchedulerConfiguration.CAPACITY, "1E-4");
    QueueConfigInfo d = new QueueConfigInfo("root.formattest",
        nearEmptyCapacity);
    updateInfo.getAddQueueInfo().add(d);

    Map<String, String> stoppedParam = new HashMap<>();
    stoppedParam.put(CapacitySchedulerConfiguration.STATE,
        QueueState.STOPPED.toString());
    QueueConfigInfo stoppedInfo = new QueueConfigInfo("root.formattest",
        stoppedParam);
    updateInfo.getUpdateQueueInfo().add(stoppedInfo);

    // Add a queue root.formattest to the existing three queues
    WebTarget r = target().register(SchedConfUpdateInfoWriter.class);
    Response response = r.path("ws").path("v1").path("cluster")
        .path("scheduler-conf").queryParam("user.name", userName)
        .request(MediaType.APPLICATION_JSON)
        .put(Entity.entity(updateInfo, MediaType.APPLICATION_JSON), Response.class);
    newConf = getSchedulerConf();
    assertNotNull(newConf);
    assertEquals(5, newConf.getQueues(ROOT).size());

    // Format the scheduler config and validate root.formattest is not present
    response = r.path("ws").path("v1").path("cluster")
        .queryParam("user.name", userName)
        .path(RMWSConsts.FORMAT_SCHEDULER_CONF)
        .request(MediaType.APPLICATION_JSON).get(Response.class);
    assertEquals(Status.OK.getStatusCode(), response.getStatus());
    newConf = getSchedulerConf();
    assertEquals(4, newConf.getQueues(ROOT).size());
  }

  private long getConfigVersion() throws Exception {
    WebTarget r = targetWithJsonObject();
    Response response = r.path("ws").path("v1").path("cluster")
        .queryParam("user.name", userName)
        .path(RMWSConsts.SCHEDULER_CONF_VERSION)
        .request(MediaType.APPLICATION_JSON).get(Response.class);
    assertEquals(Status.OK.getStatusCode(), response.getStatus());

    JSONObject json = response.readEntity(JSONObject.class).
        getJSONObject("configversion");
    return Long.parseLong(json.get("versionID").toString());
  }

  @Test
  public void testSchedulerConfigVersion() throws Exception {
    assertEquals(1, getConfigVersion());
    testAddNestedQueue();
    assertEquals(2, getConfigVersion());
  }

  @Test
  public void testAddNestedQueue() throws Exception {
    CapacitySchedulerConfiguration orgConf = getSchedulerConf();
    assertNotNull(orgConf);
    assertEquals(4, orgConf.getQueues(ROOT).size());

    WebTarget r = target().register(SchedConfUpdateInfoWriter.class);

    Response response;

    // Add parent queue root.d with two children d1 and d2.
    SchedConfUpdateInfo updateInfo = new SchedConfUpdateInfo();
    Map<String, String> d1Capacity = new HashMap<>();
    d1Capacity.put(CapacitySchedulerConfiguration.CAPACITY, "25");
    d1Capacity.put(CapacitySchedulerConfiguration.MAXIMUM_CAPACITY, "25");
    Map<String, String> nearEmptyCapacity = new HashMap<>();
    nearEmptyCapacity.put(CapacitySchedulerConfiguration.CAPACITY, "1E-4");
    nearEmptyCapacity.put(CapacitySchedulerConfiguration.MAXIMUM_CAPACITY,
        "1E-4");
    Map<String, String> d2Capacity = new HashMap<>();
    d2Capacity.put(CapacitySchedulerConfiguration.CAPACITY, "75");
    d2Capacity.put(CapacitySchedulerConfiguration.MAXIMUM_CAPACITY, "75");
    QueueConfigInfo d1 = new QueueConfigInfo("root.d.d1", d1Capacity);
    QueueConfigInfo d2 = new QueueConfigInfo("root.d.d2", d2Capacity);
    QueueConfigInfo d = new QueueConfigInfo("root.d", nearEmptyCapacity);
    updateInfo.getAddQueueInfo().add(d1);
    updateInfo.getAddQueueInfo().add(d2);
    updateInfo.getAddQueueInfo().add(d);
    response =
        r.path("ws").path("v1").path("cluster")
            .path("scheduler-conf").queryParam("user.name", userName)
            .request(MediaType.APPLICATION_JSON)
            .put(Entity.entity(updateInfo, MediaType.APPLICATION_JSON), Response.class);

    assertEquals(Status.OK.getStatusCode(), response.getStatus());
    CapacitySchedulerConfiguration newCSConf =
        ((CapacityScheduler) rm.getResourceScheduler()).getConfiguration();
    assertEquals(5, newCSConf.getQueues(ROOT).size());
    assertEquals(2, newCSConf.getQueues(ROOT_D).size());
    assertEquals(25.0f, newCSConf.getNonLabeledQueueCapacity(new QueuePath("root.d.d1")),
        0.01f);
    assertEquals(75.0f, newCSConf.getNonLabeledQueueCapacity(new QueuePath("root.d.d2")),
        0.01f);

    CapacitySchedulerConfiguration newConf = getSchedulerConf();
    assertNotNull(newConf);
    assertEquals(5, newConf.getQueues(ROOT).size());
  }

  @Test
  public void testAddWithUpdate() throws Exception {
    WebTarget r = target().register(SchedConfUpdateInfoWriter.class);

    Response response;

    // Add root.d with capacity 25, reducing root.b capacity from 75 to 50.
    SchedConfUpdateInfo updateInfo = new SchedConfUpdateInfo();
    Map<String, String> dCapacity = new HashMap<>();
    dCapacity.put(CapacitySchedulerConfiguration.CAPACITY, "25");
    Map<String, String> bCapacity = new HashMap<>();
    bCapacity.put(CapacitySchedulerConfiguration.CAPACITY, "50");
    QueueConfigInfo d = new QueueConfigInfo("root.d", dCapacity);
    QueueConfigInfo b = new QueueConfigInfo("root.b", bCapacity);
    updateInfo.getAddQueueInfo().add(d);
    updateInfo.getUpdateQueueInfo().add(b);
    response =
        r.path("ws").path("v1").path("cluster")
            .path("scheduler-conf").queryParam("user.name", userName)
            .request(MediaType.APPLICATION_JSON)
            .put(Entity.entity(updateInfo, MediaType.APPLICATION_JSON), Response.class);

    assertEquals(Status.OK.getStatusCode(), response.getStatus());
    CapacitySchedulerConfiguration newCSConf =
        ((CapacityScheduler) rm.getResourceScheduler()).getConfiguration();
    assertEquals(5, newCSConf.getQueues(ROOT).size());
    assertEquals(25.0f, newCSConf.getNonLabeledQueueCapacity(new QueuePath("root.d")), 0.01f);
    assertEquals(50.0f, newCSConf.getNonLabeledQueueCapacity(new QueuePath("root.b")), 0.01f);
  }

  @Test
  public void testUnsetParentQueueOrderingPolicy() throws Exception {
    WebTarget r = target().register(SchedConfUpdateInfoWriter.class);
    Response response;

    // Update ordering policy of Leaf Queue root.b to fair
    SchedConfUpdateInfo updateInfo1 = new SchedConfUpdateInfo();
    Map<String, String> updateParam = new HashMap<>();
    updateParam.put(CapacitySchedulerConfiguration.ORDERING_POLICY,
        "fair");
    QueueConfigInfo aUpdateInfo = new QueueConfigInfo("root.b", updateParam);
    updateInfo1.getUpdateQueueInfo().add(aUpdateInfo);
    response = r.path("ws").path("v1").path("cluster")
        .path("scheduler-conf").queryParam("user.name", userName)
        .request(MediaType.APPLICATION_JSON)
        .put(Entity.entity(updateInfo1, MediaType.APPLICATION_JSON), Response.class);
    assertEquals(Status.OK.getStatusCode(), response.getStatus());
    CapacitySchedulerConfiguration newCSConf =
        ((CapacityScheduler) rm.getResourceScheduler()).getConfiguration();
    String bOrderingPolicy = CapacitySchedulerConfiguration.PREFIX
        + "root.b" + CapacitySchedulerConfiguration.DOT + ORDERING_POLICY;
    assertEquals("fair", newCSConf.get(bOrderingPolicy));

    stopQueue(ROOT_B);

    // Add root.b.b1 which makes root.b a Parent Queue
    SchedConfUpdateInfo updateInfo2 = new SchedConfUpdateInfo();
    Map<String, String> capacity = new HashMap<>();
    capacity.put(CapacitySchedulerConfiguration.CAPACITY, "100");
    QueueConfigInfo b1 = new QueueConfigInfo("root.b.b1", capacity);
    updateInfo2.getAddQueueInfo().add(b1);
    response = r.path("ws").path("v1").path("cluster")
        .path("scheduler-conf").queryParam("user.name", userName)
        .request(MediaType.APPLICATION_JSON)
        .put(Entity.entity(updateInfo2, MediaType.APPLICATION_JSON), Response.class);

    // Validate unset ordering policy of root.b after converted to
    // Parent Queue
    assertEquals(Status.OK.getStatusCode(), response.getStatus());
    newCSConf = ((CapacityScheduler) rm.getResourceScheduler())
        .getConfiguration();
    bOrderingPolicy = CapacitySchedulerConfiguration.PREFIX
        + "root.b" + CapacitySchedulerConfiguration.DOT + ORDERING_POLICY;
    assertNull(newCSConf.get(bOrderingPolicy),
        "Failed to unset Parent Queue OrderingPolicy");
  }

  @Test
  public void testUnsetLeafQueueOrderingPolicy() throws Exception {
    WebTarget r = target().register(SchedConfUpdateInfoWriter.class);
    Response response;

    // Update ordering policy of Parent Queue root.c to priority-utilization
    SchedConfUpdateInfo updateInfo1 = new SchedConfUpdateInfo();
    Map<String, String> updateParam = new HashMap<>();
    updateParam.put(CapacitySchedulerConfiguration.ORDERING_POLICY,
        "priority-utilization");
    QueueConfigInfo aUpdateInfo = new QueueConfigInfo("root.c", updateParam);
    updateInfo1.getUpdateQueueInfo().add(aUpdateInfo);
    response = r.path("ws").path("v1").path("cluster")
        .path("scheduler-conf").queryParam("user.name", userName)
        .request(MediaType.APPLICATION_JSON)
        .put(Entity.entity(updateInfo1, MediaType.APPLICATION_JSON), Response.class);
    assertEquals(Status.OK.getStatusCode(), response.getStatus());
    CapacitySchedulerConfiguration newCSConf =
        ((CapacityScheduler) rm.getResourceScheduler()).getConfiguration();
    String cOrderingPolicy = CapacitySchedulerConfiguration.PREFIX
        + "root.c" + CapacitySchedulerConfiguration.DOT + ORDERING_POLICY;
    assertEquals("priority-utilization", newCSConf.get(cOrderingPolicy));

    stopQueue(ROOT_C_C1);

    // Remove root.c.c1 which makes root.c a Leaf Queue
    SchedConfUpdateInfo updateInfo2 = new SchedConfUpdateInfo();
    updateInfo2.getRemoveQueueInfo().add("root.c.c1");
    response = r.path("ws").path("v1").path("cluster")
        .path("scheduler-conf").queryParam("user.name", userName)
        .request(MediaType.APPLICATION_JSON)
        .put(Entity.entity(updateInfo2, MediaType.APPLICATION_JSON), Response.class);
    assertEquals(Status.OK.getStatusCode(), response.getStatus());

    // Validate unset ordering policy of root.c after converted to
    // Leaf Queue
    newCSConf = ((CapacityScheduler) rm.getResourceScheduler())
        .getConfiguration();
    cOrderingPolicy = CapacitySchedulerConfiguration.PREFIX
        + "root.c" + CapacitySchedulerConfiguration.DOT + ORDERING_POLICY;
    assertNull(newCSConf.get(cOrderingPolicy),
        "Failed to unset Leaf Queue OrderingPolicy");
  }

  @Test
  public void testRemoveQueue() throws Exception {
    WebTarget r = target().register(SchedConfUpdateInfoWriter.class);

    Response response;

    stopQueue(ROOT_A_A2);
    // Remove root.a.a2
    SchedConfUpdateInfo updateInfo = new SchedConfUpdateInfo();
    updateInfo.getRemoveQueueInfo().add("root.a.a2");
    response =
        r.path("ws").path("v1").path("cluster")
        .path("scheduler-conf").queryParam("user.name", userName)
        .request(MediaType.APPLICATION_JSON)
        .put(Entity.entity(updateInfo, MediaType.APPLICATION_JSON),
        Response.class);

    assertEquals(Status.OK.getStatusCode(), response.getStatus());
    CapacitySchedulerConfiguration newCSConf =
        ((CapacityScheduler) rm.getResourceScheduler()).getConfiguration();
    assertEquals(1, newCSConf.getQueues(ROOT_A).size(),
        "Failed to remove the queue");
    assertEquals("a1", newCSConf.getQueues(ROOT_A).get(0),
        "Failed to remove the right queue");
  }

  @Test
  public void testStopWithRemoveQueue() throws Exception {
    WebTarget r = target().register(SchedConfUpdateInfoWriter.class);

    Response response;

    // Set state of queues to STOPPED.
    SchedConfUpdateInfo updateInfo = new SchedConfUpdateInfo();
    Map<String, String> stoppedParam = new HashMap<>();
    stoppedParam.put(CapacitySchedulerConfiguration.STATE,
        QueueState.STOPPED.toString());
    QueueConfigInfo stoppedInfo = new QueueConfigInfo("root.a.a2",
        stoppedParam);
    updateInfo.getUpdateQueueInfo().add(stoppedInfo);

    updateInfo.getRemoveQueueInfo().add("root.a.a2");
    response = r.path("ws").path("v1").path("cluster")
        .path("scheduler-conf").queryParam("user.name", userName)
        .request(MediaType.APPLICATION_JSON)
        .put(Entity.entity(updateInfo, MediaType.APPLICATION_JSON),
        Response.class);

    assertEquals(Status.OK.getStatusCode(), response.getStatus());
    CapacitySchedulerConfiguration newCSConf =
        ((CapacityScheduler) rm.getResourceScheduler()).getConfiguration();
    assertEquals(1, newCSConf.getQueues(ROOT_A).size());
    assertEquals("a1", newCSConf.getQueues(ROOT_A).get(0));
  }

  @Test
  public void testRemoveQueueWhichHasQueueMapping() throws Exception {
    WebTarget r = target().register(SchedConfUpdateInfoWriter.class);

    Response response;
    CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();

    // Validate Queue 'mappedqueue' exists before deletion
    assertNotNull(cs.getQueue("mappedqueue"),
        "Failed to setup CapacityScheduler Configuration");

    // Set state of queue 'mappedqueue' to STOPPED.
    SchedConfUpdateInfo updateInfo = new SchedConfUpdateInfo();
    Map<String, String> stoppedParam = new HashMap<>();
    stoppedParam.put(CapacitySchedulerConfiguration.STATE, QueueState.STOPPED.toString());
    QueueConfigInfo stoppedInfo = new QueueConfigInfo("root.mappedqueue", stoppedParam);
    updateInfo.getUpdateQueueInfo().add(stoppedInfo);

    // Remove queue 'mappedqueue' using update scheduler-conf
    updateInfo.getRemoveQueueInfo().add("root.mappedqueue");
    response = r.path("ws").path("v1").path("cluster").path("scheduler-conf")
        .queryParam("user.name", userName).request(MediaType.APPLICATION_JSON)
        .put(Entity.entity(updateInfo, MediaType.APPLICATION_JSON), Response.class);
    String responseText = response.readEntity(String.class);

    // Queue 'mappedqueue' deletion will fail as there is queue mapping present
    assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
    assertTrue(responseText.contains(
        "Failed to re-init queues : " + "org.apache.hadoop.yarn.exceptions.YarnException:"
            + " Path root 'mappedqueue' does not exist. Path 'mappedqueue' is invalid"));

    // Validate queue 'mappedqueue' exists after above failure
    CapacitySchedulerConfiguration newCSConf =
        ((CapacityScheduler) rm.getResourceScheduler()).getConfiguration();
    assertEquals(4, newCSConf.getQueues(ROOT).size());
    assertNotNull(cs.getQueue("mappedqueue"),
        "CapacityScheduler Configuration is corrupt");
  }

  @Test
  public void testStopWithConvertLeafToParentQueue() throws Exception {
    WebTarget r = target().register(SchedConfUpdateInfoWriter.class);
    Response response;

    // Set state of queues to STOPPED.
    SchedConfUpdateInfo updateInfo = new SchedConfUpdateInfo();
    Map<String, String> stoppedParam = new HashMap<>();
    stoppedParam.put(CapacitySchedulerConfiguration.STATE,
        QueueState.STOPPED.toString());
    QueueConfigInfo stoppedInfo = new QueueConfigInfo("root.b",
        stoppedParam);
    updateInfo.getUpdateQueueInfo().add(stoppedInfo);

    Map<String, String> b1Capacity = new HashMap<>();
    b1Capacity.put(CapacitySchedulerConfiguration.CAPACITY, "100");
    QueueConfigInfo b1 = new QueueConfigInfo("root.b.b1", b1Capacity);
    updateInfo.getAddQueueInfo().add(b1);

    response = r.path("ws").path("v1").path("cluster")
        .path("scheduler-conf").queryParam("user.name", userName)
        .request(MediaType.APPLICATION_JSON)
        .put(Entity.entity(updateInfo, MediaType.APPLICATION_JSON), Response.class);

    assertEquals(Status.OK.getStatusCode(), response.getStatus());
    CapacitySchedulerConfiguration newCSConf =
        ((CapacityScheduler) rm.getResourceScheduler()).getConfiguration();
    assertEquals(1, newCSConf.getQueues(ROOT_B).size());
    assertEquals("b1", newCSConf.getQueues(ROOT_B).get(0));
  }

  @Test
  public void testRemoveParentQueue() throws Exception {
    WebTarget r = target().register(SchedConfUpdateInfoWriter.class);

    Response response;

    stopQueue(ROOT_C, ROOT_C_C1);
    // Remove root.c (parent queue)
    SchedConfUpdateInfo updateInfo = new SchedConfUpdateInfo();
    updateInfo.getRemoveQueueInfo().add("root.c");
    response =
        r.path("ws").path("v1").path("cluster")
            .path("scheduler-conf").queryParam("user.name", userName)
            .request(MediaType.APPLICATION_JSON)
            .put(Entity.entity(updateInfo, MediaType.APPLICATION_JSON), Response.class);

    assertEquals(Status.OK.getStatusCode(), response.getStatus());
    CapacitySchedulerConfiguration newCSConf =
        ((CapacityScheduler) rm.getResourceScheduler()).getConfiguration();
    assertEquals(3, newCSConf.getQueues(ROOT).size());
    assertEquals(0, newCSConf.getQueues(ROOT_C).size());
  }

  @Test
  public void testRemoveParentQueueWithCapacity() throws Exception {
    WebTarget r = target().register(SchedConfUpdateInfoWriter.class);

    Response response;

    stopQueue(ROOT_A, ROOT_A_A1, ROOT_A_A2);
    // Remove root.a (parent queue) with capacity 25
    SchedConfUpdateInfo updateInfo = new SchedConfUpdateInfo();
    updateInfo.getRemoveQueueInfo().add("root.a");

    // Set root.b capacity to 100
    Map<String, String> bCapacity = new HashMap<>();
    bCapacity.put(CapacitySchedulerConfiguration.CAPACITY, "100");
    QueueConfigInfo b = new QueueConfigInfo("root.b", bCapacity);
    updateInfo.getUpdateQueueInfo().add(b);
    response =
        r.path("ws").path("v1").path("cluster")
            .path("scheduler-conf").queryParam("user.name", userName)
            .request(MediaType.APPLICATION_JSON)
            .put(Entity.entity(updateInfo, MediaType.APPLICATION_JSON), Response.class);

    assertEquals(Status.OK.getStatusCode(), response.getStatus());
    CapacitySchedulerConfiguration newCSConf =
        ((CapacityScheduler) rm.getResourceScheduler()).getConfiguration();
    assertEquals(3, newCSConf.getQueues(ROOT).size());
    assertEquals(100.0f, newCSConf.getNonLabeledQueueCapacity(new QueuePath("root.b")),
        0.01f);
  }

  @Test
  public void testRemoveMultipleQueues() throws Exception {
    WebTarget r = target().register(SchedConfUpdateInfoWriter.class);

    Response response;

    stopQueue(ROOT_B, ROOT_C, ROOT_C_C1);
    // Remove root.b and root.c
    SchedConfUpdateInfo updateInfo = new SchedConfUpdateInfo();
    updateInfo.getRemoveQueueInfo().add("root.b");
    updateInfo.getRemoveQueueInfo().add("root.c");
    Map<String, String> aCapacity = new HashMap<>();
    aCapacity.put(CapacitySchedulerConfiguration.CAPACITY, "100");
    aCapacity.put(CapacitySchedulerConfiguration.MAXIMUM_CAPACITY, "100");
    QueueConfigInfo configInfo = new QueueConfigInfo("root.a", aCapacity);
    updateInfo.getUpdateQueueInfo().add(configInfo);
    response =
        r.path("ws").path("v1").path("cluster")
            .path("scheduler-conf").queryParam("user.name", userName)
            .request(MediaType.APPLICATION_JSON)
            .put(Entity.entity(updateInfo, MediaType.APPLICATION_JSON), Response.class);

    assertEquals(Status.OK.getStatusCode(), response.getStatus());
    CapacitySchedulerConfiguration newCSConf =
        ((CapacityScheduler) rm.getResourceScheduler()).getConfiguration();
    assertEquals(2, newCSConf.getQueues(ROOT).size());
  }

  private void stopQueue(QueuePath... queuePaths) throws Exception {
    WebTarget r = target().register(SchedConfUpdateInfoWriter.class);

    Response response;

    // Set state of queues to STOPPED.
    SchedConfUpdateInfo updateInfo = new SchedConfUpdateInfo();
    Map<String, String> stoppedParam = new HashMap<>();
    stoppedParam.put(CapacitySchedulerConfiguration.STATE,
        QueueState.STOPPED.toString());
    for (QueuePath queue : queuePaths) {
      QueueConfigInfo stoppedInfo = new QueueConfigInfo(queue.getFullPath(), stoppedParam);
      updateInfo.getUpdateQueueInfo().add(stoppedInfo);
    }
    response =
        r.path("ws").path("v1").path("cluster")
            .path("scheduler-conf").queryParam("user.name", userName)
            .request(MediaType.APPLICATION_JSON)
            .put(Entity.entity(updateInfo, MediaType.APPLICATION_JSON), Response.class);
    assertEquals(Status.OK.getStatusCode(), response.getStatus());
    CapacitySchedulerConfiguration newCSConf =
        ((CapacityScheduler) rm.getResourceScheduler()).getConfiguration();
    for (QueuePath queue : queuePaths) {
      assertEquals(QueueState.STOPPED, newCSConf.getState(queue));
    }
  }

  @Test
  public void testUpdateQueue() throws Exception {
    WebTarget r = target().register(SchedConfUpdateInfoWriter.class);

    Response response;

    // Update config value.
    SchedConfUpdateInfo updateInfo = new SchedConfUpdateInfo();
    Map<String, String> updateParam = new HashMap<>();
    updateParam.put(CapacitySchedulerConfiguration.MAXIMUM_AM_RESOURCE_SUFFIX,
        "0.2");
    QueueConfigInfo aUpdateInfo = new QueueConfigInfo("root.a", updateParam);
    updateInfo.getUpdateQueueInfo().add(aUpdateInfo);
    CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();

    assertEquals(CapacitySchedulerConfiguration
            .DEFAULT_MAXIMUM_APPLICATIONMASTERS_RESOURCE_PERCENT,
        cs.getConfiguration()
            .getMaximumApplicationMasterResourcePerQueuePercent(ROOT_A),
        0.001f);
    response =
        r.path("ws").path("v1").path("cluster")
            .path("scheduler-conf").queryParam("user.name", userName)
            .request(MediaType.APPLICATION_JSON)
            .put(Entity.entity(updateInfo, MediaType.APPLICATION_JSON), Response.class);
    LOG.debug("Response headers: {}.", response.getHeaders());
    assertEquals(Status.OK.getStatusCode(), response.getStatus());
    CapacitySchedulerConfiguration newCSConf = cs.getConfiguration();
    assertEquals(0.2f, newCSConf
        .getMaximumApplicationMasterResourcePerQueuePercent(ROOT_A), 0.001f);

    // Remove config. Config value should be reverted to default.
    updateParam.put(CapacitySchedulerConfiguration.MAXIMUM_AM_RESOURCE_SUFFIX,
        null);
    aUpdateInfo = new QueueConfigInfo("root.a", updateParam);
    updateInfo.getUpdateQueueInfo().clear();
    updateInfo.getUpdateQueueInfo().add(aUpdateInfo);
    response =
        r.path("ws").path("v1").path("cluster")
            .path("scheduler-conf").queryParam("user.name", userName)
            .request(MediaType.APPLICATION_JSON)
            .put(Entity.entity(updateInfo, MediaType.APPLICATION_JSON),
            Response.class);
    assertEquals(Status.OK.getStatusCode(), response.getStatus());
    newCSConf = cs.getConfiguration();
    assertEquals(CapacitySchedulerConfiguration
        .DEFAULT_MAXIMUM_APPLICATIONMASTERS_RESOURCE_PERCENT, newCSConf
            .getMaximumApplicationMasterResourcePerQueuePercent(ROOT_A),
        0.001f);
  }

  @Test
  public void testUpdateQueueCapacity() throws Exception {
    WebTarget r = target().register(SchedConfUpdateInfoWriter.class);

    Response response;

    // Update root.a and root.b capacity to 50.
    SchedConfUpdateInfo updateInfo = new SchedConfUpdateInfo();
    Map<String, String> updateParam = new HashMap<>();
    updateParam.put(CapacitySchedulerConfiguration.CAPACITY, "50");
    QueueConfigInfo aUpdateInfo = new QueueConfigInfo("root.a", updateParam);
    QueueConfigInfo bUpdateInfo = new QueueConfigInfo("root.b", updateParam);
    updateInfo.getUpdateQueueInfo().add(aUpdateInfo);
    updateInfo.getUpdateQueueInfo().add(bUpdateInfo);

    response =
        r.path("ws").path("v1").path("cluster")
            .path("scheduler-conf").queryParam("user.name", userName)
            .request(MediaType.APPLICATION_JSON)
            .put(Entity.entity(updateInfo, MediaType.APPLICATION_JSON), Response.class);
    assertEquals(Status.OK.getStatusCode(), response.getStatus());
    CapacitySchedulerConfiguration newCSConf =
        ((CapacityScheduler) rm.getResourceScheduler()).getConfiguration();
    assertEquals(50.0f, newCSConf.getNonLabeledQueueCapacity(new QueuePath("root.a")), 0.01f);
    assertEquals(50.0f, newCSConf.getNonLabeledQueueCapacity(new QueuePath("root.b")), 0.01f);
  }

  @Test
  public void testGlobalConfChange() throws Exception {
    WebTarget r = target().register(SchedConfUpdateInfoWriter.class);

    Response response;

    // Set maximum-applications to 30000.
    SchedConfUpdateInfo updateInfo = new SchedConfUpdateInfo();
    updateInfo.getGlobalParams().put(CapacitySchedulerConfiguration.PREFIX +
        "maximum-applications", "30000");

    response = r.path("ws").path("v1").path("cluster")
        .path("scheduler-conf").queryParam("user.name", userName)
        .request(MediaType.APPLICATION_JSON)
        .put(Entity.entity(updateInfo, MediaType.APPLICATION_JSON), Response.class);
    assertEquals(Status.OK.getStatusCode(), response.getStatus());
    CapacitySchedulerConfiguration newCSConf =
        ((CapacityScheduler) rm.getResourceScheduler()).getConfiguration();
    assertEquals(30000, newCSConf.getMaximumSystemApplications());

    updateInfo.getGlobalParams().put(CapacitySchedulerConfiguration.PREFIX +
        "maximum-applications", null);
    // Unset maximum-applications. Should be set to default.
    response =
        r.path("ws").path("v1").path("cluster")
            .path("scheduler-conf").queryParam("user.name", userName)
            .request(MediaType.APPLICATION_JSON)
            .put(Entity.entity(updateInfo, MediaType.APPLICATION_JSON),
            Response.class);
    assertEquals(Status.OK.getStatusCode(), response.getStatus());
    newCSConf =
        ((CapacityScheduler) rm.getResourceScheduler()).getConfiguration();
    assertEquals(CapacitySchedulerConfiguration
        .DEFAULT_MAXIMUM_SYSTEM_APPLICATIIONS,
        newCSConf.getMaximumSystemApplications());
  }

  @Test
  public void testNodeLabelRemovalResidualConfigsAreCleared() throws Exception {
    WebTarget r = target().register(NodeLabelsInfoReader.class);
    Response response;

    // 1. Create Node Label: label1
    NodeLabelsInfo nodeLabelsInfo = new NodeLabelsInfo();
    nodeLabelsInfo.getNodeLabelsInfo().add(new NodeLabelInfo(LABEL_1));
    WebTarget addNodeLabelsResource = r.path("ws").path("v1").path("cluster")
        .path("add-node-labels");
    WebTarget getNodeLabelsResource = r.path("ws").path("v1").path("cluster")
        .path("get-node-labels");
    WebTarget removeNodeLabelsResource = r.path("ws").path("v1").path("cluster")
        .path("remove-node-labels");
    WebTarget schedulerConfResource = r.path("ws").path("v1").path("cluster")
        .path(RMWSConsts.SCHEDULER_CONF);
    response = addNodeLabelsResource.queryParam("user.name", userName)
        .request(MediaType.APPLICATION_JSON)
        .post(Entity.entity(logAndReturnJson(addNodeLabelsResource,
        toJson(nodeLabelsInfo, NodeLabelsInfo.class)), MediaType.APPLICATION_JSON), Response.class);

    // 2. Verify new Node Label
    response = getNodeLabelsResource.queryParam("user.name", userName)
        .request(MediaType.APPLICATION_JSON).get(Response.class);
    assertEquals(MediaType.APPLICATION_JSON_TYPE + ";" + JettyUtils.UTF_8,
        response.getMediaType().toString());
    nodeLabelsInfo = response.readEntity(NodeLabelsInfo.class);
    assertEquals(1, nodeLabelsInfo.getNodeLabels().size());
    for (NodeLabelInfo nl : nodeLabelsInfo.getNodeLabelsInfo()) {
      assertEquals(LABEL_1, nl.getName());
      assertTrue(nl.getExclusivity());
    }

    // 3. Assign 'label1' to root.a
    SchedConfUpdateInfo updateInfo = new SchedConfUpdateInfo();
    Map<String, String> updateForRoot = new HashMap<>();
    updateForRoot.put(CapacitySchedulerConfiguration.ACCESSIBLE_NODE_LABELS, "*");
    QueueConfigInfo rootUpdateInfo = new QueueConfigInfo(ROOT.getFullPath(), updateForRoot);

    Map<String, String> updateForRootA = new HashMap<>();
    updateForRootA.put(CapacitySchedulerConfiguration.ACCESSIBLE_NODE_LABELS, LABEL_1);
    QueueConfigInfo rootAUpdateInfo = new QueueConfigInfo(ROOT_A.getFullPath(), updateForRootA);

    updateInfo.getUpdateQueueInfo().add(rootUpdateInfo);
    updateInfo.getUpdateQueueInfo().add(rootAUpdateInfo);

    response = schedulerConfResource
        .queryParam("user.name", userName)
        .request(MediaType.APPLICATION_JSON)
        .put(Entity.entity(logAndReturnJson(schedulerConfResource, toJson(updateInfo,
        SchedConfUpdateInfo.class)), MediaType.APPLICATION_JSON), Response.class);
    assertEquals(Status.OK.getStatusCode(), response.getStatus());

    CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();

    assertEquals(Sets.newHashSet("*"),
        cs.getConfiguration().getAccessibleNodeLabels(ROOT));
    assertEquals(Sets.newHashSet(LABEL_1),
        cs.getConfiguration().getAccessibleNodeLabels(ROOT_A));

    // 4. Set partition capacities to queues as below
    updateInfo = new SchedConfUpdateInfo();
    updateForRoot = new HashMap<>();
    updateForRoot.put(getAccessibleNodeLabelsCapacityPropertyName(LABEL_1), "100");
    updateForRoot.put(getAccessibleNodeLabelsMaxCapacityPropertyName(LABEL_1), "100");
    rootUpdateInfo = new QueueConfigInfo(ROOT.getFullPath(), updateForRoot);

    updateForRootA = new HashMap<>();
    updateForRootA.put(getAccessibleNodeLabelsCapacityPropertyName(LABEL_1), "100");
    updateForRootA.put(getAccessibleNodeLabelsMaxCapacityPropertyName(LABEL_1), "100");
    rootAUpdateInfo = new QueueConfigInfo(ROOT_A.getFullPath(), updateForRootA);

    // Avoid the following exception by adding some capacities to root.a.a1 and root.a.a2 to label1
    // Illegal capacity sum of 0.0 for children of queue a for label=label1.
    // It is set to 0, but parent percent != 0, and doesn't allow children capacity to set to 0
    Map<String, String> updateForRootA_A1 = new HashMap<>();
    updateForRootA_A1.put(getAccessibleNodeLabelsCapacityPropertyName(LABEL_1), "20");
    updateForRootA_A1.put(getAccessibleNodeLabelsMaxCapacityPropertyName(LABEL_1), "20");
    QueueConfigInfo rootA_A1UpdateInfo = new QueueConfigInfo(ROOT_A_A1.getFullPath(),
        updateForRootA_A1);

    Map<String, String> updateForRootA_A2 = new HashMap<>();
    updateForRootA_A2.put(getAccessibleNodeLabelsCapacityPropertyName(LABEL_1), "80");
    updateForRootA_A2.put(getAccessibleNodeLabelsMaxCapacityPropertyName(LABEL_1), "80");
    QueueConfigInfo rootA_A2UpdateInfo = new QueueConfigInfo(ROOT_A_A2.getFullPath(),
        updateForRootA_A2);


    updateInfo.getUpdateQueueInfo().add(rootUpdateInfo);
    updateInfo.getUpdateQueueInfo().add(rootAUpdateInfo);
    updateInfo.getUpdateQueueInfo().add(rootA_A1UpdateInfo);
    updateInfo.getUpdateQueueInfo().add(rootA_A2UpdateInfo);

    response = schedulerConfResource
        .queryParam("user.name", userName)
        .request(MediaType.APPLICATION_JSON)
        .put(Entity.entity(logAndReturnJson(schedulerConfResource, toJson(updateInfo,
        SchedConfUpdateInfo.class)), MediaType.APPLICATION_JSON),
        Response.class);
    assertEquals(Status.OK.getStatusCode(), response.getStatus());

    assertEquals(100.0, cs.getConfiguration().getLabeledQueueCapacity(ROOT, LABEL_1), 0.001f);
    assertEquals(100.0, cs.getConfiguration().getLabeledQueueMaximumCapacity(ROOT, LABEL_1),
        0.001f);
    assertEquals(100.0, cs.getConfiguration().getLabeledQueueCapacity(ROOT_A, LABEL_1), 0.001f);
    assertEquals(100.0, cs.getConfiguration().getLabeledQueueMaximumCapacity(ROOT_A, LABEL_1),
        0.001f);
    assertEquals(20.0, cs.getConfiguration().getLabeledQueueCapacity(ROOT_A_A1, LABEL_1), 0.001f);
    assertEquals(20.0, cs.getConfiguration().getLabeledQueueMaximumCapacity(ROOT_A_A1, LABEL_1),
        0.001f);
    assertEquals(80.0, cs.getConfiguration().getLabeledQueueCapacity(ROOT_A_A2, LABEL_1), 0.001f);
    assertEquals(80.0, cs.getConfiguration().getLabeledQueueMaximumCapacity(ROOT_A_A2, LABEL_1),
        0.001f);

    //5. De-assign node label: "label1" + Remove residual properties
    updateInfo = new SchedConfUpdateInfo();
    updateForRoot = new HashMap<>();
    updateForRoot.put(CapacitySchedulerConfiguration.ACCESSIBLE_NODE_LABELS, "*");
    updateForRoot.put(getAccessibleNodeLabelsCapacityPropertyName(LABEL_1), "");
    updateForRoot.put(getAccessibleNodeLabelsMaxCapacityPropertyName(LABEL_1), "");
    rootUpdateInfo = new QueueConfigInfo(ROOT.getFullPath(), updateForRoot);

    updateForRootA = new HashMap<>();
    updateForRootA.put(CapacitySchedulerConfiguration.ACCESSIBLE_NODE_LABELS, "");
    updateForRootA.put(getAccessibleNodeLabelsCapacityPropertyName(LABEL_1), "");
    updateForRootA.put(getAccessibleNodeLabelsMaxCapacityPropertyName(LABEL_1), "");
    rootAUpdateInfo = new QueueConfigInfo(ROOT_A.getFullPath(), updateForRootA);

    updateForRootA_A1 = new HashMap<>();
    updateForRootA_A1.put(CapacitySchedulerConfiguration.ACCESSIBLE_NODE_LABELS, "");
    updateForRootA_A1.put(getAccessibleNodeLabelsCapacityPropertyName(LABEL_1), "");
    updateForRootA_A1.put(getAccessibleNodeLabelsMaxCapacityPropertyName(LABEL_1), "");
    rootA_A1UpdateInfo = new QueueConfigInfo(ROOT_A_A1.getFullPath(), updateForRootA_A1);

    updateForRootA_A2 = new HashMap<>();
    updateForRootA_A2.put(CapacitySchedulerConfiguration.ACCESSIBLE_NODE_LABELS, "");
    updateForRootA_A2.put(getAccessibleNodeLabelsCapacityPropertyName(LABEL_1), "");
    updateForRootA_A2.put(getAccessibleNodeLabelsMaxCapacityPropertyName(LABEL_1), "");
    rootA_A2UpdateInfo = new QueueConfigInfo(ROOT_A_A2.getFullPath(), updateForRootA_A2);

    updateInfo.getUpdateQueueInfo().add(rootUpdateInfo);
    updateInfo.getUpdateQueueInfo().add(rootAUpdateInfo);
    updateInfo.getUpdateQueueInfo().add(rootA_A1UpdateInfo);
    updateInfo.getUpdateQueueInfo().add(rootA_A2UpdateInfo);

    response = schedulerConfResource
        .queryParam("user.name", userName)
        .request(MediaType.APPLICATION_JSON)
        .put(Entity.entity(logAndReturnJson(schedulerConfResource, toJson(updateInfo,
        SchedConfUpdateInfo.class)), MediaType.APPLICATION_JSON), Response.class);
    assertEquals(Status.OK.getStatusCode(), response.getStatus());
    assertEquals(Sets.newHashSet("*"),
        cs.getConfiguration().getAccessibleNodeLabels(ROOT));
    assertNull(cs.getConfiguration().getAccessibleNodeLabels(ROOT_A));

    //6. Remove node label 'label1'
    response =
        removeNodeLabelsResource
            .queryParam("user.name", userName)
            .queryParam("labels", LABEL_1)
            .request(MediaType.APPLICATION_JSON)
            .post(null, Response.class);

    // Verify
    response =
        getNodeLabelsResource.queryParam("user.name", userName)
            .request(MediaType.APPLICATION_JSON).get(Response.class);
    assertEquals(MediaType.APPLICATION_JSON_TYPE + ";" + JettyUtils.UTF_8,
        response.getMediaType().toString());
    nodeLabelsInfo = response.readEntity(NodeLabelsInfo.class);
    assertEquals(0, nodeLabelsInfo.getNodeLabels().size());

    //6. Check residual configs
    assertNull(getConfValueForQueueAndLabelAndType(cs, ROOT, LABEL_1, CAPACITY));
    assertNull(getConfValueForQueueAndLabelAndType(cs, ROOT, LABEL_1, MAXIMUM_CAPACITY));
    assertNull(getConfValueForQueueAndLabelAndType(cs, ROOT_A, LABEL_1, CAPACITY));
    assertNull(getConfValueForQueueAndLabelAndType(cs, ROOT_A, LABEL_1, MAXIMUM_CAPACITY));
    assertNull(getConfValueForQueueAndLabelAndType(cs, ROOT_A_A1, LABEL_1, CAPACITY));
    assertNull(getConfValueForQueueAndLabelAndType(cs, ROOT_A_A1, LABEL_1, MAXIMUM_CAPACITY));
    assertNull(getConfValueForQueueAndLabelAndType(cs, ROOT_A_A2, LABEL_1, CAPACITY));
    assertNull(getConfValueForQueueAndLabelAndType(cs, ROOT_A_A2, LABEL_1, MAXIMUM_CAPACITY));
  }

  private String getConfValueForQueueAndLabelAndType(CapacityScheduler cs,
      QueuePath queuePath, String label, String type) {
    return cs.getConfiguration().get(
            QueuePrefixes.getNodeLabelPrefix(
            queuePath, label) + type);
  }

  private Object logAndReturnJson(WebTarget ws, String json) {
    LOG.info("Sending to web resource: {}, json: {}", ws, json);
    return json;
  }

  private String getAccessibleNodeLabelsCapacityPropertyName(String label) {
    return String.format("%s.%s.%s", ACCESSIBLE_NODE_LABELS, label, CAPACITY);
  }

  private String getAccessibleNodeLabelsMaxCapacityPropertyName(String label) {
    return String.format("%s.%s.%s", ACCESSIBLE_NODE_LABELS, label, MAXIMUM_CAPACITY);
  }

  @Test
  public void testValidateWithClusterMaxAllocation() throws Exception {
    WebTarget r = target().register(SchedConfUpdateInfoWriter.class);
    int clusterMax = YarnConfiguration.
        DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB * 2;
    conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
        clusterMax);

    SchedConfUpdateInfo updateInfo = new SchedConfUpdateInfo();
    Map<String, String> updateParam = new HashMap<>();
    updateParam.put(CapacitySchedulerConfiguration.MAXIMUM_APPLICATIONS_SUFFIX,
        "100");
    QueueConfigInfo aUpdateInfo = new QueueConfigInfo("root.a", updateParam);
    updateInfo.getUpdateQueueInfo().add(aUpdateInfo);

    Response response =
        r.path("ws").path("v1").path("cluster")
        .path(RMWSConsts.SCHEDULER_CONF_VALIDATE)
        .queryParam("user.name", userName)
        .request(MediaType.APPLICATION_JSON)
        .post(Entity.entity(updateInfo, MediaType.APPLICATION_JSON),
        Response.class);
    assertEquals(Status.OK.getStatusCode(), response.getStatus());
  }

  @Override
  @AfterEach
  public void tearDown() throws Exception {
    if (rm != null) {
      rm.stop();
    }
    super.tearDown();
  }
}