TestSchedConfCLI.java

/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.yarn.client.cli;

import org.glassfish.jersey.internal.inject.AbstractBinder;
import org.glassfish.jersey.jettison.JettisonFeature;
import org.glassfish.jersey.server.ResourceConfig;
import org.glassfish.jersey.test.JerseyTest;
import org.glassfish.jersey.test.TestProperties;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.PrintStream;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueuePath;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.YarnConfigurationStore.LogMutation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfigurationProvider;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.JAXBContextResolver;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebServices;
import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
import org.apache.hadoop.yarn.webapp.dao.QueueConfigInfo;
import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo;

import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.core.Application;

import static org.apache.hadoop.yarn.webapp.JerseyTestBase.JERSEY_RANDOM_PORT;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

/**
 * Class for testing {@link SchedConfCLI}.
 */
public class TestSchedConfCLI extends JerseyTest {

  private SchedConfCLI cli;

  private static MockRM rm;
  private static String userName;

  private static final File CONF_FILE = new File(new File("target",
      "test-classes"), YarnConfiguration.CS_CONFIGURATION_FILE);
  private static final File OLD_CONF_FILE = new File(new File("target",
      "test-classes"), YarnConfiguration.CS_CONFIGURATION_FILE + ".tmp");

  public TestSchedConfCLI() {
  }

  @Override
  protected Application configure() {
    ResourceConfig config = new ResourceConfig();
    config.register(new JerseyBinder());
    config.register(RMWebServices.class);
    config.register(GenericExceptionHandler.class);
    config.register(GenericExceptionHandler.class);
    config.register(new JettisonFeature()).register(JAXBContextResolver.class);
    return config;
  }

  private class JerseyBinder extends AbstractBinder {
    @Override
    protected void configure() {

      Configuration conf = new YarnConfiguration();
      conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
          ResourceScheduler.class);
      conf.set(YarnConfiguration.SCHEDULER_CONFIGURATION_STORE_CLASS,
          YarnConfiguration.MEMORY_CONFIGURATION_STORE);

      try {
        userName = UserGroupInformation.getCurrentUser().getShortUserName();
      } catch (IOException ioe) {
        throw new RuntimeException("Unable to get current user name " + ioe.getMessage(), ioe);
      }

      CapacitySchedulerConfiguration csConf = new
          CapacitySchedulerConfiguration(new Configuration(false), false);
      setupQueueConfiguration(csConf);

      try {
        if (CONF_FILE.exists()) {
          if (!CONF_FILE.renameTo(OLD_CONF_FILE)) {
            throw new RuntimeException("Failed to rename conf file");
          }
        }
        FileOutputStream out = new FileOutputStream(CONF_FILE);
        csConf.writeXml(out);
        out.close();
      } catch (IOException e) {
        throw new RuntimeException("Failed to write XML file", e);
      }

      rm = new MockRM(conf);
      final HttpServletRequest request = mock(HttpServletRequest.class);
      final HttpServletResponse response = mock(HttpServletResponse.class);
      bind(rm).to(ResourceManager.class).named("rm");
      bind(conf).to(Configuration.class).named("conf");
      bind(request).to(HttpServletRequest.class);
      when(request.getUserPrincipal()).thenReturn(() -> userName);
      bind(response).to(HttpServletResponse.class);
      forceSet(TestProperties.CONTAINER_PORT, JERSEY_RANDOM_PORT);
    }
  }

  @BeforeEach
  public void setUp() throws Exception {
    super.setUp();
    cli = new SchedConfCLI();
  }

  private static void setupQueueConfiguration(
      CapacitySchedulerConfiguration config) {
    config.setQueues(new QueuePath(CapacitySchedulerConfiguration.ROOT),
        new String[]{"testqueue"});
    QueuePath a = new QueuePath(CapacitySchedulerConfiguration.ROOT + ".testqueue");
    config.setCapacity(a, 100f);
    config.setMaximumCapacity(a, 100f);
  }

  @AfterEach
  public void cleanUp() throws Exception {
    if (rm != null) {
      rm.stop();
    }
    CONF_FILE.delete();
    if (CONF_FILE.exists()) {
      throw new RuntimeException("Failed to delete configuration file");
    }
    if (OLD_CONF_FILE.exists()) {
      if (!OLD_CONF_FILE.renameTo(CONF_FILE)) {
        throw new RuntimeException("Failed to re-copy old" +
            " configuration file");
      }
    }
    super.tearDown();
  }

  @Test
  @Timeout(value = 10)
  public void testGetSchedulerConf() throws Exception {
    ByteArrayOutputStream sysOutStream = new ByteArrayOutputStream();
    PrintStream sysOut = new PrintStream(sysOutStream);
    System.setOut(sysOut);

    int exitCode = cli.getSchedulerConf("", target());
    assertEquals(0, exitCode, "SchedConfCLI failed to run");
    assertTrue(sysOutStream.toString().contains("testqueue"),
        "Failed to get scheduler configuration");
  }

  @Test
  @Timeout(value = 10)
  public void testFormatSchedulerConf() throws Exception {

    ResourceScheduler scheduler = rm.getResourceScheduler();
    MutableConfigurationProvider provider =
        ((MutableConfScheduler) scheduler).getMutableConfProvider();

    SchedConfUpdateInfo schedUpdateInfo = new SchedConfUpdateInfo();
    HashMap<String, String> globalUpdates = new HashMap<>();
    globalUpdates.put("schedKey1", "schedVal1");
    schedUpdateInfo.setGlobalParams(globalUpdates);

    LogMutation log = provider.logAndApplyMutation(
        UserGroupInformation.getCurrentUser(), schedUpdateInfo);
    rm.getRMContext().getRMAdminService().refreshQueues();
    provider.confirmPendingMutation(log, true);

    Configuration schedulerConf = provider.getConfiguration();
    assertEquals("schedVal1", schedulerConf.get("schedKey1"));

    int exitCode = cli.formatSchedulerConf("", target());
    assertEquals(0, exitCode);

    schedulerConf = provider.getConfiguration();
    assertNull(schedulerConf.get("schedKey1"));
  }

  @Test
  @Timeout(value = 10)
  public void testInvalidConf() throws Exception {
    ByteArrayOutputStream sysErrStream = new ByteArrayOutputStream();
    PrintStream sysErr = new PrintStream(sysErrStream);
    System.setErr(sysErr);

    // conf pair with no key should be invalid
    executeCommand(sysErrStream, "-add", "root.a:=confVal");
    executeCommand(sysErrStream, "-update", "root.a:=confVal");
    executeCommand(sysErrStream, "-add", "root.a:confKey=confVal=conf");
    executeCommand(sysErrStream, "-update", "root.a:confKey=confVal=c");
  }

  private void executeCommand(ByteArrayOutputStream sysErrStream, String op,
      String queueConf) throws Exception {
    int exitCode = cli.run(new String[] {op, queueConf});
    assertNotEquals(0, exitCode, "Should return an error code");
    assertTrue(sysErrStream.toString()
        .contains("Specify configuration key " + "value as confKey=confVal."));
  }

  @Test
  @Timeout(value = 10)
  public void testAddQueues() {
    SchedConfUpdateInfo schedUpdateInfo = new SchedConfUpdateInfo();
    cli.addQueues("root.a:a1=aVal1,a2=aVal2,a3=", schedUpdateInfo);
    Map<String, String> paramValues = new HashMap<>();
    List<QueueConfigInfo> addQueueInfo = schedUpdateInfo.getAddQueueInfo();
    paramValues.put("a1", "aVal1");
    paramValues.put("a2", "aVal2");
    paramValues.put("a3", null);
    validateQueueConfigInfo(addQueueInfo, 0, "root.a", paramValues);

    schedUpdateInfo = new SchedConfUpdateInfo();
    cli.addQueues("root.b:b1=bVal1;root.c:c1=cVal1", schedUpdateInfo);
    addQueueInfo = schedUpdateInfo.getAddQueueInfo();
    assertEquals(2, addQueueInfo.size());
    paramValues.clear();
    paramValues.put("b1", "bVal1");
    validateQueueConfigInfo(addQueueInfo, 0, "root.b", paramValues);
    paramValues.clear();
    paramValues.put("c1", "cVal1");
    validateQueueConfigInfo(addQueueInfo, 1, "root.c", paramValues);
  }

  @Test
  @Timeout(value = 10)
  public void testAddQueuesWithCommaInValue() {
    SchedConfUpdateInfo schedUpdateInfo = new SchedConfUpdateInfo();
    cli.addQueues("root.a:a1=a1Val1\\,a1Val2 a1Val3,a2=a2Val1\\,a2Val2",
        schedUpdateInfo);
    List<QueueConfigInfo> addQueueInfo = schedUpdateInfo.getAddQueueInfo();
    Map<String, String> params = new HashMap<>();
    params.put("a1", "a1Val1,a1Val2 a1Val3");
    params.put("a2", "a2Val1,a2Val2");
    validateQueueConfigInfo(addQueueInfo, 0, "root.a", params);
  }

  @Test
  @Timeout(value = 10)
  public void testRemoveQueues() {
    SchedConfUpdateInfo schedUpdateInfo = new SchedConfUpdateInfo();
    cli.removeQueues("root.a;root.b;root.c.c1", schedUpdateInfo);
    List<String> removeInfo = schedUpdateInfo.getRemoveQueueInfo();
    assertEquals(3, removeInfo.size());
    assertEquals("root.a", removeInfo.get(0));
    assertEquals("root.b", removeInfo.get(1));
    assertEquals("root.c.c1", removeInfo.get(2));
  }

  @Test
  @Timeout(value = 10)
  public void testUpdateQueues() {
    SchedConfUpdateInfo schedUpdateInfo = new SchedConfUpdateInfo();
    Map<String, String> paramValues = new HashMap<>();
    cli.updateQueues("root.a:a1=aVal1,a2=aVal2,a3=", schedUpdateInfo);
    List<QueueConfigInfo> updateQueueInfo = schedUpdateInfo
        .getUpdateQueueInfo();
    paramValues.put("a1", "aVal1");
    paramValues.put("a2", "aVal2");
    paramValues.put("a3", null);
    validateQueueConfigInfo(updateQueueInfo, 0, "root.a", paramValues);

    schedUpdateInfo = new SchedConfUpdateInfo();
    cli.updateQueues("root.b:b1=bVal1;root.c:c1=cVal1", schedUpdateInfo);
    updateQueueInfo = schedUpdateInfo.getUpdateQueueInfo();
    assertEquals(2, updateQueueInfo.size());
    paramValues.clear();
    paramValues.put("b1", "bVal1");
    validateQueueConfigInfo(updateQueueInfo, 0, "root.b", paramValues);
    paramValues.clear();
    paramValues.put("c1", "cVal1");
    validateQueueConfigInfo(updateQueueInfo, 1, "root.c", paramValues);
  }

  private void validateQueueConfigInfo(
      List<QueueConfigInfo> updateQueueInfo, int index, String queuename,
      Map<String, String> paramValues) {
    QueueConfigInfo updateInfo = updateQueueInfo.get(index);
    assertEquals(queuename, updateInfo.getQueue());
    Map<String, String> params = updateInfo.getParams();
    assertEquals(paramValues.size(), params.size());
    paramValues.forEach((k, v) -> assertEquals(v, params.get(k)));
  }

  @Test
  @Timeout(value = 10)
  public void testUpdateQueuesWithCommaInValue() {
    SchedConfUpdateInfo schedUpdateInfo = new SchedConfUpdateInfo();
    cli.updateQueues("root.a:a1=a1Val1\\,a1Val2 a1Val3,a2=a2Val1\\,a2Val2",
        schedUpdateInfo);
    List<QueueConfigInfo> updateQueueInfo = schedUpdateInfo
        .getUpdateQueueInfo();
    Map<String, String> paramValues = new HashMap<>();
    paramValues.put("a1", "a1Val1,a1Val2 a1Val3");
    paramValues.put("a2", "a2Val1,a2Val2");
    validateQueueConfigInfo(updateQueueInfo, 0, "root.a", paramValues);
  }

  @Test
  @Timeout(value = 10)
  public void testGlobalUpdate() {
    SchedConfUpdateInfo schedUpdateInfo = new SchedConfUpdateInfo();
    cli.globalUpdates("schedKey1=schedVal1,schedKey2=schedVal2",
        schedUpdateInfo);
    Map<String, String> paramValues = new HashMap<>();
    paramValues.put("schedKey1", "schedVal1");
    paramValues.put("schedKey2", "schedVal2");
    validateGlobalParams(schedUpdateInfo, paramValues);
  }

  @Test
  @Timeout(value = 10)
  public void testGlobalUpdateWithCommaInValue() {
    SchedConfUpdateInfo schedUpdateInfo = new SchedConfUpdateInfo();
    cli.globalUpdates(
        "schedKey1=schedVal1.1\\,schedVal1.2 schedVal1.3,schedKey2=schedVal2",
        schedUpdateInfo);
    Map<String, String> paramValues = new HashMap<>();
    paramValues.put("schedKey1", "schedVal1.1,schedVal1.2 schedVal1.3");
    paramValues.put("schedKey2", "schedVal2");
    validateGlobalParams(schedUpdateInfo, paramValues);
  }

  private void validateGlobalParams(SchedConfUpdateInfo schedUpdateInfo,
      Map<String, String> paramValues) {
    Map<String, String> globalInfo = schedUpdateInfo.getGlobalParams();
    assertEquals(paramValues.size(), globalInfo.size());
    paramValues.forEach((k, v) -> assertEquals(v, globalInfo.get(k)));
  }
}