TestCapacitySchedulerSurgicalPreemption.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableMap;
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmissionData;
import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmitter;
import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitorManager;
import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels
.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.PREFIX;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
public class TestCapacitySchedulerSurgicalPreemption
extends CapacitySchedulerPreemptionTestBase {
private static final int NUM_NM = 5;
private static final QueuePath ROOT = new QueuePath(CapacitySchedulerConfiguration.ROOT);
private static final QueuePath A = new QueuePath(CapacitySchedulerConfiguration.ROOT +
".a");
private static final QueuePath B = new QueuePath(CapacitySchedulerConfiguration.ROOT +
".b");
private static final QueuePath C = new QueuePath(CapacitySchedulerConfiguration.ROOT +
".c");
private static final QueuePath D = new QueuePath(CapacitySchedulerConfiguration.ROOT +
".d");
@Override
@BeforeEach
public void setUp() throws Exception {
super.setUp();
conf.setBoolean(
CapacitySchedulerConfiguration.PREEMPTION_SELECT_CANDIDATES_FOR_RESERVED_CONTAINERS,
true);
}
@Test
@Timeout(value = 60)
public void testSimpleSurgicalPreemption()
throws Exception {
/**
* Test case: Submit two application (app1/app2) to different queues, queue
* structure:
*
* <pre>
* Root
* / | \
* a b c
* 10 20 70
* </pre>
*
* 1) Two nodes (n1/n2) in the cluster, each of them has 20G.
*
* 2) app1 submit to queue-a first, it asked 32 * 1G containers
* We will allocate 16 on n1 and 16 on n2.
*
* 3) app2 submit to queue-c, ask for one 1G container (for AM)
*
* 4) app2 asks for another 6G container, it will be reserved on n1
*
* Now: we have:
* n1: 17 from app1, 1 from app2, and 1 reserved from app2
* n2: 16 from app1.
*
* After preemption, we should expect:
* Preempt 4 containers from app1 on n1.
*/
testSimpleSurgicalPreemption("a", "c", "user", "user");
}
protected void testSimpleSurgicalPreemption(String queue1, String queue2,
String user1, String user2)
throws Exception {
MockRM rm1 = new MockRM(conf) {
protected RMNodeLabelsManager createNodeLabelManager() {
return mgr;
}
};
rm1.start();
MockNM nm1 = rm1.registerNode("h1:1234", 20 * GB);
MockNM nm2 = rm1.registerNode("h2:1234", 20 * GB);
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
// launch an app to queue, AM container should be launched in nm1
MockRMAppSubmissionData data1 =
MockRMAppSubmissionData.Builder.createWithMemory(1 * GB, rm1)
.withAppName("app")
.withUser(user1)
.withAcls(null)
.withQueue(queue1)
.withUnmanagedAM(false)
.build();
RMApp app1 = MockRMAppSubmitter.submit(rm1, data1);
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
am1.allocate("*", 1 * GB, 32, new ArrayList<ContainerId>());
// Do allocation for node1/node2
for (int i = 0; i < 32; i++) {
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
}
// App1 should have 33 containers now
FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
am1.getApplicationAttemptId());
assertEquals(33, schedulerApp1.getLiveContainers().size());
// 17 from n1 and 16 from n2
waitNumberOfLiveContainersOnNodeFromApp(cs.getNode(rmNode1.getNodeID()),
am1.getApplicationAttemptId(), 17);
waitNumberOfLiveContainersOnNodeFromApp(cs.getNode(rmNode2.getNodeID()),
am1.getApplicationAttemptId(), 16);
// Submit app2 to queue-c and asks for a 1G container for AM
MockRMAppSubmissionData data =
MockRMAppSubmissionData.Builder.createWithMemory(1 * GB, rm1)
.withAppName("app")
.withUser(user2)
.withAcls(null)
.withQueue(queue2)
.withUnmanagedAM(false)
.build();
RMApp app2 = MockRMAppSubmitter.submit(rm1, data);
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1);
// NM1/NM2 has available resource = 2G/4G
assertEquals(2 * GB, cs.getNode(nm1.getNodeId())
.getUnallocatedResource().getMemorySize());
assertEquals(4 * GB, cs.getNode(nm2.getNodeId())
.getUnallocatedResource().getMemorySize());
// AM asks for a 1 * GB container
am2.allocate(Arrays.asList(ResourceRequest
.newInstance(Priority.newInstance(1), ResourceRequest.ANY,
Resources.createResource(6 * GB), 1)), null);
// Call allocation once on n1, we should expect the container reserved on n1
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
assertNotNull(cs.getNode(nm1.getNodeId()).getReservedContainer());
// Get edit policy and do one update
SchedulingMonitorManager smm = ((CapacityScheduler) rm1.
getResourceScheduler()).getSchedulingMonitorManager();
SchedulingMonitor smon = smm.getAvailableSchedulingMonitor();
ProportionalCapacityPreemptionPolicy editPolicy =
(ProportionalCapacityPreemptionPolicy) smon.getSchedulingEditPolicy();
// Call edit schedule twice, and check if 4 containers from app1 at n1 killed
editPolicy.editSchedule();
editPolicy.editSchedule();
waitNumberOfLiveContainersFromApp(schedulerApp1, 29);
// 13 from n1 (4 preempted) and 16 from n2
waitNumberOfLiveContainersOnNodeFromApp(cs.getNode(rmNode1.getNodeID()),
am1.getApplicationAttemptId(), 13);
waitNumberOfLiveContainersOnNodeFromApp(cs.getNode(rmNode2.getNodeID()),
am1.getApplicationAttemptId(), 16);
// Ensure preemption metrics were recored.
assertEquals(4, cs.getQueue("root").getMetrics().getAggregatePreemptedContainers(),
"Number of preempted containers incorrectly recorded:");
assertEquals(4 * GB, cs.getQueue("root").getMetrics().getAggregateMemoryMBPreempted(),
"Amount of preempted memory incorrectly recorded:");
assertEquals(4, cs.getQueue("root").getMetrics().getAggregateVcoresPreempted(),
"Number of preempted vcores incorrectly recorded:");
rm1.close();
}
@Test
@Timeout(value = 60)
public void testSurgicalPreemptionWithAvailableResource()
throws Exception {
/**
* Test case: Submit two application (app1/app2) to different queues, queue
* structure:
*
* <pre>
* Root
* / | \
* a b c
* 10 20 70
* </pre>
*
* 1) Two nodes (n1/n2) in the cluster, each of them has 20G.
*
* 2) app1 submit to queue-b, asks for 1G * 5
*
* 3) app2 submit to queue-c, ask for one 4G container (for AM)
*
* After preemption, we should expect:
* Preempt 3 containers from app1 and AM of app2 successfully allocated.
*/
MockRM rm1 = new MockRM(conf);
rm1.getRMContext().setNodeLabelManager(mgr);
rm1.start();
MockNM nm1 = rm1.registerNode("h1:1234", 20 * GB);
MockNM nm2 = rm1.registerNode("h2:1234", 20 * GB);
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
// launch an app to queue, AM container should be launched in nm1
MockRMAppSubmissionData data1 =
MockRMAppSubmissionData.Builder.createWithMemory(1 * GB, rm1)
.withAppName("app")
.withUser("user")
.withAcls(null)
.withQueue("a")
.withUnmanagedAM(false)
.build();
RMApp app1 = MockRMAppSubmitter.submit(rm1, data1);
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
am1.allocate("*", 1 * GB, 38, new ArrayList<ContainerId>());
// Do allocation for node1/node2
for (int i = 0; i < 38; i++) {
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
}
// App1 should have 31 containers now
FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
am1.getApplicationAttemptId());
assertEquals(39, schedulerApp1.getLiveContainers().size());
// 17 from n1 and 16 from n2
waitNumberOfLiveContainersOnNodeFromApp(cs.getNode(rmNode1.getNodeID()),
am1.getApplicationAttemptId(), 20);
waitNumberOfLiveContainersOnNodeFromApp(cs.getNode(rmNode2.getNodeID()),
am1.getApplicationAttemptId(), 19);
// Submit app2 to queue-c and asks for a 4G container for AM
MockRMAppSubmissionData data =
MockRMAppSubmissionData.Builder.createWithMemory(4 * GB, rm1)
.withAppName("app")
.withUser("user")
.withAcls(null)
.withQueue("c")
.withUnmanagedAM(false)
.build();
RMApp app2 = MockRMAppSubmitter.submit(rm1, data);
FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(
ApplicationAttemptId.newInstance(app2.getApplicationId(), 1));
// Call editSchedule: containers are selected to be preemption candidate
SchedulingMonitorManager smm = ((CapacityScheduler) rm1.
getResourceScheduler()).getSchedulingMonitorManager();
SchedulingMonitor smon = smm.getAvailableSchedulingMonitor();
ProportionalCapacityPreemptionPolicy editPolicy =
(ProportionalCapacityPreemptionPolicy) smon.getSchedulingEditPolicy();
editPolicy.editSchedule();
assertEquals(3, editPolicy.getToPreemptContainers().size());
// Call editSchedule again: selected containers are killed
editPolicy.editSchedule();
waitNumberOfLiveContainersFromApp(schedulerApp1, 36);
// Call allocation, containers are reserved
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
waitNumberOfReservedContainersFromApp(schedulerApp2, 1);
// Call editSchedule twice and allocation once, container should get allocated
editPolicy.editSchedule();
editPolicy.editSchedule();
int tick = 0;
while (schedulerApp2.getLiveContainers().size() != 1 && tick < 10) {
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
tick++;
Thread.sleep(100);
}
waitNumberOfReservedContainersFromApp(schedulerApp2, 0);
rm1.close();
}
@Test
@Timeout(value = 60)
public void testPriorityPreemptionWhenAllQueuesAreBelowGuaranteedCapacities()
throws Exception {
/**
* Test case: Submit two application (app1/app2) to different queues, queue
* structure:
*
* <pre>
* Root
* / | \
* a b c
* 10 20 70
* </pre>
*
* 1) Two nodes (n1/n2) in the cluster, each of them has 20G.
*
* 2) app1 submit to queue-b first, it asked 6 * 1G containers
* We will allocate 4 on n1 (including AM) and 3 on n2.
*
* 3) app2 submit to queue-c, ask for one 18G container (for AM)
*
* After preemption, we should expect:
* Preempt 3 containers from app1 and AM of app2 successfully allocated.
*/
conf.setPUOrderingPolicyUnderUtilizedPreemptionEnabled(true);
conf.setPUOrderingPolicyUnderUtilizedPreemptionDelay(1000);
conf.setQueueOrderingPolicy(ROOT,
CapacitySchedulerConfiguration.QUEUE_PRIORITY_UTILIZATION_ORDERING_POLICY);
// Queue c has higher priority than a/b
conf.setQueuePriority(C, 1);
MockRM rm1 = new MockRM(conf);
rm1.getRMContext().setNodeLabelManager(mgr);
rm1.start();
MockNM nm1 = rm1.registerNode("h1:1234", 20 * GB);
MockNM nm2 = rm1.registerNode("h2:1234", 20 * GB);
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
// launch an app to queue, AM container should be launched in nm1
MockRMAppSubmissionData data1 =
MockRMAppSubmissionData.Builder.createWithMemory(1 * GB, rm1)
.withAppName("app")
.withUser("user")
.withAcls(null)
.withQueue("b")
.withUnmanagedAM(false)
.build();
RMApp app1 = MockRMAppSubmitter.submit(rm1, data1);
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
am1.allocate("*", 1 * GB, 6, new ArrayList<>());
// Do allocation for node1/node2
for (int i = 0; i < 3; i++) {
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
}
// App1 should have 7 containers now, so the abs-used-cap of b is
// 7 / 40 = 17.5% < 20% (guaranteed)
FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
am1.getApplicationAttemptId());
assertEquals(7, schedulerApp1.getLiveContainers().size());
// 4 from n1 and 3 from n2
waitNumberOfLiveContainersOnNodeFromApp(cs.getNode(rmNode1.getNodeID()),
am1.getApplicationAttemptId(), 4);
waitNumberOfLiveContainersOnNodeFromApp(cs.getNode(rmNode2.getNodeID()),
am1.getApplicationAttemptId(), 3);
// Submit app2 to queue-c and asks for a 1G container for AM
MockRMAppSubmissionData data =
MockRMAppSubmissionData.Builder.createWithMemory(18 * GB, rm1)
.withAppName("app")
.withUser("user")
.withAcls(null)
.withQueue("c")
.withUnmanagedAM(false)
.build();
RMApp app2 = MockRMAppSubmitter.submit(rm1, data);
FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(
ApplicationAttemptId.newInstance(app2.getApplicationId(), 1));
while (cs.getNode(rmNode1.getNodeID()).getReservedContainer() == null) {
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
Thread.sleep(10);
}
// Call editSchedule immediately: containers are not selected
SchedulingMonitorManager smm = ((CapacityScheduler) rm1.
getResourceScheduler()).getSchedulingMonitorManager();
SchedulingMonitor smon = smm.getAvailableSchedulingMonitor();
ProportionalCapacityPreemptionPolicy editPolicy =
(ProportionalCapacityPreemptionPolicy) smon.getSchedulingEditPolicy();
editPolicy.editSchedule();
assertEquals(0, editPolicy.getToPreemptContainers().size());
// Sleep the timeout interval, we should be able to see containers selected
Thread.sleep(1000);
editPolicy.editSchedule();
assertEquals(2, editPolicy.getToPreemptContainers().size());
// Call editSchedule again: selected containers are killed, and new AM
// container launched
editPolicy.editSchedule();
// Do allocation till reserved container allocated
while (cs.getNode(rmNode1.getNodeID()).getReservedContainer() != null) {
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
Thread.sleep(10);
}
waitNumberOfLiveContainersFromApp(schedulerApp2, 1);
rm1.close();
}
@Test
@Timeout(value = 300)
public void testPriorityPreemptionRequiresMoveReservation()
throws Exception {
/**
* Test case: Submit two application (app1/app2) to different queues, queue
* structure:
*
* <pre>
* Root
* / | \
* a b c
* 10 20 70
* </pre>
*
* 1) 3 nodes in the cluster, 10G for each
*
* 2) app1 submit to queue-b first, it asked 2G each,
* it can get 2G on n1 (AM), 2 * 2G on n2
*
* 3) app2 submit to queue-c, with 2G AM container (allocated on n3)
* app2 requires 9G resource, which will be reserved on n3
*
* We should expect container unreserved from n3 and allocated on n1/n2
*/
conf.setPUOrderingPolicyUnderUtilizedPreemptionEnabled(true);
conf.setPUOrderingPolicyUnderUtilizedPreemptionDelay(1000);
conf.setQueueOrderingPolicy(ROOT,
CapacitySchedulerConfiguration.QUEUE_PRIORITY_UTILIZATION_ORDERING_POLICY);
conf.setPUOrderingPolicyUnderUtilizedPreemptionMoveReservation(true);
// Queue c has higher priority than a/b
conf.setQueuePriority(C, 1);
MockRM rm1 = new MockRM(conf);
rm1.getRMContext().setNodeLabelManager(mgr);
rm1.start();
MockNM nm1 = rm1.registerNode("h1:1234", 10 * GB);
MockNM nm2 = rm1.registerNode("h2:1234", 10 * GB);
MockNM nm3 = rm1.registerNode("h3:1234", 10 * GB);
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
RMNode rmNode3 = rm1.getRMContext().getRMNodes().get(nm3.getNodeId());
// launch an app to queue, AM container should be launched in nm1
MockRMAppSubmissionData data1 =
MockRMAppSubmissionData.Builder.createWithMemory(2 * GB, rm1)
.withAppName("app")
.withUser("user")
.withAcls(null)
.withQueue("b")
.withUnmanagedAM(false)
.build();
RMApp app1 = MockRMAppSubmitter.submit(rm1, data1);
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
am1.allocate("*", 2 * GB, 2, new ArrayList<>());
// Do allocation for node2 twice
for (int i = 0; i < 2; i++) {
cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
}
FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
am1.getApplicationAttemptId());
assertEquals(3, schedulerApp1.getLiveContainers().size());
// 1 from n1 and 2 from n2
waitNumberOfLiveContainersOnNodeFromApp(cs.getNode(rmNode1.getNodeID()),
am1.getApplicationAttemptId(), 1);
waitNumberOfLiveContainersOnNodeFromApp(cs.getNode(rmNode2.getNodeID()),
am1.getApplicationAttemptId(), 2);
// Submit app2 to queue-c and asks for a 2G container for AM, on n3
MockRMAppSubmissionData data =
MockRMAppSubmissionData.Builder.createWithMemory(2 * GB, rm1)
.withAppName("app")
.withUser("user")
.withAcls(null)
.withQueue("c")
.withUnmanagedAM(false)
.build();
RMApp app2 = MockRMAppSubmitter.submit(rm1, data);
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm3);
FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(
ApplicationAttemptId.newInstance(app2.getApplicationId(), 1));
// Asks 1 * 9G container
am2.allocate("*", 9 * GB, 1, new ArrayList<>());
// Do allocation for node3 once
cs.handle(new NodeUpdateSchedulerEvent(rmNode3));
// Make sure container reserved on node3
assertNotNull(
cs.getNode(rmNode3.getNodeID()).getReservedContainer());
// Call editSchedule immediately: nothing happens
SchedulingMonitorManager smm = ((CapacityScheduler) rm1.
getResourceScheduler()).getSchedulingMonitorManager();
SchedulingMonitor smon = smm.getAvailableSchedulingMonitor();
ProportionalCapacityPreemptionPolicy editPolicy =
(ProportionalCapacityPreemptionPolicy) smon.getSchedulingEditPolicy();
editPolicy.editSchedule();
assertNotNull(
cs.getNode(rmNode3.getNodeID()).getReservedContainer());
// Sleep the timeout interval, we should be able to see reserved container
// moved to n2 (n1 occupied by AM)
Thread.sleep(1000);
editPolicy.editSchedule();
assertNull(
cs.getNode(rmNode3.getNodeID()).getReservedContainer());
assertNotNull(
cs.getNode(rmNode2.getNodeID()).getReservedContainer());
assertEquals(am2.getApplicationAttemptId(), cs.getNode(
rmNode2.getNodeID()).getReservedContainer().getApplicationAttemptId());
// Do it again, we should see containers marked to be preempt
editPolicy.editSchedule();
assertEquals(2, editPolicy.getToPreemptContainers().size());
// Call editSchedule again: selected containers are killed
editPolicy.editSchedule();
// Do allocation till reserved container allocated
while (schedulerApp2.getLiveContainers().size() < 2) {
cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
Thread.sleep(200);
}
waitNumberOfLiveContainersFromApp(schedulerApp1, 1);
rm1.close();
}
@Test
@Timeout(value = 60)
public void testPriorityPreemptionOnlyTriggeredWhenDemandingQueueUnsatisfied()
throws Exception {
/**
* Test case: Submit two application (app1/app2) to different queues, queue
* structure:
*
* <pre>
* Root
* / | \
* a b c
* 10 20 70
* </pre>
*
* 1) 10 nodes (n0-n9) in the cluster, each of them has 10G.
*
* 2) app1 submit to queue-b first, it asked 8 * 1G containers
* We will allocate 1 container on each of n0-n10
*
* 3) app2 submit to queue-c, ask for 10 * 10G containers (including AM)
*
* After preemption, we should expect:
* Preempt 7 containers from app1 and usage of app2 is 70%
*/
conf.setPUOrderingPolicyUnderUtilizedPreemptionEnabled(true);
conf.setPUOrderingPolicyUnderUtilizedPreemptionDelay(1000);
conf.setQueueOrderingPolicy(ROOT,
CapacitySchedulerConfiguration.QUEUE_PRIORITY_UTILIZATION_ORDERING_POLICY);
// Queue c has higher priority than a/b
conf.setQueuePriority(C, 1);
MockRM rm1 = new MockRM(conf);
rm1.getRMContext().setNodeLabelManager(mgr);
rm1.start();
MockNM[] mockNMs = new MockNM[10];
for (int i = 0; i < 10; i++) {
mockNMs[i] = rm1.registerNode("h" + i + ":1234", 10 * GB);
}
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
RMNode[] rmNodes = new RMNode[10];
for (int i = 0; i < 10; i++) {
rmNodes[i] = rm1.getRMContext().getRMNodes().get(mockNMs[i].getNodeId());
}
// launch an app to queue, AM container should be launched in nm1
MockRMAppSubmissionData data1 =
MockRMAppSubmissionData.Builder.createWithMemory(1 * GB, rm1)
.withAppName("app")
.withUser("user")
.withAcls(null)
.withQueue("b")
.withUnmanagedAM(false)
.build();
RMApp app1 = MockRMAppSubmitter.submit(rm1, data1);
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, mockNMs[0]);
am1.allocate("*", 1 * GB, 8, new ArrayList<>());
// Do allocation for nm1-nm8
for (int i = 1; i < 9; i++) {
cs.handle(new NodeUpdateSchedulerEvent(rmNodes[i]));
}
// App1 should have 9 containers now, so the abs-used-cap of b is 9%
FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
am1.getApplicationAttemptId());
assertEquals(9, schedulerApp1.getLiveContainers().size());
for (int i = 0; i < 9; i++) {
waitNumberOfLiveContainersOnNodeFromApp(cs.getNode(rmNodes[i].getNodeID()),
am1.getApplicationAttemptId(), 1);
}
// Submit app2 to queue-c and asks for a 10G container for AM
// Launch AM in NM9
MockRMAppSubmissionData data =
MockRMAppSubmissionData.Builder.createWithMemory(10 * GB, rm1)
.withAppName("app")
.withUser("user")
.withAcls(null)
.withQueue("c")
.withUnmanagedAM(false)
.build();
RMApp app2 = MockRMAppSubmitter.submit(rm1, data);
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, mockNMs[9]);
FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(
ApplicationAttemptId.newInstance(app2.getApplicationId(), 1));
// Ask 10 * 10GB containers
am2.allocate("*", 10 * GB, 10, new ArrayList<>());
// Do allocation for all nms
for (int i = 1; i < 10; i++) {
cs.handle(new NodeUpdateSchedulerEvent(rmNodes[i]));
}
// Check am2 reserved resource from nm1-nm9
for (int i = 1; i < 9; i++) {
assertNotNull(cs.getNode(rmNodes[i].getNodeID()).getReservedContainer(),
"Should reserve on nm-" + i);
}
// Sleep the timeout interval, we should be able to see 6 containers selected
// 6 (selected) + 1 (allocated) which makes target capacity to 70%
Thread.sleep(1000);
SchedulingMonitorManager smm = ((CapacityScheduler) rm1.
getResourceScheduler()).getSchedulingMonitorManager();
SchedulingMonitor smon = smm.getAvailableSchedulingMonitor();
ProportionalCapacityPreemptionPolicy editPolicy =
(ProportionalCapacityPreemptionPolicy) smon.getSchedulingEditPolicy();
editPolicy.editSchedule();
checkNumberOfPreemptionCandidateFromApp(editPolicy, 6,
am1.getApplicationAttemptId());
// Call editSchedule again: selected containers are killed
editPolicy.editSchedule();
waitNumberOfLiveContainersFromApp(schedulerApp1, 3);
// Do allocation for all nms
for (int i = 1; i < 10; i++) {
cs.handle(new NodeUpdateSchedulerEvent(rmNodes[i]));
}
waitNumberOfLiveContainersFromApp(schedulerApp2, 7);
waitNumberOfLiveContainersFromApp(schedulerApp1, 3);
rm1.close();
}
@Test
@Timeout(value = 600)
public void testPriorityPreemptionFromHighestPriorityQueueAndOldestContainer()
throws Exception {
/**
* Test case: Submit two application (app1/app2) to different queues, queue
* structure:
*
* <pre>
* Root
* / | \
* a b c
* 45 45 10
* </pre>
*
* Priority of queue_a = 1
* Priority of queue_b = 2
*
* 1) 5 nodes (n0-n4) in the cluster, each of them has 4G.
*
* 2) app1 submit to queue-c first (AM=1G), it asked 4 * 1G containers
* We will allocate 1 container on each of n0-n4. AM on n4.
*
* 3) app2 submit to queue-a, AM container=0.5G, allocated on n0
* Ask for 2 * 3.5G containers. (Reserved on n0/n1)
*
* 4) app2 submit to queue-b, AM container=0.5G, allocated on n2
* Ask for 2 * 3.5G containers. (Reserved on n2/n3)
*
* First we will preempt container on n2 since it is the oldest container of
* Highest priority queue (b)
*/
// A/B has higher priority
conf.setQueuePriority(A, 1);
conf.setQueuePriority(B, 2);
conf.setCapacity(A, 45f);
conf.setCapacity(B, 45f);
conf.setCapacity(C, 10f);
testPriorityPreemptionFromHighestPriorityQueueAndOldestContainer(new
String[] {"a", "b", "c"}, new String[] {"user", "user", "user"});
}
protected void
testPriorityPreemptionFromHighestPriorityQueueAndOldestContainer(String[]
queues, String[] users) throws Exception {
// Total preemption = 1G per round, which is 5% of cluster resource (20G)
conf.setFloat(CapacitySchedulerConfiguration.TOTAL_PREEMPTION_PER_ROUND,
0.05f);
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 512);
conf.setPUOrderingPolicyUnderUtilizedPreemptionEnabled(true);
conf.setPUOrderingPolicyUnderUtilizedPreemptionDelay(1000);
conf.setQueueOrderingPolicy(ROOT,
CapacitySchedulerConfiguration.QUEUE_PRIORITY_UTILIZATION_ORDERING_POLICY);
MockRM rm1 = new MockRM(conf) {
protected RMNodeLabelsManager createNodeLabelManager() {
return mgr;
}
};
rm1.start();
MockNM[] mockNMs = new MockNM[5];
for (int i = 0; i < 5; i++) {
mockNMs[i] = rm1.registerNode("h" + i + ":1234", 4 * GB);
}
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
RMNode[] rmNodes = new RMNode[5];
for (int i = 0; i < 5; i++) {
rmNodes[i] = rm1.getRMContext().getRMNodes().get(mockNMs[i].getNodeId());
}
// launch an app to queue, AM container should be launched in nm1
MockRMAppSubmissionData data2 =
MockRMAppSubmissionData.Builder.createWithMemory(1 * GB, rm1)
.withAppName("app")
.withUser(users[2])
.withAcls(null)
.withQueue(queues[2])
.withUnmanagedAM(false)
.build();
RMApp app1 = MockRMAppSubmitter.submit(rm1, data2);
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, mockNMs[4]);
am1.allocate("*", 1 * GB, 4, new ArrayList<>());
// Do allocation for nm1-nm8
for (int i = 0; i < 4; i++) {
cs.handle(new NodeUpdateSchedulerEvent(rmNodes[i]));
}
// App1 should have 5 containers now, one for each node
FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
am1.getApplicationAttemptId());
assertEquals(5, schedulerApp1.getLiveContainers().size());
for (int i = 0; i < 5; i++) {
waitNumberOfLiveContainersOnNodeFromApp(cs.getNode(rmNodes[i].getNodeID()),
am1.getApplicationAttemptId(), 1);
}
// Submit app2 to queue-a and asks for a 0.5G container for AM (on n0)
MockRMAppSubmissionData data1 =
MockRMAppSubmissionData.Builder.createWithMemory(512, rm1)
.withAppName("app")
.withUser(users[0])
.withAcls(null)
.withQueue(queues[0])
.withUnmanagedAM(false)
.build();
RMApp app2 = MockRMAppSubmitter.submit(rm1, data1);
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, mockNMs[0]);
FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(
ApplicationAttemptId.newInstance(app2.getApplicationId(), 1));
// Ask 2 * 3.5GB containers
am2.allocate("*", 3 * GB + 512, 2, new ArrayList<>());
// Do allocation for n0-n1
for (int i = 0; i < 2; i++) {
cs.handle(new NodeUpdateSchedulerEvent(rmNodes[i]));
}
// Check am2 reserved resource from nm0-nm1
for (int i = 0; i < 2; i++) {
assertNotNull(cs.getNode(rmNodes[i].getNodeID()).getReservedContainer(),
"Should reserve on nm-" + i);
assertEquals(cs.getNode(rmNodes[i].getNodeID())
.getReservedContainer()
.getQueueName(), cs.normalizeQueueName(queues[0]));
}
// Submit app3 to queue-b and asks for a 0.5G container for AM (on n2)
MockRMAppSubmissionData data =
MockRMAppSubmissionData.Builder.createWithMemory(512, rm1)
.withAppName("app")
.withUser(users[1])
.withAcls(null)
.withQueue(queues[1])
.withUnmanagedAM(false)
.build();
RMApp app3 = MockRMAppSubmitter.submit(rm1, data);
MockAM am3 = MockRM.launchAndRegisterAM(app3, rm1, mockNMs[2]);
FiCaSchedulerApp schedulerApp3 = cs.getApplicationAttempt(
ApplicationAttemptId.newInstance(app3.getApplicationId(), 1));
// Ask 2 * 3.5GB containers
am3.allocate("*", 3 * GB + 512, 2, new ArrayList<>());
// Do allocation for n2-n3
for (int i = 2; i < 4; i++) {
cs.handle(new NodeUpdateSchedulerEvent(rmNodes[i]));
}
// Check am2 reserved resource from nm2-nm3
for (int i = 2; i < 4; i++) {
assertNotNull(cs.getNode(rmNodes[i].getNodeID()).getReservedContainer(),
"Should reserve on nm-" + i);
assertEquals(cs.getNode(rmNodes[i].getNodeID())
.getReservedContainer()
.getQueueName(), cs.normalizeQueueName(queues[1]));
}
// Sleep the timeout interval, we should be able to see 1 container selected
Thread.sleep(1000);
/* 1st container preempted is on n2 */
SchedulingMonitorManager smm = ((CapacityScheduler) rm1.
getResourceScheduler()).getSchedulingMonitorManager();
SchedulingMonitor smon = smm.getAvailableSchedulingMonitor();
ProportionalCapacityPreemptionPolicy editPolicy =
(ProportionalCapacityPreemptionPolicy) smon.getSchedulingEditPolicy();
editPolicy.editSchedule();
// We should have one to-preempt container, on node[2]
Set<RMContainer> selectedToPreempt =
editPolicy.getToPreemptContainers().keySet();
assertEquals(1, selectedToPreempt.size());
assertEquals(mockNMs[2].getNodeId(),
selectedToPreempt.iterator().next().getAllocatedNode());
// Call editSchedule again: selected containers are killed
editPolicy.editSchedule();
waitNumberOfLiveContainersFromApp(schedulerApp1, 4);
// Make sure the container killed, then do allocation for all nms
for (int i = 0; i < 4; i++) {
cs.handle(new NodeUpdateSchedulerEvent(rmNodes[i]));
}
waitNumberOfLiveContainersFromApp(schedulerApp1, 4);
waitNumberOfLiveContainersFromApp(schedulerApp2, 1);
waitNumberOfLiveContainersFromApp(schedulerApp3, 2);
/* 2nd container preempted is on n3 */
editPolicy.editSchedule();
// We should have one to-preempt container, on node[3]
selectedToPreempt =
editPolicy.getToPreemptContainers().keySet();
assertEquals(1, selectedToPreempt.size());
assertEquals(mockNMs[3].getNodeId(),
selectedToPreempt.iterator().next().getAllocatedNode());
// Call editSchedule again: selected containers are killed
editPolicy.editSchedule();
waitNumberOfLiveContainersFromApp(schedulerApp1, 3);
// Do allocation for all nms
for (int i = 0; i < 4; i++) {
cs.handle(new NodeUpdateSchedulerEvent(rmNodes[i]));
}
waitNumberOfLiveContainersFromApp(schedulerApp1, 3);
waitNumberOfLiveContainersFromApp(schedulerApp2, 1);
waitNumberOfLiveContainersFromApp(schedulerApp3, 3);
/* 3rd container preempted is on n0 */
editPolicy.editSchedule();
// We should have one to-preempt container, on node[0]
selectedToPreempt =
editPolicy.getToPreemptContainers().keySet();
assertEquals(1, selectedToPreempt.size());
assertEquals(mockNMs[0].getNodeId(),
selectedToPreempt.iterator().next().getAllocatedNode());
// Call editSchedule again: selected containers are killed
editPolicy.editSchedule();
waitNumberOfLiveContainersFromApp(schedulerApp1, 2);
// Do allocation for all nms
for (int i = 0; i < 4; i++) {
cs.handle(new NodeUpdateSchedulerEvent(rmNodes[i]));
}
waitNumberOfLiveContainersFromApp(schedulerApp1, 2);
waitNumberOfLiveContainersFromApp(schedulerApp2, 2);
waitNumberOfLiveContainersFromApp(schedulerApp3, 3);
/* 4th container preempted is on n1 */
editPolicy.editSchedule();
// We should have one to-preempt container, on node[0]
selectedToPreempt =
editPolicy.getToPreemptContainers().keySet();
assertEquals(1, selectedToPreempt.size());
assertEquals(mockNMs[1].getNodeId(),
selectedToPreempt.iterator().next().getAllocatedNode());
// Call editSchedule again: selected containers are killed
editPolicy.editSchedule();
waitNumberOfLiveContainersFromApp(schedulerApp1, 1);
// Do allocation for all nms
for (int i = 0; i < 4; i++) {
cs.handle(new NodeUpdateSchedulerEvent(rmNodes[i]));
}
waitNumberOfLiveContainersFromApp(schedulerApp1, 1);
waitNumberOfLiveContainersFromApp(schedulerApp2, 3);
waitNumberOfLiveContainersFromApp(schedulerApp3, 3);
rm1.close();
}
private void initializeConfProperties(CapacitySchedulerConfiguration conf)
throws IOException {
QueuePath aQueuePath = new QueuePath("root.A");
QueuePath bQueuePath = new QueuePath("root.B");
conf.setQueues(ROOT, new String[] {"A", "B"});
conf.setCapacity(aQueuePath, 50);
conf.setCapacity(bQueuePath, 50);
conf.setQueuePriority(aQueuePath, 1);
conf.setQueuePriority(bQueuePath, 2);
conf.set(PREFIX + "root.ordering-policy", "priority-utilization");
conf.set(PREFIX + "ordering-policy.priority-utilization.underutilized-preemption.enabled", "true");
conf.set(PREFIX + "ordering-policy.priority-utilization.underutilized-preemption.allow-move-reservation", "false");
conf.set(PREFIX + "ordering-policy.priority-utilization.underutilized-preemption.reserved-container-delay-ms", "0");
conf.set(PREFIX + "root.accessible-node-labels.x.capacity", "100");
// Setup queue access to node labels
conf.set(PREFIX + "root.A.accessible-node-labels", "x");
conf.set(PREFIX + "root.B.accessible-node-labels", "x");
conf.set(PREFIX + "root.A.default-node-label-expression", "x");
conf.set(PREFIX + "root.B.default-node-label-expression", "x");
conf.set(PREFIX + "root.A.accessible-node-labels.x.capacity", "50");
conf.set(PREFIX + "root.B.accessible-node-labels.x.capacity", "50");
conf.set(PREFIX + "root.A.user-limit-factor", "100");
conf.set(PREFIX + "root.B.user-limit-factor", "100");
conf.set(PREFIX + "maximum-am-resource-percent", "1");
conf.setBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, true);
conf.set(YarnConfiguration.RM_AM_MAX_ATTEMPTS, "1");
conf.set(CapacitySchedulerConfiguration.PREEMPTION_WAIT_TIME_BEFORE_KILL, "1000");
conf.set(CapacitySchedulerConfiguration.PREEMPTION_MONITORING_INTERVAL, "1000");
conf.set(CapacitySchedulerConfiguration.TOTAL_PREEMPTION_PER_ROUND, "0.5");
conf.set(CapacitySchedulerConfiguration.PREEMPTION_NATURAL_TERMINATION_FACTOR, "1");
}
@Test
public void testPriorityPreemptionWithNodeLabels() throws Exception {
// set up queue priority and capacity
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
initializeConfProperties(conf);
MockRM rm1 = new MockRM(conf) {
protected RMNodeLabelsManager createNodeLabelManager() {
return mgr;
}
};
rm1.start();
MockNM[] mockNMs = new MockNM[NUM_NM];
for (int i = 0; i < NUM_NM; i++) {
mockNMs[i] = rm1.registerNode("h" + i + ":1234", 6144);
}
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
mgr.addToCluserNodeLabels(Arrays.asList(NodeLabel.newInstance("x")));
RMNode[] rmNodes = new RMNode[5];
for (int i = 0; i < NUM_NM; i++) {
rmNodes[i] = rm1.getRMContext().getRMNodes().get(mockNMs[i].getNodeId());
mgr.replaceLabelsOnNode(
ImmutableMap.of(rmNodes[i].getNodeID(), ImmutableSet.of("x")));
}
// launch an app to queue B, AM container launched in nm4
MockRMAppSubmissionData data2 =
MockRMAppSubmissionData.Builder.createWithMemory(4096, rm1)
.withAppName("app")
.withUser("user")
.withAcls(null)
.withQueue("B")
.withUnmanagedAM(false)
.build();
RMApp app1 = MockRMAppSubmitter.submit(rm1, data2);
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, mockNMs[4]);
am1.allocate("*", 4096, NUM_NM-1, new ArrayList<>());
// Do allocation for nm0-nm3
for (int i = 0; i < NUM_NM-1; i++) {
cs.handle(new NodeUpdateSchedulerEvent(rmNodes[i]));
}
// App1 should have 5 containers now, one for each node
FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
am1.getApplicationAttemptId());
assertEquals(NUM_NM, schedulerApp1.getLiveContainers().size());
for (int i = 0; i < NUM_NM; i++) {
waitNumberOfLiveContainersOnNodeFromApp(cs.getNode(
rmNodes[i].getNodeID()), am1.getApplicationAttemptId(), 1);
}
// Submit app2 to queue A and asks for a 750MB container for AM (on n0)
MockRMAppSubmissionData data1 =
MockRMAppSubmissionData.Builder.createWithMemory(1024, rm1)
.withAppName("app")
.withUser("user")
.withAcls(null)
.withQueue("A")
.withUnmanagedAM(false)
.build();
RMApp app2 = MockRMAppSubmitter.submit(rm1, data1);
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, mockNMs[0]);
FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(
ApplicationAttemptId.newInstance(app2.getApplicationId(), 1));
// Ask NUM_NM-1 * 1500MB containers
am2.allocate("*", 2048, NUM_NM-1, new ArrayList<>());
// Do allocation for n1-n4
for (int i = 1; i < NUM_NM; i++) {
cs.handle(new NodeUpdateSchedulerEvent(rmNodes[i]));
}
// kill app1
rm1.killApp(app1.getApplicationId());
// Submit app3 to queue B and asks for a 5000MB container for AM (on n2)
MockRMAppSubmissionData data =
MockRMAppSubmissionData.Builder.createWithMemory(1024, rm1)
.withAppName("app")
.withUser("user")
.withAcls(null)
.withQueue("B")
.withUnmanagedAM(false)
.build();
RMApp app3 = MockRMAppSubmitter.submit(rm1, data);
MockAM am3 = MockRM.launchAndRegisterAM(app3, rm1, mockNMs[2]);
FiCaSchedulerApp schedulerApp3 = cs.getApplicationAttempt(
ApplicationAttemptId.newInstance(app3.getApplicationId(), 1));
// Ask NUM_NM * 5000MB containers
am3.allocate("*", 5120, NUM_NM, new ArrayList<>());
// Do allocation for n0-n4
for (int i = 0; i < NUM_NM; i++) {
cs.handle(new NodeUpdateSchedulerEvent(rmNodes[i]));
}
// Sleep the timeout interval, we should see 2 containers selected
Thread.sleep(1000);
SchedulingMonitorManager smm = ((CapacityScheduler) rm1.
getResourceScheduler()).getSchedulingMonitorManager();
SchedulingMonitor smon = smm.getAvailableSchedulingMonitor();
ProportionalCapacityPreemptionPolicy editPolicy =
(ProportionalCapacityPreemptionPolicy) smon.getSchedulingEditPolicy();
editPolicy.editSchedule();
// We should only allow to preempt 2 containers, on node1 and node2
Set<RMContainer> selectedToPreempt =
editPolicy.getToPreemptContainers().keySet();
assertEquals(2, selectedToPreempt.size());
List<NodeId> selectedToPreemptNodeIds = new ArrayList<>();
for (RMContainer rmc : selectedToPreempt) {
selectedToPreemptNodeIds.add(rmc.getAllocatedNode());
}
assertThat(selectedToPreemptNodeIds).contains(mockNMs[1].getNodeId(), mockNMs[2].getNodeId());
rm1.close();
}
@Test
@Timeout(value = 60)
public void testPreemptionForFragmentatedCluster() throws Exception {
// Set additional_balance_queue_based_on_reserved_res to true to get
// additional preemptions.
conf.setBoolean(
CapacitySchedulerConfiguration.ADDITIONAL_RESOURCE_BALANCE_BASED_ON_RESERVED_CONTAINERS,
true);
/**
* Two queues, a/b, each of them are 50/50
* 5 nodes in the cluster, each of them is 30G.
*
* Submit first app, AM = 3G, and 4 * 21G containers.
* Submit second app, AM = 3G, and 4 * 21G containers,
*
* We can get one container preempted from 1st app.
*/
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(
this.conf);
conf.setLong(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
1024 * 21);
conf.setQueues(ROOT, new String[] {"a", "b"});
conf.setCapacity(A, 50);
conf.setUserLimitFactor(A, 100);
conf.setCapacity(B, 50);
conf.setUserLimitFactor(B, 100);
MockRM rm1 = new MockRM(conf);
rm1.getRMContext().setNodeLabelManager(mgr);
rm1.start();
List<MockNM> nms = new ArrayList<>();
for (int i = 0; i < 5; i++) {
nms.add(rm1.registerNode("h" + i + ":1234", 30 * GB));
}
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
// launch an app to queue, AM container should be launched in nm1
MockRMAppSubmissionData data1 =
MockRMAppSubmissionData.Builder.createWithMemory(3 * GB, rm1)
.withAppName("app")
.withUser("user")
.withAcls(null)
.withQueue("a")
.withUnmanagedAM(false)
.build();
RMApp app1 = MockRMAppSubmitter.submit(rm1, data1);
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nms.get(0));
am1.allocate("*", 21 * GB, 4, new ArrayList<ContainerId>());
// Do allocation for all nodes
for (int i = 0; i < 10; i++) {
MockNM mockNM = nms.get(i % nms.size());
RMNode rmNode = cs.getRMContext().getRMNodes().get(mockNM.getNodeId());
cs.handle(new NodeUpdateSchedulerEvent(rmNode));
}
// App1 should have 5 containers now
FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
am1.getApplicationAttemptId());
assertEquals(5, schedulerApp1.getLiveContainers().size());
// launch an app to queue, AM container should be launched in nm1
MockRMAppSubmissionData data =
MockRMAppSubmissionData.Builder.createWithMemory(3 * GB, rm1)
.withAppName("app")
.withUser("user")
.withAcls(null)
.withQueue("b")
.withUnmanagedAM(false)
.build();
RMApp app2 = MockRMAppSubmitter.submit(rm1, data);
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nms.get(2));
am2.allocate("*", 21 * GB, 4, new ArrayList<ContainerId>());
// Do allocation for all nodes
for (int i = 0; i < 10; i++) {
MockNM mockNM = nms.get(i % nms.size());
RMNode rmNode = cs.getRMContext().getRMNodes().get(mockNM.getNodeId());
cs.handle(new NodeUpdateSchedulerEvent(rmNode));
}
// App2 should have 2 containers now
FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(
am2.getApplicationAttemptId());
assertEquals(2, schedulerApp2.getLiveContainers().size());
waitNumberOfReservedContainersFromApp(schedulerApp2, 1);
// Call editSchedule twice and allocation once, container should get allocated
SchedulingMonitorManager smm = ((CapacityScheduler) rm1.
getResourceScheduler()).getSchedulingMonitorManager();
SchedulingMonitor smon = smm.getAvailableSchedulingMonitor();
ProportionalCapacityPreemptionPolicy editPolicy =
(ProportionalCapacityPreemptionPolicy) smon.getSchedulingEditPolicy();
editPolicy.editSchedule();
editPolicy.editSchedule();
int tick = 0;
while (schedulerApp2.getLiveContainers().size() != 4 && tick < 10) {
// Do allocation for all nodes
for (int i = 0; i < 10; i++) {
MockNM mockNM = nms.get(i % nms.size());
RMNode rmNode = cs.getRMContext().getRMNodes().get(mockNM.getNodeId());
cs.handle(new NodeUpdateSchedulerEvent(rmNode));
}
tick++;
Thread.sleep(100);
}
assertEquals(3, schedulerApp2.getLiveContainers().size());
rm1.close();
}
@Test
@Timeout(value = 600)
public void testPreemptionToBalanceWithCustomTimeout() throws Exception {
/**
* Test case: Submit two application (app1/app2) to different queues, queue
* structure:
*
* <pre>
* Root
* / | \
* a b c
* 10 20 70
* </pre>
*
* 1) Two nodes (n1/n2) in the cluster, each of them has 20G.
*
* 2) app1 submit to queue-b, asks for 1G * 5
*
* 3) app2 submit to queue-c, ask for one 4G container (for AM)
*
* After preemption, we should expect:
* 1. Preempt 4 containers from app1
* 2. the selected containers will be killed after configured timeout.
* 3. AM of app2 successfully allocated.
*/
conf.setBoolean(
CapacitySchedulerConfiguration.PREEMPTION_TO_BALANCE_QUEUES_BEYOND_GUARANTEED,
true);
conf.setLong(
CapacitySchedulerConfiguration.MAX_WAIT_BEFORE_KILL_FOR_QUEUE_BALANCE_PREEMPTION,
20*1000);
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(
this.conf);
MockRM rm1 = new MockRM(conf);
rm1.getRMContext().setNodeLabelManager(mgr);
rm1.start();
MockNM nm1 = rm1.registerNode("h1:1234", 20 * GB);
MockNM nm2 = rm1.registerNode("h2:1234", 20 * GB);
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
// launch an app to queue, AM container should be launched in nm1
MockRMAppSubmissionData data1 =
MockRMAppSubmissionData.Builder.createWithMemory(1 * GB, rm1)
.withAppName("app")
.withUser("user")
.withAcls(null)
.withQueue("a")
.withUnmanagedAM(false)
.build();
RMApp app1 = MockRMAppSubmitter.submit(rm1, data1);
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
am1.allocate("*", 1 * GB, 38, new ArrayList<ContainerId>());
// Do allocation for node1/node2
for (int i = 0; i < 38; i++) {
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
}
// App1 should have 39 containers now
FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
am1.getApplicationAttemptId());
assertEquals(39, schedulerApp1.getLiveContainers().size());
// 20 from n1 and 19 from n2
waitNumberOfLiveContainersOnNodeFromApp(cs.getNode(rmNode1.getNodeID()),
am1.getApplicationAttemptId(), 20);
waitNumberOfLiveContainersOnNodeFromApp(cs.getNode(rmNode2.getNodeID()),
am1.getApplicationAttemptId(), 19);
// Submit app2 to queue-c and asks for a 4G container for AM
MockRMAppSubmissionData data =
MockRMAppSubmissionData.Builder.createWithMemory(4 * GB, rm1)
.withAppName("app")
.withUser("user")
.withAcls(null)
.withQueue("c")
.withUnmanagedAM(false)
.build();
RMApp app2 = MockRMAppSubmitter.submit(rm1, data);
FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(
ApplicationAttemptId.newInstance(app2.getApplicationId(), 1));
// Call editSchedule: containers are selected to be preemption candidate
SchedulingMonitorManager smm = ((CapacityScheduler) rm1.
getResourceScheduler()).getSchedulingMonitorManager();
SchedulingMonitor smon = smm.getAvailableSchedulingMonitor();
ProportionalCapacityPreemptionPolicy editPolicy =
(ProportionalCapacityPreemptionPolicy) smon.getSchedulingEditPolicy();
editPolicy.editSchedule();
assertEquals(4, editPolicy.getToPreemptContainers().size());
// check live containers immediately, nothing happen
assertEquals(39, schedulerApp1.getLiveContainers().size());
Thread.sleep(20*1000);
// Call editSchedule again: selected containers are killed
editPolicy.editSchedule();
waitNumberOfLiveContainersFromApp(schedulerApp1, 35);
// Call allocation, containers are reserved
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
waitNumberOfReservedContainersFromApp(schedulerApp2, 1);
// Call editSchedule twice and allocation once, container should get allocated
editPolicy.editSchedule();
editPolicy.editSchedule();
int tick = 0;
while (schedulerApp2.getLiveContainers().size() != 1 && tick < 10) {
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
tick++;
Thread.sleep(100);
}
waitNumberOfReservedContainersFromApp(schedulerApp2, 0);
rm1.close();
}
}