TestCapacitySchedulerAutoQueueCreation.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import org.apache.hadoop.util.Sets;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmissionData;
import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmitter;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.placement
.ApplicationPlacementContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt
.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler
.SchedulerDynamicEditException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity
.queuemanagement.GuaranteedOrZeroCapacityOverTimePolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common
.QueueEntitlement;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event
.AppAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event
.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event
.SchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy
.FairOrderingPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.security
.ClientToAMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security
.NMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security
.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager
.NO_LABEL;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueueUtils.EPSILON;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
/**
* Tests for creation and reinitialization of auto created leaf queues
* and capacity management under a ManagedParentQueue.
*/
public class TestCapacitySchedulerAutoQueueCreation
extends TestCapacitySchedulerAutoCreatedQueueBase {
private static final Logger LOG = LoggerFactory.getLogger(
TestCapacitySchedulerAutoQueueCreation.class);
private static final String SPECIFIED_QUEUE_MAPPING = "%specified";
private static final String CURRENT_USER_MAPPING = "%user";
private static final Resource TEMPLATE_MAX_RES = Resource.newInstance(16 *
GB,
48);
private static final Resource TEMPLATE_MIN_RES = Resource.newInstance(1638,
4);
@Test
@Timeout(value = 20)
public void testAutoCreateLeafQueueCreation() throws Exception {
try {
// submit an app
submitApp(mockRM, cs.getQueue(PARENT_QUEUE), USER0, USER0, 1, 1);
// check preconditions
List<ApplicationAttemptId> appsInC = cs.getAppsInQueue(PARENT_QUEUE);
assertEquals(1, appsInC.size());
assertNotNull(cs.getQueue(USER0));
AutoCreatedLeafQueue autoCreatedLeafQueue =
(AutoCreatedLeafQueue) cs.getQueue(USER0);
ManagedParentQueue parentQueue = (ManagedParentQueue) cs.getQueue(
PARENT_QUEUE);
assertEquals(parentQueue, autoCreatedLeafQueue.getParent());
Map<String, Float> expectedChildQueueAbsCapacity =
populateExpectedAbsCapacityByLabelForParentQueue(1);
validateInitialQueueEntitlement(parentQueue, USER0,
expectedChildQueueAbsCapacity, accessibleNodeLabelsOnC);
// The new queue calculation mode works from the effective resources
// so the absoluteCapacity and the maxApplications differs a little
// bit: 6553/16384=0.3999633789 vs 0.4
final int maxApps = cs.getConfiguration().isLegacyQueueMode() ? 4000 : 3999;
validateUserAndAppLimits(autoCreatedLeafQueue, maxApps, maxApps);
validateContainerLimits(autoCreatedLeafQueue, 6, 10240);
assertTrue(autoCreatedLeafQueue
.getOrderingPolicy() instanceof FairOrderingPolicy);
setupGroupQueueMappings("d", cs.getConfiguration(), "%user");
cs.reinitialize(cs.getConfiguration(), mockRM.getRMContext());
submitApp(mockRM, cs.getQueue("d"), TEST_GROUPUSER, TEST_GROUPUSER, 1, 1);
autoCreatedLeafQueue =
(AutoCreatedLeafQueue) cs.getQueue(TEST_GROUPUSER);
parentQueue = (ManagedParentQueue) cs.getQueue("d");
assertEquals(parentQueue, autoCreatedLeafQueue.getParent());
expectedChildQueueAbsCapacity =
new HashMap<String, Float>() {{
put(NO_LABEL, 0.02f);
}};
validateInitialQueueEntitlement(parentQueue, TEST_GROUPUSER,
expectedChildQueueAbsCapacity,
new HashSet<String>() {{ add(NO_LABEL); }});
} finally {
cleanupQueue(USER0);
cleanupQueue(TEST_GROUPUSER);
}
}
@Test
@Timeout(value = 20)
public void testAutoCreateLeafQueueCreationSchedulerMaximumAllocation()
throws Exception {
try {
// Check the minimum/maximum allocation settings via the
// yarn.scheduler.minimum/maximum-allocation-mb/vcore property
setSchedulerMinMaxAllocation(cs.getConfiguration());
cs.getConfiguration().setAutoCreatedLeafQueueConfigMaximumAllocation(C,
"memory-mb=18384,vcores=8");
cs.reinitialize(cs.getConfiguration(), mockRM.getRMContext());
// submit an app
submitApp(mockRM, cs.getQueue(PARENT_QUEUE), USER0, USER0, 1, 1);
// check preconditions
List<ApplicationAttemptId> appsInC = cs.getAppsInQueue(PARENT_QUEUE);
assertEquals(1, appsInC.size());
assertNotNull(cs.getQueue(USER0));
AutoCreatedLeafQueue autoCreatedLeafQueue =
(AutoCreatedLeafQueue) cs.getQueue(USER0);
validateContainerLimits(autoCreatedLeafQueue, 8, 18384);
} finally {
cleanupQueue(USER0);
cleanupQueue(TEST_GROUPUSER);
}
}
@Test
@Timeout(value = 20)
public void testAutoCreateLeafQueueCreationUsingFullParentPath()
throws Exception {
try {
setupGroupQueueMappings("root.d", cs.getConfiguration(), "%user");
cs.reinitialize(cs.getConfiguration(), mockRM.getRMContext());
submitApp(mockRM, cs.getQueue("d"), TEST_GROUPUSER, TEST_GROUPUSER, 1, 1);
AutoCreatedLeafQueue autoCreatedLeafQueue =
(AutoCreatedLeafQueue) cs.getQueue(TEST_GROUPUSER);
ManagedParentQueue parentQueue = (ManagedParentQueue) cs.getQueue("d");
assertEquals(parentQueue, autoCreatedLeafQueue.getParent());
Map<String, Float> expectedChildQueueAbsCapacity =
new HashMap<String, Float>() {{
put(NO_LABEL, 0.02f);
}};
validateInitialQueueEntitlement(parentQueue, TEST_GROUPUSER,
expectedChildQueueAbsCapacity,
new HashSet<String>() {{ add(NO_LABEL); }});
} finally {
cleanupQueue(USER0);
cleanupQueue(TEST_GROUPUSER);
}
}
@Test
public void testReinitializeStoppedAutoCreatedLeafQueue() throws Exception {
try {
String host = "127.0.0.1";
RMNode node = MockNodes.newNodeInfo(0, MockNodes.newResource(4 * GB), 1,
host);
cs.handle(new NodeAddedSchedulerEvent(node));
// submit an app
MockRMAppSubmissionData data1 =
MockRMAppSubmissionData.Builder.createWithMemory(GB, mockRM)
.withAppName("test-auto-queue-creation-1")
.withUser(USER0)
.withAcls(null)
.withQueue(USER0)
.withUnmanagedAM(false)
.build();
RMApp app1 = MockRMAppSubmitter.submit(mockRM, data1);
MockRMAppSubmissionData data =
MockRMAppSubmissionData.Builder.createWithMemory(GB, mockRM)
.withAppName("test-auto-queue-creation-2")
.withUser(USER1)
.withAcls(null)
.withQueue(USER1)
.withUnmanagedAM(false)
.build();
RMApp app2 = MockRMAppSubmitter.submit(mockRM, data);
// check preconditions
List<ApplicationAttemptId> appsInC = cs.getAppsInQueue(PARENT_QUEUE);
assertEquals(2, appsInC.size());
assertNotNull(cs.getQueue(USER0));
assertNotNull(cs.getQueue(USER1));
AutoCreatedLeafQueue user0Queue = (AutoCreatedLeafQueue) cs.getQueue(
USER0);
AutoCreatedLeafQueue user1Queue = (AutoCreatedLeafQueue) cs.getQueue(
USER0);
ManagedParentQueue parentQueue = (ManagedParentQueue) cs.getQueue(
PARENT_QUEUE);
assertEquals(parentQueue, user0Queue.getParent());
assertEquals(parentQueue, user1Queue.getParent());
Map<String, Float>
expectedAbsChildQueueCapacity =
populateExpectedAbsCapacityByLabelForParentQueue(2);
validateInitialQueueEntitlement(parentQueue, USER0,
expectedAbsChildQueueCapacity, accessibleNodeLabelsOnC);
validateInitialQueueEntitlement(parentQueue, USER1,
expectedAbsChildQueueCapacity, accessibleNodeLabelsOnC);
ApplicationAttemptId appAttemptId = appsInC.get(0);
Priority priority = TestUtils.createMockPriority(1);
RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(
null);
ResourceRequest r1 = TestUtils.createResourceRequest(ResourceRequest.ANY,
1 * GB, 1, true, priority,
recordFactory);
cs.allocate(appAttemptId, Collections.<ResourceRequest>singletonList(r1),
null, Collections.<ContainerId>emptyList(), Collections.singletonList(host),
null, NULL_UPDATE_REQUESTS);
//And this will result in container assignment for app1
CapacityScheduler.schedule(cs);
//change state to draining
user0Queue.stopQueue();
cs.killAllAppsInQueue(USER0);
mockRM.waitForState(appAttemptId, RMAppAttemptState.KILLED);
mockRM.waitForState(appAttemptId.getApplicationId(), RMAppState.KILLED);
//change state to stopped
user0Queue.stopQueue();
assertEquals(QueueState.STOPPED,
user0Queue.getQueueInfo().getQueueState());
cs.reinitialize(cs.getConf(), mockRM.getRMContext());
AutoCreatedLeafQueue user0QueueReinited =
(AutoCreatedLeafQueue) cs.getQueue(USER0);
validateCapacities(user0QueueReinited, 0.0f, 0.0f, 1.0f, 1.0f);
AutoCreatedLeafQueue leafQueue = (AutoCreatedLeafQueue) cs.getQueue(
USER1);
expectedAbsChildQueueCapacity =
populateExpectedAbsCapacityByLabelForParentQueue(1);
validateInitialQueueEntitlement(parentQueue, leafQueue.getQueuePath(),
expectedAbsChildQueueCapacity, accessibleNodeLabelsOnC);
} finally {
cleanupQueue(USER0);
}
}
@Test
public void testConvertAutoCreateDisabledOnManagedParentQueueFails()
throws Exception {
CapacityScheduler newCS = new CapacityScheduler();
try {
CapacitySchedulerConfiguration newConf = setupSchedulerConfiguration();
setupQueueConfiguration(newConf);
newConf.setAutoCreateChildQueueEnabled(C, false);
newCS.setConf(new YarnConfiguration());
newCS.setRMContext(mockRM.getRMContext());
newCS.init(cs.getConf());
newCS.start();
newCS.reinitialize(newConf,
new RMContextImpl(null, null, null, null, null, null,
new RMContainerTokenSecretManager(newConf),
new NMTokenSecretManagerInRM(newConf),
new ClientToAMTokenSecretManagerInRM(), null));
} catch (IOException e) {
//expected exception
} finally {
newCS.stop();
}
}
@Test
public void testConvertLeafQueueToParentQueueWithAutoCreate()
throws Exception {
CapacityScheduler newCS = new CapacityScheduler();
try {
CapacitySchedulerConfiguration newConf = setupSchedulerConfiguration();
setupQueueConfiguration(newConf);
newConf.setAutoCreatedLeafQueueConfigCapacity(A1, A1_CAPACITY / 10);
newConf.setAutoCreateChildQueueEnabled(A1, true);
newCS.setConf(new YarnConfiguration());
newCS.setRMContext(mockRM.getRMContext());
newCS.init(cs.getConf());
newCS.start();
final LeafQueue a1Queue = (LeafQueue) newCS.getQueue("a1");
a1Queue.stopQueue();
newCS.reinitialize(newConf,
new RMContextImpl(null, null, null, null, null, null,
new RMContainerTokenSecretManager(newConf),
new NMTokenSecretManagerInRM(newConf),
new ClientToAMTokenSecretManagerInRM(), null));
} finally {
newCS.stop();
}
}
@Test
public void testConvertFailsFromParentQueueToManagedParentQueue()
throws Exception {
CapacityScheduler newCS = new CapacityScheduler();
try {
CapacitySchedulerConfiguration newConf = setupSchedulerConfiguration();
setupQueueConfiguration(newConf);
newConf.setAutoCreatedLeafQueueConfigCapacity(A, A_CAPACITY / 10);
newConf.setAutoCreateChildQueueEnabled(A, true);
newCS.setConf(new YarnConfiguration());
newCS.setRMContext(mockRM.getRMContext());
newCS.init(cs.getConf());
newCS.start();
final ParentQueue a1Queue = (ParentQueue) newCS.getQueue("a");
a1Queue.stopQueue();
newCS.reinitialize(newConf,
new RMContextImpl(null, null, null, null, null, null,
new RMContainerTokenSecretManager(newConf),
new NMTokenSecretManagerInRM(newConf),
new ClientToAMTokenSecretManagerInRM(), null));
fail("Expected exception while converting a parent queue to"
+ " an auto create enabled parent queue");
} catch (IOException e) {
//expected exception
} finally {
newCS.stop();
}
}
@Test
@Timeout(value = 10)
public void testAutoCreateLeafQueueFailsWithNoQueueMapping()
throws Exception {
final String INVALID_USER = "invalid_user";
// submit an app under a different queue name which does not exist
// and queue mapping does not exist for this user
RMApp app = MockRMAppSubmitter.submit(mockRM,
MockRMAppSubmissionData.Builder.createWithMemory(GB, mockRM)
.withAppName("app")
.withUser(INVALID_USER)
.withAcls(null)
.withQueue(INVALID_USER)
.withWaitForAppAcceptedState(false)
.build());
mockRM.drainEvents();
mockRM.waitForState(app.getApplicationId(), RMAppState.FAILED);
assertEquals(RMAppState.FAILED, app.getState());
}
@Test
@Timeout(value = 10)
public void testQueueMappingValidationFailsWithInvalidParentQueueInMapping()
throws Exception {
MockRM newMockRM = setupSchedulerInstance();
try {
CapacityScheduler newCS =
(CapacityScheduler) newMockRM.getResourceScheduler();
//"a" is not auto create enabled
//dynamic queue mapping
try {
setupQueueMapping(newCS, CURRENT_USER_MAPPING, "a1",
CURRENT_USER_MAPPING);
newCS.updatePlacementRules();
fail("Expected invalid parent queue mapping failure");
} catch (IOException e) {
//expected exception
assertTrue(e.getMessage().contains(
"Queue path 'a1.%user' is invalid because 'root.a.a1' " +
"is a leaf queue"));
}
//"a" is not auto create enabled and app_user does not exist as a leaf
// queue
//static queue mapping
try {
setupQueueMapping(newCS, "app_user", "INVALID_PARENT_QUEUE",
"app_user");
newCS.updatePlacementRules();
fail("Expected invalid parent queue mapping failure");
} catch (IOException e) {
//expected exception
assertTrue(e.getMessage().contains(
"Path root 'INVALID_PARENT_QUEUE' does not exist."));
}
} finally {
if (newMockRM != null) {
((CapacityScheduler) newMockRM.getResourceScheduler()).stop();
newMockRM.stop();
}
}
}
@Test
@Timeout(value = 10)
public void testQueueMappingUpdatesFailsOnRemovalOfParentQueueInMapping()
throws Exception {
MockRM newMockRM = setupSchedulerInstance();
try {
CapacityScheduler newCS =
(CapacityScheduler) newMockRM.getResourceScheduler();
setupQueueMapping(newCS, CURRENT_USER_MAPPING, "c", CURRENT_USER_MAPPING);
newCS.updatePlacementRules();
try {
setupQueueMapping(newCS, CURRENT_USER_MAPPING, "nonexistent",
CURRENT_USER_MAPPING);
newCS.updatePlacementRules();
fail("Expected invalid parent queue mapping failure");
} catch (IOException e) {
//expected exception
assertTrue(
e.getMessage().contains("Path root 'nonexistent' does not exist."));
}
} finally {
if (newMockRM != null) {
((CapacityScheduler) newMockRM.getResourceScheduler()).stop();
newMockRM.stop();
}
}
}
@Test
public void testParentQueueUpdateInQueueMappingFailsAfterAutoCreation()
throws Exception {
MockRM newMockRM = setupSchedulerInstance();
CapacityScheduler newCS =
(CapacityScheduler) newMockRM.getResourceScheduler();
try {
submitApp(newCS, USER0, USER0, PARENT_QUEUE);
assertNotNull(newCS.getQueue(USER0));
//The new placement engine's validation is a bit more
//strict so it would reject the original u:user_0:a.user_0 rule since
//it checks if that paths exists or is a managed parent, but if we use
//a.%user we can trick the engine, since it cannot validate if the actual
//value of the %user will exist or not, it allows the rule
setupQueueMapping(newCS, USER0, "a", "%user");
newCS.updatePlacementRules();
RMContext rmContext = mock(RMContext.class);
when(rmContext.getDispatcher()).thenReturn(dispatcher);
newCS.setRMContext(rmContext);
ApplicationId appId = BuilderUtils.newApplicationId(1, 1);
//The new engine would return root.a as the parent queue for this
// submission so creating the ApplicationPlacementContext accordingly
SchedulerEvent addAppEvent = new AppAddedSchedulerEvent(appId, USER0,
USER0, new ApplicationPlacementContext(USER0, "root.a"));
newCS.handle(addAppEvent);
RMAppEvent event = new RMAppEvent(appId, RMAppEventType.APP_REJECTED,
"error");
dispatcher.spyOnNextEvent(event, 10000);
} finally {
if (newMockRM != null) {
((CapacityScheduler) newMockRM.getResourceScheduler()).stop();
newMockRM.stop();
}
}
}
@Test
public void testAutoQueueCreationFailsForEmptyPathWithAQCAndWeightMode()
throws Exception {
if (mockRM != null) {
mockRM.stop();
}
//We need a special configuration we only need a V2 queue auto creation
//And weight mode, to allow dynamic auto queue creation for root
CapacitySchedulerConfiguration conf = setupSchedulerConfiguration();
conf.setAutoQueueCreationV2Enabled(ROOT, true);
conf.setCapacity(DEFAULT, "1w");
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
//Just a regular mockRM and CapacityScheduler instance
MockRM newMockRM = new MockRM(conf);
newMockRM.start();
((CapacityScheduler) newMockRM.getResourceScheduler()).start();
CapacityScheduler newCS =
(CapacityScheduler) newMockRM.getResourceScheduler();
try {
//submitting to root..user, this should fail WITHOUT crashing the RM
submitApp(newCS, USER0, "user", "root.");
RMContext rmContext = mock(RMContext.class);
when(rmContext.getDispatcher()).thenReturn(dispatcher);
newCS.setRMContext(rmContext);
ApplicationId appId = BuilderUtils.newApplicationId(1, 1);
SchedulerEvent addAppEvent = new AppAddedSchedulerEvent(
appId, "user", USER0);
newCS.handle(addAppEvent);
RMAppEvent event = new RMAppEvent(appId, RMAppEventType.APP_REJECTED,
"error");
dispatcher.spyOnNextEvent(event, 10000);
} finally {
((CapacityScheduler) newMockRM.getResourceScheduler()).stop();
newMockRM.stop();
}
}
@Test
public void testAutoQueueCreationWithWeightModeAndMaxAppLifetimeFirstSubmittedApp()
throws Exception {
if (mockRM != null) {
mockRM.stop();
}
long maxRootLifetime = 20L;
long defaultRootLifetime = 10L;
QueuePath testQueue = new QueuePath("root.test");
CapacitySchedulerConfiguration conf = setupSchedulerConfiguration();
conf.setQueues(ROOT, new String[] {"test"});
conf.setAutoQueueCreationV2Enabled(testQueue, true);
conf.setCapacity(DEFAULT, "1w");
conf.setCapacity(testQueue, "2w");
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
conf.setMaximumLifetimePerQueue(ROOT, maxRootLifetime);
conf.setDefaultLifetimePerQueue(ROOT, defaultRootLifetime);
MockRM newMockRM = new MockRM(conf);
newMockRM.start();
((CapacityScheduler) newMockRM.getResourceScheduler()).start();
CapacityScheduler newCS =
(CapacityScheduler) newMockRM.getResourceScheduler();
Priority appPriority = Priority.newInstance(0);
MockRMAppSubmissionData app = MockRMAppSubmissionData.Builder.createWithMemory(1024, newMockRM)
.withAppPriority(appPriority)
.withQueue("root.test.user")
.build();
RMApp app1 = MockRMAppSubmitter.submit(newMockRM, app);
assertEquals(newCS.getMaximumApplicationLifetime("root.test.user"), 20L);
try {
newMockRM.waitForState(app1.getApplicationId(), RMAppState.KILLED);
long totalTimeRun = app1.getFinishTime() - app1.getSubmitTime();
assertEquals(RMAppState.KILLED, app1.getState());
assertTrue(totalTimeRun > (defaultRootLifetime * 1000),
"Application killed before default lifetime value");
assertTrue(totalTimeRun < (maxRootLifetime * 1000),
"Application killed after max lifetime value " + totalTimeRun);
} finally {
((CapacityScheduler) newMockRM.getResourceScheduler()).stop();
newMockRM.stop();
}
}
/**
* This test case checks if a mapping rule can put an application to an auto
* created queue even if an other queue with the same leaf name already
* exists.
*
* In this scenario we use the following queues
* root.a.a1 - already existing queue
* root.c - managed parent queue
*
* And the following mapping rule
* u:%user:root.c.%user - Any submission should go to root.c.USERNAME queue
*
* When user 'a1' submits a new application we expect it to go to 'root.c.a1'
* because of the mapping rule, and the queue should be created.
*
* @throws Exception - When stuff go wrong, obvious reason to fail the test
*/
@Test
public void testAutoQueueCreationWhenQueueExistsWithSameName()
throws Exception {
MockRM newMockRM = setupSchedulerInstance();
CapacityScheduler newCS =
(CapacityScheduler) newMockRM.getResourceScheduler();
try {
setupQueueMapping(newCS, "%user", "root.c", "%user");
newCS.updatePlacementRules();
//making sure the target queue does not exist before submission
assertNull(newCS.getQueue("root.c.a1"));
RMApp app = MockRMAppSubmitter.submit(newMockRM,
MockRMAppSubmissionData.Builder.createWithMemory(512, newMockRM)
.withAppName("testAutoQueueCreationWhenQueueExistsWithSameName")
.withUser("a1")
.withQueue("default")
.build());
RMAppAttempt attempt = MockRM.waitForAttemptScheduled(app, newMockRM);
//checking if the target queue have been created during the submission
assertNotNull(newCS.getQueue("root.c.a1"));
//making sure the application is indeed in the right queue
assertEquals("root.c.a1", app.getQueue());
} finally {
if (newMockRM != null) {
((CapacityScheduler) newMockRM.getResourceScheduler()).stop();
newMockRM.stop();
}
}
}
@Test
public void testAutoCreationFailsWhenParentCapacityExceeded()
throws Exception {
MockRM newMockRM = setupSchedulerInstance();
CapacityScheduler newCS =
(CapacityScheduler) newMockRM.getResourceScheduler();
try {
CapacitySchedulerConfiguration conf = newCS.getConfiguration();
conf.setShouldFailAutoQueueCreationWhenGuaranteedCapacityExceeded(C,
true);
newCS.reinitialize(conf, newMockRM.getRMContext());
// Test add one auto created queue dynamically and manually modify
// capacity
ManagedParentQueue parentQueue = (ManagedParentQueue) newCS.getQueue("c");
AutoCreatedLeafQueue c1 = new AutoCreatedLeafQueue(newCS.getQueueContext(), "c1",
parentQueue);
newCS.addQueue(c1);
c1.setCapacity(0.5f);
c1.setAbsoluteCapacity(c1.getParent().getAbsoluteCapacity() * 1f);
c1.setMaxCapacity(1f);
setEntitlement(c1, new QueueEntitlement(0.5f, 1f));
AutoCreatedLeafQueue c2 = new AutoCreatedLeafQueue(newCS.getQueueContext(), "c2",
parentQueue);
newCS.addQueue(c2);
setEntitlement(c2, new QueueEntitlement(0.5f, 1f));
try {
AutoCreatedLeafQueue c3 = new AutoCreatedLeafQueue(newCS.getQueueContext(), "c3",
parentQueue);
newCS.addQueue(c3);
fail("Expected exception for auto queue creation failure");
} catch (SchedulerDynamicEditException e) {
//expected exception
}
} finally {
if (newMockRM != null) {
((CapacityScheduler) newMockRM.getResourceScheduler()).stop();
newMockRM.stop();
}
}
}
@Test
public void testAutoCreatedQueueActivationDeactivation() throws Exception {
try {
CSQueue parentQueue = cs.getQueue(PARENT_QUEUE);
//submit app1 as USER1
ApplicationId user1AppId = submitApp(mockRM, parentQueue, USER1, USER1,
1, 1);
Map<String, Float> expectedAbsChildQueueCapacity =
populateExpectedAbsCapacityByLabelForParentQueue(1);
validateInitialQueueEntitlement(parentQueue, USER1,
expectedAbsChildQueueCapacity, accessibleNodeLabelsOnC);
//submit another app2 as USER2
ApplicationId user2AppId = submitApp(mockRM, parentQueue, USER2, USER2, 2,
1);
expectedAbsChildQueueCapacity =
populateExpectedAbsCapacityByLabelForParentQueue(2);
validateInitialQueueEntitlement(parentQueue, USER2,
expectedAbsChildQueueCapacity, accessibleNodeLabelsOnC);
//submit another app3 as USER1
submitApp(mockRM, parentQueue, USER1, USER1, 3, 2);
//validate total activated abs capacity remains the same
GuaranteedOrZeroCapacityOverTimePolicy autoCreatedQueueManagementPolicy =
(GuaranteedOrZeroCapacityOverTimePolicy) ((ManagedParentQueue) parentQueue)
.getAutoCreatedQueueManagementPolicy();
for (String nodeLabel : accessibleNodeLabelsOnC) {
assertEquals(expectedAbsChildQueueCapacity.get(nodeLabel),
autoCreatedQueueManagementPolicy.getAbsoluteActivatedChildQueueCapacity(nodeLabel), EPSILON);
}
//submit user_3 app. This cant be allocated since there is no capacity
// in NO_LABEL, SSD but can be in GPU label
submitApp(mockRM, parentQueue, USER3, USER3, 4, 1);
final CSQueue user3LeafQueue = cs.getQueue(USER3);
validateCapacities((AutoCreatedLeafQueue) user3LeafQueue, 0.0f, 0.0f,
1.0f, 1.0f);
validateCapacitiesByLabel((ManagedParentQueue) parentQueue,
(AutoCreatedLeafQueue)
user3LeafQueue, NODEL_LABEL_GPU);
assertEquals(0.2f, autoCreatedQueueManagementPolicy
.getAbsoluteActivatedChildQueueCapacity(NO_LABEL), EPSILON);
assertEquals(0.9f, autoCreatedQueueManagementPolicy.getAbsoluteActivatedChildQueueCapacity(NODEL_LABEL_GPU),
EPSILON);
//Verify that AMs can be allocated
//Node 1 has SSD and default node label expression on C is SSD.
//This validates that the default node label expression with SSD is set
// on the AM attempt
// and app attempt reaches ALLOCATED state for a dynamic queue 'USER1'
mockRM.launchAM(mockRM.getRMContext().getRMApps().get(user1AppId),
mockRM, nm1);
// //deactivate USER2 queue
cs.killAllAppsInQueue(USER2);
mockRM.waitForState(user2AppId, RMAppState.KILLED);
//Verify if USER_2 can be deactivated since it has no pending apps
List<QueueManagementChange> queueManagementChanges =
autoCreatedQueueManagementPolicy.computeQueueManagementChanges();
ManagedParentQueue managedParentQueue = (ManagedParentQueue) parentQueue;
managedParentQueue.
validateAndApplyQueueManagementChanges(queueManagementChanges);
validateDeactivatedQueueEntitlement(parentQueue, USER2,
expectedAbsChildQueueCapacity, queueManagementChanges);
//USER_3 should now get activated for SSD, NO_LABEL
Set<String> expectedNodeLabelsUpdated = new HashSet<>();
expectedNodeLabelsUpdated.add(NO_LABEL);
expectedNodeLabelsUpdated.add(NODEL_LABEL_SSD);
validateActivatedQueueEntitlement(parentQueue, USER3,
expectedAbsChildQueueCapacity , queueManagementChanges, expectedNodeLabelsUpdated);
} finally {
cleanupQueue(USER1);
cleanupQueue(USER2);
cleanupQueue(USER3);
}
}
@Test
public void testClusterResourceUpdationOnAutoCreatedLeafQueues() throws
Exception {
MockRM newMockRM = setupSchedulerInstance();
try {
CapacityScheduler newCS =
(CapacityScheduler) newMockRM.getResourceScheduler();
CSQueue parentQueue = newCS.getQueue(PARENT_QUEUE);
//submit app1 as USER1
submitApp(newMockRM, parentQueue, USER1, USER1, 1, 1);
Map<String, Float> expectedAbsChildQueueCapacity =
populateExpectedAbsCapacityByLabelForParentQueue(1);
validateInitialQueueEntitlement(newMockRM, newCS, parentQueue, USER1,
expectedAbsChildQueueCapacity, accessibleNodeLabelsOnC);
//submit another app2 as USER2
ApplicationId user2AppId = submitApp(newMockRM, parentQueue, USER2, USER2, 2,
1);
expectedAbsChildQueueCapacity =
populateExpectedAbsCapacityByLabelForParentQueue(2);
validateInitialQueueEntitlement(newMockRM, newCS, parentQueue, USER2,
expectedAbsChildQueueCapacity, accessibleNodeLabelsOnC);
//validate total activated abs capacity remains the same
GuaranteedOrZeroCapacityOverTimePolicy autoCreatedQueueManagementPolicy =
(GuaranteedOrZeroCapacityOverTimePolicy) ((ManagedParentQueue)
parentQueue)
.getAutoCreatedQueueManagementPolicy();
assertEquals(autoCreatedQueueManagementPolicy
.getAbsoluteActivatedChildQueueCapacity(NO_LABEL), 0.2f, EPSILON);
//submit user_3 app. This cant be scheduled since there is no capacity
submitApp(newMockRM, parentQueue, USER3, USER3, 3, 1);
final CSQueue user3LeafQueue = newCS.getQueue(USER3);
validateCapacities((AutoCreatedLeafQueue) user3LeafQueue, 0.0f, 0.0f,
1.0f, 1.0f);
assertEquals(autoCreatedQueueManagementPolicy
.getAbsoluteActivatedChildQueueCapacity(NO_LABEL), 0.2f, EPSILON);
// add new NM.
newMockRM.registerNode("127.0.0.3:1234", 125 * GB, 20);
// There will be change in effective resource when nodes are added
// since we deal with percentages
Resource MAX_RES = Resources.addTo(TEMPLATE_MAX_RES, Resources.createResource(125 *
GB, 20));
Resource MIN_RES = Resources.createResource(14438, 6);
assertEquals(Resources.none(), user3LeafQueue.getQueueResourceQuotas()
.getEffectiveMinResource(), "Effective Min resource for USER3 is not correct");
assertEquals(MAX_RES, user3LeafQueue
.getQueueResourceQuotas()
.getEffectiveMaxResource(), "Effective Max resource for USER3 is not correct");
CSQueue user1LeafQueue = newCS.getQueue(USER1);
CSQueue user2LeafQueue = newCS.getQueue(USER2);
assertEquals(MIN_RES, user1LeafQueue.getQueueResourceQuotas()
.getEffectiveMinResource(), "Effective Min resource for USER2 is not correct");
assertEquals(MAX_RES, user1LeafQueue.getQueueResourceQuotas().getEffectiveMaxResource(),
"Effective Max resource for USER2 is not correct");
assertEquals(MIN_RES, user2LeafQueue.getQueueResourceQuotas()
.getEffectiveMinResource(), "Effective Min resource for USER1 is not correct");
assertEquals(MAX_RES, user2LeafQueue.getQueueResourceQuotas()
.getEffectiveMaxResource(), "Effective Max resource for USER1 is not correct");
// unregister one NM.
newMockRM.unRegisterNode(nm3);
Resource MIN_RES_UPDATED = Resources.createResource(12800, 2);
Resource MAX_RES_UPDATED = Resources.createResource(128000, 20);
// After loosing one NM, resources will reduce
assertEquals(MIN_RES_UPDATED,
user1LeafQueue.getQueueResourceQuotas().getEffectiveMinResource(),
"Effective Min resource for USER2 is not correct");
assertEquals(MAX_RES_UPDATED, user2LeafQueue.getQueueResourceQuotas()
.getEffectiveMaxResource(), "Effective Max resource for USER2 is not correct");
} finally {
cleanupQueue(USER1);
cleanupQueue(USER2);
cleanupQueue(USER3);
if (newMockRM != null) {
((CapacityScheduler) newMockRM.getResourceScheduler()).stop();
newMockRM.stop();
}
}
}
@Test
public void testReinitializeQueuesWithAutoCreatedLeafQueues()
throws Exception {
MockRM newMockRM = setupSchedulerInstance();
try {
CapacityScheduler newCS =
(CapacityScheduler) newMockRM.getResourceScheduler();
CapacitySchedulerConfiguration conf = newCS.getConfiguration();
CSQueue parentQueue = newCS.getQueue(PARENT_QUEUE);
//submit app1 as USER1
submitApp(newMockRM, parentQueue, USER1, USER1, 1, 1);
Map<String, Float> expectedChildQueueAbsCapacity =
populateExpectedAbsCapacityByLabelForParentQueue(1);
validateInitialQueueEntitlement(newMockRM, newCS, parentQueue, USER1,
expectedChildQueueAbsCapacity, accessibleNodeLabelsOnC);
//submit another app2 as USER2
ApplicationId user2AppId = submitApp(newMockRM, parentQueue, USER2,
USER2, 2,
1);
expectedChildQueueAbsCapacity =
populateExpectedAbsCapacityByLabelForParentQueue(2);
validateInitialQueueEntitlement(newMockRM, newCS, parentQueue, USER2,
expectedChildQueueAbsCapacity, accessibleNodeLabelsOnC);
//update parent queue capacity
conf.setCapacity(C, 30f);
conf.setCapacity(D, 10f);
conf.setMaximumCapacity(C, 50f);
newCS.reinitialize(conf, newMockRM.getRMContext());
// validate that leaf queues abs capacity is now changed
AutoCreatedLeafQueue user0Queue = (AutoCreatedLeafQueue) newCS.getQueue(
USER1);
validateCapacities(user0Queue, 0.5f, 0.15f, 1.0f, 0.5f);
// The new queue calculation mode works from the effective resources
// so the absoluteCapacity and the maxApplications differs a little
// bit: 6553/16384=0.3999633789 vs 0.4
final int maxApps = cs.getConfiguration().isLegacyQueueMode() ? 4000 : 3999;
validateUserAndAppLimits(user0Queue, maxApps, maxApps);
//update leaf queue template capacities
conf.setAutoCreatedLeafQueueConfigCapacity(C, 30f);
conf.setAutoCreatedLeafQueueConfigMaxCapacity(C, 40f);
newCS.reinitialize(conf, newMockRM.getRMContext());
validateCapacities(user0Queue, 0.3f, 0.09f, 0.4f, 0.2f);
validateUserAndAppLimits(user0Queue, maxApps, maxApps);
//submit app1 as USER3
submitApp(newMockRM, parentQueue, USER3, USER3, 3, 1);
AutoCreatedLeafQueue user3Queue =
(AutoCreatedLeafQueue) newCS.getQueue(USER1);
validateCapacities(user3Queue, 0.3f, 0.09f, 0.4f,0.2f);
validateUserAndAppLimits(user3Queue, maxApps, maxApps);
//submit app1 as USER1 - is already activated. there should be no diff
// in capacities
submitApp(newMockRM, parentQueue, USER3, USER3, 4, 2);
validateCapacities(user3Queue, 0.3f, 0.09f, 0.4f,0.2f);
validateUserAndAppLimits(user3Queue, maxApps, maxApps);
validateContainerLimits(user3Queue, 6, 10240);
GuaranteedOrZeroCapacityOverTimePolicy autoCreatedQueueManagementPolicy =
(GuaranteedOrZeroCapacityOverTimePolicy) ((ManagedParentQueue)
parentQueue)
.getAutoCreatedQueueManagementPolicy();
assertEquals(0.27f, autoCreatedQueueManagementPolicy
.getAbsoluteActivatedChildQueueCapacity
(NO_LABEL), EPSILON);
} finally {
cleanupQueue(USER1);
cleanupQueue(USER2);
if (newMockRM != null) {
((CapacityScheduler) newMockRM.getResourceScheduler()).stop();
newMockRM.stop();
}
}
}
@Test
public void testDynamicAutoQueueCreationWithTags()
throws Exception {
// This test we will reinitialize mockRM, so stop the previous initialized
// mockRM to avoid issues like MetricsSystem
if (mockRM != null) {
mockRM.stop();
}
mockRM = null;
try {
CapacitySchedulerConfiguration csConf
= new CapacitySchedulerConfiguration();
csConf.setQueues(ROOT, new String[] {"a", "b"});
csConf.setCapacity(A, 90);
csConf.setCapacity(B, 10);
csConf.setAutoCreateChildQueueEnabled(A, true);
csConf.setAutoCreatedLeafQueueConfigCapacity(A, 50);
csConf.setAutoCreatedLeafQueueConfigMaxCapacity(A, 100);
csConf.setAcl(A, QueueACL.ADMINISTER_QUEUE, "*");
csConf.setAcl(A, QueueACL.SUBMIT_APPLICATIONS, "*");
csConf.setBoolean(YarnConfiguration
.APPLICATION_TAG_BASED_PLACEMENT_ENABLED, true);
csConf.setStrings(YarnConfiguration
.APPLICATION_TAG_BASED_PLACEMENT_USER_WHITELIST, "hadoop");
csConf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING,
"u:%user:root.a.%user");
RMNodeLabelsManager mgr = new NullRMNodeLabelsManager();
mgr.init(csConf);
mockRM = new MockRM(csConf) {
@Override
public RMNodeLabelsManager createNodeLabelManager() {
return mgr;
}
};
mockRM.start();
MockNM nm = mockRM.registerNode("127.0.0.1:1234", 16 * GB);
MockRMAppSubmissionData data =
MockRMAppSubmissionData.Builder.createWithMemory(GB, mockRM)
.withAppName("apptodynamicqueue")
.withUser("hadoop")
.withAcls(null)
.withUnmanagedAM(false)
.withApplicationTags(Sets.newHashSet("userid=testuser"))
.build();
RMApp app = MockRMAppSubmitter.submit(mockRM, data);
MockRM.launchAndRegisterAM(app, mockRM, nm);
nm.nodeHeartbeat(true);
CapacityScheduler cs = (CapacityScheduler) mockRM.getResourceScheduler();
CSQueue queue = cs.getQueue("root.a.testuser");
assertNotNull(queue, "Leaf queue has not been auto-created");
assertEquals(1, queue.getNumApplications(), "Number of running applications");
} finally {
if (mockRM != null) {
mockRM.close();
}
}
}
@Test
@Timeout(value = 10)
public void testAutoCreateLeafQueueFailsWithSpecifiedEmptyStringLeafQueue()
throws Exception {
final String invalidQueue = "";
MockRM newMockRM = setupSchedulerInstance();
CapacityScheduler newCS =
(CapacityScheduler) newMockRM.getResourceScheduler();
//queue mapping to place app in queue specified by user
setupQueueMapping(newCS, "app_user", "root.c", SPECIFIED_QUEUE_MAPPING);
newCS.updatePlacementRules();
try {
//submitting to root.c. should fail WITHOUT crashing the RM
submitApp(newCS, "app_user", invalidQueue, "root.c");
RMContext rmContext = mock(RMContext.class);
when(rmContext.getDispatcher()).thenReturn(dispatcher);
newCS.setRMContext(rmContext);
ApplicationId appId = BuilderUtils.newApplicationId(1, 1);
SchedulerEvent addAppEvent = new AppAddedSchedulerEvent(
appId, "root.c." + invalidQueue, "app_user");
newCS.handle(addAppEvent);
RMAppEvent event = new RMAppEvent(appId, RMAppEventType.APP_REJECTED,
"error");
dispatcher.spyOnNextEvent(event, 10000);
} finally {
((CapacityScheduler) newMockRM.getResourceScheduler()).stop();
newMockRM.stop();
}
}
}