TestPlacementProcessor.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueuePath;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.RejectedSchedulingRequest;
import org.apache.hadoop.yarn.api.records.RejectionReason;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceSizing;
import org.apache.hadoop.yarn.api.records.SchedulingRequest;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
import org.apache.hadoop.yarn.api.resource.PlacementConstraints;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmissionData;
import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmitter;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import static java.lang.Thread.sleep;
import static org.apache.hadoop.yarn.api.records.RejectionReason.COULD_NOT_PLACE_ON_NODE;
import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.NODE;
import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.PlacementTargets.allocationTag;
import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.targetCardinality;
import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.targetIn;
import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.targetNotIn;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
/**
* This tests end2end workflow of the constraint placement framework.
*/
public class TestPlacementProcessor {
private static final int GB = 1024;
private static final Logger LOG =
LoggerFactory.getLogger(TestPlacementProcessor.class);
private MockRM rm;
private DrainDispatcher dispatcher;
@BeforeEach
public void createAndStartRM() {
CapacitySchedulerConfiguration csConf =
new CapacitySchedulerConfiguration();
YarnConfiguration conf = new YarnConfiguration(csConf);
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
conf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER,
YarnConfiguration.PROCESSOR_RM_PLACEMENT_CONSTRAINTS_HANDLER);
conf.setInt(
YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_RETRY_ATTEMPTS, 1);
startRM(conf);
}
private void startRM(final YarnConfiguration conf) {
dispatcher = new DrainDispatcher();
rm = new MockRM(conf) {
@Override
protected Dispatcher createDispatcher() {
return dispatcher;
}
};
rm.start();
}
@AfterEach
public void stopRM() {
if (rm != null) {
rm.stop();
}
}
@Test
@Timeout(value = 300)
public void testAntiAffinityPlacement() throws Exception {
HashMap<NodeId, MockNM> nodes = new HashMap<>();
MockNM nm1 = new MockNM("h1:1234", 4096, rm.getResourceTrackerService());
nodes.put(nm1.getNodeId(), nm1);
MockNM nm2 = new MockNM("h2:1234", 4096, rm.getResourceTrackerService());
nodes.put(nm2.getNodeId(), nm2);
MockNM nm3 = new MockNM("h3:1234", 4096, rm.getResourceTrackerService());
nodes.put(nm3.getNodeId(), nm3);
MockNM nm4 = new MockNM("h4:1234", 4096, rm.getResourceTrackerService());
nodes.put(nm4.getNodeId(), nm4);
nm1.registerNode();
nm2.registerNode();
nm3.registerNode();
nm4.registerNode();
MockRMAppSubmissionData data =
MockRMAppSubmissionData.Builder.createWithMemory(1 * GB, rm)
.withAppName("app")
.withUser("user")
.withAcls(null)
.withQueue("default")
.withUnmanagedAM(false)
.build();
RMApp app1 = MockRMAppSubmitter.submit(rm, data);
// Containers with allocationTag 'foo' are restricted to 1 per NODE
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2,
Collections.singletonMap(Collections.singleton("foo"),
PlacementConstraints.build(
PlacementConstraints.targetNotIn(NODE, allocationTag("foo")))));
am1.addSchedulingRequest(
Arrays.asList(schedulingRequest(1, 1, 1, 512, "foo"),
schedulingRequest(1, 2, 1, 512, "foo"),
schedulingRequest(1, 3, 1, 512, "foo"),
schedulingRequest(1, 5, 1, 512, "foo")));
AllocateResponse allocResponse = am1.schedule(); // send the request
List<Container> allocatedContainers = new ArrayList<>();
allocatedContainers.addAll(allocResponse.getAllocatedContainers());
// kick the scheduler
waitForContainerAllocation(nodes.values(), am1,
allocatedContainers, new ArrayList<>(), 4);
assertEquals(4, allocatedContainers.size());
Set<NodeId> nodeIds = allocatedContainers.stream().map(x -> x.getNodeId())
.collect(Collectors.toSet());
// Ensure unique nodes (antiaffinity)
assertEquals(4, nodeIds.size());
QueueMetrics metrics = rm.getResourceScheduler().getRootQueueMetrics();
// Verify Metrics
verifyMetrics(metrics, 11264, 11, 5120, 5, 5);
}
@Test
@Timeout(value = 300)
public void testMutualAntiAffinityPlacement() throws Exception {
HashMap<NodeId, MockNM> nodes = new HashMap<>();
MockNM nm1 = new MockNM("h1:1234", 4096, rm.getResourceTrackerService());
nodes.put(nm1.getNodeId(), nm1);
MockNM nm2 = new MockNM("h2:1234", 4096, rm.getResourceTrackerService());
nodes.put(nm2.getNodeId(), nm2);
MockNM nm3 = new MockNM("h3:1234", 4096, rm.getResourceTrackerService());
nodes.put(nm3.getNodeId(), nm3);
MockNM nm4 = new MockNM("h4:1234", 4096, rm.getResourceTrackerService());
nodes.put(nm4.getNodeId(), nm4);
MockNM nm5 = new MockNM("h5:1234", 4096, rm.getResourceTrackerService());
nodes.put(nm5.getNodeId(), nm5);
nm1.registerNode();
nm2.registerNode();
nm3.registerNode();
nm4.registerNode();
nm5.registerNode();
MockRMAppSubmissionData data =
MockRMAppSubmissionData.Builder.createWithMemory(1 * GB, rm)
.withAppName("app")
.withUser("user")
.withAcls(null)
.withQueue("default")
.withUnmanagedAM(false)
.build();
RMApp app1 = MockRMAppSubmitter.submit(rm, data);
// Containers with allocationTag 'foo' are restricted to 1 per NODE
Map<Set<String>, PlacementConstraint> pcMap = new HashMap<>();
pcMap.put(Collections.singleton("foo"),
PlacementConstraints.build(
PlacementConstraints.targetNotIn(NODE, allocationTag("foo"))));
pcMap.put(Collections.singleton("bar"),
PlacementConstraints.build(
PlacementConstraints.targetNotIn(NODE, allocationTag("foo"))));
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2, pcMap);
am1.addSchedulingRequest(
Arrays.asList(schedulingRequest(1, 1, 1, 512, "bar"),
schedulingRequest(1, 2, 1, 512, "foo"),
schedulingRequest(1, 3, 1, 512, "foo"),
schedulingRequest(1, 4, 1, 512, "foo"),
schedulingRequest(1, 5, 1, 512, "foo")));
AllocateResponse allocResponse = am1.schedule(); // send the request
List<Container> allocatedContainers = new ArrayList<>();
allocatedContainers.addAll(allocResponse.getAllocatedContainers());
// kick the scheduler
waitForContainerAllocation(nodes.values(), am1,
allocatedContainers, new ArrayList<>(), 5);
assertEquals(5, allocatedContainers.size());
Set<NodeId> nodeIds = allocatedContainers.stream().map(x -> x.getNodeId())
.collect(Collectors.toSet());
// Ensure unique nodes (antiaffinity)
assertEquals(5, nodeIds.size());
QueueMetrics metrics = rm.getResourceScheduler().getRootQueueMetrics();
// Verify Metrics
verifyMetrics(metrics, 14336, 14, 6144, 6, 6);
}
@Test
@Timeout(value = 300)
public void testCardinalityPlacement() throws Exception {
HashMap<NodeId, MockNM> nodes = new HashMap<>();
MockNM nm1 = new MockNM("h1:1234", 8192, rm.getResourceTrackerService());
nodes.put(nm1.getNodeId(), nm1);
MockNM nm2 = new MockNM("h2:1234", 8192, rm.getResourceTrackerService());
nodes.put(nm2.getNodeId(), nm2);
MockNM nm3 = new MockNM("h3:1234", 8192, rm.getResourceTrackerService());
nodes.put(nm3.getNodeId(), nm3);
MockNM nm4 = new MockNM("h4:1234", 8192, rm.getResourceTrackerService());
nodes.put(nm4.getNodeId(), nm4);
nm1.registerNode();
nm2.registerNode();
nm3.registerNode();
nm4.registerNode();
MockRMAppSubmissionData data =
MockRMAppSubmissionData.Builder.createWithMemory(1 * GB, rm)
.withAppName("app")
.withUser("user")
.withAcls(null)
.withQueue("default")
.withUnmanagedAM(false)
.build();
RMApp app1 = MockRMAppSubmitter.submit(rm, data);
// Containers with allocationTag 'foo' should not exceed 4 per NODE
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2,
Collections.singletonMap(Collections.singleton("foo"),
PlacementConstraints.build(PlacementConstraints
.targetCardinality(NODE, 0, 3, allocationTag("foo")))));
am1.addSchedulingRequest(
Arrays.asList(schedulingRequest(1, 1, 1, 512, "foo"),
schedulingRequest(1, 2, 1, 512, "foo"),
schedulingRequest(1, 3, 1, 512, "foo"),
schedulingRequest(1, 4, 1, 512, "foo"),
schedulingRequest(1, 5, 1, 512, "foo"),
schedulingRequest(1, 6, 1, 512, "foo"),
schedulingRequest(1, 7, 1, 512, "foo"),
schedulingRequest(1, 8, 1, 512, "foo")));
AllocateResponse allocResponse = am1.schedule(); // send the request
List<Container> allocatedContainers = new ArrayList<>();
allocatedContainers.addAll(allocResponse.getAllocatedContainers());
// kick the scheduler
waitForContainerAllocation(nodes.values(), am1,
allocatedContainers, new ArrayList<>(), 8);
assertEquals(8, allocatedContainers.size());
Map<NodeId, Long> nodeIdContainerIdMap =
allocatedContainers.stream().collect(
Collectors.groupingBy(c -> c.getNodeId(), Collectors.counting()));
// Ensure no more than 4 containers per node
for (NodeId n : nodeIdContainerIdMap.keySet()) {
assertTrue(nodeIdContainerIdMap.get(n) < 5);
}
QueueMetrics metrics = rm.getResourceScheduler().getRootQueueMetrics();
// Verify Metrics
verifyMetrics(metrics, 23552, 23, 9216, 9, 9);
}
@Test
@Timeout(value = 300)
public void testAffinityPlacement() throws Exception {
HashMap<NodeId, MockNM> nodes = new HashMap<>();
MockNM nm1 = new MockNM("h1:1234", 8192, rm.getResourceTrackerService());
nodes.put(nm1.getNodeId(), nm1);
MockNM nm2 = new MockNM("h2:1234", 8192, rm.getResourceTrackerService());
nodes.put(nm2.getNodeId(), nm2);
MockNM nm3 = new MockNM("h3:1234", 8192, rm.getResourceTrackerService());
nodes.put(nm3.getNodeId(), nm3);
MockNM nm4 = new MockNM("h4:1234", 8192, rm.getResourceTrackerService());
nodes.put(nm4.getNodeId(), nm4);
nm1.registerNode();
nm2.registerNode();
nm3.registerNode();
nm4.registerNode();
MockRMAppSubmissionData data =
MockRMAppSubmissionData.Builder.createWithMemory(1 * GB, rm)
.withAppName("app")
.withUser("user")
.withAcls(null)
.withQueue("default")
.withUnmanagedAM(false)
.build();
RMApp app1 = MockRMAppSubmitter.submit(rm, data);
// Containers with allocationTag 'foo' should be placed where
// containers with allocationTag 'bar' are already running
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2,
Collections.singletonMap(Collections.singleton("foo"),
PlacementConstraints.build(
PlacementConstraints.targetIn(NODE, allocationTag("bar")))));
am1.addSchedulingRequest(
Arrays.asList(schedulingRequest(1, 1, 1, 512, "bar"),
schedulingRequest(1, 2, 1, 512, "foo"),
schedulingRequest(1, 3, 1, 512, "foo"),
schedulingRequest(1, 4, 1, 512, "foo"),
schedulingRequest(1, 5, 1, 512, "foo")));
AllocateResponse allocResponse = am1.schedule(); // send the request
List<Container> allocatedContainers = new ArrayList<>();
allocatedContainers.addAll(allocResponse.getAllocatedContainers());
// kick the scheduler
waitForContainerAllocation(nodes.values(), am1,
allocatedContainers, new ArrayList<>(), 5);
assertEquals(5, allocatedContainers.size());
Set<NodeId> nodeIds = allocatedContainers.stream().map(x -> x.getNodeId())
.collect(Collectors.toSet());
// Ensure all containers end up on the same node (affinity)
assertEquals(1, nodeIds.size());
QueueMetrics metrics = rm.getResourceScheduler().getRootQueueMetrics();
// Verify Metrics
verifyMetrics(metrics, 26624, 26, 6144, 6, 6);
}
@Test
@Timeout(value = 300)
public void testComplexPlacement() throws Exception {
HashMap<NodeId, MockNM> nodes = new HashMap<>();
MockNM nm1 = new MockNM("h1:1234", 4096, rm.getResourceTrackerService());
nodes.put(nm1.getNodeId(), nm1);
MockNM nm2 = new MockNM("h2:1234", 4096, rm.getResourceTrackerService());
nodes.put(nm2.getNodeId(), nm2);
MockNM nm3 = new MockNM("h3:1234", 4096, rm.getResourceTrackerService());
nodes.put(nm3.getNodeId(), nm3);
MockNM nm4 = new MockNM("h4:1234", 4096, rm.getResourceTrackerService());
nodes.put(nm4.getNodeId(), nm4);
nm1.registerNode();
nm2.registerNode();
nm3.registerNode();
nm4.registerNode();
MockRMAppSubmissionData data =
MockRMAppSubmissionData.Builder.createWithMemory(1 * GB, rm)
.withAppName("app")
.withUser("user")
.withAcls(null)
.withQueue("default")
.withUnmanagedAM(false)
.build();
RMApp app1 = MockRMAppSubmitter.submit(rm, data);
Map<Set<String>, PlacementConstraint> constraintMap = new HashMap<>();
// Containers with allocationTag 'bar' should not exceed 1 per NODE
constraintMap.put(Collections.singleton("bar"),
PlacementConstraints.build(targetNotIn(NODE, allocationTag("bar"))));
// Containers with allocationTag 'foo' should be placed where 'bar' exists
constraintMap.put(Collections.singleton("foo"),
PlacementConstraints.build(targetIn(NODE, allocationTag("bar"))));
// Containers with allocationTag 'foo' should not exceed 2 per NODE
constraintMap.put(Collections.singleton("foo"), PlacementConstraints
.build(targetCardinality(NODE, 0, 1, allocationTag("foo"))));
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2, constraintMap);
am1.addSchedulingRequest(
Arrays.asList(schedulingRequest(1, 1, 1, 512, "bar"),
schedulingRequest(1, 2, 1, 512, "bar"),
schedulingRequest(1, 3, 1, 512, "foo"),
schedulingRequest(1, 4, 1, 512, "foo"),
schedulingRequest(1, 5, 1, 512, "foo"),
schedulingRequest(1, 6, 1, 512, "foo")));
AllocateResponse allocResponse = am1.schedule(); // send the request
List<Container> allocatedContainers = new ArrayList<>();
allocatedContainers.addAll(allocResponse.getAllocatedContainers());
// kick the scheduler
waitForContainerAllocation(nodes.values(), am1,
allocatedContainers, new ArrayList<>(), 6);
assertEquals(6, allocatedContainers.size());
Map<NodeId, Long> nodeIdContainerIdMap =
allocatedContainers.stream().collect(
Collectors.groupingBy(c -> c.getNodeId(), Collectors.counting()));
// Ensure no more than 3 containers per node (1 'bar', 2 'foo')
for (NodeId n : nodeIdContainerIdMap.keySet()) {
assertTrue(nodeIdContainerIdMap.get(n) < 4);
}
QueueMetrics metrics = rm.getResourceScheduler().getRootQueueMetrics();
// Verify Metrics
verifyMetrics(metrics, 9216, 9, 7168, 7, 7);
}
@Test
@Timeout(value = 300)
public void testSchedulerRejection() throws Exception {
stopRM();
CapacitySchedulerConfiguration csConf =
new CapacitySchedulerConfiguration();
QueuePath root = new QueuePath(CapacitySchedulerConfiguration.ROOT);
QueuePath queueA = new QueuePath(CapacitySchedulerConfiguration.ROOT + ".a");
QueuePath queueB = new QueuePath(CapacitySchedulerConfiguration.ROOT + ".b");
csConf.setQueues(root, new String[] {"a", "b"});
csConf.setCapacity(queueA, 15.0f);
csConf.setCapacity(queueB, 85.0f);
YarnConfiguration conf = new YarnConfiguration(csConf);
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
conf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER,
YarnConfiguration.PROCESSOR_RM_PLACEMENT_CONSTRAINTS_HANDLER);
startRM(conf);
HashMap<NodeId, MockNM> nodes = new HashMap<>();
MockNM nm1 = new MockNM("h1:1234", 4096, rm.getResourceTrackerService());
nodes.put(nm1.getNodeId(), nm1);
MockNM nm2 = new MockNM("h2:1234", 4096, rm.getResourceTrackerService());
nodes.put(nm2.getNodeId(), nm2);
MockNM nm3 = new MockNM("h3:1234", 4096, rm.getResourceTrackerService());
nodes.put(nm3.getNodeId(), nm3);
MockNM nm4 = new MockNM("h4:1234", 4096, rm.getResourceTrackerService());
nodes.put(nm4.getNodeId(), nm4);
nm1.registerNode();
nm2.registerNode();
nm3.registerNode();
nm4.registerNode();
MockRMAppSubmissionData data =
MockRMAppSubmissionData.Builder.createWithMemory(1 * GB, rm)
.withAppName("app")
.withUser("user")
.withAcls(null)
.withQueue("a")
.withUnmanagedAM(false)
.build();
RMApp app1 = MockRMAppSubmitter.submit(rm, data);
// Containers with allocationTag 'foo' are restricted to 1 per NODE
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2,
Collections.singletonMap(
Collections.singleton("foo"),
PlacementConstraints.build(
PlacementConstraints.targetNotIn(NODE, allocationTag("foo")))
));
am1.addSchedulingRequest(
Arrays.asList(
schedulingRequest(1, 1, 1, 512, "foo"),
schedulingRequest(1, 2, 1, 512, "foo"),
schedulingRequest(1, 3, 1, 512, "foo"),
// Ask for a container larger than the node
schedulingRequest(1, 4, 1, 512, "foo"))
);
AllocateResponse allocResponse = am1.schedule(); // send the request
List<Container> allocatedContainers = new ArrayList<>();
List<RejectedSchedulingRequest> rejectedReqs = new ArrayList<>();
int allocCount = 1;
allocatedContainers.addAll(allocResponse.getAllocatedContainers());
rejectedReqs.addAll(allocResponse.getRejectedSchedulingRequests());
// kick the scheduler
while (allocCount < 11) {
nm1.nodeHeartbeat(true);
nm2.nodeHeartbeat(true);
nm3.nodeHeartbeat(true);
nm4.nodeHeartbeat(true);
LOG.info("Waiting for containers to be created for app 1...");
sleep(1000);
allocResponse = am1.schedule();
allocatedContainers.addAll(allocResponse.getAllocatedContainers());
rejectedReqs.addAll(allocResponse.getRejectedSchedulingRequests());
allocCount++;
if (rejectedReqs.size() > 0 && allocatedContainers.size() > 2) {
break;
}
}
assertEquals(3, allocatedContainers.size());
Set<NodeId> nodeIds = allocatedContainers.stream()
.map(x -> x.getNodeId()).collect(Collectors.toSet());
// Ensure unique nodes
assertEquals(3, nodeIds.size());
RejectedSchedulingRequest rej = rejectedReqs.get(0);
assertEquals(4, rej.getRequest().getAllocationRequestId());
assertEquals(RejectionReason.COULD_NOT_SCHEDULE_ON_NODE,
rej.getReason());
QueueMetrics metrics = rm.getResourceScheduler().getRootQueueMetrics();
// Verify Metrics
verifyMetrics(metrics, 12288, 12, 4096, 4, 4);
}
@Test
@Timeout(value = 300)
public void testNodeCapacityRejection() throws Exception {
HashMap<NodeId, MockNM> nodes = new HashMap<>();
MockNM nm1 = new MockNM("h1:1234", 4096, rm.getResourceTrackerService());
nodes.put(nm1.getNodeId(), nm1);
MockNM nm2 = new MockNM("h2:1234", 4096, rm.getResourceTrackerService());
nodes.put(nm2.getNodeId(), nm2);
MockNM nm3 = new MockNM("h3:1234", 4096, rm.getResourceTrackerService());
nodes.put(nm3.getNodeId(), nm3);
MockNM nm4 = new MockNM("h4:1234", 4096, rm.getResourceTrackerService());
nodes.put(nm4.getNodeId(), nm4);
nm1.registerNode();
nm2.registerNode();
nm3.registerNode();
nm4.registerNode();
MockRMAppSubmissionData data =
MockRMAppSubmissionData.Builder.createWithMemory(1 * GB, rm)
.withAppName("app")
.withUser("user")
.withAcls(null)
.withQueue("default")
.withUnmanagedAM(false)
.build();
RMApp app1 = MockRMAppSubmitter.submit(rm, data);
// Containers with allocationTag 'foo' are restricted to 1 per NODE
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2,
Collections.singletonMap(
Collections.singleton("foo"),
PlacementConstraints.build(
PlacementConstraints.targetNotIn(NODE, allocationTag("foo")))
));
am1.addSchedulingRequest(
Arrays.asList(
schedulingRequest(1, 1, 1, 512, "foo"),
schedulingRequest(1, 2, 1, 512, "foo"),
schedulingRequest(1, 3, 1, 512, "foo"),
// Ask for a container larger than the node
schedulingRequest(1, 4, 1, 5120, "foo"))
);
AllocateResponse allocResponse = am1.schedule(); // send the request
List<Container> allocatedContainers = new ArrayList<>();
List<RejectedSchedulingRequest> rejectedReqs = new ArrayList<>();
int allocCount = 1;
allocatedContainers.addAll(allocResponse.getAllocatedContainers());
rejectedReqs.addAll(allocResponse.getRejectedSchedulingRequests());
// kick the scheduler
while (allocCount < 11) {
nm1.nodeHeartbeat(true);
nm2.nodeHeartbeat(true);
nm3.nodeHeartbeat(true);
nm4.nodeHeartbeat(true);
LOG.info("Waiting for containers to be created for app 1...");
sleep(1000);
allocResponse = am1.schedule();
allocatedContainers.addAll(allocResponse.getAllocatedContainers());
rejectedReqs.addAll(allocResponse.getRejectedSchedulingRequests());
allocCount++;
if (rejectedReqs.size() > 0 && allocatedContainers.size() > 2) {
break;
}
}
assertEquals(3, allocatedContainers.size());
Set<NodeId> nodeIds = allocatedContainers.stream()
.map(x -> x.getNodeId()).collect(Collectors.toSet());
// Ensure unique nodes
assertEquals(3, nodeIds.size());
RejectedSchedulingRequest rej = rejectedReqs.get(0);
assertEquals(4, rej.getRequest().getAllocationRequestId());
assertEquals(RejectionReason.COULD_NOT_PLACE_ON_NODE,
rej.getReason());
QueueMetrics metrics = rm.getResourceScheduler().getRootQueueMetrics();
// Verify Metrics
verifyMetrics(metrics, 12288, 12, 4096, 4, 4);
}
@Test
@Timeout(value = 300)
public void testRePlacementAfterSchedulerRejection() throws Exception {
stopRM();
CapacitySchedulerConfiguration csConf =
new CapacitySchedulerConfiguration();
YarnConfiguration conf = new YarnConfiguration(csConf);
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
conf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER,
YarnConfiguration.PROCESSOR_RM_PLACEMENT_CONSTRAINTS_HANDLER);
conf.setInt(
YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_RETRY_ATTEMPTS, 2);
startRM(conf);
HashMap<NodeId, MockNM> nodes = new HashMap<>();
MockNM nm1 = new MockNM("h1:1234", 4096, rm.getResourceTrackerService());
nodes.put(nm1.getNodeId(), nm1);
MockNM nm2 = new MockNM("h2:1234", 4096, rm.getResourceTrackerService());
nodes.put(nm2.getNodeId(), nm2);
MockNM nm3 = new MockNM("h3:1234", 4096, rm.getResourceTrackerService());
nodes.put(nm3.getNodeId(), nm3);
MockNM nm4 = new MockNM("h4:1234", 4096, rm.getResourceTrackerService());
nodes.put(nm4.getNodeId(), nm4);
MockNM nm5 = new MockNM("h5:1234", 8192, rm.getResourceTrackerService());
nodes.put(nm5.getNodeId(), nm5);
nm1.registerNode();
nm2.registerNode();
nm3.registerNode();
nm4.registerNode();
// Do not register nm5 yet..
MockRMAppSubmissionData data =
MockRMAppSubmissionData.Builder.createWithMemory(1 * GB, rm)
.withAppName("app")
.withUser("user")
.withAcls(null)
.withQueue("default")
.withUnmanagedAM(false)
.build();
RMApp app1 = MockRMAppSubmitter.submit(rm, data);
// Containers with allocationTag 'foo' are restricted to 1 per NODE
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2,
Collections.singletonMap(
Collections.singleton("foo"),
PlacementConstraints.build(
PlacementConstraints.targetNotIn(NODE, allocationTag("foo")))
));
am1.addSchedulingRequest(
Arrays.asList(
schedulingRequest(1, 1, 1, 512, "foo"),
schedulingRequest(1, 2, 1, 512, "foo"),
schedulingRequest(1, 3, 1, 512, "foo"),
// Ask for a container larger than the node
schedulingRequest(1, 4, 1, 5120, "foo"))
);
AllocateResponse allocResponse = am1.schedule(); // send the request
List<Container> allocatedContainers = new ArrayList<>();
List<RejectedSchedulingRequest> rejectedReqs = new ArrayList<>();
int allocCount = 1;
allocatedContainers.addAll(allocResponse.getAllocatedContainers());
rejectedReqs.addAll(allocResponse.getRejectedSchedulingRequests());
// Register node5 only after first allocate - so the initial placement
// for the large schedReq goes to some other node..
nm5.registerNode();
// kick the scheduler
while (allocCount < 11) {
nm1.nodeHeartbeat(true);
nm2.nodeHeartbeat(true);
nm3.nodeHeartbeat(true);
nm4.nodeHeartbeat(true);
nm5.nodeHeartbeat(true);
LOG.info("Waiting for containers to be created for app 1...");
sleep(1000);
allocResponse = am1.schedule();
allocatedContainers.addAll(allocResponse.getAllocatedContainers());
rejectedReqs.addAll(allocResponse.getRejectedSchedulingRequests());
allocCount++;
if (allocatedContainers.size() > 3) {
break;
}
}
assertEquals(4, allocatedContainers.size());
Set<NodeId> nodeIds = allocatedContainers.stream()
.map(x -> x.getNodeId()).collect(Collectors.toSet());
// Ensure unique nodes
assertEquals(4, nodeIds.size());
QueueMetrics metrics = rm.getResourceScheduler().getRootQueueMetrics();
// Verify Metrics
verifyMetrics(metrics, 15360, 19, 9216, 5, 5);
}
@Test
@Timeout(value = 300)
public void testPlacementRejection() throws Exception {
HashMap<NodeId, MockNM> nodes = new HashMap<>();
MockNM nm1 = new MockNM("h1:1234", 4096, rm.getResourceTrackerService());
nodes.put(nm1.getNodeId(), nm1);
MockNM nm2 = new MockNM("h2:1234", 4096, rm.getResourceTrackerService());
nodes.put(nm2.getNodeId(), nm2);
MockNM nm3 = new MockNM("h3:1234", 4096, rm.getResourceTrackerService());
nodes.put(nm3.getNodeId(), nm3);
MockNM nm4 = new MockNM("h4:1234", 4096, rm.getResourceTrackerService());
nodes.put(nm4.getNodeId(), nm4);
nm1.registerNode();
nm2.registerNode();
nm3.registerNode();
nm4.registerNode();
MockRMAppSubmissionData data =
MockRMAppSubmissionData.Builder.createWithMemory(1 * GB, rm)
.withAppName("app")
.withUser("user")
.withAcls(null)
.withQueue("default")
.withUnmanagedAM(false)
.build();
RMApp app1 = MockRMAppSubmitter.submit(rm, data);
// Containers with allocationTag 'foo' are restricted to 1 per NODE
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2,
Collections.singletonMap(
Collections.singleton("foo"),
PlacementConstraints.build(
PlacementConstraints.targetNotIn(NODE, allocationTag("foo")))
));
am1.addSchedulingRequest(
Arrays.asList(
schedulingRequest(1, 1, 1, 512, "foo"),
schedulingRequest(1, 2, 1, 512, "foo"),
schedulingRequest(1, 3, 1, 512, "foo"),
schedulingRequest(1, 4, 1, 512, "foo"),
// Ask for more containers than nodes
schedulingRequest(1, 5, 1, 512, "foo"))
);
AllocateResponse allocResponse = am1.schedule(); // send the request
List<Container> allocatedContainers = new ArrayList<>();
List<RejectedSchedulingRequest> rejectedReqs = new ArrayList<>();
int allocCount = 1;
allocatedContainers.addAll(allocResponse.getAllocatedContainers());
rejectedReqs.addAll(allocResponse.getRejectedSchedulingRequests());
// kick the scheduler
while (allocCount < 11) {
nm1.nodeHeartbeat(true);
nm2.nodeHeartbeat(true);
nm3.nodeHeartbeat(true);
nm4.nodeHeartbeat(true);
LOG.info("Waiting for containers to be created for app 1...");
sleep(1000);
allocResponse = am1.schedule();
allocatedContainers.addAll(allocResponse.getAllocatedContainers());
rejectedReqs.addAll(allocResponse.getRejectedSchedulingRequests());
allocCount++;
if (rejectedReqs.size() > 0 && allocatedContainers.size() > 3) {
break;
}
}
assertEquals(4, allocatedContainers.size());
Set<NodeId> nodeIds = allocatedContainers.stream()
.map(x -> x.getNodeId()).collect(Collectors.toSet());
// Ensure unique nodes
assertEquals(4, nodeIds.size());
RejectedSchedulingRequest rej = rejectedReqs.get(0);
assertEquals(COULD_NOT_PLACE_ON_NODE,
rej.getReason());
QueueMetrics metrics = rm.getResourceScheduler().getRootQueueMetrics();
// Verify Metrics
verifyMetrics(metrics, 11264, 11, 5120, 5, 5);
}
@Test
@Timeout(value = 300)
public void testAndOrPlacement() throws Exception {
HashMap<NodeId, MockNM> nodes = new HashMap<>();
MockNM nm1 = new MockNM("h1:1234", 40960, 100,
rm.getResourceTrackerService());
nodes.put(nm1.getNodeId(), nm1);
MockNM nm2 = new MockNM("h2:1234", 40960, 100,
rm.getResourceTrackerService());
nodes.put(nm2.getNodeId(), nm2);
MockNM nm3 = new MockNM("h3:1234", 40960, 100,
rm.getResourceTrackerService());
nodes.put(nm3.getNodeId(), nm3);
MockNM nm4 = new MockNM("h4:1234", 40960, 100,
rm.getResourceTrackerService());
nodes.put(nm4.getNodeId(), nm4);
nm1.registerNode();
nm2.registerNode();
nm3.registerNode();
nm4.registerNode();
MockRMAppSubmissionData data =
MockRMAppSubmissionData.Builder.createWithMemory(1 * GB, rm)
.withAppName("app")
.withUser("user")
.withAcls(null)
.withQueue("default")
.withUnmanagedAM(false)
.build();
RMApp app1 = MockRMAppSubmitter.submit(rm, data);
// Register app1 with following constraints
// 1) foo anti-affinity with foo on node
// 2) bar anti-affinity with foo on node AND maxCardinality = 2
// 3) moo affinity with foo OR bar
Map<Set<String>, PlacementConstraint> app1Constraints = new HashMap<>();
app1Constraints.put(Collections.singleton("foo"),
PlacementConstraints.build(
PlacementConstraints.targetNotIn(NODE, allocationTag("foo"))));
app1Constraints.put(Collections.singleton("bar"),
PlacementConstraints.build(
PlacementConstraints.and(
PlacementConstraints.targetNotIn(NODE, allocationTag("foo")),
PlacementConstraints.maxCardinality(NODE, 2, "bar"))));
app1Constraints.put(Collections.singleton("moo"),
PlacementConstraints.build(
PlacementConstraints.or(
PlacementConstraints.targetIn(NODE, allocationTag("foo")),
PlacementConstraints.targetIn(NODE, allocationTag("bar")))));
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2, app1Constraints);
// Allocates 3 foo containers on 3 different nodes,
// in anti-affinity fashion.
am1.addSchedulingRequest(
Arrays.asList(
schedulingRequest(1, 1, 1, 512, "foo"),
schedulingRequest(1, 2, 1, 512, "foo"),
schedulingRequest(1, 3, 1, 512, "foo")
));
List<Container> allocatedContainers = new ArrayList<>();
waitForContainerAllocation(nodes.values(), am1,
allocatedContainers, new ArrayList<>(), 3);
printTags(nodes.values(), rm.getRMContext().getAllocationTagsManager());
assertEquals(3, allocatedContainers.size());
/** Testing AND placement constraint**/
// Now allocates a bar container, as restricted by the AND constraint,
// bar could be only allocated to the node without foo
am1.addSchedulingRequest(
Arrays.asList(
schedulingRequest(1, 1, 1, 512, "bar")
));
allocatedContainers.clear();
waitForContainerAllocation(nodes.values(), am1,
allocatedContainers, new ArrayList<>(), 1);
printTags(nodes.values(), rm.getRMContext().getAllocationTagsManager());
assertEquals(1, allocatedContainers.size());
NodeId barNode = allocatedContainers.get(0).getNodeId();
// Sends another 3 bar request, 2 of them can be allocated
// as maxCardinality is 2, for placed containers, they should be all
// on the node where the last bar was placed.
allocatedContainers.clear();
List<RejectedSchedulingRequest> rejectedContainers = new ArrayList<>();
am1.addSchedulingRequest(
Arrays.asList(
schedulingRequest(1, 2, 1, 512, "bar"),
schedulingRequest(1, 3, 1, 512, "bar"),
schedulingRequest(1, 4, 1, 512, "bar")
));
waitForContainerAllocation(nodes.values(), am1,
allocatedContainers, rejectedContainers, 2);
printTags(nodes.values(), rm.getRMContext().getAllocationTagsManager());
assertEquals(2, allocatedContainers.size());
assertTrue(allocatedContainers.stream().allMatch(
container -> container.getNodeId().equals(barNode)));
// The third request could not be satisfied because it violates
// the cardinality constraint. Validate rejected request correctly
// capture this.
assertEquals(1, rejectedContainers.size());
assertEquals(COULD_NOT_PLACE_ON_NODE,
rejectedContainers.get(0).getReason());
/** Testing OR placement constraint**/
// Register one more NM for testing
MockNM nm5 = new MockNM("h5:1234", 4096, 100,
rm.getResourceTrackerService());
nodes.put(nm5.getNodeId(), nm5);
nm5.registerNode();
nm5.nodeHeartbeat(true);
List<SchedulingRequest> mooRequests = new ArrayList<>();
for (int i=5; i<25; i++) {
mooRequests.add(schedulingRequest(1, i, 1, 100, "moo"));
}
am1.addSchedulingRequest(mooRequests);
allocatedContainers.clear();
waitForContainerAllocation(nodes.values(), am1,
allocatedContainers, new ArrayList<>(), 20);
// All 20 containers should be allocated onto nodes besides nm5,
// because moo affinity to foo or bar which only exists on rest of nodes.
assertEquals(20, allocatedContainers.size());
for (Container mooContainer : allocatedContainers) {
// nm5 has no moo allocated containers.
assertFalse(mooContainer.getNodeId().equals(nm5.getNodeId()));
}
}
private static void printTags(Collection<MockNM> nodes,
AllocationTagsManager atm){
for (MockNM nm : nodes) {
Map<String, Long> nmTags = atm
.getAllocationTagsWithCount(nm.getNodeId());
StringBuilder sb = new StringBuilder();
if (nmTags != null) {
nmTags.forEach((tag, count) ->
sb.append(tag + "(" + count + "),"));
LOG.info("nm_" + nm.getNodeId() + ": " + sb.toString());
}
}
}
private static void waitForContainerAllocation(Collection<MockNM> nodes,
MockAM am, List<Container> allocatedContainers,
List<RejectedSchedulingRequest> rejectedRequests,
int containerNum) throws Exception {
int attemptCount = 10;
while (allocatedContainers.size() < containerNum && attemptCount > 0) {
for (MockNM node : nodes) {
node.nodeHeartbeat(true);
}
LOG.info("Waiting for containers to be created for "
+ am.getApplicationAttemptId().getApplicationId() + "...");
sleep(1000);
AllocateResponse allocResponse = am.schedule();
allocatedContainers.addAll(allocResponse.getAllocatedContainers());
rejectedRequests.addAll(allocResponse.getRejectedSchedulingRequests());
attemptCount--;
}
}
protected static SchedulingRequest schedulingRequest(
int priority, long allocReqId, int cores, int mem, String... tags) {
return schedulingRequest(priority, allocReqId, cores, mem,
ExecutionType.GUARANTEED, tags);
}
protected static SchedulingRequest schedulingRequest(
int priority, long allocReqId, int cores, int mem,
ExecutionType execType, String... tags) {
return SchedulingRequest.newBuilder()
.priority(Priority.newInstance(priority))
.allocationRequestId(allocReqId)
.allocationTags(new HashSet<>(Arrays.asList(tags)))
.executionType(ExecutionTypeRequest.newInstance(execType, true))
.resourceSizing(
ResourceSizing.newInstance(1, Resource.newInstance(mem, cores)))
.build();
}
private static void verifyMetrics(QueueMetrics metrics, long availableMB,
int availableVirtualCores, long allocatedMB,
int allocatedVirtualCores, int allocatedContainers) {
assertEquals(availableMB, metrics.getAvailableMB());
assertEquals(availableVirtualCores,
metrics.getAvailableVirtualCores());
assertEquals(allocatedMB, metrics.getAllocatedMB());
assertEquals(allocatedVirtualCores,
metrics.getAllocatedVirtualCores());
assertEquals(allocatedContainers, metrics.getAllocatedContainers());
}
}