TestNodeQueueLoadMonitor.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import org.apache.hadoop.net.NodeBase;
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableMap;
import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.api.records.ResourceUtilization;
import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit;
import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* Unit tests for NodeQueueLoadMonitor.
*/
public class TestNodeQueueLoadMonitor {
// Extra resource type to test that all resource dimensions are considered
private static final String NETWORK_RESOURCE = "network";
private final static int DEFAULT_MAX_QUEUE_LENGTH = 200;
// Note: The following variables are private static resources
// re-initialized on each test because resource dimensions considered
// are initialized in a static method.
// Declaring them as static final will "lock-in" resource dimensions and
// disallow specification of a new resource dimension ("network") in tests.
private static Resource defaultResourceRequested;
private static Resource defaultCapacity;
static class FakeNodeId extends NodeId {
final String host;
final int port;
public FakeNodeId(String host, int port) {
this.host = host;
this.port = port;
}
@Override
public String getHost() {
return host;
}
@Override
public int getPort() {
return port;
}
@Override
protected void setHost(String host) {}
@Override
protected void setPort(int port) {}
@Override
protected void build() {}
@Override
public String toString() {
return host + ":" + port;
}
}
private static Resource newResourceInstance(long memory, int vCores) {
return newResourceInstance(memory, vCores, 0L);
}
private static Resource newResourceInstance(
final long memory, final int vCores, final long network) {
return Resource.newInstance(memory, vCores,
ImmutableMap.of(NETWORK_RESOURCE, network));
}
private static long getNetworkResourceValue(final Resource resource) {
return resource.getResourceValue(NETWORK_RESOURCE);
}
public static void addNewTypesToResources(String... resourceTypes) {
// Initialize resource map
Map<String, ResourceInformation> riMap = new HashMap<>();
// Initialize mandatory resources
riMap.put(ResourceInformation.MEMORY_URI, ResourceInformation.MEMORY_MB);
riMap.put(ResourceInformation.VCORES_URI, ResourceInformation.VCORES);
for (String newResource : resourceTypes) {
riMap.put(newResource, ResourceInformation
.newInstance(newResource, "", 0, ResourceTypes.COUNTABLE, 0,
Integer.MAX_VALUE));
}
ResourceUtils.initializeResourcesFromResourceInformationMap(riMap);
}
@BeforeAll
public static void classSetUp() {
addNewTypesToResources(NETWORK_RESOURCE);
defaultResourceRequested = newResourceInstance(128, 1, 1);
defaultCapacity = newResourceInstance(1024, 8, 1000);
}
@Test
public void testWaitTimeSort() {
NodeQueueLoadMonitor selector = new NodeQueueLoadMonitor(
NodeQueueLoadMonitor.LoadComparator.QUEUE_WAIT_TIME);
selector.updateNode(createRMNode("h1", 1, 15, 10));
selector.updateNode(createRMNode("h2", 2, 5, 10));
selector.updateNode(createRMNode("h3", 3, 10, 10));
selector.computeTask.run();
List<NodeId> nodeIds = selector.selectNodes();
assertEquals("h2:2", nodeIds.get(0).toString());
assertEquals("h3:3", nodeIds.get(1).toString());
assertEquals("h1:1", nodeIds.get(2).toString());
// Now update node3
selector.updateNode(createRMNode("h3", 3, 2, 10));
selector.computeTask.run();
nodeIds = selector.selectNodes();
assertEquals("h3:3", nodeIds.get(0).toString());
assertEquals("h2:2", nodeIds.get(1).toString());
assertEquals("h1:1", nodeIds.get(2).toString());
// Now send update with -1 wait time
selector.updateNode(createRMNode("h4", 4, -1, 10));
selector.computeTask.run();
nodeIds = selector.selectNodes();
// No change
assertEquals("h3:3", nodeIds.get(0).toString());
assertEquals("h2:2", nodeIds.get(1).toString());
assertEquals("h1:1", nodeIds.get(2).toString());
// Now update node 2 to DECOMMISSIONING state
selector
.updateNode(createRMNode("h2", 2, 1, 10, NodeState.DECOMMISSIONING));
selector.computeTask.run();
nodeIds = selector.selectNodes();
assertEquals(2, nodeIds.size());
assertEquals("h3:3", nodeIds.get(0).toString());
assertEquals("h1:1", nodeIds.get(1).toString());
// Now update node 2 back to RUNNING state
selector.updateNode(createRMNode("h2", 2, 1, 10, NodeState.RUNNING));
selector.computeTask.run();
nodeIds = selector.selectNodes();
assertEquals("h2:2", nodeIds.get(0).toString());
assertEquals("h3:3", nodeIds.get(1).toString());
assertEquals("h1:1", nodeIds.get(2).toString());
}
@Test
public void testQueueLengthSort() {
NodeQueueLoadMonitor selector = new NodeQueueLoadMonitor(
NodeQueueLoadMonitor.LoadComparator.QUEUE_LENGTH);
selector.updateNode(createRMNode("h1", 1, -1, 15));
selector.updateNode(createRMNode("h2", 2, -1, 5));
selector.updateNode(createRMNode("h3", 3, -1, 10));
selector.computeTask.run();
List<NodeId> nodeIds = selector.selectNodes();
System.out.println("1-> " + nodeIds);
assertEquals("h2:2", nodeIds.get(0).toString());
assertEquals("h3:3", nodeIds.get(1).toString());
assertEquals("h1:1", nodeIds.get(2).toString());
// Now update node3
selector.updateNode(createRMNode("h3", 3, -1, 2));
selector.computeTask.run();
nodeIds = selector.selectNodes();
System.out.println("2-> "+ nodeIds);
assertEquals("h3:3", nodeIds.get(0).toString());
assertEquals("h2:2", nodeIds.get(1).toString());
assertEquals("h1:1", nodeIds.get(2).toString());
// Now send update with -1 wait time but valid length
selector.updateNode(createRMNode("h4", 4, -1, 20));
selector.computeTask.run();
nodeIds = selector.selectNodes();
System.out.println("3-> "+ nodeIds);
// No change
assertEquals("h3:3", nodeIds.get(0).toString());
assertEquals("h2:2", nodeIds.get(1).toString());
assertEquals("h1:1", nodeIds.get(2).toString());
assertEquals("h4:4", nodeIds.get(3).toString());
// Now update h3 and fill its queue.
selector.updateNode(createRMNode("h3", 3, -1,
DEFAULT_MAX_QUEUE_LENGTH));
selector.computeTask.run();
nodeIds = selector.selectNodes();
System.out.println("4-> "+ nodeIds);
assertEquals(3, nodeIds.size());
assertEquals("h2:2", nodeIds.get(0).toString());
assertEquals("h1:1", nodeIds.get(1).toString());
assertEquals("h4:4", nodeIds.get(2).toString());
// Now update h2 to Decommissioning state
selector.updateNode(createRMNode("h2", 2, -1,
5, NodeState.DECOMMISSIONING));
selector.computeTask.run();
nodeIds = selector.selectNodes();
assertEquals(2, nodeIds.size());
assertEquals("h1:1", nodeIds.get(0).toString());
assertEquals("h4:4", nodeIds.get(1).toString());
// Now update h2 back to Running state
selector.updateNode(createRMNode("h2", 2, -1,
5, NodeState.RUNNING));
selector.computeTask.run();
nodeIds = selector.selectNodes();
assertEquals(3, nodeIds.size());
assertEquals("h2:2", nodeIds.get(0).toString());
assertEquals("h1:1", nodeIds.get(1).toString());
assertEquals("h4:4", nodeIds.get(2).toString());
}
@Test
public void testQueueLengthThenResourcesSort() {
NodeQueueLoadMonitor selector = new NodeQueueLoadMonitor(
NodeQueueLoadMonitor.LoadComparator.QUEUE_LENGTH_THEN_RESOURCES);
// Node and queue sizes were selected such that we can determine the
// order of these nodes in the selectNodes call deterministically
// h2 -> h1 -> h3 -> h4
selector.updateNode(createRMNode(
"h1", 1, -1, 0,
Resources.multiply(defaultResourceRequested, 3), defaultCapacity));
selector.updateNode(createRMNode(
"h2", 2, -1, 0,
Resources.multiply(defaultResourceRequested, 2), defaultCapacity));
selector.updateNode(createRMNode(
"h3", 3, -1, 5,
Resources.multiply(defaultResourceRequested, 3), defaultCapacity));
selector.updateNode(createRMNode(
"h4", 4, -1, 10,
Resources.multiply(defaultResourceRequested, 2), defaultCapacity));
selector.computeTask.run();
List<NodeId> nodeIds = selector.selectNodes();
assertEquals("h2:2", nodeIds.get(0).toString());
assertEquals("h1:1", nodeIds.get(1).toString());
assertEquals("h3:3", nodeIds.get(2).toString());
assertEquals("h4:4", nodeIds.get(3).toString());
// Now update node3
// node3 should now rank after node4 since it has the same queue length
// but less resources available
selector.updateNode(createRMNode(
"h3", 3, -1, 10,
Resources.multiply(defaultResourceRequested, 3), defaultCapacity));
selector.computeTask.run();
nodeIds = selector.selectNodes();
assertEquals("h2:2", nodeIds.get(0).toString());
assertEquals("h1:1", nodeIds.get(1).toString());
assertEquals("h4:4", nodeIds.get(2).toString());
assertEquals("h3:3", nodeIds.get(3).toString());
// Now update h3 and fill its queue -- it should no longer be available
selector.updateNode(createRMNode("h3", 3, -1,
DEFAULT_MAX_QUEUE_LENGTH));
selector.computeTask.run();
nodeIds = selector.selectNodes();
// h3 is queued up, so we should only have 3 nodes left
assertEquals(3, nodeIds.size());
assertEquals("h2:2", nodeIds.get(0).toString());
assertEquals("h1:1", nodeIds.get(1).toString());
assertEquals("h4:4", nodeIds.get(2).toString());
// Now update h2 to Decommissioning state
selector.updateNode(createRMNode("h2", 2, -1,
5, NodeState.DECOMMISSIONING));
selector.computeTask.run();
nodeIds = selector.selectNodes();
// h2 is decommissioned, and h3 is full, so we should only have 2 nodes
assertEquals(2, nodeIds.size());
assertEquals("h1:1", nodeIds.get(0).toString());
assertEquals("h4:4", nodeIds.get(1).toString());
// Now update h2 back to Running state
selector.updateNode(createRMNode(
"h2", 2, -1, 0,
Resources.multiply(defaultResourceRequested, 2), defaultCapacity));
selector.computeTask.run();
nodeIds = selector.selectNodes();
assertEquals(3, nodeIds.size());
assertEquals("h2:2", nodeIds.get(0).toString());
assertEquals("h1:1", nodeIds.get(1).toString());
assertEquals("h4:4", nodeIds.get(2).toString());
// Now update h2 to have a zero queue capacity.
// Make sure that here it is still in the pool.
selector.updateNode(createRMNode(
"h2", 2, -1, 0, 0,
Resources.multiply(defaultResourceRequested, 2),
defaultCapacity));
selector.computeTask.run();
nodeIds = selector.selectNodes();
assertEquals(3, nodeIds.size());
assertEquals("h2:2", nodeIds.get(0).toString());
assertEquals("h1:1", nodeIds.get(1).toString());
assertEquals("h4:4", nodeIds.get(2).toString());
// Now update h2 to have a positive queue length but a zero queue capacity.
// Make sure that here it is no longer in the pool.
// Need to first remove the node, because node capacity is not updated.
selector.removeNode(createRMNode(
"h2", 2, -1, 0, 0,
Resources.multiply(defaultResourceRequested, 2),
defaultCapacity));
selector.updateNode(createRMNode(
"h2", 2, -1, 1, 0,
Resources.multiply(defaultResourceRequested, 2),
defaultCapacity));
selector.computeTask.run();
nodeIds = selector.selectNodes();
assertEquals(2, nodeIds.size());
assertEquals("h1:1", nodeIds.get(0).toString());
assertEquals("h4:4", nodeIds.get(1).toString());
}
/**
* Tests that when using QUEUE_LENGTH_THEN_RESOURCES decrements the amount
* of resources on the internal {@link ClusterNode} representation.
*/
@Test
public void testQueueLengthThenResourcesDecrementsAvailable() {
NodeQueueLoadMonitor selector = new NodeQueueLoadMonitor(
NodeQueueLoadMonitor.LoadComparator.QUEUE_LENGTH_THEN_RESOURCES);
RMNode node = createRMNode("h1", 1, -1, 0);
selector.addNode(null, node);
selector.updateNode(node);
selector.updateSortedNodes();
ClusterNode clusterNode = selector.getClusterNodes().get(node.getNodeID());
assertEquals(Resources.none(),
clusterNode.getAllocatedResource());
// Has enough resources
RMNode selectedNode = selector.selectAnyNode(
Collections.emptySet(), defaultResourceRequested);
assertNotNull(selectedNode);
assertEquals(node.getNodeID(), selectedNode.getNodeID());
clusterNode = selector.getClusterNodes().get(node.getNodeID());
assertEquals(defaultResourceRequested,
clusterNode.getAllocatedResource());
// Does not have enough resources, but can queue
selectedNode = selector.selectAnyNode(
Collections.emptySet(), defaultCapacity);
assertNotNull(selectedNode);
assertEquals(node.getNodeID(), selectedNode.getNodeID());
clusterNode = selector.getClusterNodes().get(node.getNodeID());
assertEquals(1, clusterNode.getQueueLength());
// Does not have enough resources and cannot queue
selectedNode = selector.selectAnyNode(
Collections.emptySet(),
Resources.add(defaultResourceRequested, defaultCapacity));
assertNull(selectedNode);
}
@Test
public void testQueueLengthThenResourcesCapabilityChange() {
NodeQueueLoadMonitor selector = new NodeQueueLoadMonitor(
NodeQueueLoadMonitor.LoadComparator.QUEUE_LENGTH_THEN_RESOURCES);
// Node sizes were selected such that we can determine the
// order of these nodes in the selectNodes call deterministically
// h1 -> h2 -> h3 -> h4
selector.updateNode(createRMNode(
"h1", 1, -1, 0,
Resources.multiply(defaultResourceRequested, 1), defaultCapacity));
selector.updateNode(createRMNode(
"h2", 2, -1, 0,
Resources.multiply(defaultResourceRequested, 2), defaultCapacity));
selector.updateNode(createRMNode(
"h3", 3, -1, 0,
Resources.multiply(defaultResourceRequested, 3), defaultCapacity));
selector.updateNode(createRMNode(
"h4", 4, -1, 0,
Resources.multiply(defaultResourceRequested, 4), defaultCapacity));
selector.computeTask.run();
List<NodeId> nodeIds = selector.selectNodes();
assertEquals("h1:1", nodeIds.get(0).toString());
assertEquals("h2:2", nodeIds.get(1).toString());
assertEquals("h3:3", nodeIds.get(2).toString());
assertEquals("h4:4", nodeIds.get(3).toString());
// Now update node1 to have only defaultResourceRequested available
// by changing its capability to 2x defaultResourceReqeusted
// node1 should now rank last
selector.updateNode(createRMNode(
"h1", 1, -1, 0,
Resources.multiply(defaultResourceRequested, 1),
Resources.multiply(defaultResourceRequested, 2)));
selector.computeTask.run();
nodeIds = selector.selectNodes();
assertEquals("h2:2", nodeIds.get(0).toString());
assertEquals("h3:3", nodeIds.get(1).toString());
assertEquals("h4:4", nodeIds.get(2).toString());
assertEquals("h1:1", nodeIds.get(3).toString());
// Now update node2 to have no resources available
// by changing its capability to 1x defaultResourceReqeusted
// node2 should now rank last
selector.updateNode(createRMNode(
"h2", 2, -1, 0,
Resources.multiply(defaultResourceRequested, 1),
Resources.multiply(defaultResourceRequested, 1)));
selector.computeTask.run();
nodeIds = selector.selectNodes();
assertEquals("h3:3", nodeIds.get(0).toString());
assertEquals("h4:4", nodeIds.get(1).toString());
assertEquals("h1:1", nodeIds.get(2).toString());
assertEquals("h2:2", nodeIds.get(3).toString());
}
@Test
public void testContainerQueuingLimit() {
NodeQueueLoadMonitor selector = new NodeQueueLoadMonitor(
NodeQueueLoadMonitor.LoadComparator.QUEUE_LENGTH);
selector.updateNode(createRMNode("h1", 1, -1, 15));
selector.updateNode(createRMNode("h2", 2, -1, 5));
selector.updateNode(createRMNode("h3", 3, -1, 10));
// Test Mean Calculation
selector.initThresholdCalculator(0, 6, 100);
QueueLimitCalculator calculator = selector.getThresholdCalculator();
ContainerQueuingLimit containerQueuingLimit = calculator
.createContainerQueuingLimit();
assertEquals(6, containerQueuingLimit.getMaxQueueLength());
assertEquals(-1, containerQueuingLimit.getMaxQueueWaitTimeInMs());
selector.computeTask.run();
containerQueuingLimit = calculator.createContainerQueuingLimit();
assertEquals(10, containerQueuingLimit.getMaxQueueLength());
assertEquals(-1, containerQueuingLimit.getMaxQueueWaitTimeInMs());
// Test Limits do not exceed specified max
selector.updateNode(createRMNode("h1", 1, -1, 110));
selector.updateNode(createRMNode("h2", 2, -1, 120));
selector.updateNode(createRMNode("h3", 3, -1, 130));
selector.updateNode(createRMNode("h4", 4, -1, 140));
selector.updateNode(createRMNode("h5", 5, -1, 150));
selector.updateNode(createRMNode("h6", 6, -1, 160));
selector.computeTask.run();
containerQueuingLimit = calculator.createContainerQueuingLimit();
assertEquals(100, containerQueuingLimit.getMaxQueueLength());
// Test Limits do not go below specified min
selector.updateNode(createRMNode("h1", 1, -1, 1));
selector.updateNode(createRMNode("h2", 2, -1, 2));
selector.updateNode(createRMNode("h3", 3, -1, 3));
selector.updateNode(createRMNode("h4", 4, -1, 4));
selector.updateNode(createRMNode("h5", 5, -1, 5));
selector.updateNode(createRMNode("h6", 6, -1, 6));
selector.computeTask.run();
containerQueuingLimit = calculator.createContainerQueuingLimit();
assertEquals(6, containerQueuingLimit.getMaxQueueLength());
}
/**
* Tests selection of local node from NodeQueueLoadMonitor. This test covers
* selection of node based on queue limit and blacklisted nodes.
*/
@Test
public void testSelectLocalNode() {
NodeQueueLoadMonitor selector = new NodeQueueLoadMonitor(
NodeQueueLoadMonitor.LoadComparator.QUEUE_LENGTH);
RMNode h1 = createRMNode("h1", 1, -1, 2, 5);
RMNode h2 = createRMNode("h2", 2, -1, 5, 5);
RMNode h3 = createRMNode("h3", 3, -1, 4, 5);
selector.addNode(null, h1);
selector.addNode(null, h2);
selector.addNode(null, h3);
selector.updateNode(h1);
selector.updateNode(h2);
selector.updateNode(h3);
// basic test for selecting node which has queue length less
// than queue capacity.
Set<String> blacklist = new HashSet<>();
RMNode node = selector.selectLocalNode(
"h1", blacklist, defaultResourceRequested);
assertEquals("h1", node.getHostName());
// if node has been added to blacklist
blacklist.add("h1");
node = selector.selectLocalNode(
"h1", blacklist, defaultResourceRequested);
assertNull(node);
node = selector.selectLocalNode(
"h2", blacklist, defaultResourceRequested);
assertNull(node);
node = selector.selectLocalNode(
"h3", blacklist, defaultResourceRequested);
assertEquals("h3", node.getHostName());
}
/**
* Tests selection of rack local node from NodeQueueLoadMonitor. This test
* covers selection of node based on queue limit and blacklisted nodes.
*/
@Test
public void testSelectRackLocalNode() {
NodeQueueLoadMonitor selector = new NodeQueueLoadMonitor(
NodeQueueLoadMonitor.LoadComparator.QUEUE_LENGTH);
RMNode h1 = createRMNode("h1", 1, "rack1", -1, 2, 5);
RMNode h2 = createRMNode("h2", 2, "rack2", -1, 5, 5);
RMNode h3 = createRMNode("h3", 3, "rack2", -1, 4, 5);
selector.addNode(null, h1);
selector.addNode(null, h2);
selector.addNode(null, h3);
selector.updateNode(h1);
selector.updateNode(h2);
selector.updateNode(h3);
// basic test for selecting node which has queue length less
// than queue capacity.
Set<String> blacklist = new HashSet<>();
RMNode node = selector.selectRackLocalNode(
"rack1", blacklist, defaultResourceRequested);
assertEquals("h1", node.getHostName());
// if node has been added to blacklist
blacklist.add("h1");
node = selector.selectRackLocalNode(
"rack1", blacklist, defaultResourceRequested);
assertNull(node);
node = selector.selectRackLocalNode(
"rack2", blacklist, defaultResourceRequested);
assertEquals("h3", node.getHostName());
blacklist.add("h3");
node = selector.selectRackLocalNode(
"rack2", blacklist, defaultResourceRequested);
assertNull(node);
}
/**
* Tests selection of any node from NodeQueueLoadMonitor. This test
* covers selection of node based on queue limit and blacklisted nodes.
*/
@Test
public void testSelectAnyNode() {
NodeQueueLoadMonitor selector = new NodeQueueLoadMonitor(
NodeQueueLoadMonitor.LoadComparator.QUEUE_LENGTH);
RMNode h1 = createRMNode("h1", 1, "rack1", -1, 2, 5);
RMNode h2 = createRMNode("h2", 2, "rack2", -1, 5, 5);
RMNode h3 = createRMNode("h3", 3, "rack2", -1, 4, 10);
selector.addNode(null, h1);
selector.addNode(null, h2);
selector.addNode(null, h3);
selector.updateNode(h1);
selector.updateNode(h2);
selector.updateNode(h3);
selector.computeTask.run();
assertEquals(2, selector.getSortedNodes().size());
// basic test for selecting node which has queue length
// less than queue capacity.
Set<String> blacklist = new HashSet<>();
RMNode node = selector.selectAnyNode(blacklist, defaultResourceRequested);
assertTrue(node.getHostName().equals("h1") ||
node.getHostName().equals("h3"));
// if node has been added to blacklist
blacklist.add("h1");
node = selector.selectAnyNode(blacklist, defaultResourceRequested);
assertEquals("h3", node.getHostName());
blacklist.add("h3");
node = selector.selectAnyNode(blacklist, defaultResourceRequested);
assertNull(node);
}
@Test
public void testQueueLengthThenResourcesComparator() {
NodeQueueLoadMonitor.LoadComparator comparator =
NodeQueueLoadMonitor.LoadComparator.QUEUE_LENGTH_THEN_RESOURCES;
NodeId n1 = new FakeNodeId("n1", 5000);
NodeId n2 = new FakeNodeId("n2", 5000);
// Case 1: larger available cores should be ranked first
{
ClusterNode.Properties cn1Props =
ClusterNode.Properties.newInstance()
.setAllocatedResource(newResourceInstance(5, 5))
.setCapability(newResourceInstance(10, 10, 1000))
.setQueueLength(10);
ClusterNode cn1 = new ClusterNode(n1);
cn1.setProperties(cn1Props);
ClusterNode.Properties cn2Props =
ClusterNode.Properties.newInstance()
.setAllocatedResource(newResourceInstance(6, 6))
.setCapability(newResourceInstance(10, 10, 1000))
.setQueueLength(10);
ClusterNode cn2 = new ClusterNode(n2);
cn2.setProperties(cn2Props);
comparator.setClusterResource(
Resources.add(cn1.getCapability(), cn2.getCapability()));
assertTrue(comparator.compare(cn1, cn2) < 0);
}
// Case 2: Shorter queue should be ranked first before comparing resources
{
ClusterNode.Properties cn1Props =
ClusterNode.Properties.newInstance()
.setAllocatedResource(newResourceInstance(5, 5))
.setCapability(newResourceInstance(10, 10, 1000))
.setQueueLength(5);
ClusterNode cn1 = new ClusterNode(n1);
cn1.setProperties(cn1Props);
ClusterNode.Properties cn2Props =
ClusterNode.Properties.newInstance()
.setAllocatedResource(newResourceInstance(3, 3))
.setCapability(newResourceInstance(10, 10, 1000))
.setQueueLength(10);
ClusterNode cn2 = new ClusterNode(n2);
cn2.setProperties(cn2Props);
comparator.setClusterResource(
Resources.add(cn1.getCapability(), cn2.getCapability()));
assertTrue(comparator.compare(cn1, cn2) < 0);
}
// Case 3: No capability vs with capability,
// with capability should come first
{
ClusterNode.Properties cn1Props =
ClusterNode.Properties.newInstance()
.setAllocatedResource(Resources.none())
.setCapability(newResourceInstance(1, 1, 1000))
.setQueueLength(5);
ClusterNode cn1 = new ClusterNode(n1);
cn1.setProperties(cn1Props);
ClusterNode.Properties cn2Props =
ClusterNode.Properties.newInstance()
.setAllocatedResource(Resources.none())
.setCapability(Resources.none())
.setQueueLength(5);
ClusterNode cn2 = new ClusterNode(n2);
cn2.setProperties(cn2Props);
comparator.setClusterResource(
Resources.add(cn1.getCapability(), cn2.getCapability()));
assertTrue(comparator.compare(cn1, cn2) < 0);
}
// Case 4: Compare same values
{
ClusterNode.Properties cn1Props =
ClusterNode.Properties.newInstance()
.setAllocatedResource(newResourceInstance(5, 5))
.setCapability(newResourceInstance(10, 10, 1000))
.setQueueLength(10);
ClusterNode cn1 = new ClusterNode(n1);
cn1.setProperties(cn1Props);
ClusterNode.Properties cn2Props =
ClusterNode.Properties.newInstance()
.setAllocatedResource(newResourceInstance(5, 5))
.setCapability(newResourceInstance(10, 10, 1000))
.setQueueLength(10);
ClusterNode cn2 = new ClusterNode(n2);
cn2.setProperties(cn2Props);
comparator.setClusterResource(
Resources.add(cn1.getCapability(), cn2.getCapability()));
assertEquals(0, comparator.compare(cn1, cn2));
}
// Case 5: If ratio is the same, compare raw values
// by VCores first, then memory
{
ClusterNode.Properties cn1Props =
ClusterNode.Properties.newInstance()
.setAllocatedResource(newResourceInstance(6, 5))
.setCapability(newResourceInstance(10, 10, 1000))
.setQueueLength(10);
ClusterNode cn1 = new ClusterNode(n1);
cn1.setProperties(cn1Props);
ClusterNode.Properties cn2Props =
ClusterNode.Properties.newInstance()
.setAllocatedResource(newResourceInstance(5, 6))
.setCapability(newResourceInstance(10, 10, 1000))
.setQueueLength(10);
ClusterNode cn2 = new ClusterNode(n2);
cn2.setProperties(cn2Props);
comparator.setClusterResource(
Resources.add(cn1.getCapability(), cn2.getCapability()));
// Both are 60% allocated, but CN1 has 5 avail VCores, CN2 only has 4
assertTrue(comparator.compare(cn1, cn2) < 0);
}
// Case 6: by VCores absolute value
{
ClusterNode.Properties cn1Props =
ClusterNode.Properties.newInstance()
.setAllocatedResource(newResourceInstance(5, 5))
.setCapability(newResourceInstance(10, 10, 1000))
.setQueueLength(10);
ClusterNode cn1 = new ClusterNode(n1);
cn1.setProperties(cn1Props);
ClusterNode.Properties cn2Props =
ClusterNode.Properties.newInstance()
.setAllocatedResource(newResourceInstance(5, 6))
.setCapability(newResourceInstance(10, 12, 1000))
.setQueueLength(10);
ClusterNode cn2 = new ClusterNode(n2);
cn2.setProperties(cn2Props);
comparator.setClusterResource(
Resources.add(cn1.getCapability(), cn2.getCapability()));
assertTrue(comparator.compare(cn2, cn1) < 0);
}
// Case 7: by memory absolute value
{
ClusterNode.Properties cn1Props =
ClusterNode.Properties.newInstance()
.setAllocatedResource(newResourceInstance(5, 5))
.setCapability(newResourceInstance(10, 10, 1000))
.setQueueLength(10);
ClusterNode cn1 = new ClusterNode(n1);
cn1.setProperties(cn1Props);
ClusterNode.Properties cn2Props =
ClusterNode.Properties.newInstance()
.setAllocatedResource(newResourceInstance(6, 5))
.setCapability(newResourceInstance(12, 10, 1000))
.setQueueLength(10);
ClusterNode cn2 = new ClusterNode(n2);
cn2.setProperties(cn2Props);
comparator.setClusterResource(
Resources.add(cn1.getCapability(), cn2.getCapability()));
assertTrue(comparator.compare(cn2, cn1) < 0);
}
// Case 8: Memory should be more constraining in the overall cluster,
// so rank the node with less allocated memory first
{
ClusterNode.Properties cn1Props =
ClusterNode.Properties.newInstance()
.setAllocatedResource(newResourceInstance(5, 11))
.setCapability(newResourceInstance(10, 100, 1000))
.setQueueLength(10);
ClusterNode cn1 = new ClusterNode(n1);
cn1.setProperties(cn1Props);
ClusterNode.Properties cn2Props =
ClusterNode.Properties.newInstance()
.setAllocatedResource(newResourceInstance(6, 10))
.setCapability(newResourceInstance(10, 100, 1000))
.setQueueLength(10);
ClusterNode cn2 = new ClusterNode(n2);
cn2.setProperties(cn2Props);
comparator.setClusterResource(
Resources.add(cn1.getCapability(), cn2.getCapability()));
assertTrue(comparator.compare(cn1, cn2) < 0);
}
}
private RMNode createRMNode(String host, int port,
int waitTime, int queueLength) {
return createRMNode(host, port, waitTime, queueLength,
DEFAULT_MAX_QUEUE_LENGTH);
}
private RMNode createRMNode(String host, int port,
int waitTime, int queueLength, NodeState state) {
return createRMNode(host, port, "default", waitTime, queueLength,
DEFAULT_MAX_QUEUE_LENGTH, state);
}
private RMNode createRMNode(String host, int port,
int waitTime, int queueLength, int queueCapacity) {
return createRMNode(host, port, "default", waitTime, queueLength,
queueCapacity, NodeState.RUNNING);
}
private RMNode createRMNode(String host, int port, String rack,
int waitTime, int queueLength, int queueCapacity) {
return createRMNode(host, port, rack, waitTime, queueLength, queueCapacity,
NodeState.RUNNING);
}
private RMNode createRMNode(String host, int port, String rack,
int waitTime, int queueLength, int queueCapacity, NodeState state) {
return createRMNode(host, port, rack, waitTime, queueLength, queueCapacity,
state, Resources.none(), defaultCapacity);
}
private RMNode createRMNode(
String host, int port, int waitTime, int queueLength,
Resource allocatedResource, Resource nodeResource) {
return createRMNode(host, port, waitTime, queueLength,
DEFAULT_MAX_QUEUE_LENGTH, allocatedResource, nodeResource);
}
private RMNode createRMNode(
String host, int port, int waitTime, int queueLength, int queueCapacity,
Resource allocatedResource, Resource nodeResource) {
return createRMNode(host, port, "default", waitTime, queueLength,
queueCapacity, NodeState.RUNNING, allocatedResource, nodeResource);
}
@SuppressWarnings("parameternumber")
private RMNode createRMNode(String host, int port, String rack,
int waitTime, int queueLength, int queueCapacity, NodeState state,
Resource allocatedResource, Resource nodeResource) {
RMNode node1 = Mockito.mock(RMNode.class);
NodeId nID1 = new FakeNodeId(host, port);
Mockito.when(node1.getHostName()).thenReturn(host);
Mockito.when(node1.getRackName()).thenReturn(rack);
Mockito.when(node1.getNode()).thenReturn(new NodeBase("/" + host));
Mockito.when(node1.getNodeID()).thenReturn(nID1);
Mockito.when(node1.getState()).thenReturn(state);
Mockito.when(node1.getTotalCapability()).thenReturn(nodeResource);
Mockito.when(node1.getNodeUtilization()).thenReturn(
ResourceUtilization.newInstance(0, 0, 0));
Mockito.when(node1.getAllocatedContainerResource()).thenReturn(
allocatedResource);
OpportunisticContainersStatus status1 =
Mockito.mock(OpportunisticContainersStatus.class);
Mockito.when(status1.getEstimatedQueueWaitTime())
.thenReturn(waitTime);
Mockito.when(status1.getWaitQueueLength())
.thenReturn(queueLength);
Mockito.when(status1.getOpportQueueCapacity())
.thenReturn(queueCapacity);
Mockito.when(node1.getOpportunisticContainersStatus()).thenReturn(status1);
return node1;
}
}