TestFSQueueConverter.java
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.DEFAULT_MAX_PARALLEL_APPLICATIONS;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.PREFIX;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.Sets;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueuePath;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
/**
* Unit tests for FSQueueConverter.
*
*/
@RunWith(MockitoJUnitRunner.class)
public class TestFSQueueConverter {
private static final float MAX_AM_SHARE_DEFAULT = 0.16f;
private static final int MAX_APPS_DEFAULT = 15;
private static final Resource CLUSTER_RESOURCE =
Resource.newInstance(16384, 16);
private final static Set<String> ALL_QUEUES =
Sets.newHashSet("root",
"root.default",
"root.admins",
"root.users",
"root.admins.alice",
"root.admins.bob",
"root.users.joe",
"root.users.john",
"root.misc",
"root.misc.a",
"root.misc.b");
private static final String FILE_PREFIX = "file:";
private static final String FAIR_SCHEDULER_XML =
prepareFileName("fair-scheduler-conversion.xml");
private static String prepareFileName(String f) {
return FILE_PREFIX + new File("src/test/resources/" + f).getAbsolutePath();
}
private FSQueueConverter converter;
private Configuration yarnConfig;
private CapacitySchedulerConfiguration csConfig;
private FairScheduler fs;
private FSQueue rootQueue;
private ConversionOptions conversionOptions;
private DryRunResultHolder dryRunResultHolder;
private FSQueueConverterBuilder builder;
private String key;
private static final QueuePath ROOT = new QueuePath("root");
private static final QueuePath DEFAULT = new QueuePath("root.default");
private static final QueuePath USERS = new QueuePath("root.users");
private static final QueuePath USERS_JOE = new QueuePath("root.users.joe");
private static final QueuePath USERS_JOHN = new QueuePath("root.users.john");
private static final QueuePath ADMINS = new QueuePath("root.admins");
private static final QueuePath ADMINS_ALICE = new QueuePath("root.admins.alice");
private static final QueuePath ADMINS_BOB = new QueuePath("root.admins.bob");
private static final QueuePath MISC = new QueuePath("root.misc");
private static final QueuePath MISC_A = new QueuePath("root.misc.a");
private static final QueuePath MISC_B = new QueuePath("root.misc.b");
@Mock
private FSConfigToCSConfigRuleHandler ruleHandler;
@Rule
public ExpectedException expectedException = ExpectedException.none();
@Before
public void setup() {
yarnConfig = new Configuration(false);
yarnConfig.set(FairSchedulerConfiguration.ALLOCATION_FILE,
FAIR_SCHEDULER_XML);
yarnConfig.setBoolean(FairSchedulerConfiguration.MIGRATION_MODE, true);
csConfig = new CapacitySchedulerConfiguration(
new Configuration(false));
dryRunResultHolder = new DryRunResultHolder();
conversionOptions =
new ConversionOptions(dryRunResultHolder, false);
fs = createFairScheduler();
createBuilder();
rootQueue = fs.getQueueManager().getRootQueue();
}
@After
public void tearDown() throws IOException {
if (fs != null) {
fs.close();
}
}
private FairScheduler createFairScheduler() {
RMContext ctx = new RMContextImpl();
PlacementManager placementManager = new PlacementManager();
ctx.setQueuePlacementManager(placementManager);
FairScheduler fairScheduler = new FairScheduler();
fairScheduler.setRMContext(ctx);
fairScheduler.init(yarnConfig);
return fairScheduler;
}
private void createBuilder() {
builder = FSQueueConverterBuilder.create()
.withRuleHandler(ruleHandler)
.withCapacitySchedulerConfig(csConfig)
.withPreemptionEnabled(false)
.withSizeBasedWeight(false)
.withClusterResource(CLUSTER_RESOURCE)
.withQueueMaxAMShareDefault(MAX_AM_SHARE_DEFAULT)
.withQueueMaxAppsDefault(MAX_APPS_DEFAULT)
.withConversionOptions(conversionOptions);
}
@Test
public void testConvertQueueHierarchy() {
converter = builder.build();
converter.convertQueueHierarchy(rootQueue);
// root children
assertEquals("root children", Arrays.asList("admins", "users", "misc", "default"),
csConfig.getQueues(ROOT));
// root.admins children
assertEquals("root.admins children", Arrays.asList("bob", "alice"),
csConfig.getQueues(ADMINS));
// root.default children - none
assertTrue("root.default children",
csConfig.getQueues(DEFAULT).isEmpty());
// root.users children
assertEquals("root.users children", Arrays.asList("john", "joe"),
csConfig.getQueues(USERS));
Set<String> leafs = Sets.difference(ALL_QUEUES,
Sets.newHashSet("root",
"root.default",
"root.admins",
"root.users",
"root.misc"));
for (String queue : leafs) {
key = PREFIX + queue + ".queues";
assertTrue("Key " + key + " has value, but it should be empty",
csConfig.getQueues(new QueuePath(queue)).isEmpty());
}
}
@Test
public void testQueueMaxAMShare() {
converter = builder.build();
converter.convertQueueHierarchy(rootQueue);
// root.admins.bob
assertEquals("root.admins.bob AM share", 1.0f,
csConfig.getMaximumApplicationMasterResourcePerQueuePercent(
ADMINS_BOB), 0.0f);
// root.admins.alice
assertEquals("root.admins.alice AM share", 0.15f,
csConfig.getMaximumApplicationMasterResourcePerQueuePercent(
ADMINS_ALICE), 0.0f);
Set<String> remaining = Sets.difference(ALL_QUEUES,
Sets.newHashSet("root.admins.bob", "root.admins.alice"));
for (String queue : remaining) {
key = PREFIX + queue + ".maximum-am-resource-percent";
assertEquals("Key " + key + " has different value",
0.1f, csConfig
.getMaximumApplicationMasterResourcePerQueuePercent(new QueuePath(queue)), 0.0f);
}
}
@Test
public void testQueueMaxParallelApps() {
converter = builder.build();
converter.convertQueueHierarchy(rootQueue);
assertEquals("root.admins.alice max apps", 2,
csConfig.getMaxParallelAppsForQueue(ADMINS_ALICE), 0);
Set<String> remaining = Sets.difference(ALL_QUEUES,
Sets.newHashSet("root.admins.alice"));
for (String queue : remaining) {
key = PREFIX + queue + ".max-parallel-apps";
assertEquals("Key " + key + " has different value",
DEFAULT_MAX_PARALLEL_APPLICATIONS, csConfig
.getMaxParallelAppsForQueue(new QueuePath(queue)), 0);
}
}
@Test
public void testQueueMaxAllocations() {
converter = builder.build();
converter.convertQueueHierarchy(rootQueue);
// root.admins vcores + mb
assertEquals("root.admins max vcores", 3,
csConfig.getQueueMaximumAllocationVcores(ADMINS));
assertEquals("root.admins max memory", 4096,
csConfig.getQueueMaximumAllocationMb(ADMINS));
// root.users.john max vcores + mb
assertEquals("root.users.john max vcores", 2,
csConfig.getQueueMaximumAllocationVcores(USERS_JOHN));
assertEquals("root.users.john max memory", 8192,
csConfig.getQueueMaximumAllocationMb(USERS_JOHN));
Set<String> remaining = Sets.difference(ALL_QUEUES,
Sets.newHashSet("root.admins", "root.users.john"));
for (String queue : remaining) {
key = PREFIX + queue + ".maximum-allocation-vcores";
assertEquals("Key " + key + " has different value",
-1.0, csConfig
.getQueueMaximumAllocationVcores(new QueuePath(queue)), 0.0f);
key = PREFIX + queue + ".maximum-allocation-mb";
assertEquals("Key " + key + " has different value",
-1.0, csConfig
.getQueueMaximumAllocationMb(new QueuePath(queue)), 0.0f);
}
}
@Test
public void testQueuePreemptionDisabled() {
converter = builder.withPreemptionEnabled(true).build();
converter.convertQueueHierarchy(rootQueue);
assertTrue("root.admins.alice preemption setting",
csConfig.getPreemptionDisabled(
ADMINS_ALICE, false));
assertTrue("root.users.joe preemption setting",
csConfig.getPreemptionDisabled(
USERS_JOE, false));
Set<String> remaining = Sets.difference(ALL_QUEUES,
Sets.newHashSet("root.admins.alice", "root.users.joe"));
for (String queue : remaining) {
key = PREFIX + queue + ".disable_preemption";
assertEquals("Key " + key + " has different value",
false, csConfig.getPreemptionDisabled(new QueuePath(queue), false));
}
}
@Test
public void testQueuePreemptionDisabledWhenGlobalPreemptionDisabled() {
converter = builder.build();
converter.convertQueueHierarchy(rootQueue);
for (String queue : ALL_QUEUES) {
key = PREFIX + queue + ".disable_preemption";
assertEquals("Key " + key + " has different value",
false, csConfig.getPreemptionDisabled(new QueuePath(queue), false));
}
}
@Test
public void testChildCapacityInCapacityMode() {
converter = builder.withPercentages(true).build();
converter.convertQueueHierarchy(rootQueue);
// root
assertEquals("root.default capacity", 33.333f,
csConfig.getNonLabeledQueueCapacity(DEFAULT), 0.0f);
assertEquals("root.admins capacity", 33.333f,
csConfig.getNonLabeledQueueCapacity(ADMINS), 0.0f);
assertEquals("root.users capacity", 33.334f,
csConfig.getNonLabeledQueueCapacity(USERS), 0.0f);
// root.users
assertEquals("root.users.john capacity", 25.000f,
csConfig.getNonLabeledQueueCapacity(USERS_JOHN), 0.0f);
assertEquals("root.users.joe capacity", 75.000f,
csConfig.getNonLabeledQueueCapacity(USERS_JOE), 0.0f);
// root.admins
assertEquals("root.admins.alice capacity", 75.000f,
csConfig.getNonLabeledQueueCapacity(ADMINS_ALICE), 0.0f);
assertEquals("root.admins.bob capacity", 25.000f,
csConfig.getNonLabeledQueueCapacity(ADMINS_BOB), 0.0f);
// root.misc
assertEquals("root.misc capacity", 0.000f,
csConfig.getNonLabeledQueueCapacity(MISC), 0.000f);
assertEquals("root.misc.a capacity", 0.000f,
csConfig.getNonLabeledQueueCapacity(MISC_A), 0.000f);
assertEquals("root.misc.b capacity", 0.000f,
csConfig.getNonLabeledQueueCapacity(MISC_B), 0.000f);
}
@Test
public void testChildCapacityInWeightMode() {
converter = builder.withPercentages(false).build();
converter.convertQueueHierarchy(rootQueue);
// root
assertEquals("root.default weight", 1.0f,
csConfig.getNonLabeledQueueWeight(DEFAULT), 0.01f);
assertEquals("root.admins weight", 1.0f,
csConfig.getNonLabeledQueueWeight(ADMINS), 0.01f);
assertEquals("root.users weight", 1.0f,
csConfig.getNonLabeledQueueWeight(USERS), 0.01f);
// root.users
assertEquals("root.users.john weight", 1.0f,
csConfig.getNonLabeledQueueWeight(USERS_JOHN), 0.01f);
assertEquals("root.users.joe weight", 3.0f,
csConfig.getNonLabeledQueueWeight(USERS_JOE), 0.01f);
// root.admins
assertEquals("root.admins.alice weight", 3.0f,
csConfig.getNonLabeledQueueWeight(ADMINS_ALICE), 0.01f);
assertEquals("root.admins.bob weight", 1.0f,
csConfig.getNonLabeledQueueWeight(ADMINS_BOB), 0.01f);
// root.misc
assertEquals("root.misc weight", 0.0f,
csConfig.getNonLabeledQueueWeight(MISC), 0.00f);
assertEquals("root.misc.a weight", 0.0f,
csConfig.getNonLabeledQueueWeight(MISC_A), 0.00f);
assertEquals("root.misc.b weight", 0.0f,
csConfig.getNonLabeledQueueWeight(MISC_B), 0.00f);
}
@Test
public void testAutoCreateV2FlagsInWeightMode() {
converter = builder.withPercentages(false).build();
converter.convertQueueHierarchy(rootQueue);
assertTrue("root autocreate v2 flag",
csConfig.isAutoQueueCreationV2Enabled(ROOT));
assertTrue("root.admins autocreate v2 flag",
csConfig.isAutoQueueCreationV2Enabled(ADMINS));
assertTrue("root.admins.alice autocreate v2 flag",
csConfig.isAutoQueueCreationV2Enabled(ADMINS_ALICE));
assertTrue("root.users autocreate v2 flag",
csConfig.isAutoQueueCreationV2Enabled(USERS));
assertTrue("root.misc autocreate v2 flag",
csConfig.isAutoQueueCreationV2Enabled(MISC));
//leaf queue root.admins.alice is removed from the below list
//adding reservation to a leaf, it's queueType changes to FSParentQueue
Set<String> leafs = Sets.difference(ALL_QUEUES,
Sets.newHashSet("root",
"root.admins",
"root.users",
"root.misc",
"root.admins.alice"));
for (String queue : leafs) {
key = PREFIX + queue + ".auto-queue-creation-v2.enabled";
assertEquals("Key " + key + " has different value",
false, csConfig
.isAutoQueueCreationV2Enabled(new QueuePath(queue)));
}
}
@Test
public void testZeroSumCapacityValidation() {
converter = builder.withPercentages(true).build();
converter.convertQueueHierarchy(rootQueue);
Set<String> noZeroSumAllowedQueues = Sets.difference(ALL_QUEUES,
Sets.newHashSet("root.misc"));
for (String queue : noZeroSumAllowedQueues) {
key = PREFIX + queue + ".allow-zero-capacity-sum";
assertEquals("Key " + key + " has different value",
false, csConfig
.getAllowZeroCapacitySum(new QueuePath(queue)));
}
assertTrue("root.misc allow zero capacities",
csConfig.getAllowZeroCapacitySum(MISC));
}
@Test
public void testQueueMaximumCapacity() {
converter = builder.build();
converter.convertQueueHierarchy(rootQueue);
for (String queue : ALL_QUEUES) {
key = PREFIX + queue + ".maximum-capacity";
assertEquals("Key " + key + " has different value",
100.0, csConfig
.getNonLabeledQueueMaximumCapacity(new QueuePath(queue)), 0.0f);
}
verify(ruleHandler, times(3)).handleMaxResources();
}
@Test
public void testQueueMinimumCapacity() {
converter = builder.build();
converter.convertQueueHierarchy(rootQueue);
verify(ruleHandler, times(2)).handleMinResources();
}
@Test
public void testQueueWithNoAutoCreateChildQueue() {
converter = builder
.withCapacitySchedulerConfig(csConfig)
.build();
converter.convertQueueHierarchy(rootQueue);
for (String queue : ALL_QUEUES) {
key = PREFIX + queue + ".auto-create-child-queue.enabled";
assertEquals("Key " + key + " has different value",
false, csConfig.isAutoCreateChildQueueEnabled(new QueuePath(queue)));
}
}
@Test
public void testQueueSizeBasedWeightEnabled() {
converter = builder.withSizeBasedWeight(true).build();
converter.convertQueueHierarchy(rootQueue);
for (String queue : ALL_QUEUES) {
key = PREFIX + queue + ".ordering-policy.fair.enable-size-based-weight";
assertTrue("Key " + key + " has different value",
csConfig.getBoolean(key, false));
}
}
@Test
public void testQueueSizeBasedWeightDisabled() {
converter = builder.build();
converter.convertQueueHierarchy(rootQueue);
for (String queue : ALL_QUEUES) {
key = PREFIX + queue + ".ordering-policy.fair.enable-size-based-weight";
assertNull("Key " + key + " has different value",
csConfig.get(key));
}
}
@Test
public void testQueueOrderingPolicy() throws Exception {
converter = builder.build();
String absolutePath =
new File("src/test/resources/fair-scheduler-orderingpolicy.xml")
.getAbsolutePath();
yarnConfig.set(FairSchedulerConfiguration.ALLOCATION_FILE,
FILE_PREFIX + absolutePath);
fs.close();
fs = createFairScheduler();
rootQueue = fs.getQueueManager().getRootQueue();
converter.convertQueueHierarchy(rootQueue);
// root
assertEquals("root ordering policy", "fifo",
csConfig.getAppOrderingPolicy(ROOT).getConfigName());
assertEquals("root.default ordering policy", "fair",
csConfig.getAppOrderingPolicy(DEFAULT).getConfigName());
assertEquals("root.admins ordering policy", "fifo",
csConfig.getAppOrderingPolicy(ADMINS).getConfigName());
assertEquals("root.users ordering policy", "fifo",
csConfig.getAppOrderingPolicy(USERS).getConfigName());
// root.users
assertEquals("root.users.joe ordering policy", "fair",
csConfig.getAppOrderingPolicy(USERS_JOE).getConfigName());
assertEquals("root.users.john ordering policy", "fifo",
csConfig.getAppOrderingPolicy(USERS_JOHN).getConfigName());
// root.admins
assertEquals("root.admins.alice ordering policy", "fifo",
csConfig.getAppOrderingPolicy(ADMINS_ALICE).getConfigName());
assertEquals("root.admins.bob ordering policy", "fair",
csConfig.getAppOrderingPolicy(ADMINS_BOB).getConfigName());
}
@Test
public void testQueueUnsupportedMixedOrderingPolicy() throws IOException {
converter = builder.withDrfUsed(true).build();
String absolutePath =
new File("src/test/resources/fair-scheduler-orderingpolicy-mixed.xml")
.getAbsolutePath();
yarnConfig.set(FairSchedulerConfiguration.ALLOCATION_FILE,
FILE_PREFIX + absolutePath);
fs.close();
fs = createFairScheduler();
rootQueue = fs.getQueueManager().getRootQueue();
converter.convertQueueHierarchy(rootQueue);
verify(ruleHandler, times(5)).handleFairAsDrf(anyString());
}
@Test
public void testQueueMaxChildCapacityNotSupported() {
converter = builder.build();
expectedException.expect(UnsupportedPropertyException.class);
expectedException.expectMessage("test");
Mockito.doThrow(new UnsupportedPropertyException("test"))
.when(ruleHandler).handleMaxChildCapacity();
converter.convertQueueHierarchy(rootQueue);
}
@Test
public void testReservationSystemNotSupported() {
converter = builder.build();
expectedException.expect(UnsupportedPropertyException.class);
expectedException.expectMessage("maxCapacity");
Mockito.doThrow(new UnsupportedPropertyException("maxCapacity"))
.when(ruleHandler).handleMaxChildCapacity();
yarnConfig.setBoolean(YarnConfiguration.RM_RESERVATION_SYSTEM_ENABLE,
true);
converter.convertQueueHierarchy(rootQueue);
}
}