TestAllocationFileLoaderService.java

/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.placement.DefaultPlacementRule;
import org.apache.hadoop.yarn.server.resourcemanager.placement.FSPlacementRule;
import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager;
import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementRule;
import org.apache.hadoop.yarn.server.resourcemanager.placement.PrimaryGroupPlacementRule;
import org.apache.hadoop.yarn.server.resourcemanager.placement.SpecifiedPlacementRule;
import org.apache.hadoop.yarn.server.resourcemanager.placement.UserPlacementRule;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration;

import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueuePath;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.allocationfile.AllocationFileQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.allocationfile.AllocationFileQueuePlacementPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.allocationfile.AllocationFileQueuePlacementRule;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.allocationfile.AllocationFileWriter;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.allocationfile.UserSettings;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FairSharePolicy;
import org.apache.hadoop.yarn.util.ControlledClock;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.hadoop.yarn.util.resource.CustomResourceTypesConfigurationProvider;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

import java.io.File;
import java.io.IOException;
import java.net.URISyntaxException;
import java.net.URL;
import java.util.List;
import java.util.Map;
import java.util.Set;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

/**
 * Test loading the allocation file for the FairScheduler.
 */
public class TestAllocationFileLoaderService {

  private static final String A_CUSTOM_RESOURCE = "a-custom-resource";

  final static String TEST_DIR = new File(System.getProperty("test.build.data",
      "/tmp")).getAbsolutePath();

  final static String ALLOC_FILE = new File(TEST_DIR,
      "test-queues").getAbsolutePath();
  private static final String TEST_FAIRSCHED_XML = "test-fair-scheduler.xml";

  private FairScheduler scheduler;
  private Configuration conf;

  @BeforeEach
  public void setup() {
    SystemClock clock = SystemClock.getInstance();
    PlacementManager placementManager = new PlacementManager();
    FairSchedulerConfiguration fsConf = new FairSchedulerConfiguration();
    RMContext rmContext = mock(RMContext.class);
    when(rmContext.getQueuePlacementManager()).thenReturn(placementManager);

    scheduler = mock(FairScheduler.class);
    conf = new YarnConfiguration();
    when(scheduler.getClock()).thenReturn(clock);
    when(scheduler.getConf()).thenReturn(fsConf);
    when(scheduler.getConfig()).thenReturn(conf);
    when(scheduler.getRMContext()).thenReturn(rmContext);
  }

  @AfterEach
  public void teardown() {
    new File(ALLOC_FILE).delete();
  }

  @Test
  public void testGetAllocationFileFromFileSystem()
      throws IOException, URISyntaxException {
    File baseDir =
        new File(TEST_DIR + Path.SEPARATOR + "getAllocHDFS").getAbsoluteFile();
    FileUtil.fullyDelete(baseDir);
    conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.getAbsolutePath());
    MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf);
    MiniDFSCluster hdfsCluster = builder.build();
    String fsAllocPath = "hdfs://localhost:" + hdfsCluster.getNameNodePort()
        + Path.SEPARATOR + TEST_FAIRSCHED_XML;

    URL fschedURL = Thread.currentThread().getContextClassLoader()
        .getResource(TEST_FAIRSCHED_XML);
    FileSystem fs = FileSystem.get(conf);
    fs.copyFromLocalFile(new Path(fschedURL.toURI()), new Path(fsAllocPath));
    conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, fsAllocPath);

    AllocationFileLoaderService allocLoader =
        new AllocationFileLoaderService(scheduler);
    Path allocationFile = allocLoader.getAllocationFile(conf);
    assertEquals(fsAllocPath, allocationFile.toString());
    assertTrue(fs.exists(allocationFile));

    hdfsCluster.shutdown(true);
  }

  @Test
  public void testDenyGetAllocationFileFromUnsupportedFileSystem()
      throws UnsupportedFileSystemException {
    assertThrows(UnsupportedFileSystemException.class, () -> {
      conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, "badfs:///badfile");
      AllocationFileLoaderService allocLoader =
          new AllocationFileLoaderService(scheduler);
      allocLoader.getAllocationFile(conf);
    });
  }

  @Test
  public void testGetAllocationFileFromClasspath() {
    try {
      FileSystem fs = FileSystem.get(conf);
      conf.set(FairSchedulerConfiguration.ALLOCATION_FILE,
          TEST_FAIRSCHED_XML);
      AllocationFileLoaderService allocLoader =
          new AllocationFileLoaderService(scheduler);
      Path allocationFile = allocLoader.getAllocationFile(conf);
      assertEquals(TEST_FAIRSCHED_XML, allocationFile.getName());
      assertTrue(fs.exists(allocationFile));
    } catch (IOException e) {
      fail("Unable to access allocation file from classpath: " + e);
    }
  }

  @Test
  @Timeout(value = 10)
  public void testReload() throws Exception {
    AllocationFileWriter.create()
        .addQueue(new AllocationFileQueue.Builder("queueA")
            .maxRunningApps(1).build())
        .addQueue(new AllocationFileQueue.Builder("queueB").build())
        .queuePlacementPolicy(new AllocationFileQueuePlacementPolicy()
            .addRule(new AllocationFileQueuePlacementRule(
                AllocationFileQueuePlacementRule.RuleName.DEFAULT)))
        .writeToFile(ALLOC_FILE);

    ControlledClock clock = new ControlledClock();
    clock.setTime(0);
    conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);

    AllocationFileLoaderService allocLoader = new AllocationFileLoaderService(
        clock, scheduler);
    allocLoader.reloadIntervalMs = 5;
    allocLoader.init(conf);
    ReloadListener confHolder = new ReloadListener();
    allocLoader.setReloadListener(confHolder);
    allocLoader.reloadAllocations();
    AllocationConfiguration allocConf = confHolder.allocConf;

    // Verify conf
    List<PlacementRule> rules = scheduler.getRMContext()
        .getQueuePlacementManager().getPlacementRules();
    assertEquals(1, rules.size());
    assertEquals(DefaultPlacementRule.class, rules.get(0).getClass());
    assertEquals(1, allocConf.getQueueMaxApps("root.queueA"));
    assertEquals(2, allocConf.getConfiguredQueues().get(FSQueueType.LEAF)
        .size());
    assertTrue(allocConf.getConfiguredQueues().get(FSQueueType.LEAF)
        .contains("root.queueA"));
    assertTrue(allocConf.getConfiguredQueues().get(FSQueueType.LEAF)
        .contains("root.queueB"));

    // reset the conf so we can detect the reload
    confHolder.allocConf = null;

    // Modify file and advance the clock
    AllocationFileWriter.create()
        .addQueue(new AllocationFileQueue.Builder("queueB")
            .maxRunningApps(3).build())
        .queuePlacementPolicy(new AllocationFileQueuePlacementPolicy()
            .addRule(new AllocationFileQueuePlacementRule(
                AllocationFileQueuePlacementRule.RuleName.SPECIFIED))
            .addRule(new AllocationFileQueuePlacementRule(
                AllocationFileQueuePlacementRule.RuleName.NESTED)
            .addNestedRule(new AllocationFileQueuePlacementRule(
                AllocationFileQueuePlacementRule.RuleName.PRIMARY_GROUP))))
        .writeToFile(ALLOC_FILE);

    clock.tickMsec(System.currentTimeMillis()
        + AllocationFileLoaderService.ALLOC_RELOAD_WAIT_MS + 10000);
    allocLoader.start();

    while (confHolder.allocConf == null) {
      Thread.sleep(20);
    }

    // Verify conf
    allocConf = confHolder.allocConf;
    rules = scheduler.getRMContext().getQueuePlacementManager()
        .getPlacementRules();
    assertEquals(2, rules.size());
    assertEquals(SpecifiedPlacementRule.class, rules.get(0).getClass());
    assertEquals(UserPlacementRule.class, rules.get(1).getClass());
    assertEquals(PrimaryGroupPlacementRule.class,
        ((FSPlacementRule)(rules.get(1))).getParentRule().getClass());
    assertEquals(3, allocConf.getQueueMaxApps("root.queueB"));
    assertEquals(1, allocConf.getConfiguredQueues().get(FSQueueType.LEAF)
        .size());
    assertTrue(allocConf.getConfiguredQueues().get(FSQueueType.LEAF)
        .contains("root.queueB"));
  }

  @Test
  public void testAllocationFileParsing() throws Exception {
    CustomResourceTypesConfigurationProvider.
        initResourceTypes(A_CUSTOM_RESOURCE);
    conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
    AllocationFileLoaderService allocLoader =
        new AllocationFileLoaderService(scheduler);

    AllocationFileWriter.create()
            // Give queue A a minimum of 1024 M
            .addQueue(new AllocationFileQueue.Builder("queueA")
                .minResources("1024mb,0vcores")
                .maxResources("2048mb,10vcores")
                .build())
            // Give queue B a minimum of 2048 M
            .addQueue(new AllocationFileQueue.Builder("queueB")
                .minResources("2048mb,0vcores")
                .maxResources("5120mb,110vcores")
                .aclAdministerApps("alice,bob admins")
                .schedulingPolicy("fair")
                .build())
            // Give queue C no minimum
            .addQueue(new AllocationFileQueue.Builder("queueC")
                .minResources("5120mb,0vcores")
                .aclSubmitApps("alice,bob admins")
                .build())
            // Give queue D a limit of 3 running apps and 0.4f maxAMShare
            .addQueue(new AllocationFileQueue.Builder("queueD")
                .maxRunningApps(3)
                .maxAMShare(0.4)
                .build())
            // Give queue E a preemption timeout of one minute
            .addQueue(new AllocationFileQueue.Builder("queueE")
                .minSharePreemptionTimeout(60)
                .build())
            // Make queue F a parent queue without configured leaf queues
            // using the 'type' attribute
            .addQueue(new AllocationFileQueue.Builder("queueF")
                .parent(true)
                .maxChildResources("2048mb,64vcores")
                .build())
            .addQueue(new AllocationFileQueue.Builder("queueG")
                .maxChildResources("2048mb,64vcores")
                .fairSharePreemptionTimeout(120)
                .minSharePreemptionTimeout(50)
                .fairSharePreemptionThreshold(0.6)
                .maxContainerAllocation(
                      "vcores=16, memory-mb=512, " + A_CUSTOM_RESOURCE + "=10")
            // Create hierarchical queues G,H, with different min/fair
            // share preemption timeouts and preemption thresholds.
            // Also add a child default to make sure it doesn't impact queue H.
                .subQueue(new AllocationFileQueue.Builder("queueH")
                    .fairSharePreemptionTimeout(180)
                    .minSharePreemptionTimeout(40)
                    .fairSharePreemptionThreshold(0.7)
                    .maxContainerAllocation("1024mb,8vcores")
                    .build())
                .build())
            // Set default limit of apps per queue to 15
            .queueMaxAppsDefault(15)
            // Set default limit of max resource per queue to 4G and 100 cores
            .queueMaxResourcesDefault("4096mb,100vcores")
            // Set default limit of apps per user to 5
            .userMaxAppsDefault(5)
            // Set default limit of AMResourceShare to 0.5f
            .queueMaxAMShareDefault(0.5)
            // Set default min share preemption timeout to 2 minutes
            .defaultMinSharePreemptionTimeout(120)
            // Set default fair share preemption timeout to 5 minutes
            .defaultFairSharePreemptionTimeout(300)
            // Set default fair share preemption threshold to 0.4
            .defaultFairSharePreemptionThreshold(0.4)
            // Set default scheduling policy to DRF
            .drfDefaultQueueSchedulingPolicy()
            // Give user1 a limit of 10 jobs
            .userSettings(new UserSettings.Builder("user1")
              .maxRunningApps(10)
              .build())
            .writeToFile(ALLOC_FILE);

    allocLoader.init(conf);
    ReloadListener confHolder = new ReloadListener();
    allocLoader.setReloadListener(confHolder);
    allocLoader.reloadAllocations();
    AllocationConfiguration queueConf = confHolder.allocConf;

    assertEquals(6, queueConf.getConfiguredQueues().get(FSQueueType.LEAF).size());
    assertEquals(Resources.createResource(0),
        queueConf.getMinResources("root." + YarnConfiguration.DEFAULT_QUEUE_NAME));

    assertEquals(Resources.createResource(2048, 10),
        queueConf.getMaxResources("root.queueA").getResource());
    assertEquals(Resources.createResource(5120, 110),
        queueConf.getMaxResources("root.queueB").getResource());
    assertEquals(Resources.createResource(4096, 100),
        queueConf.getMaxResources("root.queueC").getResource());
    assertEquals(Resources.createResource(4096, 100),
        queueConf.getMaxResources("root.queueD").getResource());
    assertEquals(Resources.createResource(4096, 100),
        queueConf.getMaxResources("root.queueE").getResource());
    assertEquals(Resources.createResource(4096, 100),
        queueConf.getMaxResources("root.queueF").getResource());
    assertEquals(Resources.createResource(4096, 100),
        queueConf.getMaxResources("root.queueG").getResource());
    assertEquals(Resources.createResource(4096, 100),
        queueConf.getMaxResources("root.queueG.queueH").getResource());

    assertEquals(Resources.createResource(1024, 0),
        queueConf.getMinResources("root.queueA"));
    assertEquals(Resources.createResource(2048, 0),
        queueConf.getMinResources("root.queueB"));
    assertEquals(Resources.createResource(5120, 0),
        queueConf.getMinResources("root.queueC"));
    assertEquals(Resources.createResource(0),
        queueConf.getMinResources("root.queueD"));
    assertEquals(Resources.createResource(0),
        queueConf.getMinResources("root.queueE"));
    assertEquals(Resources.createResource(0),
        queueConf.getMinResources("root.queueF"));
    assertEquals(Resources.createResource(0),
        queueConf.getMinResources("root.queueG"));
    assertEquals(Resources.createResource(0),
        queueConf.getMinResources("root.queueG.queueH"));

    assertNull(queueConf.getMaxChildResources("root.queueA"),
        "Max child resources unexpectedly set for queue root.queueA");
    assertNull(queueConf.getMaxChildResources("root.queueB"),
        "Max child resources unexpectedly set for queue root.queueB");
    assertNull(queueConf.getMaxChildResources("root.queueC"),
        "Max child resources unexpectedly set for queue root.queueC");
    assertNull(queueConf.getMaxChildResources("root.queueD"),
        "Max child resources unexpectedly set for queue root.queueD");
    assertNull(queueConf.getMaxChildResources("root.queueE"),
        "Max child resources unexpectedly set for queue root.queueE");
    assertEquals(Resources.createResource(2048, 64),
        queueConf.getMaxChildResources("root.queueF").getResource());
    assertEquals(Resources.createResource(2048, 64),
        queueConf.getMaxChildResources("root.queueG").getResource());
    assertNull(queueConf.getMaxChildResources("root.queueG.queueH"),
        "Max child resources unexpectedly set for "
        + "queue root.queueG.queueH");

    assertEquals(15, queueConf.getQueueMaxApps("root."
        + YarnConfiguration.DEFAULT_QUEUE_NAME));
    assertEquals(15, queueConf.getQueueMaxApps("root.queueA"));
    assertEquals(15, queueConf.getQueueMaxApps("root.queueB"));
    assertEquals(15, queueConf.getQueueMaxApps("root.queueC"));
    assertEquals(3, queueConf.getQueueMaxApps("root.queueD"));
    assertEquals(15, queueConf.getQueueMaxApps("root.queueE"));
    assertEquals(10, queueConf.getUserMaxApps("user1"));
    assertEquals(5, queueConf.getUserMaxApps("user2"));

    assertEquals(.5f, queueConf.getQueueMaxAMShare("root." + YarnConfiguration.DEFAULT_QUEUE_NAME), 0.01);
    assertEquals(.5f, queueConf.getQueueMaxAMShare("root.queueA"), 0.01);
    assertEquals(.5f, queueConf.getQueueMaxAMShare("root.queueB"), 0.01);
    assertEquals(.5f, queueConf.getQueueMaxAMShare("root.queueC"), 0.01);
    assertEquals(.4f, queueConf.getQueueMaxAMShare("root.queueD"), 0.01);
    assertEquals(.5f, queueConf.getQueueMaxAMShare("root.queueE"), 0.01);

    Resource expectedResourceWithCustomType = Resources.createResource(512, 16);
    expectedResourceWithCustomType.setResourceValue(A_CUSTOM_RESOURCE, 10);

    assertEquals(Resources.unbounded(),
        queueConf.getQueueMaxContainerAllocation(
            "root." + YarnConfiguration.DEFAULT_QUEUE_NAME));
    assertEquals(Resources.unbounded(),
        queueConf.getQueueMaxContainerAllocation("root.queueA"));
    assertEquals(Resources.unbounded(),
        queueConf.getQueueMaxContainerAllocation("root.queueB"));
    assertEquals(Resources.unbounded(),
        queueConf.getQueueMaxContainerAllocation("root.queueC"));
    assertEquals(Resources.unbounded(),
        queueConf.getQueueMaxContainerAllocation("root.queueD"));
    assertEquals(Resources.unbounded(),
        queueConf.getQueueMaxContainerAllocation("root.queueE"));
    assertEquals(Resources.unbounded(),
        queueConf.getQueueMaxContainerAllocation("root.queueF"));
    assertEquals(expectedResourceWithCustomType,
        queueConf.getQueueMaxContainerAllocation("root.queueG"));
    assertEquals(Resources.createResource(1024, 8),
        queueConf.getQueueMaxContainerAllocation("root.queueG.queueH"));

    assertEquals(120000, queueConf.getMinSharePreemptionTimeout("root"));
    assertEquals(-1, queueConf.getMinSharePreemptionTimeout("root." +
        YarnConfiguration.DEFAULT_QUEUE_NAME));
    assertEquals(-1, queueConf.getMinSharePreemptionTimeout("root.queueA"));
    assertEquals(-1, queueConf.getMinSharePreemptionTimeout("root.queueB"));
    assertEquals(-1, queueConf.getMinSharePreemptionTimeout("root.queueC"));
    assertEquals(-1, queueConf.getMinSharePreemptionTimeout("root.queueD"));
    assertEquals(60000, queueConf.getMinSharePreemptionTimeout("root.queueE"));
    assertEquals(-1, queueConf.getMinSharePreemptionTimeout("root.queueF"));
    assertEquals(50000, queueConf.getMinSharePreemptionTimeout("root.queueG"));
    assertEquals(40000, queueConf.getMinSharePreemptionTimeout("root.queueG.queueH"));

    assertEquals(300000, queueConf.getFairSharePreemptionTimeout("root"));
    assertEquals(-1, queueConf.getFairSharePreemptionTimeout("root." +
        YarnConfiguration.DEFAULT_QUEUE_NAME));
    assertEquals(-1, queueConf.getFairSharePreemptionTimeout("root.queueA"));
    assertEquals(-1, queueConf.getFairSharePreemptionTimeout("root.queueB"));
    assertEquals(-1, queueConf.getFairSharePreemptionTimeout("root.queueC"));
    assertEquals(-1, queueConf.getFairSharePreemptionTimeout("root.queueD"));
    assertEquals(-1, queueConf.getFairSharePreemptionTimeout("root.queueE"));
    assertEquals(-1, queueConf.getFairSharePreemptionTimeout("root.queueF"));
    assertEquals(120000, queueConf.getFairSharePreemptionTimeout("root.queueG"));
    assertEquals(180000, queueConf.getFairSharePreemptionTimeout("root.queueG.queueH"));

    assertEquals(.4f, queueConf.getFairSharePreemptionThreshold("root"), 0.01);
    assertEquals(-1, queueConf.getFairSharePreemptionThreshold("root." +
        YarnConfiguration.DEFAULT_QUEUE_NAME), 0.01);
    assertEquals(-1,
        queueConf.getFairSharePreemptionThreshold("root.queueA"), 0.01);
    assertEquals(-1,
        queueConf.getFairSharePreemptionThreshold("root.queueB"), 0.01);
    assertEquals(-1,
        queueConf.getFairSharePreemptionThreshold("root.queueC"), 0.01);
    assertEquals(-1,
        queueConf.getFairSharePreemptionThreshold("root.queueD"), 0.01);
    assertEquals(-1,
        queueConf.getFairSharePreemptionThreshold("root.queueE"), 0.01);
    assertEquals(-1,
        queueConf.getFairSharePreemptionThreshold("root.queueF"), 0.01);
    assertEquals(.6f,
        queueConf.getFairSharePreemptionThreshold("root.queueG"), 0.01);
    assertEquals(.7f,
        queueConf.getFairSharePreemptionThreshold("root.queueG.queueH"), 0.01);

    assertTrue(queueConf.getConfiguredQueues()
        .get(FSQueueType.PARENT)
        .contains("root.queueF"));
    assertTrue(queueConf.getConfiguredQueues().get(FSQueueType.PARENT)
        .contains("root.queueG"));
    assertTrue(queueConf.getConfiguredQueues().get(FSQueueType.LEAF)
        .contains("root.queueG.queueH"));

    // Verify existing queues have default scheduling policy
    assertEquals(DominantResourceFairnessPolicy.NAME,
        queueConf.getSchedulingPolicy("root").getName());
    assertEquals(DominantResourceFairnessPolicy.NAME,
        queueConf.getSchedulingPolicy("root.queueA").getName());
    // Verify default is overriden if specified explicitly
    assertEquals(FairSharePolicy.NAME,
        queueConf.getSchedulingPolicy("root.queueB").getName());
    // Verify new queue gets default scheduling policy
    assertEquals(DominantResourceFairnessPolicy.NAME,
        queueConf.getSchedulingPolicy("root.newqueue").getName());
  }

  @Test
  public void testBackwardsCompatibleAllocationFileParsing() throws Exception {
    conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
    AllocationFileLoaderService allocLoader =
        new AllocationFileLoaderService(scheduler);

    AllocationFileWriter.create()
        .useLegacyTagNameForQueues()
        // Give queue A a minimum of 1024 M
        .addQueue(new AllocationFileQueue.Builder("queueA")
            .minResources("1024mb,0vcores")
            .build())
        // Give queue B a minimum of 2048 M
        .addQueue(new AllocationFileQueue.Builder("queueB")
            .minResources("2048mb,0vcores")
            .aclAdministerApps("alice,bob admins")
            .build())
        // Give queue C no minimum
        .addQueue(new AllocationFileQueue.Builder("queueC")
            .aclAdministerApps("alice,bob admins")
            .build())
        // Give queue D a limit of 3 running apps
        .addQueue(new AllocationFileQueue.Builder("queueD")
            .maxRunningApps(3)
            .build())
        // Give queue E a preemption timeout of one minute and 0.3f threshold
        .addQueue(new AllocationFileQueue.Builder("queueE")
            .minSharePreemptionTimeout(60)
            .fairSharePreemptionThreshold(0.3)
            .build())
        // Set default limit of apps per queue to 15
        .queueMaxAppsDefault(15)
        // Set default limit of apps per user to 5
        .userMaxAppsDefault(5)
        // Set default limit of max resource per queue to 4G and 100 cores
        .queueMaxResourcesDefault("4096mb,100vcores")
        // Set default limit of AMResourceShare to 0.5f
        .queueMaxAMShareDefault(0.5)
        // Set default min share preemption timeout to 2 minutes
        .defaultMinSharePreemptionTimeout(120)
        // Set default fair share preemption timeout to 5 minutes
        .defaultFairSharePreemptionTimeout(300)
        // Set default fair share preemption threshold to 0.6
        .defaultFairSharePreemptionThreshold(0.6)
        // Set default scheduling policy to DRF
        .drfDefaultQueueSchedulingPolicy()
        // Give user1 a limit of 10 jobs
        .userSettings(new UserSettings.Builder("user1")
            .maxRunningApps(10)
            .build())
        .writeToFile(ALLOC_FILE);

    allocLoader.init(conf);
    ReloadListener confHolder = new ReloadListener();
    allocLoader.setReloadListener(confHolder);
    allocLoader.reloadAllocations();
    AllocationConfiguration queueConf = confHolder.allocConf;

    assertEquals(5, queueConf.getConfiguredQueues().get(FSQueueType.LEAF).size());
    assertEquals(Resources.createResource(0),
        queueConf.getMinResources("root." + YarnConfiguration.DEFAULT_QUEUE_NAME));
    assertEquals(Resources.createResource(0),
        queueConf.getMinResources("root." + YarnConfiguration.DEFAULT_QUEUE_NAME));

    assertEquals(Resources.createResource(1024, 0),
        queueConf.getMinResources("root.queueA"));
    assertEquals(Resources.createResource(2048, 0),
        queueConf.getMinResources("root.queueB"));
    assertEquals(Resources.createResource(0),
        queueConf.getMinResources("root.queueC"));
    assertEquals(Resources.createResource(0),
        queueConf.getMinResources("root.queueD"));
    assertEquals(Resources.createResource(0),
        queueConf.getMinResources("root.queueE"));

    assertEquals(15, queueConf.getQueueMaxApps("root." + YarnConfiguration.DEFAULT_QUEUE_NAME));
    assertEquals(15, queueConf.getQueueMaxApps("root.queueA"));
    assertEquals(15, queueConf.getQueueMaxApps("root.queueB"));
    assertEquals(15, queueConf.getQueueMaxApps("root.queueC"));
    assertEquals(3, queueConf.getQueueMaxApps("root.queueD"));
    assertEquals(15, queueConf.getQueueMaxApps("root.queueE"));
    assertEquals(10, queueConf.getUserMaxApps("user1"));
    assertEquals(5, queueConf.getUserMaxApps("user2"));

    assertEquals(120000, queueConf.getMinSharePreemptionTimeout("root"));
    assertEquals(-1, queueConf.getMinSharePreemptionTimeout("root." +
        YarnConfiguration.DEFAULT_QUEUE_NAME));
    assertEquals(-1, queueConf.getMinSharePreemptionTimeout("root.queueA"));
    assertEquals(-1, queueConf.getMinSharePreemptionTimeout("root.queueB"));
    assertEquals(-1, queueConf.getMinSharePreemptionTimeout("root.queueC"));
    assertEquals(-1, queueConf.getMinSharePreemptionTimeout("root.queueD"));
    assertEquals(60000, queueConf.getMinSharePreemptionTimeout("root.queueE"));

    assertEquals(300000, queueConf.getFairSharePreemptionTimeout("root"));
    assertEquals(-1, queueConf.getFairSharePreemptionTimeout("root." +
        YarnConfiguration.DEFAULT_QUEUE_NAME));
    assertEquals(-1, queueConf.getFairSharePreemptionTimeout("root.queueA"));
    assertEquals(-1, queueConf.getFairSharePreemptionTimeout("root.queueB"));
    assertEquals(-1, queueConf.getFairSharePreemptionTimeout("root.queueC"));
    assertEquals(-1, queueConf.getFairSharePreemptionTimeout("root.queueD"));
    assertEquals(-1, queueConf.getFairSharePreemptionTimeout("root.queueE"));

    assertEquals(.6f, queueConf.getFairSharePreemptionThreshold("root"), 0.01);
    assertEquals(-1, queueConf.getFairSharePreemptionThreshold("root."
        + YarnConfiguration.DEFAULT_QUEUE_NAME), 0.01);
    assertEquals(-1,
        queueConf.getFairSharePreemptionThreshold("root.queueA"), 0.01);
    assertEquals(-1,
        queueConf.getFairSharePreemptionThreshold("root.queueB"), 0.01);
    assertEquals(-1,
        queueConf.getFairSharePreemptionThreshold("root.queueC"), 0.01);
    assertEquals(-1,
        queueConf.getFairSharePreemptionThreshold("root.queueD"), 0.01);
    assertEquals(.3f,
        queueConf.getFairSharePreemptionThreshold("root.queueE"), 0.01);
  }

  @Test
  public void testSimplePlacementPolicyFromConf() throws Exception {
    conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
    conf.setBoolean(FairSchedulerConfiguration.ALLOW_UNDECLARED_POOLS, false);
    conf.setBoolean(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, false);

    AllocationFileWriter.create().writeToFile(ALLOC_FILE);

    AllocationFileLoaderService allocLoader =
        new AllocationFileLoaderService(scheduler);
    allocLoader.init(conf);
    ReloadListener confHolder = new ReloadListener();
    allocLoader.setReloadListener(confHolder);
    allocLoader.reloadAllocations();

    List<PlacementRule> rules = scheduler.getRMContext()
        .getQueuePlacementManager().getPlacementRules();
    assertEquals(2, rules.size());
    assertEquals(SpecifiedPlacementRule.class, rules.get(0).getClass());
    assertFalse(((FSPlacementRule)rules.get(0)).getCreateFlag(),
        "Create flag was not set to false");
    assertEquals(DefaultPlacementRule.class, rules.get(1).getClass());
  }

  /**
   * Verify that you can't place queues at the same level as the root queue in
   * the allocations file.
   */
  @Test
  public void testQueueAlongsideRoot() throws Exception {
    assertThrows(AllocationConfigurationException.class, () -> {
      conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);

      AllocationFileWriter.create()
          .addQueue(new AllocationFileQueue.Builder("root").build())
          .addQueue(new AllocationFileQueue.Builder("other").build())
          .writeToFile(ALLOC_FILE);

      AllocationFileLoaderService allocLoader =
          new AllocationFileLoaderService(scheduler);
      allocLoader.init(conf);
      ReloadListener confHolder = new ReloadListener();
      allocLoader.setReloadListener(confHolder);
      allocLoader.reloadAllocations();
    });
  }

  /**
   * Verify that you can't include periods as the queue name in the allocations
   * file.
   */
  @Test
  public void testQueueNameContainingPeriods() throws Exception {
    assertThrows(AllocationConfigurationException.class, () -> {
      conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);

      AllocationFileWriter.create()
          .addQueue(new AllocationFileQueue.Builder("parent1.child").build())
          .writeToFile(ALLOC_FILE);

      AllocationFileLoaderService allocLoader =
          new AllocationFileLoaderService(scheduler);
      allocLoader.init(conf);
      ReloadListener confHolder = new ReloadListener();
      allocLoader.setReloadListener(confHolder);
      allocLoader.reloadAllocations();
    });
  }

  /**
   * Verify that you can't have the queue name with whitespace only in the
   * allocations file.
   */
  @Test
  public void testQueueNameContainingOnlyWhitespace() throws Exception {
    assertThrows(AllocationConfigurationException.class, () -> {
      conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);

      AllocationFileWriter.create()
          .addQueue(new AllocationFileQueue.Builder("      ").build())
          .writeToFile(ALLOC_FILE);

      AllocationFileLoaderService allocLoader =
          new AllocationFileLoaderService(scheduler);
      allocLoader.init(conf);
      ReloadListener confHolder = new ReloadListener();
      allocLoader.setReloadListener(confHolder);
      allocLoader.reloadAllocations();
    });
  }

  @Test
  public void testParentTagWithReservation() throws Exception {
    conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);

    AllocationFileWriter.create()
        .addQueue(new AllocationFileQueue.Builder("parent")
            .parent(true)
            .reservation()
            .build())
        .writeToFile(ALLOC_FILE);

    AllocationFileLoaderService allocLoader =
        new AllocationFileLoaderService(scheduler);
    allocLoader.init(conf);
    ReloadListener confHolder = new ReloadListener();
    allocLoader.setReloadListener(confHolder);
    try {
      allocLoader.reloadAllocations();
    } catch (AllocationConfigurationException ex) {
      assertEquals(ex.getMessage(), "The configuration settings for root.parent"
          + " are invalid. A queue element that contains child queue elements"
          + " or that has the type='parent' attribute cannot also include a"
          + " reservation element.");
    }
  }

  @Test
  public void testParentWithReservation() throws Exception {
    conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);

    AllocationFileWriter.create()
        .addQueue(new AllocationFileQueue.Builder("parent")
            .parent(true)
            .subQueue(new AllocationFileQueue.Builder("child").build())
            .reservation()
            .build())
        .writeToFile(ALLOC_FILE);

    AllocationFileLoaderService allocLoader =
        new AllocationFileLoaderService(scheduler);
    allocLoader.init(conf);
    ReloadListener confHolder = new ReloadListener();
    allocLoader.setReloadListener(confHolder);
    try {
      allocLoader.reloadAllocations();
    } catch (AllocationConfigurationException ex) {
      assertEquals(ex.getMessage(), "The configuration settings for root.parent"
          + " are invalid. A queue element that contains child queue elements"
          + " or that has the type='parent' attribute cannot also include a"
          + " reservation element.");
    }
  }

  /**
   * Verify that a parent queue (type = parent) cannot have a maxAMShare element
   * as dynamic queues won't be able to inherit this setting.
   */
  @Test
  public void testParentTagWithMaxAMShare() throws Exception {
    conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);

    AllocationFileWriter.create()
        .addQueue(new AllocationFileQueue.Builder("parent")
            .parent(true)
            .maxAMShare(0.75)
            .build())
        .writeToFile(ALLOC_FILE);

    AllocationFileLoaderService allocLoader =
        new AllocationFileLoaderService(scheduler);
    allocLoader.init(conf);
    ReloadListener confHolder = new ReloadListener();
    allocLoader.setReloadListener(confHolder);
    try {
      allocLoader.reloadAllocations();
      fail("Expect allocation parsing to fail as maxAMShare cannot be set for"
          + " a parent queue.");
    } catch (AllocationConfigurationException ex) {
      assertEquals(ex.getMessage(), "The configuration settings for root.parent"
          + " are invalid. A queue element that contains child queue elements"
          + " or that has the type='parent' attribute cannot also include a"
          + " maxAMShare element.");
    }
  }

  /**
   * Verify that a parent queue that is not explicitly tagged with "type"
   * as "parent" but has a child queue (implicit parent) cannot have a
   * maxAMShare element.
   */
  @Test
  public void testParentWithMaxAMShare() throws Exception {
    conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);

    AllocationFileWriter.create()
        .addQueue(new AllocationFileQueue.Builder("parent")
            .parent(false)
            .maxAMShare(0.76)
            .subQueue(new AllocationFileQueue.Builder("child").build())
            .build())
        .writeToFile(ALLOC_FILE);

    AllocationFileLoaderService allocLoader =
        new AllocationFileLoaderService(scheduler);
    allocLoader.init(conf);
    ReloadListener confHolder = new ReloadListener();
    allocLoader.setReloadListener(confHolder);
    try {
      allocLoader.reloadAllocations();
      fail("Expect allocation parsing to fail as maxAMShare cannot be set for"
          + " a parent queue.");
    } catch (AllocationConfigurationException ex) {
      assertEquals(ex.getMessage(), "The configuration settings for root.parent"
          + " are invalid. A queue element that contains child queue elements"
          + " or that has the type='parent' attribute cannot also include a"
          + " maxAMShare element.");
    }
  }

  @Test
  public void testParentTagWithChild() throws Exception {
    conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);

    AllocationFileWriter.create()
        .addQueue(new AllocationFileQueue.Builder("parent")
            .parent(true)
            .subQueue(new AllocationFileQueue.Builder("child").build())
            .build())
        .writeToFile(ALLOC_FILE);

    AllocationFileLoaderService allocLoader =
        new AllocationFileLoaderService(scheduler);
    allocLoader.init(conf);
    ReloadListener confHolder = new ReloadListener();
    allocLoader.setReloadListener(confHolder);
    allocLoader.reloadAllocations();
    AllocationConfiguration queueConf = confHolder.allocConf;
    // Check whether queue 'parent' and 'child' are loaded successfully
    assertTrue(queueConf.getConfiguredQueues().get(FSQueueType.PARENT)
        .contains("root.parent"));
    assertTrue(queueConf.getConfiguredQueues().get(FSQueueType.LEAF)
        .contains("root.parent.child"));
  }

  /**
   * Verify that you can't have the queue name with just a non breaking
   * whitespace in the allocations file.
   */
  @Test
  public void testQueueNameContainingNBWhitespace() throws Exception {
    assertThrows(AllocationConfigurationException.class, () -> {
      conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);

      AllocationFileWriter.create()
          .addQueue(new AllocationFileQueue.Builder("\u00a0").build())
          .writeToFile(ALLOC_FILE);

      AllocationFileLoaderService allocLoader =
          new AllocationFileLoaderService(scheduler);
      allocLoader.init(conf);
      ReloadListener confHolder = new ReloadListener();
      allocLoader.setReloadListener(confHolder);
      allocLoader.reloadAllocations();
    });
  }

  /**
   * Verify that defaultQueueSchedulingMode can't accept FIFO as a value.
   */
  @Test
  public void testDefaultQueueSchedulingModeIsFIFO() throws Exception {
    assertThrows(AllocationConfigurationException.class, () -> {
      conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);

      AllocationFileWriter.create()
          .fifoDefaultQueueSchedulingPolicy()
          .writeToFile(ALLOC_FILE);

      AllocationFileLoaderService allocLoader =
          new AllocationFileLoaderService(scheduler);
      allocLoader.init(conf);
      ReloadListener confHolder = new ReloadListener();
      allocLoader.setReloadListener(confHolder);
      allocLoader.reloadAllocations();
    });
  }

  @Test
  public void testReservableQueue() throws Exception {
    conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);

    AllocationFileWriter.create()
        .addQueue(new AllocationFileQueue.Builder("reservable")
            .reservation()
            .build())
        .addQueue(new AllocationFileQueue.Builder("other").build())
        .reservationAgent("DummyAgentName")
        .reservationPolicy("AnyAdmissionPolicy")
        .writeToFile(ALLOC_FILE);

    AllocationFileLoaderService allocLoader =
        new AllocationFileLoaderService(scheduler);
    allocLoader.init(conf);
    ReloadListener confHolder = new ReloadListener();
    allocLoader.setReloadListener(confHolder);
    allocLoader.reloadAllocations();

    AllocationConfiguration allocConf = confHolder.allocConf;
    String reservableQueueName = "root.reservable";
    String nonreservableQueueName = "root.other";
    QueuePath nonreservableQueuePath = new QueuePath(nonreservableQueueName);
    QueuePath reservableQueuePath = new QueuePath(reservableQueueName);
    assertFalse(allocConf.isReservable(nonreservableQueuePath));
    assertTrue(allocConf.isReservable(reservableQueuePath));
    Map<FSQueueType, Set<String>> configuredQueues =
        allocConf.getConfiguredQueues();
    assertTrue(configuredQueues.get(FSQueueType.PARENT).contains(reservableQueueName),
        "reservable queue is expected be to a parent queue");
    assertFalse(configuredQueues.get(FSQueueType.LEAF)
        .contains(reservableQueueName), "reservable queue should not be a leaf queue");

    assertTrue(allocConf.getMoveOnExpiry(reservableQueuePath));
    assertEquals(ReservationSchedulerConfiguration.DEFAULT_RESERVATION_WINDOW,
        allocConf.getReservationWindow(reservableQueuePath));
    assertEquals(100,
        allocConf.getInstantaneousMaxCapacity(reservableQueuePath), 0.0001);
    assertEquals("DummyAgentName",
        allocConf.getReservationAgent(reservableQueuePath));
    assertEquals(100, allocConf.getAverageCapacity(reservableQueuePath), 0.001);
    assertFalse(allocConf.getShowReservationAsQueues(reservableQueuePath));
    assertEquals("AnyAdmissionPolicy",
        allocConf.getReservationAdmissionPolicy(reservableQueuePath));
    assertEquals(ReservationSchedulerConfiguration
        .DEFAULT_RESERVATION_PLANNER_NAME,
        allocConf.getReplanner(reservableQueuePath));
    assertEquals(ReservationSchedulerConfiguration
        .DEFAULT_RESERVATION_ENFORCEMENT_WINDOW,
        allocConf.getEnforcementWindow(reservableQueuePath));
  }

  /**
   * Verify that you can't have dynamic user queue and reservable queue on
   * the same queue.
   */
  @Test
  public void testReservableCannotBeCombinedWithDynamicUserQueue()
      throws Exception {
    assertThrows(AllocationConfigurationException.class, () -> {
      conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);

      AllocationFileWriter.create()
          .addQueue(new AllocationFileQueue.Builder("notboth")
          .parent(true)
          .reservation()
          .build())
          .writeToFile(ALLOC_FILE);

      AllocationFileLoaderService allocLoader =
          new AllocationFileLoaderService(scheduler);
      allocLoader.init(conf);
      ReloadListener confHolder = new ReloadListener();
      allocLoader.setReloadListener(confHolder);
      allocLoader.reloadAllocations();
    });
  }

  private class ReloadListener implements AllocationFileLoaderService.Listener {
    private AllocationConfiguration allocConf;

    @Override
    public void onReload(AllocationConfiguration info) {
      allocConf = info;
    }

    @Override
    public void onCheck() {
    }
  }
}