TestAbstractYarnScheduler.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import static org.apache.hadoop.yarn.server.resourcemanager.MockNM.createMockNodeStatus;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils.createResourceRequest;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Lists;
import org.apache.hadoop.util.Sets;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.*;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmissionData;
import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmitter;
import org.apache.hadoop.yarn.server.resourcemanager.NMLivelinessMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.NodesListManager;
import org.apache.hadoop.yarn.server.resourcemanager.ParameterizedSchedulerTestBase;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
@SuppressWarnings("unchecked")
public class TestAbstractYarnScheduler extends ParameterizedSchedulerTestBase {
public void initTestAbstractYarnScheduler(SchedulerType type) throws IOException {
initParameterizedSchedulerTestBase(type);
}
@ParameterizedTest(name = "{0}")
@MethodSource("getParameters")
public void testMaximimumAllocationMemory(SchedulerType type) throws Exception {
initTestAbstractYarnScheduler(type);
final int node1MaxMemory = 15 * 1024;
final int node2MaxMemory = 5 * 1024;
final int node3MaxMemory = 6 * 1024;
final int configuredMaxMemory = 10 * 1024;
YarnConfiguration conf = getConf();
conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
configuredMaxMemory);
conf.setLong(
YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS,
1000 * 1000);
MockRM rm = new MockRM(conf);
try {
rm.start();
testMaximumAllocationMemoryHelper(
rm.getResourceScheduler(),
node1MaxMemory, node2MaxMemory, node3MaxMemory,
configuredMaxMemory, configuredMaxMemory, configuredMaxMemory,
configuredMaxMemory, configuredMaxMemory, configuredMaxMemory);
} finally {
rm.stop();
}
conf.setLong(
YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS,
0);
rm = new MockRM(conf);
try {
rm.start();
testMaximumAllocationMemoryHelper(
rm.getResourceScheduler(),
node1MaxMemory, node2MaxMemory, node3MaxMemory,
configuredMaxMemory, configuredMaxMemory, configuredMaxMemory,
node2MaxMemory, node3MaxMemory, node2MaxMemory);
} finally {
rm.stop();
}
}
private void testMaximumAllocationMemoryHelper(
YarnScheduler scheduler,
final int node1MaxMemory, final int node2MaxMemory,
final int node3MaxMemory, final int... expectedMaxMemory)
throws Exception {
assertEquals(6, expectedMaxMemory.length);
assertEquals(0, scheduler.getNumClusterNodes());
long maxMemory = scheduler.getMaximumResourceCapability().getMemorySize();
assertEquals(expectedMaxMemory[0], maxMemory);
RMNode node1 = MockNodes.newNodeInfo(
0, Resources.createResource(node1MaxMemory), 1, "127.0.0.2");
scheduler.handle(new NodeAddedSchedulerEvent(node1));
assertEquals(1, scheduler.getNumClusterNodes());
maxMemory = scheduler.getMaximumResourceCapability().getMemorySize();
assertEquals(expectedMaxMemory[1], maxMemory);
scheduler.handle(new NodeRemovedSchedulerEvent(node1));
assertEquals(0, scheduler.getNumClusterNodes());
maxMemory = scheduler.getMaximumResourceCapability().getMemorySize();
assertEquals(expectedMaxMemory[2], maxMemory);
RMNode node2 = MockNodes.newNodeInfo(
0, Resources.createResource(node2MaxMemory), 2, "127.0.0.3");
scheduler.handle(new NodeAddedSchedulerEvent(node2));
assertEquals(1, scheduler.getNumClusterNodes());
maxMemory = scheduler.getMaximumResourceCapability().getMemorySize();
assertEquals(expectedMaxMemory[3], maxMemory);
RMNode node3 = MockNodes.newNodeInfo(
0, Resources.createResource(node3MaxMemory), 3, "127.0.0.4");
scheduler.handle(new NodeAddedSchedulerEvent(node3));
assertEquals(2, scheduler.getNumClusterNodes());
maxMemory = scheduler.getMaximumResourceCapability().getMemorySize();
assertEquals(expectedMaxMemory[4], maxMemory);
scheduler.handle(new NodeRemovedSchedulerEvent(node3));
assertEquals(1, scheduler.getNumClusterNodes());
maxMemory = scheduler.getMaximumResourceCapability().getMemorySize();
assertEquals(expectedMaxMemory[5], maxMemory);
scheduler.handle(new NodeRemovedSchedulerEvent(node2));
assertEquals(0, scheduler.getNumClusterNodes());
}
@ParameterizedTest(name = "{0}")
@MethodSource("getParameters")
public void testMaximimumAllocationVCores(SchedulerType type) throws Exception {
initTestAbstractYarnScheduler(type);
final int node1MaxVCores = 15;
final int node2MaxVCores = 5;
final int node3MaxVCores = 6;
final int configuredMaxVCores = 10;
YarnConfiguration conf = getConf();
conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
configuredMaxVCores);
conf.setLong(
YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS,
1000 * 1000);
MockRM rm = new MockRM(conf);
try {
rm.start();
testMaximumAllocationVCoresHelper(
rm.getResourceScheduler(),
node1MaxVCores, node2MaxVCores, node3MaxVCores,
configuredMaxVCores, configuredMaxVCores, configuredMaxVCores,
configuredMaxVCores, configuredMaxVCores, configuredMaxVCores);
} finally {
rm.stop();
}
conf.setLong(
YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS,
0);
rm = new MockRM(conf);
try {
rm.start();
testMaximumAllocationVCoresHelper(
rm.getResourceScheduler(),
node1MaxVCores, node2MaxVCores, node3MaxVCores,
configuredMaxVCores, configuredMaxVCores, configuredMaxVCores,
node2MaxVCores, node3MaxVCores, node2MaxVCores);
} finally {
rm.stop();
}
}
private void testMaximumAllocationVCoresHelper(
YarnScheduler scheduler,
final int node1MaxVCores, final int node2MaxVCores,
final int node3MaxVCores, final int... expectedMaxVCores)
throws Exception {
assertEquals(6, expectedMaxVCores.length);
assertEquals(0, scheduler.getNumClusterNodes());
int maxVCores = scheduler.getMaximumResourceCapability().getVirtualCores();
assertEquals(expectedMaxVCores[0], maxVCores);
RMNode node1 = MockNodes.newNodeInfo(
0, Resources.createResource(1024, node1MaxVCores), 1, "127.0.0.2");
scheduler.handle(new NodeAddedSchedulerEvent(node1));
assertEquals(1, scheduler.getNumClusterNodes());
maxVCores = scheduler.getMaximumResourceCapability().getVirtualCores();
assertEquals(expectedMaxVCores[1], maxVCores);
scheduler.handle(new NodeRemovedSchedulerEvent(node1));
assertEquals(0, scheduler.getNumClusterNodes());
maxVCores = scheduler.getMaximumResourceCapability().getVirtualCores();
assertEquals(expectedMaxVCores[2], maxVCores);
RMNode node2 = MockNodes.newNodeInfo(
0, Resources.createResource(1024, node2MaxVCores), 2, "127.0.0.3");
scheduler.handle(new NodeAddedSchedulerEvent(node2));
assertEquals(1, scheduler.getNumClusterNodes());
maxVCores = scheduler.getMaximumResourceCapability().getVirtualCores();
assertEquals(expectedMaxVCores[3], maxVCores);
RMNode node3 = MockNodes.newNodeInfo(
0, Resources.createResource(1024, node3MaxVCores), 3, "127.0.0.4");
scheduler.handle(new NodeAddedSchedulerEvent(node3));
assertEquals(2, scheduler.getNumClusterNodes());
maxVCores = scheduler.getMaximumResourceCapability().getVirtualCores();
assertEquals(expectedMaxVCores[4], maxVCores);
scheduler.handle(new NodeRemovedSchedulerEvent(node3));
assertEquals(1, scheduler.getNumClusterNodes());
maxVCores = scheduler.getMaximumResourceCapability().getVirtualCores();
assertEquals(expectedMaxVCores[5], maxVCores);
scheduler.handle(new NodeRemovedSchedulerEvent(node2));
assertEquals(0, scheduler.getNumClusterNodes());
}
/**
* Test for testing autocorrect container allocation feature.
*/
@ParameterizedTest(name = "{0}")
@MethodSource("getParameters")
public void testAutoCorrectContainerAllocation(SchedulerType type) throws IOException {
initTestAbstractYarnScheduler(type);
Configuration conf = new Configuration(getConf());
conf.setBoolean(YarnConfiguration.RM_SCHEDULER_AUTOCORRECT_CONTAINER_ALLOCATION, true);
conf.setBoolean("yarn.scheduler.capacity.root.auto-create-child-queue.enabled",
true);
MockRM rm = new MockRM(conf);
rm.start();
AbstractYarnScheduler scheduler = (AbstractYarnScheduler) rm.getResourceScheduler();
String host = "127.0.0.1";
RMNode node =
MockNodes.newNodeInfo(0, MockNodes.newResource(4 * 1024), 1, host);
scheduler.handle(new NodeAddedSchedulerEvent(node));
//add app begin
ApplicationId appId1 = BuilderUtils.newApplicationId(100, 1);
ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(
appId1, 1);
RMAppAttemptMetrics attemptMetric1 =
new RMAppAttemptMetrics(appAttemptId, rm.getRMContext());
RMAppImpl app1 = mock(RMAppImpl.class);
when(app1.getApplicationId()).thenReturn(appId1);
RMAppAttemptImpl attempt1 = mock(RMAppAttemptImpl.class);
Container container = mock(Container.class);
when(attempt1.getMasterContainer()).thenReturn(container);
ApplicationSubmissionContext submissionContext = mock(
ApplicationSubmissionContext.class);
when(attempt1.getSubmissionContext()).thenReturn(submissionContext);
when(attempt1.getAppAttemptId()).thenReturn(appAttemptId);
when(attempt1.getRMAppAttemptMetrics()).thenReturn(attemptMetric1);
when(app1.getCurrentAppAttempt()).thenReturn(attempt1);
rm.getRMContext().getRMApps().put(appId1, app1);
ApplicationPlacementContext apc = new ApplicationPlacementContext("user",
"root");
SchedulerEvent addAppEvent1 =
new AppAddedSchedulerEvent(appId1, "user", "user", apc);
scheduler.handle(addAppEvent1);
SchedulerEvent addAttemptEvent1 =
new AppAttemptAddedSchedulerEvent(appAttemptId, false);
scheduler.handle(addAttemptEvent1);
SchedulerApplicationAttempt application = scheduler.getApplicationAttempt(appAttemptId);
SchedulerNode schedulerNode = scheduler.getSchedulerNode(node.getNodeID());
Priority priority = Priority.newInstance(0);
NodeId nodeId = NodeId.newInstance("foo.bar.org", 1234);
// test different container ask and newly allocated container.
testContainerAskAndNewlyAllocatedContainerZero(scheduler, application, priority);
testContainerAskAndNewlyAllocatedContainerOne(scheduler, application, schedulerNode,
nodeId, priority, app1.getCurrentAppAttempt().getAppAttemptId());
testContainerAskZeroAndNewlyAllocatedContainerOne(scheduler, application, schedulerNode,
nodeId, priority, app1.getCurrentAppAttempt().getAppAttemptId());
testContainerAskFourAndNewlyAllocatedContainerEight(scheduler, application, schedulerNode,
nodeId, priority, app1.getCurrentAppAttempt().getAppAttemptId());
testContainerAskFourAndNewlyAllocatedContainerSix(scheduler, application, schedulerNode,
nodeId, priority, app1.getCurrentAppAttempt().getAppAttemptId());
}
/**
* Creates a mock instance of {@link RMContainer} with the specified parameters.
*
* @param containerId The ID of the container
* @param nodeId The NodeId of the node where the container is allocated
* @param appAttemptId The ApplicationAttemptId of the application attempt
* @param allocationId The allocation ID of the container
* @param memory The amount of memory (in MB) requested for the container
* @param priority The priority of the container request
* @param executionType The execution type of the container request
* @return A mock instance of RMContainer with the specified parameters
*/
private RMContainer createMockRMContainer(int containerId, NodeId nodeId,
ApplicationAttemptId appAttemptId, long allocationId, int memory,
Priority priority, ExecutionType executionType) {
// Create a mock instance of Container
Container container = mock(Container.class);
// Mock the Container instance with the specified parameters
when(container.getResource()).thenReturn(Resource.newInstance(memory, 1));
when(container.getPriority()).thenReturn(priority);
when(container.getId()).thenReturn(ContainerId.newContainerId(appAttemptId, containerId));
when(container.getNodeId()).thenReturn(nodeId);
when(container.getAllocationRequestId()).thenReturn(allocationId);
when(container.getExecutionType()).thenReturn(executionType);
when(container.getContainerToken()).thenReturn(Token.newInstance(new byte[0], "kind",
new byte[0], "service"));
// Create a mock instance of RMContainerImpl
RMContainer rmContainer = mock(RMContainerImpl.class);
// Set up the behavior of the mock RMContainer
when(rmContainer.getContainer()).thenReturn(container);
when(rmContainer.getContainerId()).thenReturn(
ContainerId.newContainerId(appAttemptId, containerId));
return rmContainer;
}
/**
* Tests the behavior when the container ask is 1 and there are no newly allocated containers.
*
* @param scheduler The AbstractYarnScheduler instance to test.
* @param application The SchedulerApplicationAttempt instance representing the application.
* @param priority The priority of the resource request.
*/
private void testContainerAskAndNewlyAllocatedContainerZero(AbstractYarnScheduler scheduler,
SchedulerApplicationAttempt application, Priority priority) {
// Create a resource request with 1 container, 1024 MB memory, and GUARANTEED execution type
ResourceRequest resourceRequest = createResourceRequest(1024, 1, 1,
priority, 0,
ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED), ResourceRequest.ANY);
// Create a list with the resource request
List<ResourceRequest> containerAsk = new ArrayList<>();
containerAsk.add(resourceRequest);
// Call the autoCorrectContainerAllocation method
scheduler.autoCorrectContainerAllocation(containerAsk, application);
// Assert that the container ask remains unchanged (1 container)
assertEquals(1, containerAsk.get(0).getNumContainers());
// Assert that there are no newly allocated containers
assertEquals(0, application.pullNewlyAllocatedContainers().size());
}
/**
* Tests the behavior when the container ask is 1 and there is one newly allocated container.
*
* @param scheduler The AbstractYarnScheduler instance to test
* @param application The SchedulerApplicationAttempt instance representing the application
* @param schedulerNode The SchedulerNode instance representing the node
* @param nodeId The NodeId of the node
* @param priority The priority of the resource request
* @param appAttemptId The ApplicationAttemptId of the application attempt
*/
private void testContainerAskAndNewlyAllocatedContainerOne(AbstractYarnScheduler scheduler,
SchedulerApplicationAttempt application,
SchedulerNode schedulerNode, NodeId nodeId,
Priority priority, ApplicationAttemptId appAttemptId) {
// Create a resource request with 1 container, 1024 MB memory, and GUARANTEED execution type
ResourceRequest resourceRequest = createResourceRequest(1024, 1, 1,
priority, 0L, ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED),
ResourceRequest.ANY);
List<ResourceRequest> containerAsk = new ArrayList<>();
containerAsk.add(resourceRequest);
// Create an RMContainer with the specified parameters
RMContainer rmContainer = createMockRMContainer(1, nodeId, appAttemptId,
0L, 1024, priority, ExecutionType.GUARANTEED);
// Add the RMContainer to the newly allocated containers of the application
application.addToNewlyAllocatedContainers(schedulerNode, rmContainer);
// Call the autoCorrectContainerAllocation method
scheduler.autoCorrectContainerAllocation(containerAsk, application);
// Assert that the container ask is updated to 0
assertEquals(0, containerAsk.get(0).getNumContainers());
// Assert that there is one newly allocated container
assertEquals(1, application.pullNewlyAllocatedContainers().size());
}
/**
* Tests the behavior when the container ask is 0 and there is one newly allocated container.
*
* @param scheduler The AbstractYarnScheduler instance to test
* @param application The SchedulerApplicationAttempt instance representing the application
* @param schedulerNode The SchedulerNode instance representing the node
* @param nodeId The NodeId of the node
* @param priority The priority of the resource request
* @param appAttemptId The ApplicationAttemptId of the application attempt
*/
private void testContainerAskZeroAndNewlyAllocatedContainerOne(AbstractYarnScheduler scheduler,
SchedulerApplicationAttempt application, SchedulerNode schedulerNode, NodeId nodeId,
Priority priority, ApplicationAttemptId appAttemptId) {
// Create a resource request with 0 containers, 1024 MB memory, and GUARANTEED execution type
ResourceRequest resourceRequest = createResourceRequest(1024, 1,
0, priority, 0L,
ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED), ResourceRequest.ANY);
List<ResourceRequest> containerAsk = new ArrayList<>();
containerAsk.add(resourceRequest);
// Create an RMContainer with the specified parameters
RMContainer rmContainer1 = createMockRMContainer(1, nodeId, appAttemptId,
0L, 1024, priority, ExecutionType.GUARANTEED);
// Add the RMContainer to the newly allocated containers of the application
application.addToNewlyAllocatedContainers(schedulerNode, rmContainer1);
// Call the autoCorrectContainerAllocation method
scheduler.autoCorrectContainerAllocation(containerAsk, application);
// Assert that the container ask remains 0
assertEquals(0, resourceRequest.getNumContainers());
// Assert that there are no newly allocated containers
assertEquals(0, application.pullNewlyAllocatedContainers().size());
}
/**
* Tests the behavior when the container ask consists of four unique resource requests
* and there are eight newly allocated containers (two containers for each resource request type).
*
* @param scheduler The AbstractYarnScheduler instance to test
* @param application The SchedulerApplicationAttempt instance representing the application
* @param schedulerNode The SchedulerNode instance representing the node
* @param nodeId The NodeId of the node
* @param priority The priority of the resource requests
* @param appAttemptId The ApplicationAttemptId of the application attempt
*/
private void testContainerAskFourAndNewlyAllocatedContainerEight(AbstractYarnScheduler scheduler,
SchedulerApplicationAttempt application, SchedulerNode schedulerNode,
NodeId nodeId, Priority priority, ApplicationAttemptId appAttemptId) {
// Create four unique resource requests
ResourceRequest resourceRequest1 = createResourceRequest(1024, 1, 1,
priority, 0L,
ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED), ResourceRequest.ANY);
ResourceRequest resourceRequest2 = createResourceRequest(2048, 1, 1,
priority, 0L,
ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED), ResourceRequest.ANY);
ResourceRequest resourceRequest3 = createResourceRequest(1024, 1, 1,
priority, 1L,
ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED), ResourceRequest.ANY);
ResourceRequest resourceRequest4 = createResourceRequest(1024, 1, 1,
priority, 0L,
ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC), ResourceRequest.ANY);
// Add the resource requests to a list
List<ResourceRequest> ask4 = new ArrayList<>();
ask4.add(resourceRequest1);
ask4.add(resourceRequest2);
ask4.add(resourceRequest3);
ask4.add(resourceRequest4);
// Create eight RMContainers (two for each resource request type)
RMContainer rmContainer1 = createMockRMContainer(1, nodeId, appAttemptId,
0L, 1024, priority, ExecutionType.GUARANTEED);
RMContainer rmContainer2 = createMockRMContainer(2, nodeId, appAttemptId,
0L, 1024, priority, ExecutionType.GUARANTEED);
RMContainer rmContainer3 = createMockRMContainer(3, nodeId, appAttemptId,
0L, 2048, priority, ExecutionType.GUARANTEED);
RMContainer rmContainer4 = createMockRMContainer(4, nodeId, appAttemptId,
0L, 2048, priority, ExecutionType.GUARANTEED);
RMContainer rmContainer5 = createMockRMContainer(5, nodeId, appAttemptId,
1L, 1024, priority, ExecutionType.GUARANTEED);
RMContainer rmContainer6 = createMockRMContainer(6, nodeId, appAttemptId,
1L, 1024, priority, ExecutionType.GUARANTEED);
RMContainer rmContainer7 = createMockRMContainer(7, nodeId, appAttemptId,
0L, 1024, priority, ExecutionType.OPPORTUNISTIC);
RMContainer rmContainer8 = createMockRMContainer(8, nodeId, appAttemptId,
0L, 1024, priority, ExecutionType.OPPORTUNISTIC);
// Add the RMContainers to the newly allocated containers of the application
application.addToNewlyAllocatedContainers(schedulerNode, rmContainer1);
application.addToNewlyAllocatedContainers(schedulerNode, rmContainer2);
application.addToNewlyAllocatedContainers(schedulerNode, rmContainer3);
application.addToNewlyAllocatedContainers(schedulerNode, rmContainer4);
application.addToNewlyAllocatedContainers(schedulerNode, rmContainer5);
application.addToNewlyAllocatedContainers(schedulerNode, rmContainer6);
application.addToNewlyAllocatedContainers(schedulerNode, rmContainer7);
application.addToNewlyAllocatedContainers(schedulerNode, rmContainer8);
// Call the autoCorrectContainerAllocation method
scheduler.autoCorrectContainerAllocation(ask4, application);
// Assert that all resource requests have 0 containers
for (ResourceRequest rr : ask4) {
assertEquals(0, rr.getNumContainers());
}
// Assert that there are four newly allocated containers
assertEquals(4, application.pullNewlyAllocatedContainers().size());
}
/**
* Tests the behavior when the container ask consists of two resource requests.
* i.e one for any host and one for a specific host ,
* each requesting four containers, and there are six newly allocated containers.
*
* @param scheduler The AbstractYarnScheduler instance to test
* @param application The SchedulerApplicationAttempt instance representing the application
* @param schedulerNode The SchedulerNode instance representing the node
* @param nodeId The NodeId of the node
* @param priority The priority of the resource requests
* @param appAttemptId The ApplicationAttemptId of the application attempt
*/
private void testContainerAskFourAndNewlyAllocatedContainerSix(AbstractYarnScheduler scheduler,
SchedulerApplicationAttempt application, SchedulerNode schedulerNode,
NodeId nodeId, Priority priority, ApplicationAttemptId appAttemptId) {
// Create a resource request for any host, requesting 4 containers
ResourceRequest resourceRequest1 = createResourceRequest(1024, 1, 4,
priority, 0L,
ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED), ResourceRequest.ANY);
// Create a resource request for a specific host, requesting 4 containers
ResourceRequest resourceRequest2 = createResourceRequest(1024, 1, 4,
priority, 0L,
ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED), nodeId.getHost());
// Add the resource requests to a list
List<ResourceRequest> containerAsk = new ArrayList<>();
containerAsk.add(resourceRequest1);
containerAsk.add(resourceRequest2);
// Create six RMContainers with the specified parameters
RMContainer rmContainer1 = createMockRMContainer(1, nodeId, appAttemptId,
0L, 1024, priority, ExecutionType.GUARANTEED);
RMContainer rmContainer2 = createMockRMContainer(2, nodeId, appAttemptId,
0L, 1024, priority, ExecutionType.GUARANTEED);
RMContainer rmContainer3 = createMockRMContainer(3, nodeId, appAttemptId,
0L, 1024, priority, ExecutionType.GUARANTEED);
RMContainer rmContainer4 = createMockRMContainer(4, nodeId, appAttemptId,
0L, 1024, priority, ExecutionType.GUARANTEED);
RMContainer rmContainer5 = createMockRMContainer(5, nodeId, appAttemptId,
0L, 1024, priority, ExecutionType.GUARANTEED);
RMContainer rmContainer6 = createMockRMContainer(6, nodeId, appAttemptId,
0L, 1024, priority, ExecutionType.GUARANTEED);
// Add the RMContainers to the newly allocated containers of the application
application.addToNewlyAllocatedContainers(schedulerNode, rmContainer1);
application.addToNewlyAllocatedContainers(schedulerNode, rmContainer2);
application.addToNewlyAllocatedContainers(schedulerNode, rmContainer3);
application.addToNewlyAllocatedContainers(schedulerNode, rmContainer4);
application.addToNewlyAllocatedContainers(schedulerNode, rmContainer5);
application.addToNewlyAllocatedContainers(schedulerNode, rmContainer6);
// Call the autoCorrectContainerAllocation method
scheduler.autoCorrectContainerAllocation(containerAsk, application);
// Assert that all resource requests have 0 containers
for (ResourceRequest resourceRequest : containerAsk) {
assertEquals(0, resourceRequest.getNumContainers());
}
// Assert that there are four newly allocated containers
assertEquals(4, application.pullNewlyAllocatedContainers().size());
}
@ParameterizedTest(name = "{0}")
@MethodSource("getParameters")
public void testUpdateMaxAllocationUsesTotal(SchedulerType type) throws IOException {
initTestAbstractYarnScheduler(type);
final int configuredMaxVCores = 20;
final int configuredMaxMemory = 10 * 1024;
Resource configuredMaximumResource = Resource.newInstance
(configuredMaxMemory, configuredMaxVCores);
YarnConfiguration conf = getConf();
conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
configuredMaxVCores);
conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
configuredMaxMemory);
conf.setLong(
YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS,
0);
MockRM rm = new MockRM(conf);
try {
rm.start();
AbstractYarnScheduler scheduler = (AbstractYarnScheduler) rm
.getResourceScheduler();
Resource emptyResource = Resource.newInstance(0, 0);
Resource fullResource1 = Resource.newInstance(1024, 5);
Resource fullResource2 = Resource.newInstance(2048, 10);
SchedulerNode mockNode1 = mock(SchedulerNode.class);
when(mockNode1.getNodeID()).thenReturn(NodeId.newInstance("foo", 8080));
when(mockNode1.getUnallocatedResource()).thenReturn(emptyResource);
when(mockNode1.getTotalResource()).thenReturn(fullResource1);
SchedulerNode mockNode2 = mock(SchedulerNode.class);
when(mockNode1.getNodeID()).thenReturn(NodeId.newInstance("bar", 8081));
when(mockNode2.getUnallocatedResource()).thenReturn(emptyResource);
when(mockNode2.getTotalResource()).thenReturn(fullResource2);
verifyMaximumResourceCapability(configuredMaximumResource, scheduler);
scheduler.nodeTracker.addNode(mockNode1);
verifyMaximumResourceCapability(fullResource1, scheduler);
scheduler.nodeTracker.addNode(mockNode2);
verifyMaximumResourceCapability(fullResource2, scheduler);
scheduler.nodeTracker.removeNode(mockNode2.getNodeID());
verifyMaximumResourceCapability(fullResource1, scheduler);
scheduler.nodeTracker.removeNode(mockNode1.getNodeID());
verifyMaximumResourceCapability(configuredMaximumResource, scheduler);
} finally {
rm.stop();
}
}
@ParameterizedTest(name = "{0}")
@MethodSource("getParameters")
public void testMaxAllocationAfterUpdateNodeResource(SchedulerType type) throws IOException {
initTestAbstractYarnScheduler(type);
final int configuredMaxVCores = 20;
final int configuredMaxMemory = 10 * 1024;
Resource configuredMaximumResource = Resource.newInstance
(configuredMaxMemory, configuredMaxVCores);
YarnConfiguration conf = getConf();
conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
configuredMaxVCores);
conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
configuredMaxMemory);
conf.setLong(
YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS,
0);
MockRM rm = new MockRM(conf);
try {
rm.start();
AbstractYarnScheduler scheduler = (AbstractYarnScheduler) rm
.getResourceScheduler();
verifyMaximumResourceCapability(configuredMaximumResource, scheduler);
Resource resource1 = Resource.newInstance(2048, 5);
Resource resource2 = Resource.newInstance(4096, 10);
Resource resource3 = Resource.newInstance(512, 1);
Resource resource4 = Resource.newInstance(1024, 2);
RMNode node1 = MockNodes.newNodeInfo(
0, resource1, 1, "127.0.0.2");
scheduler.handle(new NodeAddedSchedulerEvent(node1));
RMNode node2 = MockNodes.newNodeInfo(
0, resource3, 2, "127.0.0.3");
scheduler.handle(new NodeAddedSchedulerEvent(node2));
verifyMaximumResourceCapability(resource1, scheduler);
// increase node1 resource
scheduler.updateNodeResource(node1, ResourceOption.newInstance(
resource2, 0));
verifyMaximumResourceCapability(resource2, scheduler);
// decrease node1 resource
scheduler.updateNodeResource(node1, ResourceOption.newInstance(
resource1, 0));
verifyMaximumResourceCapability(resource1, scheduler);
// increase node2 resource
scheduler.updateNodeResource(node2, ResourceOption.newInstance(
resource4, 0));
verifyMaximumResourceCapability(resource1, scheduler);
// decrease node2 resource
scheduler.updateNodeResource(node2, ResourceOption.newInstance(
resource3, 0));
verifyMaximumResourceCapability(resource1, scheduler);
} finally {
rm.stop();
}
}
/*
* This test case is to test the pending containers are cleared from the
* attempt even if one of the application in the list have current attempt as
* null (no attempt).
*/
@SuppressWarnings({ "rawtypes" })
@ParameterizedTest(name = "{0}")
@MethodSource("getParameters")
@Timeout(10)
public void testReleasedContainerIfAppAttemptisNull(SchedulerType type) throws Exception {
initTestAbstractYarnScheduler(type);
YarnConfiguration conf=getConf();
MockRM rm1 = new MockRM(conf);
try {
rm1.start();
MockNM nm1 =
new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService());
nm1.registerNode();
AbstractYarnScheduler scheduler =
(AbstractYarnScheduler) rm1.getResourceScheduler();
// Mock App without attempt
RMApp mockAPp =
new MockRMApp(125, System.currentTimeMillis(), RMAppState.NEW);
SchedulerApplication<FiCaSchedulerApp> application =
new SchedulerApplication<FiCaSchedulerApp>(null, mockAPp.getUser(),
false);
// Second app with one app attempt
RMApp app = MockRMAppSubmitter.submitWithMemory(200, rm1);
MockAM am1 = MockRM.launchAndRegisterAM(app, rm1, nm1);
final ContainerId runningContainer =
ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
am1.allocate(null, Arrays.asList(runningContainer));
Map schedulerApplications = scheduler.getSchedulerApplications();
SchedulerApplication schedulerApp =
(SchedulerApplication) scheduler.getSchedulerApplications().get(
app.getApplicationId());
schedulerApplications.put(mockAPp.getApplicationId(), application);
scheduler.clearPendingContainerCache();
assertEquals(schedulerApp.getCurrentAppAttempt().getPendingRelease().size(),
0, "Pending containers are not released "
+ "when one of the application attempt is null !");
} finally {
if (rm1 != null) {
rm1.stop();
}
}
}
@ParameterizedTest(name = "{0}")
@MethodSource("getParameters")
@Timeout(30)
public void testContainerReleaseWithAllocationTags(SchedulerType type) throws Exception {
initTestAbstractYarnScheduler(type);
// Currently only can be tested against capacity scheduler.
if (getSchedulerType().equals(SchedulerType.CAPACITY)) {
final String testTag1 = "some-tag";
final String testTag2 = "some-other-tag";
YarnConfiguration conf = getConf();
conf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER, "scheduler");
MockRM rm1 = new MockRM(conf);
rm1.start();
MockNM nm1 = new MockNM("127.0.0.1:1234",
10240, rm1.getResourceTrackerService());
nm1.registerNode();
MockRMAppSubmissionData data =
MockRMAppSubmissionData.Builder.createWithMemory(200, rm1)
.withAppName("name")
.withUser("user")
.withAcls(new HashMap<>())
.withUnmanagedAM(false)
.withQueue("default")
.withMaxAppAttempts(-1)
.withCredentials(null)
.withAppType("Test")
.withWaitForAppAcceptedState(false)
.withKeepContainers(true)
.build();
RMApp app1 =
MockRMAppSubmitter.submit(rm1, data);
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
// allocate 1 container with tag1
SchedulingRequest sr = SchedulingRequest
.newInstance(1l, Priority.newInstance(1),
ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED),
Sets.newHashSet(testTag1),
ResourceSizing.newInstance(1, Resource.newInstance(1024, 1)),
null);
// allocate 3 containers with tag2
SchedulingRequest sr1 = SchedulingRequest
.newInstance(2l, Priority.newInstance(1),
ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED),
Sets.newHashSet(testTag2),
ResourceSizing.newInstance(3, Resource.newInstance(1024, 1)),
null);
AllocateRequest ar = AllocateRequest.newBuilder()
.schedulingRequests(Lists.newArrayList(sr, sr1)).build();
am1.allocate(ar);
nm1.nodeHeartbeat(true);
List<Container> allocated = new ArrayList<>();
while (allocated.size() < 4) {
AllocateResponse rsp = am1
.allocate(new ArrayList<>(), new ArrayList<>());
allocated.addAll(rsp.getAllocatedContainers());
nm1.nodeHeartbeat(true);
Thread.sleep(1000);
}
assertEquals(4, allocated.size());
Set<Container> containers = allocated.stream()
.filter(container -> container.getAllocationRequestId() == 1l)
.collect(Collectors.toSet());
assertNotNull(containers);
assertEquals(1, containers.size());
ContainerId cid = containers.iterator().next().getId();
// mock container start
rm1.getRMContext().getScheduler()
.getSchedulerNode(nm1.getNodeId()).containerStarted(cid);
// verifies the allocation is made with correct number of tags
Map<String, Long> nodeTags = rm1.getRMContext()
.getAllocationTagsManager()
.getAllocationTagsWithCount(nm1.getNodeId());
assertNotNull(nodeTags.get(testTag1));
assertEquals(1, nodeTags.get(testTag1).intValue());
// release a container
am1.allocate(new ArrayList<>(), Lists.newArrayList(cid));
// before NM confirms, the tag should still exist
nodeTags = rm1.getRMContext().getAllocationTagsManager()
.getAllocationTagsWithCount(nm1.getNodeId());
assertNotNull(nodeTags);
assertNotNull(nodeTags.get(testTag1));
assertEquals(1, nodeTags.get(testTag1).intValue());
// NM reports back that container is released
// RM should cleanup the tag
ContainerStatus cs = ContainerStatus.newInstance(cid,
ContainerState.COMPLETE, "", 0);
nm1.nodeHeartbeat(Lists.newArrayList(cs), true);
// Wait on condition
// 1) tag1 doesn't exist anymore
// 2) num of tag2 is still 3
GenericTestUtils.waitFor(() -> {
Map<String, Long> tags = rm1.getRMContext()
.getAllocationTagsManager()
.getAllocationTagsWithCount(nm1.getNodeId());
return tags.get(testTag1) == null &&
tags.get(testTag2).intValue() == 3;
}, 500, 3000);
}
}
@ParameterizedTest(name = "{0}")
@MethodSource("getParameters")
@Timeout(30)
public void testNodeRemovedWithAllocationTags(SchedulerType type) throws Exception {
initTestAbstractYarnScheduler(type);
// Currently only can be tested against capacity scheduler.
if (getSchedulerType().equals(SchedulerType.CAPACITY)) {
final String testTag1 = "some-tag";
YarnConfiguration conf = getConf();
conf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER, "scheduler");
MockRM rm1 = new MockRM(conf);
rm1.start();
MockNM nm1 = new MockNM("127.0.0.1:1234",
10240, rm1.getResourceTrackerService());
nm1.registerNode();
MockRMAppSubmissionData data =
MockRMAppSubmissionData.Builder.createWithMemory(200, rm1)
.withAppName("name")
.withUser("user")
.withAcls(new HashMap<>())
.withUnmanagedAM(false)
.withQueue("default")
.withMaxAppAttempts(-1)
.withCredentials(null)
.withAppType("Test")
.withWaitForAppAcceptedState(false)
.withKeepContainers(true)
.build();
RMApp app1 =
MockRMAppSubmitter.submit(rm1, data);
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
// allocate 1 container with tag1
SchedulingRequest sr = SchedulingRequest
.newInstance(1L, Priority.newInstance(1),
ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED),
Sets.newHashSet(testTag1),
ResourceSizing.newInstance(1, Resource.newInstance(1024, 1)),
null);
AllocateRequest ar = AllocateRequest.newBuilder()
.schedulingRequests(Lists.newArrayList(sr)).build();
am1.allocate(ar);
nm1.nodeHeartbeat(true);
List<Container> allocated = new ArrayList<>();
while (allocated.size() < 1) {
AllocateResponse rsp = am1
.allocate(new ArrayList<>(), new ArrayList<>());
allocated.addAll(rsp.getAllocatedContainers());
nm1.nodeHeartbeat(true);
Thread.sleep(1000);
}
assertEquals(1, allocated.size());
Set<Container> containers = allocated.stream()
.filter(container -> container.getAllocationRequestId() == 1L)
.collect(Collectors.toSet());
assertNotNull(containers);
assertEquals(1, containers.size());
ContainerId cid = containers.iterator().next().getId();
// mock container start
rm1.getRMContext().getScheduler()
.getSchedulerNode(nm1.getNodeId()).containerStarted(cid);
// verifies the allocation is made with correct number of tags
Map<String, Long> nodeTags = rm1.getRMContext()
.getAllocationTagsManager()
.getAllocationTagsWithCount(nm1.getNodeId());
assertNotNull(nodeTags.get(testTag1));
assertEquals(1, nodeTags.get(testTag1).intValue());
// remove the node
RMNode node1 = MockNodes.newNodeInfo(
0, Resources.createResource(nm1.getMemory()), 1, "127.0.0.1", 1234);
rm1.getRMContext().getScheduler().handle(
new NodeRemovedSchedulerEvent(node1));
// Once the node is removed, the tag should be removed immediately
nodeTags = rm1.getRMContext().getAllocationTagsManager()
.getAllocationTagsWithCount(nm1.getNodeId());
assertNull(nodeTags);
}
}
@ParameterizedTest(name = "{0}")
@MethodSource("getParameters")
@Timeout(60)
public void testContainerReleasedByNode(SchedulerType type) throws Exception {
initTestAbstractYarnScheduler(type);
System.out.println("Starting testContainerReleasedByNode");
YarnConfiguration conf = getConf();
MockRM rm1 = new MockRM(conf);
try {
rm1.start();
MockRMAppSubmissionData data =
MockRMAppSubmissionData.Builder.createWithMemory(200, rm1)
.withAppName("name")
.withUser("user")
.withAcls(new HashMap<ApplicationAccessType, String>())
.withUnmanagedAM(false)
.withQueue("default")
.withMaxAppAttempts(-1)
.withCredentials(null)
.withAppType("Test")
.withWaitForAppAcceptedState(false)
.withKeepContainers(true)
.build();
RMApp app1 =
MockRMAppSubmitter.submit(rm1, data);
MockNM nm1 =
new MockNM("127.0.0.1:1234", 10240, rm1.getResourceTrackerService());
nm1.registerNode();
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
// allocate a container that fills more than half the node
am1.allocate("127.0.0.1", 8192, 1, new ArrayList<ContainerId>());
nm1.nodeHeartbeat(true);
// wait for containers to be allocated.
List<Container> containers =
am1.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>()).getAllocatedContainers();
while (containers.isEmpty()) {
Thread.sleep(10);
nm1.nodeHeartbeat(true);
containers = am1.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>()).getAllocatedContainers();
}
// release the container from the AM
ContainerId cid = containers.get(0).getId();
List<ContainerId> releasedContainers = new ArrayList<>(1);
releasedContainers.add(cid);
List<ContainerStatus> completedContainers = am1.allocate(
new ArrayList<ResourceRequest>(), releasedContainers)
.getCompletedContainersStatuses();
while (completedContainers.isEmpty()) {
Thread.sleep(10);
completedContainers = am1.allocate(
new ArrayList<ResourceRequest>(), releasedContainers)
.getCompletedContainersStatuses();
}
// verify new container can be allocated immediately because container
// never launched on the node
containers = am1.allocate("127.0.0.1", 8192, 1,
new ArrayList<ContainerId>()).getAllocatedContainers();
nm1.nodeHeartbeat(true);
while (containers.isEmpty()) {
Thread.sleep(10);
nm1.nodeHeartbeat(true);
containers = am1.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>()).getAllocatedContainers();
}
// launch the container on the node
cid = containers.get(0).getId();
nm1.nodeHeartbeat(cid.getApplicationAttemptId(), cid.getContainerId(),
ContainerState.RUNNING);
rm1.waitForState(nm1, cid, RMContainerState.RUNNING);
// release the container from the AM
releasedContainers.clear();
releasedContainers.add(cid);
completedContainers = am1.allocate(
new ArrayList<ResourceRequest>(), releasedContainers)
.getCompletedContainersStatuses();
while (completedContainers.isEmpty()) {
Thread.sleep(10);
completedContainers = am1.allocate(
new ArrayList<ResourceRequest>(), releasedContainers)
.getCompletedContainersStatuses();
}
// verify new container cannot be allocated immediately because container
// has not been released by the node
containers = am1.allocate("127.0.0.1", 8192, 1,
new ArrayList<ContainerId>()).getAllocatedContainers();
nm1.nodeHeartbeat(true);
assertTrue(containers.isEmpty(),
"new container allocated before node freed old");
for (int i = 0; i < 10; ++i) {
Thread.sleep(10);
containers = am1.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>()).getAllocatedContainers();
nm1.nodeHeartbeat(true);
assertTrue(containers.isEmpty(),
"new container allocated before node freed old");
}
// free the old container from the node
nm1.nodeHeartbeat(cid.getApplicationAttemptId(), cid.getContainerId(),
ContainerState.COMPLETE);
// verify new container is now allocated
containers = am1.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>()).getAllocatedContainers();
while (containers.isEmpty()) {
Thread.sleep(10);
nm1.nodeHeartbeat(true);
containers = am1.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>()).getAllocatedContainers();
}
} finally {
rm1.stop();
System.out.println("Stopping testContainerReleasedByNode");
}
}
@ParameterizedTest(name = "{0}")
@MethodSource("getParameters")
@Timeout(60)
public void testResourceRequestRestoreWhenRMContainerIsAtAllocated(SchedulerType type)
throws Exception {
initTestAbstractYarnScheduler(type);
YarnConfiguration conf = getConf();
MockRM rm1 = new MockRM(conf);
try {
rm1.start();
MockRMAppSubmissionData data =
MockRMAppSubmissionData.Builder.createWithMemory(200, rm1)
.withAppName("name")
.withUser("user")
.withAcls(new HashMap<ApplicationAccessType, String>())
.withUnmanagedAM(false)
.withQueue("default")
.withMaxAppAttempts(-1)
.withCredentials(null)
.withAppType("Test")
.withWaitForAppAcceptedState(false)
.withKeepContainers(true)
.build();
RMApp app1 =
MockRMAppSubmitter.submit(rm1, data);
MockNM nm1 =
new MockNM("127.0.0.1:1234", 10240, rm1.getResourceTrackerService());
nm1.registerNode();
MockNM nm2 =
new MockNM("127.0.0.1:2351", 10240, rm1.getResourceTrackerService());
nm2.registerNode();
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
int NUM_CONTAINERS = 1;
// allocate NUM_CONTAINERS containers
am1.allocate("127.0.0.1", 1024, NUM_CONTAINERS,
new ArrayList<ContainerId>());
nm1.nodeHeartbeat(true);
// wait for containers to be allocated.
List<Container> containers =
am1.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>()).getAllocatedContainers();
while (containers.size() != NUM_CONTAINERS) {
nm1.nodeHeartbeat(true);
containers.addAll(am1.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>()).getAllocatedContainers());
Thread.sleep(200);
}
// launch the 2nd container, for testing running container transferred.
nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 2,
ContainerState.RUNNING);
ContainerId containerId2 =
ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
rm1.waitForState(nm1, containerId2, RMContainerState.RUNNING);
// 3rd container is in Allocated state.
am1.allocate("127.0.0.1", 1024, NUM_CONTAINERS,
new ArrayList<ContainerId>());
nm2.nodeHeartbeat(true);
ContainerId containerId3 =
ContainerId.newContainerId(am1.getApplicationAttemptId(), 3);
rm1.waitForState(nm2, containerId3, RMContainerState.ALLOCATED);
// NodeManager restart
nm2.registerNode();
// NM restart kills all allocated and running containers.
rm1.waitForState(nm2, containerId3, RMContainerState.KILLED);
// The killed RMContainer request should be restored. In successive
// nodeHeartBeats AM should be able to get container allocated.
containers =
am1.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>()).getAllocatedContainers();
while (containers.size() != NUM_CONTAINERS) {
nm2.nodeHeartbeat(true);
containers.addAll(am1.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>()).getAllocatedContainers());
Thread.sleep(200);
}
nm2.nodeHeartbeat(am1.getApplicationAttemptId(), 4,
ContainerState.RUNNING);
ContainerId containerId4 =
ContainerId.newContainerId(am1.getApplicationAttemptId(), 4);
rm1.waitForState(nm2, containerId4, RMContainerState.RUNNING);
} finally {
rm1.stop();
}
}
/**
* Test to verify that ResourceRequests recovery back to the right app-attempt
* after a container gets killed at ACQUIRED state: YARN-4502.
*
* @throws Exception
*/
@ParameterizedTest(name = "{0}")
@MethodSource("getParameters")
public void testResourceRequestRecoveryToTheRightAppAttempt(SchedulerType type)
throws Exception {
initTestAbstractYarnScheduler(type);
YarnConfiguration conf = getConf();
MockRM rm = new MockRM(conf);
try {
rm.start();
MockRMAppSubmissionData data =
MockRMAppSubmissionData.Builder.createWithMemory(200, rm)
.withAppName("name")
.withUser("user")
.withAcls(new HashMap<ApplicationAccessType, String>())
.withUnmanagedAM(false)
.withQueue("default")
.withMaxAppAttempts(-1)
.withCredentials(null)
.withAppType("Test")
.withWaitForAppAcceptedState(false)
.withKeepContainers(true)
.build();
RMApp rmApp =
MockRMAppSubmitter.submit(rm, data);
MockNM node =
new MockNM("127.0.0.1:1234", 10240, rm.getResourceTrackerService());
node.registerNode();
MockAM am1 = MockRM.launchAndRegisterAM(rmApp, rm, node);
ApplicationAttemptId applicationAttemptOneID =
am1.getApplicationAttemptId();
ContainerId am1ContainerID =
ContainerId.newContainerId(applicationAttemptOneID, 1);
// allocate NUM_CONTAINERS containers
am1.allocate("127.0.0.1", 1024, 1, new ArrayList<ContainerId>());
node.nodeHeartbeat(true);
// wait for containers to be allocated.
List<Container> containers =
am1.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>()).getAllocatedContainers();
while (containers.size() != 1) {
node.nodeHeartbeat(true);
containers.addAll(am1.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>()).getAllocatedContainers());
Thread.sleep(200);
}
// launch a 2nd container, for testing running-containers transfer.
node.nodeHeartbeat(applicationAttemptOneID, 2, ContainerState.RUNNING);
ContainerId runningContainerID =
ContainerId.newContainerId(applicationAttemptOneID, 2);
rm.waitForState(node, runningContainerID, RMContainerState.RUNNING);
// 3rd container is in Allocated state.
int ALLOCATED_CONTAINER_PRIORITY = 1047;
am1.allocate("127.0.0.1", 1024, 1, ALLOCATED_CONTAINER_PRIORITY,
new ArrayList<ContainerId>(), null);
node.nodeHeartbeat(true);
ContainerId allocatedContainerID =
ContainerId.newContainerId(applicationAttemptOneID, 3);
rm.waitForState(node, allocatedContainerID, RMContainerState.ALLOCATED);
RMContainer allocatedContainer =
rm.getResourceScheduler().getRMContainer(allocatedContainerID);
// Capture scheduler app-attempt before AM crash.
SchedulerApplicationAttempt firstSchedulerAppAttempt =
((AbstractYarnScheduler<SchedulerApplicationAttempt, SchedulerNode>) rm
.getResourceScheduler())
.getApplicationAttempt(applicationAttemptOneID);
// AM crashes, and a new app-attempt gets created
node.nodeHeartbeat(applicationAttemptOneID, 1, ContainerState.COMPLETE);
rm.drainEvents();
RMAppAttempt rmAppAttempt2 = MockRM.waitForAttemptScheduled(rmApp, rm);
ApplicationAttemptId applicationAttemptTwoID =
rmAppAttempt2.getAppAttemptId();
assertEquals(2, applicationAttemptTwoID.getAttemptId());
// All outstanding allocated containers will be killed (irrespective of
// keep-alive of container across app-attempts)
assertEquals(RMContainerState.KILLED,
allocatedContainer.getState());
// The core part of this test
// The killed containers' ResourceRequests are recovered back to the
// original app-attempt, not the new one
for (SchedulerRequestKey key : firstSchedulerAppAttempt.getSchedulerKeys()) {
if (key.getPriority().getPriority() == 0) {
assertEquals(0,
firstSchedulerAppAttempt.getOutstandingAsksCount(key));
} else if (key.getPriority().getPriority() ==
ALLOCATED_CONTAINER_PRIORITY) {
assertEquals(1,
firstSchedulerAppAttempt.getOutstandingAsksCount(key));
}
}
// Also, only one running container should be transferred after AM
// launches
MockRM.launchAM(rmApp, rm, node);
List<Container> transferredContainers =
rm.getResourceScheduler().getTransferredContainers(
applicationAttemptTwoID);
assertEquals(1, transferredContainers.size());
assertEquals(runningContainerID, transferredContainers.get(0)
.getId());
} finally {
rm.stop();
}
}
private void verifyMaximumResourceCapability(
Resource expectedMaximumResource, YarnScheduler scheduler) {
final Resource schedulerMaximumResourceCapability = scheduler
.getMaximumResourceCapability();
assertEquals(expectedMaximumResource.getMemorySize(),
schedulerMaximumResourceCapability.getMemorySize());
assertEquals(expectedMaximumResource.getVirtualCores(),
schedulerMaximumResourceCapability.getVirtualCores());
}
private class SleepHandler implements EventHandler<SchedulerEvent> {
boolean sleepFlag = false;
int sleepTime = 20;
@Override
public void handle(SchedulerEvent event) {
try {
if (sleepFlag) {
Thread.sleep(sleepTime);
}
} catch(InterruptedException ie) {
}
}
}
private ResourceTrackerService getPrivateResourceTrackerService(
Dispatcher privateDispatcher, ResourceManager rm,
SleepHandler sleepHandler) {
Configuration conf = getConf();
RMContext privateContext =
new RMContextImpl(privateDispatcher, null, null, null, null, null, null,
null, null, null);
privateContext.setNodeLabelManager(mock(RMNodeLabelsManager.class));
privateDispatcher.register(SchedulerEventType.class, sleepHandler);
privateDispatcher.register(SchedulerEventType.class,
rm.getResourceScheduler());
privateDispatcher.register(RMNodeEventType.class,
new ResourceManager.NodeEventDispatcher(privateContext));
((Service) privateDispatcher).init(conf);
((Service) privateDispatcher).start();
NMLivelinessMonitor nmLivelinessMonitor =
new NMLivelinessMonitor(privateDispatcher);
nmLivelinessMonitor.init(conf);
nmLivelinessMonitor.start();
NodesListManager nodesListManager = new NodesListManager(privateContext);
nodesListManager.init(conf);
RMContainerTokenSecretManager containerTokenSecretManager =
new RMContainerTokenSecretManager(conf);
containerTokenSecretManager.start();
NMTokenSecretManagerInRM nmTokenSecretManager =
new NMTokenSecretManagerInRM(conf);
nmTokenSecretManager.start();
ResourceTrackerService privateResourceTrackerService =
new ResourceTrackerService(privateContext, nodesListManager,
nmLivelinessMonitor, containerTokenSecretManager,
nmTokenSecretManager);
privateResourceTrackerService.init(conf);
privateResourceTrackerService.start();
rm.getResourceScheduler().setRMContext(privateContext);
return privateResourceTrackerService;
}
/**
* Test the behavior of the scheduler when a node reconnects
* with changed capabilities. This test is to catch any race conditions
* that might occur due to the use of the RMNode object.
* @throws Exception
*/
@ParameterizedTest(name = "{0}")
@MethodSource("getParameters")
@Timeout(60)
public void testNodemanagerReconnect(SchedulerType type) throws Exception {
initTestAbstractYarnScheduler(type);
Configuration conf = getConf();
MockRM rm = new MockRM(conf);
try {
rm.start();
DrainDispatcher privateDispatcher = new DrainDispatcher();
privateDispatcher.disableExitOnDispatchException();
SleepHandler sleepHandler = new SleepHandler();
ResourceTrackerService privateResourceTrackerService =
getPrivateResourceTrackerService(privateDispatcher, rm, sleepHandler);
// Register node1
String hostname1 = "localhost1";
Resource capability = Resources.createResource(4096, 4);
RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
RegisterNodeManagerRequest request1 =
recordFactory.newRecordInstance(RegisterNodeManagerRequest.class);
NodeId nodeId1 = NodeId.newInstance(hostname1, 0);
NodeStatus mockNodeStatus = createMockNodeStatus();
request1.setNodeId(nodeId1);
request1.setHttpPort(0);
request1.setResource(capability);
request1.setNodeStatus(mockNodeStatus);
privateResourceTrackerService.registerNodeManager(request1);
privateDispatcher.await();
Resource clusterResource =
rm.getResourceScheduler().getClusterResource();
assertEquals(capability,
clusterResource, "Initial cluster resources don't match");
Resource newCapability = Resources.createResource(1024);
RegisterNodeManagerRequest request2 =
recordFactory.newRecordInstance(RegisterNodeManagerRequest.class);
request2.setNodeId(nodeId1);
request2.setHttpPort(0);
request2.setResource(newCapability);
// hold up the disaptcher and register the same node with lower capability
sleepHandler.sleepFlag = true;
privateResourceTrackerService.registerNodeManager(request2);
privateDispatcher.await();
assertEquals(newCapability, rm.getResourceScheduler().getClusterResource(),
"Cluster resources don't match");
privateResourceTrackerService.stop();
} finally {
rm.stop();
}
}
@ParameterizedTest(name = "{0}")
@MethodSource("getParameters")
@Timeout(10)
public void testUpdateThreadLifeCycle(SchedulerType type) throws Exception {
initTestAbstractYarnScheduler(type);
MockRM rm = new MockRM(getConf());
try {
rm.start();
AbstractYarnScheduler scheduler =
(AbstractYarnScheduler) rm.getResourceScheduler();
if (getSchedulerType().equals(SchedulerType.FAIR)) {
Thread updateThread = scheduler.updateThread;
assertTrue(updateThread.isAlive());
scheduler.stop();
int numRetries = 100;
while (numRetries-- > 0 && updateThread.isAlive()) {
Thread.sleep(50);
}
assertNotEquals(0, numRetries, "The Update thread is still alive");
} else if (getSchedulerType().equals(SchedulerType.CAPACITY)) {
assertNull(scheduler.updateThread,
"updateThread shouldn't have been created");
} else {
fail("Unhandled SchedulerType, " + getSchedulerType() +
", please update this unit test.");
}
} finally {
rm.stop();
}
}
@ParameterizedTest(name = "{0}")
@MethodSource("getParameters")
@Timeout(60)
public void testContainerRecoveredByNode(SchedulerType type) throws Exception {
initTestAbstractYarnScheduler(type);
System.out.println("Starting testContainerRecoveredByNode");
final int maxMemory = 10 * 1024;
YarnConfiguration conf = getConf();
conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
conf.setBoolean(
YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, true);
conf.set(
YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
MockRM rm1 = new MockRM(conf);
try {
rm1.start();
MockRMAppSubmissionData data =
MockRMAppSubmissionData.Builder.createWithMemory(200, rm1)
.withAppName("name")
.withUser("user")
.withAcls(new HashMap<ApplicationAccessType, String>())
.withUnmanagedAM(false)
.withQueue("default")
.withMaxAppAttempts(-1)
.withCredentials(null)
.withAppType("Test")
.withWaitForAppAcceptedState(false)
.withKeepContainers(true)
.build();
RMApp app1 =
MockRMAppSubmitter.submit(rm1, data);
MockNM nm1 =
new MockNM("127.0.0.1:1234", 10240, rm1.getResourceTrackerService());
nm1.registerNode();
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
am1.allocate("127.0.0.1", 8192, 1, new ArrayList<ContainerId>());
YarnScheduler scheduler = rm1.getResourceScheduler();
RMNode node1 = MockNodes.newNodeInfo(
0, Resources.createResource(maxMemory), 1, "127.0.0.2");
ContainerId containerId = ContainerId.newContainerId(
app1.getCurrentAppAttempt().getAppAttemptId(), 2);
NMContainerStatus containerReport =
NMContainerStatus.newInstance(containerId, 0, ContainerState.RUNNING,
Resource.newInstance(1024, 1), "recover container", 0,
Priority.newInstance(0), 0);
List<NMContainerStatus> containerReports = new ArrayList<>();
containerReports.add(containerReport);
scheduler.handle(new NodeAddedSchedulerEvent(node1, containerReports));
RMContainer rmContainer = scheduler.getRMContainer(containerId);
//verify queue name when rmContainer is recovered
if (scheduler instanceof CapacityScheduler) {
assertEquals(
app1.getQueue(),
rmContainer.getQueueName());
} else {
assertEquals(app1.getQueue(), rmContainer.getQueueName());
}
} finally {
rm1.stop();
System.out.println("Stopping testContainerRecoveredByNode");
}
}
/**
* Test the order we get the containers to kill. It should respect the order
* described in {@link SchedulerNode#getContainersToKill()}.
*/
@ParameterizedTest(name = "{0}")
@MethodSource("getParameters")
public void testGetRunningContainersToKill(SchedulerType type) throws IOException {
initTestAbstractYarnScheduler(type);
final SchedulerNode node = new MockSchedulerNode();
assertEquals(Collections.emptyList(), node.getContainersToKill());
// AM0
RMContainer am0 = newMockRMContainer(
true, ExecutionType.GUARANTEED, "AM0");
node.allocateContainer(am0);
assertEquals(Arrays.asList(am0), node.getContainersToKill());
// OPPORTUNISTIC0, AM0
RMContainer opp0 = newMockRMContainer(
false, ExecutionType.OPPORTUNISTIC, "OPPORTUNISTIC0");
node.allocateContainer(opp0);
assertEquals(Arrays.asList(opp0, am0), node.getContainersToKill());
// OPPORTUNISTIC0, GUARANTEED0, AM0
RMContainer regular0 = newMockRMContainer(
false, ExecutionType.GUARANTEED, "GUARANTEED0");
node.allocateContainer(regular0);
assertEquals(Arrays.asList(opp0, regular0, am0),
node.getContainersToKill());
// OPPORTUNISTIC1, OPPORTUNISTIC0, GUARANTEED0, AM0
RMContainer opp1 = newMockRMContainer(
false, ExecutionType.OPPORTUNISTIC, "OPPORTUNISTIC1");
node.allocateContainer(opp1);
assertEquals(Arrays.asList(opp1, opp0, regular0, am0),
node.getContainersToKill());
// OPPORTUNISTIC1, OPPORTUNISTIC0, GUARANTEED0, AM1, AM0
RMContainer am1 = newMockRMContainer(
true, ExecutionType.GUARANTEED, "AM1");
node.allocateContainer(am1);
assertEquals(Arrays.asList(opp1, opp0, regular0, am1, am0),
node.getContainersToKill());
// OPPORTUNISTIC1, OPPORTUNISTIC0, GUARANTEED1, GUARANTEED0, AM1, AM0
RMContainer regular1 = newMockRMContainer(
false, ExecutionType.GUARANTEED, "GUARANTEED1");
node.allocateContainer(regular1);
assertEquals(Arrays.asList(opp1, opp0, regular1, regular0, am1, am0),
node.getContainersToKill());
}
private static long LAST_TIMESTAMP = 0L;
private static RMContainer newMockRMContainer(boolean isAMContainer,
ExecutionType executionType, String name) {
long now = Time.now();
while (now <= LAST_TIMESTAMP) { now = Time.now(); }
LAST_TIMESTAMP = now;
RMContainer container = mock(RMContainer.class);
when(container.isAMContainer()).thenReturn(isAMContainer);
when(container.getExecutionType()).thenReturn(executionType);
when(container.getCreationTime()).thenReturn(now);
when(container.toString()).thenReturn(name);
return container;
}
/**
* SchedulerNode mock to test launching containers.
*/
static class MockSchedulerNode extends SchedulerNode {
private final List<RMContainer> containers = new ArrayList<>();
MockSchedulerNode() {
super(MockNodes.newNodeInfo(0, Resource.newInstance(1, 1)), false);
}
@Override
protected List<RMContainer> getLaunchedContainers() {
return containers;
}
@Override
public void allocateContainer(RMContainer rmContainer) {
containers.add(rmContainer);
// Shuffle for testing
Collections.shuffle(containers);
}
@Override
public void reserveResource(SchedulerApplicationAttempt attempt,
SchedulerRequestKey schedulerKey, RMContainer container) {}
@Override
public void unreserveResource(SchedulerApplicationAttempt attempt) {}
}
}