TestCSAllocateCustomResource.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.thirdparty.com.google.common.collect.Maps;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.ClusterMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmissionData;
import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmitter;
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.resource.TestResourceProfiles;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ClusterNodeTracker;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.Map;
import java.util.Arrays;
import java.util.stream.Collectors;
import static org.apache.hadoop.yarn.api.records.ResourceInformation.FPGA_URI;
import static org.apache.hadoop.yarn.api.records.ResourceInformation.GPU_URI;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.MAXIMUM_ALLOCATION_MB;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
/**
* Test case for custom resource container allocation.
* for capacity scheduler
* */
public class TestCSAllocateCustomResource {
private static final String A_PATH = CapacitySchedulerConfiguration.ROOT + ".a";
private static final QueuePath ROOT = new QueuePath(CapacitySchedulerConfiguration.ROOT);
private static final QueuePath A = new QueuePath(A_PATH);
private YarnConfiguration conf;
private RMNodeLabelsManager mgr;
private File resourceTypesFile = null;
private final int g = 1024;
private ClusterNodeTracker<FiCaSchedulerNode> nodeTracker;
private ClusterMetrics metrics;
@BeforeEach
public void setUp() throws Exception {
conf = new YarnConfiguration();
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
mgr = new NullRMNodeLabelsManager();
mgr.init(conf);
}
@AfterEach
public void tearDown() {
if (resourceTypesFile != null && resourceTypesFile.exists()) {
resourceTypesFile.delete();
}
}
/**
* Test containers request custom resource.
* */
@Test
public void testCapacitySchedulerJobWhenConfigureCustomResourceType()
throws Exception {
// reset resource types
ResourceUtils.resetResourceTypes();
String resourceTypesFileName = "resource-types-test.xml";
File source = new File(
conf.getClassLoader().getResource(resourceTypesFileName).getFile());
resourceTypesFile = new File(source.getParent(), "resource-types.xml");
FileUtils.copyFile(source, resourceTypesFile);
CapacitySchedulerConfiguration newConf =
(CapacitySchedulerConfiguration) TestUtils
.getConfigurationWithMultipleQueues(conf);
newConf.setClass(CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS,
DominantResourceCalculator.class, ResourceCalculator.class);
newConf.set(QueuePrefixes.getQueuePrefix(A)
+ MAXIMUM_ALLOCATION_MB, "4096");
// We must set this to false to avoid MockRM init configuration with
// resource-types.xml by ResourceUtils.resetResourceTypes(conf);
newConf.setBoolean(TestResourceProfiles.TEST_CONF_RESET_RESOURCE_TYPES,
false);
//start RM
MockRM rm = new MockRM(newConf);
rm.start();
//register node with custom resource
String customResourceType = "yarn.io/gpu";
Resource nodeResource = Resources.createResource(4 * g, 4);
nodeResource.setResourceValue(customResourceType, 10);
MockNM nm1 = rm.registerNode("h1:1234", nodeResource);
// submit app
Resource amResource = Resources.createResource(1 * g, 1);
amResource.setResourceValue(customResourceType, 1);
RMApp app1 = MockRMAppSubmitter.submit(rm,
MockRMAppSubmissionData.Builder.createWithResource(amResource, rm)
.withAppName("app")
.withUser("user")
.withAcls(null)
.withQueue("a")
.build());
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
// am request containers
Resource cResource = Resources.createResource(1 * g, 1);
amResource.setResourceValue(customResourceType, 1);
am1.allocate("*", cResource, 2,
new ArrayList<ContainerId>(), null);
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
RMNode rmNode1 = rm.getRMContext().getRMNodes().get(nm1.getNodeId());
FiCaSchedulerApp schedulerApp1 =
cs.getApplicationAttempt(am1.getApplicationAttemptId());
// Do nm heartbeats 1 times, will allocate a container on nm1
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
rm.drainEvents();
assertEquals(2, schedulerApp1.getLiveContainers().size());
rm.close();
}
/**
* Test CS initialized with custom resource types loaded.
* */
@Test
public void testCapacitySchedulerInitWithCustomResourceType()
throws IOException {
// reset resource types
ResourceUtils.resetResourceTypes();
String resourceTypesFileName = "resource-types-test.xml";
File source = new File(
conf.getClassLoader().getResource(resourceTypesFileName).getFile());
resourceTypesFile = new File(source.getParent(), "resource-types.xml");
FileUtils.copyFile(source, resourceTypesFile);
CapacitySchedulerConfiguration newConf =
(CapacitySchedulerConfiguration) TestUtils
.getConfigurationWithMultipleQueues(conf);
newConf.setClass(CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS,
DominantResourceCalculator.class, ResourceCalculator.class);
//start RM
MockRM rm = new MockRM(newConf);
rm.start();
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
// Ensure the method can get custom resource type from
// CapacitySchedulerConfiguration
assertNotEquals(0,
ResourceUtils
.fetchMaximumAllocationFromConfig(cs.getConfiguration())
.getResourceValue("yarn.io/gpu"));
// Ensure custom resource type exists in queue's maximumAllocation
assertNotEquals(0,
cs.getMaximumResourceCapability("a")
.getResourceValue("yarn.io/gpu"));
rm.close();
}
@Test
public void testClusterMetricsWithGPU()
throws Exception {
metrics = ClusterMetrics.getMetrics();
// reset resource types
ResourceUtils.resetResourceTypes();
String resourceTypesFileName = "resource-types-test.xml";
File source = new File(
conf.getClassLoader().getResource(resourceTypesFileName).getFile());
resourceTypesFile = new File(source.getParent(), "resource-types.xml");
FileUtils.copyFile(source, resourceTypesFile);
CapacitySchedulerConfiguration newConf =
(CapacitySchedulerConfiguration) TestUtils
.getConfigurationWithMultipleQueues(conf);
newConf.setClass(CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS,
DominantResourceCalculator.class, ResourceCalculator.class);
//start RM
MockRM rm = new MockRM(newConf);
rm.start();
nodeTracker = new ClusterNodeTracker<>();
MockNodes.resetHostIds();
Resource nodeResource = Resource.newInstance(4096, 4,
Collections.singletonMap(GPU_URI, 4L));
List<RMNode> rmNodes =
MockNodes.newNodes(2, 4, nodeResource);
for (RMNode rmNode : rmNodes) {
nodeTracker.addNode(new FiCaSchedulerNode(rmNode, false));
}
// Check GPU inc related cluster metrics.
assertEquals(metrics.getCapabilityMB(), (4096 * 8), "Cluster Capability Memory incorrect");
assertEquals(metrics.getCapabilityVirtualCores(), 4 * 8, "Cluster Capability Vcores incorrect");
assertEquals((metrics.getCustomResourceCapability()
.get(GPU_URI)).longValue(), 4 * 8, "Cluster Capability GPUs incorrect");
for (RMNode rmNode : rmNodes) {
nodeTracker.removeNode(rmNode.getNodeID());
}
// Check GPU dec related cluster metrics.
assertEquals(metrics.getCapabilityMB(), 0, "Cluster Capability Memory incorrect");
assertEquals(metrics.getCapabilityVirtualCores(), 0, "Cluster Capability Vcores incorrect");
assertEquals((metrics.getCustomResourceCapability()
.get(GPU_URI)).longValue(), 0, "Cluster Capability GPUs incorrect");
ClusterMetrics.destroy();
rm.stop();
}
/**
* Test CS absolute conf with Custom resource type.
* */
@Test
public void testCapacitySchedulerAbsoluteConfWithCustomResourceType()
throws IOException {
// reset resource types
ResourceUtils.resetResourceTypes();
String resourceTypesFileName = "resource-types-test.xml";
File source = new File(
conf.getClassLoader().getResource(resourceTypesFileName).getFile());
resourceTypesFile = new File(source.getParent(), "resource-types.xml");
FileUtils.copyFile(source, resourceTypesFile);
CapacitySchedulerConfiguration newConf =
new CapacitySchedulerConfiguration(conf);
// Only memory vcores for first class.
Set<String> resourceTypes = Arrays.
stream(CapacitySchedulerConfiguration.
AbsoluteResourceType.values()).
map(value -> value.toString().toLowerCase()).
collect(Collectors.toSet());
Map<String, Long> valuesMin = Maps.newHashMap();
valuesMin.put(GPU_URI, 10L);
valuesMin.put(FPGA_URI, 10L);
valuesMin.put("testType", 10L);
Map<String, Long> valuesMax = Maps.newHashMap();
valuesMax.put(GPU_URI, 100L);
valuesMax.put(FPGA_URI, 100L);
valuesMax.put("testType", 100L);
Resource aMINRES =
Resource.newInstance(1000, 10, valuesMin);
Resource aMAXRES =
Resource.newInstance(1000, 10, valuesMax);
// Define top-level queues
newConf.setQueues(ROOT,
new String[] {"a", "b", "c"});
newConf.setMinimumResourceRequirement("", new QueuePath("root", "a"),
aMINRES);
newConf.setMaximumResourceRequirement("", new QueuePath("root", "a"),
aMAXRES);
newConf.setClass(CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS,
DominantResourceCalculator.class, ResourceCalculator.class);
//start RM
MockRM rm = new MockRM(newConf);
rm.start();
// Check the gpu resource conf is right.
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
assertEquals(aMINRES,
cs.getConfiguration().
getMinimumResourceRequirement("", A, resourceTypes));
assertEquals(aMAXRES,
cs.getConfiguration().
getMaximumResourceRequirement("", A, resourceTypes));
// Check the gpu resource of queue is right.
assertEquals(aMINRES, cs.getQueue("root.a").
getQueueResourceQuotas().getConfiguredMinResource());
assertEquals(aMAXRES, cs.getQueue("root.a").
getQueueResourceQuotas().getConfiguredMaxResource());
rm.close();
}
}