TestCapacitySchedulerMultiNodes.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.GB;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.waitforNMRegistered;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.Mockito.when;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.thirdparty.com.google.common.collect.Iterators;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ClusterNodeTracker;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSet;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimpleCandidateNodeSet;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmissionData;
import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmitter;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.MultiNodeSorter;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.MultiNodeSortingManager;
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
/**
* Test class for Multi Node scheduling related tests.
*/
public class TestCapacitySchedulerMultiNodes {
private static final Logger LOG = LoggerFactory
.getLogger(TestCapacitySchedulerMultiNodes.class);
private static final QueuePath DEFAULT = new QueuePath("root.default");
private CapacitySchedulerConfiguration conf;
private static final String POLICY_CLASS_NAME =
"org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.ResourceUsageMultiNodeLookupPolicy";
@BeforeEach
public void setUp() {
CapacitySchedulerConfiguration config =
new CapacitySchedulerConfiguration();
config.set(CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS,
DominantResourceCalculator.class.getName());
conf = new CapacitySchedulerConfiguration(config);
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
conf.set(CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICIES,
"resource-based");
conf.set(CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICY_NAME,
"resource-based");
String policyName =
CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICY_NAME
+ ".resource-based" + ".class";
conf.set(policyName, POLICY_CLASS_NAME);
conf.setBoolean(CapacitySchedulerConfiguration.MULTI_NODE_PLACEMENT_ENABLED,
true);
conf.setInt("yarn.scheduler.minimum-allocation-mb", 512);
conf.setInt("yarn.scheduler.minimum-allocation-vcores", 1);
}
@Test
public void testMultiNodeSorterForScheduling() throws Exception {
MockRM rm = new MockRM(conf);
rm.start();
rm.registerNode("127.0.0.1:1234", 10 * GB);
rm.registerNode("127.0.0.1:1235", 10 * GB);
rm.registerNode("127.0.0.1:1236", 10 * GB);
rm.registerNode("127.0.0.1:1237", 10 * GB);
ResourceScheduler scheduler = rm.getRMContext().getScheduler();
waitforNMRegistered(scheduler, 4, 5);
MultiNodeSortingManager<SchedulerNode> mns = rm.getRMContext()
.getMultiNodeSortingManager();
MultiNodeSorter<SchedulerNode> sorter = mns
.getMultiNodePolicy(POLICY_CLASS_NAME);
sorter.reSortClusterNodes();
Set<SchedulerNode> nodes = sorter.getMultiNodeLookupPolicy()
.getNodesPerPartition("");
assertEquals(4, nodes.size());
rm.stop();
}
@Test
public void testMultiNodeSorterForSchedulingWithOrdering() throws Exception {
MockRM rm = new MockRM(conf);
rm.start();
MockNM nm1 = rm.registerNode("127.0.0.1:1234", 10 * GB, 10);
MockNM nm2 = rm.registerNode("127.0.0.2:1235", 10 * GB, 10);
MockNM nm3 = rm.registerNode("127.0.0.3:1236", 10 * GB, 10);
MockNM nm4 = rm.registerNode("127.0.0.4:1237", 10 * GB, 10);
ResourceScheduler scheduler = rm.getRMContext().getScheduler();
waitforNMRegistered(scheduler, 4, 5);
MultiNodeSortingManager<SchedulerNode> mns = rm.getRMContext()
.getMultiNodeSortingManager();
MultiNodeSorter<SchedulerNode> sorter = mns
.getMultiNodePolicy(POLICY_CLASS_NAME);
sorter.reSortClusterNodes();
Set<SchedulerNode> nodes = sorter.getMultiNodeLookupPolicy()
.getNodesPerPartition("");
assertEquals(4, nodes.size());
MockRMAppSubmissionData data1 =
MockRMAppSubmissionData.Builder.createWithMemory(2048, rm)
.withAppName("app-1")
.withUser("user1")
.withAcls(null)
.withQueue("default")
.withUnmanagedAM(false)
.build();
RMApp app1 = MockRMAppSubmitter.submit(rm, data1);
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
SchedulerNodeReport reportNm1 =
rm.getResourceScheduler().getNodeReport(nm1.getNodeId());
// check node report
assertEquals(2 * GB, reportNm1.getUsedResource().getMemorySize());
assertEquals(8 * GB,
reportNm1.getAvailableResource().getMemorySize());
// Ideally thread will invoke this, but thread operates every 1sec.
// Hence forcefully recompute nodes.
sorter.reSortClusterNodes();
MockRMAppSubmissionData data =
MockRMAppSubmissionData.Builder.createWithMemory(1024, rm)
.withAppName("app-2")
.withUser("user2")
.withAcls(null)
.withQueue("default")
.withUnmanagedAM(false)
.build();
RMApp app2 = MockRMAppSubmitter.submit(rm, data);
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm2);
SchedulerNodeReport reportNm2 =
rm.getResourceScheduler().getNodeReport(nm2.getNodeId());
// check node report
assertEquals(1 * GB, reportNm2.getUsedResource().getMemorySize());
assertEquals(9 * GB,
reportNm2.getAvailableResource().getMemorySize());
// Ideally thread will invoke this, but thread operates every 1sec.
// Hence forcefully recompute nodes.
sorter.reSortClusterNodes();
// Node1 and Node2 are now having used resources. Hence ensure these 2 comes
// latter in the list.
nodes = sorter.getMultiNodeLookupPolicy()
.getNodesPerPartition("");
List<NodeId> currentNodes = new ArrayList<>();
currentNodes.add(nm3.getNodeId());
currentNodes.add(nm4.getNodeId());
currentNodes.add(nm2.getNodeId());
currentNodes.add(nm1.getNodeId());
Iterator<SchedulerNode> it = nodes.iterator();
SchedulerNode current;
int i = 0;
while (it.hasNext()) {
current = it.next();
assertEquals(current.getNodeID(), currentNodes.get(i++));
}
rm.stop();
}
@Test
@Timeout(value = 30)
public void testExcessReservationWillBeUnreserved() throws Exception {
CapacitySchedulerConfiguration newConf =
new CapacitySchedulerConfiguration(conf);
newConf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER,
YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER);
newConf.setInt(CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICY_NAME
+ ".resource-based.sorting-interval.ms", 0);
newConf.setMaximumApplicationMasterResourcePerQueuePercent(DEFAULT,
1.0f);
MockRM rm1 = new MockRM(newConf);
rm1.start();
MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB);
MockNM nm2 = rm1.registerNode("h2:1234", 8 * GB);
// launch an app to queue, AM container should be launched in nm1
RMApp app1 = MockRMAppSubmitter.submit(rm1,
MockRMAppSubmissionData.Builder.createWithMemory(5 * GB, rm1)
.withAppName("app")
.withUser("user")
.withAcls(null)
.withQueue("default")
.build());
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
// launch another app to queue, AM container should be launched in nm2
RMApp app2 = MockRMAppSubmitter.submit(rm1,
MockRMAppSubmissionData.Builder.createWithMemory(5 * GB, rm1)
.withAppName("app")
.withUser("user")
.withAcls(null)
.withQueue("default")
.build());
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
LeafQueue leafQueue = (LeafQueue) cs.getQueue("default");
FiCaSchedulerApp schedulerApp1 =
cs.getApplicationAttempt(am1.getApplicationAttemptId());
FiCaSchedulerApp schedulerApp2 =
cs.getApplicationAttempt(am2.getApplicationAttemptId());
/*
* Verify that reserved container will be unreserved
* after its ask has been cancelled when used capacity of root queue is 1.
*/
// Ask a container with 6GB memory size for app1,
// nm2 will reserve a container for app1
// Last Node from Node Iterator will be RESERVED
am1.allocate("*", 6 * GB, 1, new ArrayList<>());
cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
// Check containers of app1 and app2.
assertNotNull(cs.getNode(nm2.getNodeId()).getReservedContainer());
assertEquals(1, schedulerApp1.getLiveContainers().size());
assertEquals(1, schedulerApp1.getReservedContainers().size());
assertEquals(1, schedulerApp2.getLiveContainers().size());
// Cancel ask of the reserved container.
am1.allocate("*", 6 * GB, 0, new ArrayList<>());
// Ask another container with 2GB memory size for app2.
am2.allocate("*", 2 * GB, 1, new ArrayList<>());
// Trigger scheduling to release reserved container
// whose ask has been cancelled.
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
assertNull(cs.getNode(nm1.getNodeId()).getReservedContainer());
assertEquals(1, schedulerApp1.getLiveContainers().size());
assertEquals(0, schedulerApp1.getReservedContainers().size());
assertEquals(2, schedulerApp2.getLiveContainers().size());
assertEquals(7 * GB,
cs.getNode(nm1.getNodeId()).getAllocatedResource().getMemorySize());
assertEquals(12 * GB,
cs.getRootQueue().getQueueResourceUsage().getUsed().getMemorySize());
assertEquals(0,
cs.getRootQueue().getQueueResourceUsage().getReserved()
.getMemorySize());
assertEquals(0,
leafQueue.getQueueResourceUsage().getReserved().getMemorySize());
rm1.close();
}
@Test
@Timeout(value = 30)
public void testAllocateForReservedContainer() throws Exception {
CapacitySchedulerConfiguration newConf =
new CapacitySchedulerConfiguration(conf);
newConf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER,
YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER);
newConf.setInt(CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICY_NAME
+ ".resource-based.sorting-interval.ms", 0);
newConf.setMaximumApplicationMasterResourcePerQueuePercent(DEFAULT,
1.0f);
MockRM rm1 = new MockRM(newConf);
rm1.start();
MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB);
MockNM nm2 = rm1.registerNode("h2:1234", 8 * GB);
// launch an app to queue, AM container should be launched in nm1
RMApp app1 = MockRMAppSubmitter.submit(rm1,
MockRMAppSubmissionData.Builder.createWithMemory(5 * GB, rm1)
.withAppName("app")
.withUser("user")
.withAcls(null)
.withQueue("default")
.build());
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
// launch another app to queue, AM container should be launched in nm2
RMApp app2 = MockRMAppSubmitter.submit(rm1,
MockRMAppSubmissionData.Builder.createWithMemory(5 * GB, rm1)
.withAppName("app")
.withUser("user")
.withAcls(null)
.withQueue("default")
.build());
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
FiCaSchedulerApp schedulerApp1 =
cs.getApplicationAttempt(am1.getApplicationAttemptId());
FiCaSchedulerApp schedulerApp2 =
cs.getApplicationAttempt(am2.getApplicationAttemptId());
/*
* Verify that reserved container will be allocated
* after node has sufficient resource.
*/
// Ask a container with 6GB memory size for app2,
// nm2 will reserve a container for app2
// Last Node from Node Iterator will be RESERVED
am2.allocate("*", 6 * GB, 1, new ArrayList<>());
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
// Check containers of app1 and app2.
assertNotNull(cs.getNode(nm2.getNodeId()).getReservedContainer());
assertEquals(1, schedulerApp1.getLiveContainers().size());
assertEquals(1, schedulerApp2.getLiveContainers().size());
assertEquals(1, schedulerApp2.getReservedContainers().size());
// Kill app1 to release resource on nm1.
rm1.killApp(app1.getApplicationId());
// Trigger scheduling to allocate for reserved container on nm1.
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
assertNull(cs.getNode(nm1.getNodeId()).getReservedContainer());
assertEquals(2, schedulerApp2.getLiveContainers().size());
rm1.close();
}
@Test
@Timeout(value = 30)
public void testAllocateOfReservedContainerFromAnotherNode()
throws Exception {
CapacitySchedulerConfiguration newConf =
new CapacitySchedulerConfiguration(conf);
newConf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER,
YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER);
newConf.setInt(CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICY_NAME
+ ".resource-based.sorting-interval.ms", 0);
newConf.setMaximumApplicationMasterResourcePerQueuePercent(DEFAULT,
1.0f);
MockRM rm1 = new MockRM(newConf);
rm1.start();
MockNM nm1 = rm1.registerNode("h1:1234", 12 * GB, 2);
MockNM nm2 = rm1.registerNode("h2:1234", 12 * GB, 2);
// launch an app1 to queue, AM container will be launched in nm1
RMApp app1 = MockRMAppSubmitter.submit(rm1,
MockRMAppSubmissionData.Builder.createWithMemory(8 * GB, rm1)
.withAppName("app")
.withUser("user")
.withAcls(null)
.withQueue("default")
.build());
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
// launch another app2 to queue, AM container will be launched in nm2
RMApp app2 = MockRMAppSubmitter.submit(rm1,
MockRMAppSubmissionData.Builder.createWithMemory(8 * GB, rm1)
.withAppName("app")
.withUser("user")
.withAcls(null)
.withQueue("default")
.build());
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
// Reserve a Container for app3
RMApp app3 = MockRMAppSubmitter.submit(rm1,
MockRMAppSubmissionData.Builder.createWithMemory(8 * GB, rm1)
.withAppName("app")
.withUser("user")
.withAcls(null)
.withQueue("default")
.build());
final AtomicBoolean result = new AtomicBoolean(false);
Thread t = new Thread() {
public void run() {
try {
MockAM am3 = MockRM.launchAndRegisterAM(app3, rm1, nm1);
result.set(true);
} catch (Exception e) {
fail("Failed to allocate the reserved container");
}
}
};
t.start();
Thread.sleep(1000);
// Validate if app3 has got RESERVED container
FiCaSchedulerApp schedulerApp =
cs.getApplicationAttempt(app3.getCurrentAppAttempt().getAppAttemptId());
assertEquals(1, schedulerApp.getReservedContainers().size(),
"App3 failed to get reserved container");
// Free the Space on other node where Reservation has not happened
if (cs.getNode(rmNode1.getNodeID()).getReservedContainer() != null) {
rm1.killApp(app2.getApplicationId());
cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
} else {
rm1.killApp(app1.getApplicationId());
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
}
// Check if Reserved AM of app3 gets allocated in
// node where space available
while (!result.get()) {
Thread.sleep(100);
}
// Validate release of reserved containers
schedulerApp =
cs.getApplicationAttempt(app3.getCurrentAppAttempt().getAppAttemptId());
assertEquals(0, schedulerApp.getReservedContainers().size(),
"App3 failed to release Reserved container");
assertNull(cs.getNode(rmNode1.getNodeID()).getReservedContainer());
assertNull(cs.getNode(rmNode2.getNodeID()).getReservedContainer());
rm1.close();
}
@Test
public void testMultiNodeSorterAfterHeartbeatInterval() throws Exception {
MockRM rm = new MockRM(conf);
rm.start();
rm.registerNode("127.0.0.1:1234", 10 * GB);
rm.registerNode("127.0.0.2:1234", 10 * GB);
rm.registerNode("127.0.0.3:1234", 10 * GB);
rm.registerNode("127.0.0.4:1234", 10 * GB);
Set<SchedulerNode> nodes = new HashSet<>();
String partition = "";
ResourceScheduler scheduler = rm.getRMContext().getScheduler();
waitforNMRegistered(scheduler, 4, 5);
MultiNodeSortingManager<SchedulerNode> mns = rm.getRMContext()
.getMultiNodeSortingManager();
MultiNodeSorter<SchedulerNode> sorter = mns
.getMultiNodePolicy(POLICY_CLASS_NAME);
sorter.reSortClusterNodes();
Iterator<SchedulerNode> nodeIterator = mns.getMultiNodeSortIterator(
nodes, partition, POLICY_CLASS_NAME);
assertEquals(4, Iterators.size(nodeIterator));
// Validate the count after missing 3 node heartbeats
Thread.sleep(YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MS * 3);
nodeIterator = mns.getMultiNodeSortIterator(
nodes, partition, POLICY_CLASS_NAME);
assertEquals(0, Iterators.size(nodeIterator));
rm.stop();
}
@Test
@Timeout(value = 30)
public void testSkipAllocationOnNodeReservedByAnotherApp() throws Exception {
CapacitySchedulerConfiguration newConf =
new CapacitySchedulerConfiguration(conf);
newConf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER,
YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER);
newConf.setInt(CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICY_NAME
+ ".resource-based.sorting-interval.ms", 0);
newConf.setMaximumApplicationMasterResourcePerQueuePercent(DEFAULT, 1.0f);
newConf.set(CapacitySchedulerConfiguration.SKIP_ALLOCATE_ON_NODES_WITH_RESERVED_CONTAINERS,
"true");
MockRM rm1 = new MockRM(newConf);
rm1.start();
MockNM nm1 = rm1.registerNode("127.0.0.1:1234", 8 * GB);
MockNM nm2 = rm1.registerNode("127.0.0.2:1235", 8 * GB);
// launch an app to queue, AM container should be launched in nm1
RMApp app1 = MockRMAppSubmitter.submit(rm1, MockRMAppSubmissionData.Builder
.createWithMemory(5 * GB, rm1)
.withAppName("app")
.withUser("user")
.withQueue("default")
.build());
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
// launch another app to queue, AM container should be launched in nm2
RMApp app2 = MockRMAppSubmitter.submit(rm1, MockRMAppSubmissionData.Builder
.createWithMemory(5 * GB, rm1)
.withAppName("app")
.withUser("user")
.withQueue("default")
.build());
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
FiCaSchedulerApp schedulerApp1 =
cs.getApplicationAttempt(am1.getApplicationAttemptId());
FiCaSchedulerApp schedulerApp2 =
cs.getApplicationAttempt(am2.getApplicationAttemptId());
// Ask a container with 4 GB memory size for app1,
am1.allocate("*", 4 * GB, 1, new ArrayList<>());
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
// Check containers of app1 and app2.
Set<RMNode> reservedContainers = checkReservedContainers(cs,
rm1.getRMContext().getRMNodes(), 1);
assertEquals(1, reservedContainers.size());
RMNode nodeWithReservedContainer = reservedContainers.iterator().next();
LOG.debug("Reserved container on: {}", nodeWithReservedContainer);
//Move reservation to nm1 for easier testing
if (nodeWithReservedContainer.getNodeID().getHost().startsWith("127.0.0.2")) {
moveReservation(cs, rm1, nm1, nm2, am1);
}
assertNotNull(cs.getNode(nm1.getNodeId()).getReservedContainer());
assertNull(cs.getNode(nm2.getNodeId()).getReservedContainer());
assertEquals(1, schedulerApp1.getLiveContainers().size());
assertEquals(1, schedulerApp2.getLiveContainers().size());
assertEquals(1, schedulerApp1.getReservedContainers().size());
//Make sure to have available headroom on the child queue,
// see: RegularContainerAllocator#checkHeadroom,
//that can make RegularContainerAllocator.preCheckForNodeCandidateSet to return
// ContainerAllocation.QUEUE_SKIPPED
MockNM nm3 = rm1.registerNode("127.0.0.3:1235", 3 * GB);
//Allocate a container for app2, we expect this to be allocated on nm2 as
// nm1 has a reservation for another app
am2.allocate("*", 4 * GB, 1, new ArrayList<>());
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
assertNotNull(cs.getNode(nm1.getNodeId()).getReservedContainer());
assertNotNull(cs.getNode(nm2.getNodeId()).getReservedContainer());
rm1.close();
}
// In a scheduling scenario with 2000 candidate nodes and an unsatisfied
// request that reaches the headroom,
// currently the global scheduler checks the request for each node repeatedly,
// causing a scheduling cycle to exceed 200ms.
// With the improvement of YARN-11798, the global scheduler checks the request
// only once before evaluating all nodes, reducing the scheduling cycle to
// less than 2ms.
@Test
@Timeout(value = 30)
public void testCheckRequestOnceForUnsatisfiedRequest() throws Exception {
QueuePath defaultQueuePath =
new QueuePath(CapacitySchedulerConfiguration.ROOT, "default");
String resourceLimit = "[vcores=3,memory=3096]";
conf.setCapacity(defaultQueuePath, resourceLimit);
conf.setMaximumCapacityByLabel(defaultQueuePath, "", resourceLimit);
conf.setInt(YarnConfiguration.SCHEDULER_SKIP_NODE_MULTIPLIER, 1000);
MockRM rm = new MockRM(conf);
rm.start();
MockNM nm1 = rm.registerNode("test:1234", 10 * GB);
ResourceScheduler scheduler = rm.getRMContext().getScheduler();
waitforNMRegistered(scheduler, 1, 5);
MockRMAppSubmissionData data1 =
MockRMAppSubmissionData.Builder.createWithMemory(2048, rm)
.withAppName("app-1")
.withUser("user1")
.withAcls(null)
.withQueue("default")
.withUnmanagedAM(false)
.build();
RMApp app1 = MockRMAppSubmitter.submit(rm, data1);
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
SchedulerNodeReport reportNm1 =
rm.getResourceScheduler().getNodeReport(nm1.getNodeId());
// check node report
assertEquals(2 * GB, reportNm1.getUsedResource().getMemorySize());
assertEquals(8 * GB,
reportNm1.getAvailableResource().getMemorySize());
// mock node tracker with 2000 nodes
// to simulate the scenario where there are many nodes in the cluster
List<FiCaSchedulerNode> mockNodes = new ArrayList<>();
for (int i = 0; i < 2000; i++) {
FiCaSchedulerNode node =
TestUtils.getMockNode("host" + i + ":1234", "", 0, 10 * GB, 10);
mockNodes.add(node);
}
ClusterNodeTracker<FiCaSchedulerNode> mockNodeTracker =
new ClusterNodeTracker<FiCaSchedulerNode>() {
@Override
public List<FiCaSchedulerNode> getNodesPerPartition(String partition) {
return mockNodes;
}
};
// replace the scheduler with a spy scheduler with mocked node-tracker
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
final CapacityScheduler spyCs = Mockito.spy(cs);
when(spyCs.getNodeTracker()).thenReturn(mockNodeTracker);
RMContext rmContext = rm.getRMContext();
((RMContextImpl) rmContext).setScheduler(spyCs);
MultiNodeSortingManager<SchedulerNode> mns =
rmContext.getMultiNodeSortingManager();
MultiNodeSorter<SchedulerNode> sorter =
mns.getMultiNodePolicy(POLICY_CLASS_NAME);
sorter.reSortClusterNodes();
// verify that the number of nodes is correct
Set<SchedulerNode> nodes =
sorter.getMultiNodeLookupPolicy().getNodesPerPartition("");
assertEquals(mockNodes.size(), nodes.size());
// create an unsatisfied request which will reach the headroom
am1.allocate("*", 2 * GB, 10, new ArrayList<>());
List<Long> elapsedMsLst = new ArrayList<>();
try {
GenericTestUtils.waitFor(() -> {
// verify that when headroom is reached for an unsatisfied request,
// scheduler should only check the request once before checking all nodes.
CandidateNodeSet<FiCaSchedulerNode> candidates =
new SimpleCandidateNodeSet<>(Collections.emptyMap(), "");
int numSchedulingCycles = 10;
long startTime = System.currentTimeMillis();
for (int i = 0; i < numSchedulingCycles; i++) {
spyCs.allocateContainersToNode(candidates, false);
}
long avgElapsedMs =
(System.currentTimeMillis() - startTime) / numSchedulingCycles;
LOG.info("Average elapsed time for a scheduling cycle: {} ms",
avgElapsedMs);
elapsedMsLst.add(avgElapsedMs);
// verify that the scheduling cycle is less than 10ms,
// ideally the latency should be less than 2ms.
return avgElapsedMs < 10;
}, 500, 3000);
} catch (TimeoutException e) {
fail("Scheduling cycle expected to be less than 10ms, " +
"but took too long, elapsedMs:" + elapsedMsLst);
} finally {
rm.stop();
}
}
private static void moveReservation(CapacityScheduler cs,
MockRM rm1, MockNM nm1, MockNM nm2, MockAM am1) {
RMNode sourceNode = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
RMNode targetNode = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
SchedulerApplicationAttempt firstSchedulerAppAttempt =
cs.getApplicationAttempt(am1.getApplicationAttemptId());
FiCaSchedulerApp app = (FiCaSchedulerApp)firstSchedulerAppAttempt;
RMContainer reservedContainer = cs.getNode(sourceNode.getNodeID()).getReservedContainer();
LOG.debug("Moving reservation");
app.moveReservation(reservedContainer,
cs.getNode(sourceNode.getNodeID()), cs.getNode(targetNode.getNodeID()));
}
private static Set<RMNode> checkReservedContainers(CapacityScheduler cs,
ConcurrentMap<NodeId, RMNode> rmNodes, int expectedNumberOfContainers) {
Set<RMNode> result = new HashSet<>();
for (Map.Entry<NodeId, RMNode> entry : rmNodes.entrySet()) {
if (cs.getNode(entry.getKey()).getReservedContainer() != null) {
result.add(entry.getValue());
}
}
assertEquals(expectedNumberOfContainers, result.size());
return result;
}
}