TestQueueConfigurationAutoRefreshPolicy.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.yarn.FileSystemBasedConfigurationProvider;
import org.apache.hadoop.yarn.LocalConfigurationProvider;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.io.DataOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;

public class TestQueueConfigurationAutoRefreshPolicy  {

  private Configuration configuration;
  private MockRM rm = null;
  private FileSystem fs;
  private Path workingPath;
  private Path workingPathRecover;
  private Path fileSystemWorkingPath;
  private Path tmpDir;
  private QueueConfigurationAutoRefreshPolicy policy;

  static {
    YarnConfiguration.addDefaultResource(
        YarnConfiguration.CS_CONFIGURATION_FILE);
    YarnConfiguration.addDefaultResource(
        YarnConfiguration.DR_CONFIGURATION_FILE);
  }

  @BeforeEach
  public void setup() throws IOException {
    QueueMetrics.clearQueueMetrics();
    DefaultMetricsSystem.setMiniClusterMode(true);

    configuration = new YarnConfiguration();
    configuration.set(YarnConfiguration.RM_SCHEDULER,
        CapacityScheduler.class.getCanonicalName());
    fs = FileSystem.get(configuration);
    workingPath = new Path(QueueConfigurationAutoRefreshPolicy.
        class.getClassLoader().
        getResource(".").toString());
    workingPathRecover = new Path(QueueConfigurationAutoRefreshPolicy.
        class.getClassLoader().
        getResource(".").toString() + "/" + "Recover");
    fileSystemWorkingPath =
        new Path(new File("target", this.getClass().getSimpleName()
            + "-remoteDir").getAbsolutePath());

    tmpDir = new Path(new File("target", this.getClass().getSimpleName()
        + "-tmpDir").getAbsolutePath());
    fs.delete(fileSystemWorkingPath, true);
    fs.mkdirs(fileSystemWorkingPath);
    fs.delete(tmpDir, true);
    fs.mkdirs(tmpDir);

    policy =
        new QueueConfigurationAutoRefreshPolicy();
  }

  private String writeConfigurationXML(Configuration conf, String confXMLName)
      throws IOException {
    DataOutputStream output = null;
    try {
      final File confFile = new File(tmpDir.toString(), confXMLName);
      if (confFile.exists()) {
        confFile.delete();
      }
      if (!confFile.createNewFile()) {
        fail("Can not create " + confXMLName);
      }
      output = new DataOutputStream(
          new FileOutputStream(confFile));
      conf.writeXml(output);
      return confFile.getAbsolutePath();
    } finally {
      if (output != null) {
        output.close();
      }
    }
  }

  private void uploadConfiguration(Boolean isFileSystemBased,
      Configuration conf, String confFileName)
      throws IOException {
    String csConfFile = writeConfigurationXML(conf, confFileName);
    if (isFileSystemBased) {
      // upload the file into Remote File System
      uploadToRemoteFileSystem(new Path(csConfFile),
          fileSystemWorkingPath);
    } else {
      // upload the file into Work Path for Local File
      uploadToRemoteFileSystem(new Path(csConfFile),
          workingPath);
    }
  }

  private void uploadToRemoteFileSystem(Path filePath, Path remotePath)
      throws IOException {
    fs.copyFromLocalFile(filePath, remotePath);
  }

  private void uploadDefaultConfiguration(Boolean
      isFileSystemBased) throws IOException {
    Configuration conf = new Configuration();
    uploadConfiguration(isFileSystemBased,
        conf, "core-site.xml");

    YarnConfiguration yarnConf = new YarnConfiguration();

    uploadConfiguration(isFileSystemBased,
        yarnConf, "yarn-site.xml");

    CapacitySchedulerConfiguration csConf =
        new CapacitySchedulerConfiguration();
    uploadConfiguration(isFileSystemBased,
        csConf, "capacity-scheduler.xml");

    Configuration hadoopPolicyConf = new Configuration(false);
    hadoopPolicyConf
        .addResource(YarnConfiguration.HADOOP_POLICY_CONFIGURATION_FILE);
    uploadConfiguration(isFileSystemBased,
        hadoopPolicyConf, "hadoop-policy.xml");
  }

  @Test
  public void testFileSystemBasedEditSchedule() throws Exception {
    // Test FileSystemBasedConfigurationProvider scheduled
    testCommon(true);
  }

  @Test
  public void testLocalFileBasedEditSchedule() throws Exception {
    // Prepare for recover for local file default.
    fs.mkdirs(workingPath);
    fs.copyFromLocalFile(new Path(workingPath.toString()
        + "/" + YarnConfiguration.CORE_SITE_CONFIGURATION_FILE),
        new Path(workingPathRecover.toString()
        + "/" + YarnConfiguration.CORE_SITE_CONFIGURATION_FILE));

    fs.copyFromLocalFile(new Path(workingPath.toString()
        + "/" + YarnConfiguration.YARN_SITE_CONFIGURATION_FILE),
        new Path(workingPathRecover.toString()
        + "/" + YarnConfiguration.YARN_SITE_CONFIGURATION_FILE));

    fs.copyFromLocalFile(new Path(workingPath.toString()
        + "/" + YarnConfiguration.CS_CONFIGURATION_FILE),
        new Path(workingPathRecover.toString()
        + "/" + YarnConfiguration.CS_CONFIGURATION_FILE));

    // Test LocalConfigurationProvider scheduled
    testCommon(false);

    // Recover for recover for local file default.
    fs.copyFromLocalFile(new Path(workingPathRecover.toString()
        + "/" + YarnConfiguration.CORE_SITE_CONFIGURATION_FILE),
        new Path(workingPath.toString()
        + "/" + YarnConfiguration.CORE_SITE_CONFIGURATION_FILE));

    fs.copyFromLocalFile(new Path(workingPathRecover.toString()
        + "/" + YarnConfiguration.YARN_SITE_CONFIGURATION_FILE),
        new Path(workingPath.toString()
        + "/" + YarnConfiguration.YARN_SITE_CONFIGURATION_FILE));

    fs.copyFromLocalFile(new Path(workingPathRecover.toString()
        + "/" + YarnConfiguration.CS_CONFIGURATION_FILE),
        new Path(workingPath.toString()
        + "/" + YarnConfiguration.CS_CONFIGURATION_FILE));

    fs.delete(workingPathRecover, true);
  }

  public void testCommon(Boolean isFileSystemBased) throws Exception {

    // Set auto refresh interval to 1s
    configuration.setLong(CapacitySchedulerConfiguration.
            QUEUE_AUTO_REFRESH_MONITORING_INTERVAL,
        1000L);

    if (isFileSystemBased) {
      configuration.set(YarnConfiguration.FS_BASED_RM_CONF_STORE,
          fileSystemWorkingPath.toString());
    }

    //upload default configurations
    uploadDefaultConfiguration(isFileSystemBased);

    if (isFileSystemBased) {
      configuration.set(YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS,
          FileSystemBasedConfigurationProvider.class.getCanonicalName());
    } else {
      configuration.set(YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS,
          LocalConfigurationProvider.class.getCanonicalName());
    }

    // upload the auto refresh related configurations
    uploadConfiguration(isFileSystemBased,
        configuration, "yarn-site.xml");
    uploadConfiguration(isFileSystemBased,
        configuration, "capacity-scheduler.xml");

    rm = new MockRM(configuration);
    rm.init(configuration);
    policy.init(configuration,
        rm.getRMContext(),
        rm.getResourceScheduler());
    rm.start();

    CapacityScheduler cs =
        (CapacityScheduler) rm.getRMContext().getScheduler();

    int maxAppsBefore = cs.getConfiguration().getMaximumSystemApplications();

    CapacitySchedulerConfiguration csConf =
        new CapacitySchedulerConfiguration();
    csConf.setInt(CapacitySchedulerConfiguration.MAXIMUM_SYSTEM_APPLICATIONS,
        5000);
    uploadConfiguration(isFileSystemBased,
        csConf, "capacity-scheduler.xml");

    // Refreshed first time.
    policy.editSchedule();

    // Make sure refresh successfully.
    assertFalse(policy.getLastReloadAttemptFailed());
    long oldModified = policy.getLastModified();
    long oldSuccess = policy.getLastReloadAttempt();

    assertTrue(oldSuccess > oldModified);

    int maxAppsAfter = cs.getConfiguration().getMaximumSystemApplications();
    assertEquals(maxAppsAfter, 5000);
    assertTrue(maxAppsAfter != maxAppsBefore);

    // Trigger interval for refresh.
    GenericTestUtils.waitFor(() -> (policy.getClock().getTime() -
            policy.getLastReloadAttempt()) / 1000 > 1,
        500, 3000);

    // Upload for modified.
    csConf.setInt(CapacitySchedulerConfiguration.MAXIMUM_SYSTEM_APPLICATIONS,
        3000);
    uploadConfiguration(isFileSystemBased,
        csConf, "capacity-scheduler.xml");

    policy.editSchedule();
    // Wait for triggered refresh.
    GenericTestUtils.waitFor(() -> policy.getLastReloadAttempt() >
                    policy.getLastModified(),
        500, 3000);

    // Make sure refresh successfully.
    assertFalse(policy.getLastReloadAttemptFailed());
    oldModified = policy.getLastModified();
    oldSuccess = policy.getLastReloadAttempt();
    assertTrue(oldSuccess > oldModified);
    assertEquals(cs.getConfiguration().
        getMaximumSystemApplications(), 3000);

    // Trigger interval for refresh.
    GenericTestUtils.waitFor(() -> (policy.getClock().getTime() -
          policy.getLastReloadAttempt()) / 1000 > 1,
        500, 3000);

    // Without modified
    policy.editSchedule();
    assertEquals(oldModified,
        policy.getLastModified());
    assertEquals(oldSuccess,
        policy.getLastReloadAttempt());
  }

  @AfterEach
  public void tearDown() throws IOException {
    if (rm != null) {
      rm.stop();
    }
    fs.delete(fileSystemWorkingPath, true);
    fs.delete(tmpDir, true);
  }
}