TestClientRMService.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNotSame;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.junit.jupiter.api.Assumptions.assumeFalse;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileOutputStream;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.security.AccessControlException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CyclicBarrier;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Sets;
import org.apache.hadoop.yarn.MockApps;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.ApplicationsRequestScope;
import org.apache.hadoop.yarn.api.protocolrecords.GetAllResourceTypeInfoRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetAllResourceTypeInfoResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetAttributesToNodesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetAttributesToNodesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeAttributesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeAttributesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToAttributesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToAttributesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationListRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationListResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest;
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeAttribute;
import org.apache.hadoop.yarn.api.records.NodeAttributeInfo;
import org.apache.hadoop.yarn.api.records.NodeAttributeKey;
import org.apache.hadoop.yarn.api.records.NodeAttributeType;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.NodeToAttributeValue;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueConfigurations;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.ReservationRequest;
import org.apache.hadoop.yarn.api.records.ReservationRequests;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.nodelabels.NodeAttributesManager;
import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemTestUtil;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager;
import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollectorManager;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.UTCClock;
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TestClientRMService {
private static final Logger LOG =
LoggerFactory.getLogger(TestClientRMService.class);
private RecordFactory recordFactory = RecordFactoryProvider
.getRecordFactory(null);
private String appType = "MockApp";
private final static String QUEUE_1 = "Q-1";
private final static String QUEUE_2 = "Q-2";
private final static String APPLICATION_TAG_SC_PREPROCESSOR ="mytag:foo";
private File resourceTypesFile = null;
private Configuration conf;
private ResourceManager resourceManager;
private YarnRPC rpc;
private ApplicationClientProtocol client;
@Test
public void testGetDecommissioningClusterNodes() throws Exception {
MockRM rm = new MockRM() {
protected ClientRMService createClientRMService() {
return new ClientRMService(this.rmContext, scheduler,
this.rmAppManager, this.applicationACLsManager,
this.queueACLsManager,
this.getRMContext().getRMDelegationTokenSecretManager());
};
};
resourceManager = rm;
rm.start();
int nodeMemory = 1024;
MockNM nm1 = rm.registerNode("host1:1234", nodeMemory);
rm.sendNodeStarted(nm1);
nm1.nodeHeartbeat(true);
rm.waitForState(nm1.getNodeId(), NodeState.RUNNING);
Integer decommissioningTimeout = 600;
rm.sendNodeGracefulDecommission(nm1, decommissioningTimeout);
rm.waitForState(nm1.getNodeId(), NodeState.DECOMMISSIONING);
// Create a client.
conf = new Configuration();
rpc = YarnRPC.create(conf);
InetSocketAddress rmAddress = rm.getClientRMService().getBindAddress();
LOG.info("Connecting to ResourceManager at " + rmAddress);
client = (ApplicationClientProtocol) rpc.getProxy(
ApplicationClientProtocol.class, rmAddress, conf);
// Make call
List<NodeReport> nodeReports = client.getClusterNodes(
GetClusterNodesRequest.newInstance(
EnumSet.of(NodeState.DECOMMISSIONING)))
.getNodeReports();
assertEquals(1, nodeReports.size());
NodeReport nr = nodeReports.iterator().next();
assertEquals(decommissioningTimeout, nr.getDecommissioningTimeout());
assertNull(nr.getNodeUpdateType());
}
@Test
public void testGetClusterNodes() throws Exception {
MockRM rm = new MockRM() {
protected ClientRMService createClientRMService() {
return new ClientRMService(this.rmContext, scheduler,
this.rmAppManager, this.applicationACLsManager, this.queueACLsManager,
this.getRMContext().getRMDelegationTokenSecretManager());
};
};
resourceManager = rm;
rm.start();
RMNodeLabelsManager labelsMgr = rm.getRMContext().getNodeLabelManager();
labelsMgr.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("x", "y"));
// Add a healthy node with label = x
MockNM node = rm.registerNode("host1:1234", 1024);
Map<NodeId, Set<String>> map = new HashMap<NodeId, Set<String>>();
map.put(node.getNodeId(), ImmutableSet.of("x"));
labelsMgr.replaceLabelsOnNode(map);
rm.sendNodeStarted(node);
node.nodeHeartbeat(true);
// Add and lose a node with label = y
MockNM lostNode = rm.registerNode("host2:1235", 1024);
rm.sendNodeStarted(lostNode);
lostNode.nodeHeartbeat(true);
rm.waitForState(lostNode.getNodeId(), NodeState.RUNNING);
rm.sendNodeLost(lostNode);
// Create a client.
conf = new Configuration();
rpc = YarnRPC.create(conf);
InetSocketAddress rmAddress = rm.getClientRMService().getBindAddress();
LOG.info("Connecting to ResourceManager at " + rmAddress);
client = (ApplicationClientProtocol) rpc.getProxy(
ApplicationClientProtocol.class, rmAddress, conf);
// Make call
GetClusterNodesRequest request =
GetClusterNodesRequest.newInstance(EnumSet.of(NodeState.RUNNING));
List<NodeReport> nodeReports =
client.getClusterNodes(request).getNodeReports();
assertEquals(1, nodeReports.size());
assertNotSame(NodeState.UNHEALTHY, nodeReports.get(0).getNodeState(),
"Node is expected to be healthy!");
// Check node's label = x
assertTrue(nodeReports.get(0).getNodeLabels().contains("x"));
assertNull(nodeReports.get(0).getDecommissioningTimeout());
assertNull(nodeReports.get(0).getNodeUpdateType());
// Now make the node unhealthy.
node.nodeHeartbeat(false);
rm.waitForState(node.getNodeId(), NodeState.UNHEALTHY);
// Call again
nodeReports = client.getClusterNodes(request).getNodeReports();
assertEquals(0, nodeReports.size(),
"Unhealthy nodes should not show up by default");
// Change label of host1 to y
map = new HashMap<NodeId, Set<String>>();
map.put(node.getNodeId(), ImmutableSet.of("y"));
labelsMgr.replaceLabelsOnNode(map);
// Now query for UNHEALTHY nodes
request = GetClusterNodesRequest.newInstance(EnumSet.of(NodeState.UNHEALTHY));
nodeReports = client.getClusterNodes(request).getNodeReports();
assertEquals(1, nodeReports.size());
assertEquals(NodeState.UNHEALTHY, nodeReports.get(0).getNodeState(),
"Node is expected to be unhealthy!");
assertTrue(nodeReports.get(0).getNodeLabels().contains("y"));
assertNull(nodeReports.get(0).getDecommissioningTimeout());
assertNull(nodeReports.get(0).getNodeUpdateType());
// Remove labels of host1
map = new HashMap<NodeId, Set<String>>();
map.put(node.getNodeId(), ImmutableSet.of("y"));
labelsMgr.removeLabelsFromNode(map);
// Query all states should return all nodes
rm.registerNode("host3:1236", 1024);
request = GetClusterNodesRequest.newInstance(EnumSet.allOf(NodeState.class));
nodeReports = client.getClusterNodes(request).getNodeReports();
assertEquals(3, nodeReports.size());
// All host1-3's label should be empty (instead of null)
for (NodeReport report : nodeReports) {
assertTrue(report.getNodeLabels() != null
&& report.getNodeLabels().isEmpty());
assertNull(report.getDecommissioningTimeout());
assertNull(report.getNodeUpdateType());
}
}
@Test
public void testNonExistingApplicationReport() throws YarnException {
RMContext rmContext = mock(RMContext.class);
when(rmContext.getRMApps()).thenReturn(
new ConcurrentHashMap<ApplicationId, RMApp>());
ClientRMService rmService = new ClientRMService(rmContext, null, null,
null, null, null);
GetApplicationReportRequest request = recordFactory
.newRecordInstance(GetApplicationReportRequest.class);
request.setApplicationId(ApplicationId.newInstance(0, 0));
try {
rmService.getApplicationReport(request);
fail();
} catch (ApplicationNotFoundException ex) {
assertEquals(ex.getMessage(),
"Application with id '" + request.getApplicationId()
+ "' doesn't exist in RM. Please check that the "
+ "job submission was successful.");
}
}
@Test
public void testGetApplicationReport() throws Exception {
ResourceScheduler scheduler = mock(ResourceScheduler.class);
RMContext rmContext = mock(RMContext.class);
mockRMContext(scheduler, rmContext);
ApplicationId appId1 = getApplicationId(1);
ApplicationACLsManager mockAclsManager = mock(ApplicationACLsManager.class);
when(
mockAclsManager.checkAccess(UserGroupInformation.getCurrentUser(),
ApplicationAccessType.VIEW_APP, null, appId1)).thenReturn(true);
ClientRMService rmService = new ClientRMService(rmContext, scheduler,
null, mockAclsManager, null, null);
try {
GetApplicationReportRequest request = recordFactory
.newRecordInstance(GetApplicationReportRequest.class);
request.setApplicationId(appId1);
GetApplicationReportResponse response =
rmService.getApplicationReport(request);
ApplicationReport report = response.getApplicationReport();
ApplicationResourceUsageReport usageReport =
report.getApplicationResourceUsageReport();
assertEquals(10, usageReport.getMemorySeconds());
assertEquals(3, usageReport.getVcoreSeconds());
assertEquals("<Not set>", report.getAmNodeLabelExpression());
assertEquals("<Not set>", report.getAppNodeLabelExpression());
// if application has am node label set to blank
ApplicationId appId2 = getApplicationId(2);
when(mockAclsManager.checkAccess(UserGroupInformation.getCurrentUser(),
ApplicationAccessType.VIEW_APP, null, appId2)).thenReturn(true);
request.setApplicationId(appId2);
response = rmService.getApplicationReport(request);
report = response.getApplicationReport();
assertEquals(NodeLabel.DEFAULT_NODE_LABEL_PARTITION,
report.getAmNodeLabelExpression());
assertEquals(NodeLabel.NODE_LABEL_EXPRESSION_NOT_SET,
report.getAppNodeLabelExpression());
// if application has am node label set to blank
ApplicationId appId3 = getApplicationId(3);
when(mockAclsManager.checkAccess(UserGroupInformation.getCurrentUser(),
ApplicationAccessType.VIEW_APP, null, appId3)).thenReturn(true);
request.setApplicationId(appId3);
response = rmService.getApplicationReport(request);
report = response.getApplicationReport();
assertEquals("high-mem", report.getAmNodeLabelExpression());
assertEquals("high-mem", report.getAppNodeLabelExpression());
// if application id is null
GetApplicationReportRequest invalidRequest = recordFactory
.newRecordInstance(GetApplicationReportRequest.class);
invalidRequest.setApplicationId(null);
try {
rmService.getApplicationReport(invalidRequest);
} catch (YarnException e) {
// rmService should return a ApplicationNotFoundException
// when a null application id is provided
assertTrue(e instanceof ApplicationNotFoundException);
}
} finally {
rmService.close();
}
}
@Test
public void testGetApplicationAttemptReport() throws YarnException,
IOException {
ClientRMService rmService = createRMService();
GetApplicationAttemptReportRequest request = recordFactory
.newRecordInstance(GetApplicationAttemptReportRequest.class);
ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(
ApplicationId.newInstance(123456, 1), 1);
request.setApplicationAttemptId(attemptId);
try {
GetApplicationAttemptReportResponse response = rmService
.getApplicationAttemptReport(request);
assertEquals(attemptId, response.getApplicationAttemptReport()
.getApplicationAttemptId());
} catch (ApplicationNotFoundException ex) {
fail(ex.getMessage());
}
}
@Test
public void testGetApplicationResourceUsageReportDummy() throws YarnException,
IOException {
ApplicationAttemptId attemptId = getApplicationAttemptId(1);
ResourceScheduler scheduler = mockResourceScheduler();
RMContext rmContext = mock(RMContext.class);
mockRMContext(scheduler, rmContext);
when(rmContext.getDispatcher().getEventHandler()).thenReturn(
new EventHandler<Event>() {
public void handle(Event event) {
}
});
ApplicationSubmissionContext asContext =
mock(ApplicationSubmissionContext.class);
YarnConfiguration config = new YarnConfiguration();
RMAppAttemptImpl rmAppAttemptImpl = new RMAppAttemptImpl(attemptId,
rmContext, scheduler, null, asContext, config, null, null);
ApplicationResourceUsageReport report = rmAppAttemptImpl
.getApplicationResourceUsageReport();
assertEquals(report, RMServerUtils.DUMMY_APPLICATION_RESOURCE_USAGE_REPORT);
}
@Test
public void testGetApplicationAttempts() throws YarnException, IOException {
ClientRMService rmService = createRMService();
GetApplicationAttemptsRequest request = recordFactory
.newRecordInstance(GetApplicationAttemptsRequest.class);
ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(
ApplicationId.newInstance(123456, 1), 1);
request.setApplicationId(ApplicationId.newInstance(123456, 1));
try {
GetApplicationAttemptsResponse response = rmService
.getApplicationAttempts(request);
assertEquals(1, response.getApplicationAttemptList().size());
assertEquals(attemptId, response.getApplicationAttemptList()
.get(0).getApplicationAttemptId());
} catch (ApplicationNotFoundException ex) {
fail(ex.getMessage());
}
}
@Test
public void testGetContainerReport() throws YarnException, IOException {
ClientRMService rmService = createRMService();
GetContainerReportRequest request = recordFactory
.newRecordInstance(GetContainerReportRequest.class);
ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(
ApplicationId.newInstance(123456, 1), 1);
ContainerId containerId = ContainerId.newContainerId(attemptId, 1);
request.setContainerId(containerId);
try {
GetContainerReportResponse response = rmService
.getContainerReport(request);
assertEquals(containerId, response.getContainerReport()
.getContainerId());
} catch (ApplicationNotFoundException ex) {
fail(ex.getMessage());
}
}
@Test
public void testGetContainers() throws YarnException, IOException {
ClientRMService rmService = createRMService();
GetContainersRequest request = recordFactory
.newRecordInstance(GetContainersRequest.class);
ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(
ApplicationId.newInstance(123456, 1), 1);
ContainerId containerId = ContainerId.newContainerId(attemptId, 1);
request.setApplicationAttemptId(attemptId);
try {
GetContainersResponse response = rmService.getContainers(request);
assertEquals(containerId, response.getContainerList().get(0)
.getContainerId());
} catch (ApplicationNotFoundException ex) {
fail(ex.getMessage());
}
}
public ClientRMService createRMService() throws IOException, YarnException {
ResourceScheduler scheduler = mockResourceScheduler();
RMContext rmContext = mock(RMContext.class);
mockRMContext(scheduler, rmContext);
ConcurrentHashMap<ApplicationId, RMApp> apps = getRMApps(rmContext,
scheduler);
when(rmContext.getRMApps()).thenReturn(apps);
when(rmContext.getYarnConfiguration()).thenReturn(new Configuration());
RMAppManager appManager = new RMAppManager(rmContext, scheduler, null,
mock(ApplicationACLsManager.class), new Configuration());
when(rmContext.getDispatcher().getEventHandler()).thenReturn(
new EventHandler<Event>() {
public void handle(Event event) {
}
});
ApplicationACLsManager mockAclsManager = mock(ApplicationACLsManager.class);
QueueACLsManager mockQueueACLsManager = mock(QueueACLsManager.class);
when(
mockQueueACLsManager.checkAccess(any(UserGroupInformation.class),
any(QueueACL.class), any(RMApp.class), any(),
any())).thenReturn(true);
return new ClientRMService(rmContext, scheduler, appManager,
mockAclsManager, mockQueueACLsManager, null);
}
@Test
public void testForceKillNonExistingApplication() throws YarnException {
RMContext rmContext = mock(RMContext.class);
when(rmContext.getRMApps()).thenReturn(
new ConcurrentHashMap<ApplicationId, RMApp>());
ClientRMService rmService = new ClientRMService(rmContext, null, null,
null, null, null);
ApplicationId applicationId =
BuilderUtils.newApplicationId(System.currentTimeMillis(), 0);
KillApplicationRequest request =
KillApplicationRequest.newInstance(applicationId);
try {
rmService.forceKillApplication(request);
fail();
} catch (ApplicationNotFoundException ex) {
assertEquals(ex.getMessage(),
"Trying to kill an absent " +
"application " + request.getApplicationId());
}
}
@Test
public void testApplicationTagsValidation() throws IOException {
conf = new YarnConfiguration();
int maxtags = 3, appMaxTagLength = 5;
conf.setInt(YarnConfiguration.RM_APPLICATION_MAX_TAGS, maxtags);
conf.setInt(YarnConfiguration.RM_APPLICATION_MAX_TAG_LENGTH,
appMaxTagLength);
MockRM rm = new MockRM(conf);
resourceManager = rm;
rm.init(conf);
rm.start();
ClientRMService rmService = rm.getClientRMService();
List<String> tags = Arrays.asList("Tag1", "Tag2", "Tag3", "Tag4");
validateApplicationTag(rmService, tags,
"Too many applicationTags, a maximum of only " + maxtags
+ " are allowed!");
tags = Arrays.asList("ApplicationTag1", "ApplicationTag2",
"ApplicationTag3");
// tags are converted to lowercase in
// ApplicationSubmissionContext#setApplicationTags
validateApplicationTag(rmService, tags,
"Tag applicationtag1 is too long, maximum allowed length of a tag is "
+ appMaxTagLength);
tags = Arrays.asList("t��g1", "tag2#");
validateApplicationTag(rmService, tags,
"A tag can only have ASCII characters! Invalid tag - t��g1");
}
private void validateApplicationTag(ClientRMService rmService,
List<String> tags, String errorMsg) {
SubmitApplicationRequest submitRequest = mockSubmitAppRequest(
getApplicationId(101), MockApps.newAppName(), QUEUE_1,
new HashSet<String>(tags));
try {
rmService.submitApplication(submitRequest);
fail();
} catch (Exception ex) {
assertTrue(ex.getMessage().contains(errorMsg));
}
}
@Test
public void testForceKillApplication() throws Exception {
conf = new YarnConfiguration();
conf.setBoolean(MockRM.ENABLE_WEBAPP, true);
MockRM rm = new MockRM(conf);
resourceManager = rm;
rm.init(conf);
rm.start();
ClientRMService rmService = rm.getClientRMService();
GetApplicationsRequest getRequest = GetApplicationsRequest.newInstance(
EnumSet.of(YarnApplicationState.KILLED));
RMApp app1 = MockRMAppSubmitter.submitWithMemory(1024, rm);
MockRMAppSubmissionData data =
MockRMAppSubmissionData.Builder.createWithMemory(1024, rm)
.withUnmanagedAM(true)
.build();
RMApp app2 = MockRMAppSubmitter.submit(rm, data);
assertEquals(0, rmService.getApplications(getRequest).getApplicationList().size(),
"Incorrect number of apps in the RM");
KillApplicationRequest killRequest1 =
KillApplicationRequest.newInstance(app1.getApplicationId());
String diagnostic = "message1";
killRequest1.setDiagnostics(diagnostic);
KillApplicationRequest killRequest2 =
KillApplicationRequest.newInstance(app2.getApplicationId());
int killAttemptCount = 0;
for (int i = 0; i < 100; i++) {
KillApplicationResponse killResponse1 =
rmService.forceKillApplication(killRequest1);
killAttemptCount++;
if (killResponse1.getIsKillCompleted()) {
break;
}
Thread.sleep(10);
}
assertTrue(killAttemptCount > 1,
"Kill attempt count should be greater than 1 for managed AMs");
assertEquals(1,
rmService.getApplications(getRequest).getApplicationList().size(),
"Incorrect number of apps in the RM");
assertTrue(app1.getDiagnostics().toString().contains(diagnostic),
"Diagnostic message is incorrect");
KillApplicationResponse killResponse2 =
rmService.forceKillApplication(killRequest2);
assertTrue(killResponse2.getIsKillCompleted(),
"Killing UnmanagedAM should falsely acknowledge true");
for (int i = 0; i < 100; i++) {
if (2 ==
rmService.getApplications(getRequest).getApplicationList().size()) {
break;
}
Thread.sleep(10);
}
assertEquals(2, rmService.getApplications(getRequest).getApplicationList().size(),
"Incorrect number of apps in the RM");
}
@Test
public void testMoveAbsentApplication() throws YarnException {
assertThrows(ApplicationNotFoundException.class, () -> {
RMContext rmContext = mock(RMContext.class);
when(rmContext.getRMApps()).thenReturn(
new ConcurrentHashMap<ApplicationId, RMApp>());
ClientRMService rmService = new ClientRMService(rmContext, null, null,
null, null, null);
ApplicationId applicationId =
BuilderUtils.newApplicationId(System.currentTimeMillis(), 0);
MoveApplicationAcrossQueuesRequest request =
MoveApplicationAcrossQueuesRequest.newInstance(applicationId,
"newqueue");
rmService.moveApplicationAcrossQueues(request);
});
}
@Test
public void testMoveApplicationSubmitTargetQueue() throws Exception {
// move the application as the owner
ApplicationId applicationId = getApplicationId(1);
UserGroupInformation aclUGI = UserGroupInformation.getCurrentUser();
QueueACLsManager queueACLsManager = getQueueAclManager("allowed_queue",
QueueACL.SUBMIT_APPLICATIONS, aclUGI);
ApplicationACLsManager appAclsManager = getAppAclManager();
ClientRMService rmService = createClientRMServiceForMoveApplicationRequest(
applicationId, aclUGI.getShortUserName(), appAclsManager,
queueACLsManager);
// move as the owner queue in the acl
MoveApplicationAcrossQueuesRequest moveAppRequest =
MoveApplicationAcrossQueuesRequest.
newInstance(applicationId, "allowed_queue");
rmService.moveApplicationAcrossQueues(moveAppRequest);
// move as the owner queue not in the acl
moveAppRequest = MoveApplicationAcrossQueuesRequest.newInstance(
applicationId, "not_allowed");
try {
rmService.moveApplicationAcrossQueues(moveAppRequest);
fail("The request should fail with an AccessControlException");
} catch (YarnException rex) {
assertTrue(rex.getCause() instanceof AccessControlException,
"AccessControlException is expected");
}
// ACL is owned by "moveuser", move is performed as a different user
aclUGI = UserGroupInformation.createUserForTesting("moveuser",
new String[]{});
queueACLsManager = getQueueAclManager("move_queue",
QueueACL.SUBMIT_APPLICATIONS, aclUGI);
appAclsManager = getAppAclManager();
ClientRMService rmService2 =
createClientRMServiceForMoveApplicationRequest(applicationId,
aclUGI.getShortUserName(), appAclsManager, queueACLsManager);
// access to the queue not OK: user not allowed in this queue
MoveApplicationAcrossQueuesRequest moveAppRequest2 =
MoveApplicationAcrossQueuesRequest.
newInstance(applicationId, "move_queue");
try {
rmService2.moveApplicationAcrossQueues(moveAppRequest2);
fail("The request should fail with an AccessControlException");
} catch (YarnException rex) {
assertTrue(rex.getCause() instanceof AccessControlException,
"AccessControlException is expected");
}
// execute the move as the acl owner
// access to the queue OK: user allowed in this queue
aclUGI.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
return rmService2.moveApplicationAcrossQueues(moveAppRequest2);
}
});
}
@Test
public void testMoveApplicationAdminTargetQueue() throws Exception {
ApplicationId applicationId = getApplicationId(1);
UserGroupInformation aclUGI = UserGroupInformation.getCurrentUser();
QueueACLsManager queueAclsManager = getQueueAclManager("allowed_queue",
QueueACL.ADMINISTER_QUEUE, aclUGI);
ApplicationACLsManager appAclsManager = getAppAclManager();
ClientRMService rmService =
createClientRMServiceForMoveApplicationRequest(applicationId,
aclUGI.getShortUserName(), appAclsManager, queueAclsManager);
// user is admin move to queue in acl
MoveApplicationAcrossQueuesRequest moveAppRequest =
MoveApplicationAcrossQueuesRequest.newInstance(applicationId,
"allowed_queue");
rmService.moveApplicationAcrossQueues(moveAppRequest);
// user is admin move to queue not in acl
moveAppRequest = MoveApplicationAcrossQueuesRequest.newInstance(
applicationId, "not_allowed");
try {
rmService.moveApplicationAcrossQueues(moveAppRequest);
fail("The request should fail with an AccessControlException");
} catch (YarnException rex) {
assertTrue(rex.getCause() instanceof AccessControlException,
"AccessControlException is expected");
}
// ACL is owned by "moveuser", move is performed as a different user
aclUGI = UserGroupInformation.createUserForTesting("moveuser",
new String[]{});
queueAclsManager = getQueueAclManager("move_queue",
QueueACL.ADMINISTER_QUEUE, aclUGI);
appAclsManager = getAppAclManager();
ClientRMService rmService2 =
createClientRMServiceForMoveApplicationRequest(applicationId,
aclUGI.getShortUserName(), appAclsManager, queueAclsManager);
// no access to this queue
MoveApplicationAcrossQueuesRequest moveAppRequest2 =
MoveApplicationAcrossQueuesRequest.
newInstance(applicationId, "move_queue");
try {
rmService2.moveApplicationAcrossQueues(moveAppRequest2);
fail("The request should fail with an AccessControlException");
} catch (YarnException rex) {
assertTrue(rex.getCause() instanceof AccessControlException,
"AccessControlException is expected");
}
// execute the move as the acl owner
// access to the queue OK: user allowed in this queue
aclUGI.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
return rmService2.moveApplicationAcrossQueues(moveAppRequest2);
}
});
}
@Test
public void testNonExistingQueue() throws Exception {
assertThrows(YarnException.class, () -> {
ApplicationId applicationId = getApplicationId(1);
UserGroupInformation aclUGI = UserGroupInformation.getCurrentUser();
QueueACLsManager queueAclsManager = getQueueAclManager();
ApplicationACLsManager appAclsManager = getAppAclManager();
ClientRMService rmService =
createClientRMServiceForMoveApplicationRequest(applicationId,
aclUGI.getShortUserName(), appAclsManager, queueAclsManager);
MoveApplicationAcrossQueuesRequest moveAppRequest =
MoveApplicationAcrossQueuesRequest.newInstance(applicationId,
"unknown_queue");
rmService.moveApplicationAcrossQueues(moveAppRequest);
});
}
/**
* Create an instance of ClientRMService for testing
* moveApplicationAcrossQueues requests.
* @param applicationId the application
* @return ClientRMService
*/
private ClientRMService createClientRMServiceForMoveApplicationRequest(
ApplicationId applicationId, String appOwner,
ApplicationACLsManager appAclsManager,
QueueACLsManager queueAclsManager) {
RMApp app = mock(RMApp.class);
when(app.getUser()).thenReturn(appOwner);
when(app.getState()).thenReturn(RMAppState.RUNNING);
when(app.getApplicationId()).thenReturn(applicationId);
ConcurrentHashMap<ApplicationId, RMApp> apps = new ConcurrentHashMap<>();
apps.put(applicationId, app);
RMContext rmContext = mock(RMContext.class);
when(rmContext.getRMApps()).thenReturn(apps);
RMAppManager rmAppManager = mock(RMAppManager.class);
return new ClientRMService(rmContext, null, rmAppManager, appAclsManager,
queueAclsManager, null);
}
/**
* Plain application acl manager that always returns true.
* @return ApplicationACLsManager
*/
private ApplicationACLsManager getAppAclManager() {
ApplicationACLsManager aclsManager = mock(ApplicationACLsManager.class);
when(aclsManager.checkAccess(
any(UserGroupInformation.class),
any(ApplicationAccessType.class),
any(String.class),
any(ApplicationId.class))).thenReturn(true);
return aclsManager;
}
/**
* Generate the Queue acl.
* @param allowedQueue the queue to allow the move to
* @param queueACL the acl to check: submit app or queue admin
* @param aclUser the user to check
* @return QueueACLsManager
*/
private QueueACLsManager getQueueAclManager(String allowedQueue,
QueueACL queueACL, UserGroupInformation aclUser) throws IOException {
// ACL that checks the queue is allowed
QueueACLsManager queueACLsManager = mock(QueueACLsManager.class);
when(queueACLsManager.checkAccess(
any(UserGroupInformation.class),
any(QueueACL.class),
any(RMApp.class),
any(),
any())).thenAnswer(new Answer<Boolean>() {
@Override
public Boolean answer(InvocationOnMock invocationOnMock) {
final UserGroupInformation user =
(UserGroupInformation) invocationOnMock.getArguments()[0];
final QueueACL acl =
(QueueACL) invocationOnMock.getArguments()[1];
return (queueACL.equals(acl) &&
aclUser.getShortUserName().equals(user.getShortUserName()));
}
});
when(queueACLsManager.checkAccess(
any(UserGroupInformation.class),
any(QueueACL.class),
any(RMApp.class),
any(),
any(),
any(String.class))).thenAnswer(new Answer<Boolean>() {
@Override
public Boolean answer(InvocationOnMock invocationOnMock) {
final UserGroupInformation user =
(UserGroupInformation) invocationOnMock.getArguments()[0];
final QueueACL acl = (QueueACL) invocationOnMock.getArguments()[1];
final String queue = (String) invocationOnMock.getArguments()[5];
return (allowedQueue.equals(queue) && queueACL.equals(acl) &&
aclUser.getShortUserName().equals(user.getShortUserName()));
}
});
return queueACLsManager;
}
/**
* QueueACLsManager that always returns false when a target queue is passed
* in and true for other checks to simulate a missing queue.
* @return QueueACLsManager
*/
private QueueACLsManager getQueueAclManager() {
QueueACLsManager queueACLsManager = mock(QueueACLsManager.class);
when(queueACLsManager.checkAccess(
any(UserGroupInformation.class),
any(QueueACL.class),
any(RMApp.class),
any(String.class),
anyList(),
any(String.class))).thenReturn(false);
when(queueACLsManager.checkAccess(
any(UserGroupInformation.class),
any(QueueACL.class),
any(RMApp.class),
any(String.class),
anyList())).thenReturn(true);
return queueACLsManager;
}
@Test
public void testGetQueueInfo() throws Exception {
ResourceScheduler scheduler = mock(ResourceScheduler.class);
RMContext rmContext = mock(RMContext.class);
mockRMContext(scheduler, rmContext);
ApplicationACLsManager mockAclsManager = mock(ApplicationACLsManager.class);
QueueACLsManager mockQueueACLsManager = mock(QueueACLsManager.class);
when(mockQueueACLsManager.checkAccess(any(UserGroupInformation.class),
any(QueueACL.class), any(RMApp.class), any(String.class),
any()))
.thenReturn(true);
when(mockAclsManager.checkAccess(any(UserGroupInformation.class),
any(ApplicationAccessType.class), any(),
any(ApplicationId.class))).thenReturn(true);
ClientRMService rmService = new ClientRMService(rmContext, scheduler,
null, mockAclsManager, mockQueueACLsManager, null);
GetQueueInfoRequest request = recordFactory
.newRecordInstance(GetQueueInfoRequest.class);
request.setQueueName("testqueue");
request.setIncludeApplications(true);
GetQueueInfoResponse queueInfo = rmService.getQueueInfo(request);
List<ApplicationReport> applications = queueInfo.getQueueInfo()
.getApplications();
assertEquals(2, applications.size());
Map<String, QueueConfigurations> queueConfigsByPartition =
queueInfo.getQueueInfo().getQueueConfigurations();
assertEquals(1, queueConfigsByPartition.size());
assertTrue(queueConfigsByPartition.containsKey("*"));
QueueConfigurations queueConfigs = queueConfigsByPartition.get("*");
assertEquals(0.5f, queueConfigs.getCapacity(), 0.0001f);
assertEquals(0.1f, queueConfigs.getAbsoluteCapacity(), 0.0001f);
assertEquals(1.0f, queueConfigs.getMaxCapacity(), 0.0001f);
assertEquals(1.0f, queueConfigs.getAbsoluteMaxCapacity(), 0.0001f);
assertEquals(0.2f, queueConfigs.getMaxAMPercentage(), 0.0001f);
request.setQueueName("nonexistentqueue");
request.setIncludeApplications(true);
// should not throw exception on nonexistent queue
queueInfo = rmService.getQueueInfo(request);
// Case where user does not have application access
ApplicationACLsManager mockAclsManager1 =
mock(ApplicationACLsManager.class);
QueueACLsManager mockQueueACLsManager1 =
mock(QueueACLsManager.class);
when(mockQueueACLsManager1.checkAccess(any(UserGroupInformation.class),
any(QueueACL.class), any(RMApp.class), any(String.class),
any()))
.thenReturn(false);
when(mockAclsManager1.checkAccess(any(UserGroupInformation.class),
any(ApplicationAccessType.class), anyString(),
any(ApplicationId.class))).thenReturn(false);
ClientRMService rmService1 = new ClientRMService(rmContext, scheduler,
null, mockAclsManager1, mockQueueACLsManager1, null);
request.setQueueName("testqueue");
request.setIncludeApplications(true);
GetQueueInfoResponse queueInfo1 = rmService1.getQueueInfo(request);
List<ApplicationReport> applications1 = queueInfo1.getQueueInfo()
.getApplications();
assertEquals(0, applications1.size());
}
@Test
@Timeout(value = 30)
@SuppressWarnings ("rawtypes")
public void testAppSubmitWithSubmissionPreProcessor() throws Exception {
ResourceScheduler scheduler = mockResourceScheduler();
RMContext rmContext = mock(RMContext.class);
mockRMContext(scheduler, rmContext);
YarnConfiguration yConf = new YarnConfiguration();
yConf.setBoolean(YarnConfiguration.RM_SUBMISSION_PREPROCESSOR_ENABLED,
true);
yConf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true);
// Override the YARN configuration.
when(rmContext.getYarnConfiguration()).thenReturn(yConf);
RMStateStore stateStore = mock(RMStateStore.class);
when(rmContext.getStateStore()).thenReturn(stateStore);
RMAppManager appManager = new RMAppManager(rmContext, scheduler,
null, mock(ApplicationACLsManager.class), new Configuration());
when(rmContext.getDispatcher().getEventHandler()).thenReturn(
new EventHandler<Event>() {
public void handle(Event event) {}
});
ApplicationId appId1 = getApplicationId(100);
ApplicationACLsManager mockAclsManager = mock(ApplicationACLsManager.class);
when(
mockAclsManager.checkAccess(UserGroupInformation.getCurrentUser(),
ApplicationAccessType.VIEW_APP, null, appId1)).thenReturn(true);
QueueACLsManager mockQueueACLsManager = mock(QueueACLsManager.class);
when(mockQueueACLsManager.checkAccess(any(UserGroupInformation.class),
any(QueueACL.class), any(RMApp.class), any(String.class),
any()))
.thenReturn(true);
ClientRMService rmService =
new ClientRMService(rmContext, scheduler, appManager,
mockAclsManager, mockQueueACLsManager, null);
File rulesFile = File.createTempFile("submission_rules", ".tmp");
rulesFile.deleteOnExit();
rulesFile.createNewFile();
yConf.set(YarnConfiguration.RM_SUBMISSION_PREPROCESSOR_FILE_PATH,
rulesFile.getAbsolutePath());
rmService.serviceInit(yConf);
rmService.serviceStart();
BufferedWriter writer = new BufferedWriter(new FileWriter(rulesFile));
writer.write("host.cluster1.com NL=foo Q=bar TA=cluster:cluster1");
writer.newLine();
writer.write("host.cluster2.com Q=hello NL=zuess TA=cluster:cluster2");
writer.newLine();
writer.write("host.cluster.*.com Q=hello NL=reg TA=cluster:reg");
writer.newLine();
writer.write("host.cluster.*.com Q=hello NL=reg TA=cluster:reg");
writer.newLine();
writer.write("* TA=cluster:other Q=default NL=barfoo");
writer.newLine();
writer.write("host.testcluster1.com Q=default");
writer.flush();
writer.close();
rmService.getContextPreProcessor().refresh();
setupCurrentCall("host.cluster1.com");
SubmitApplicationRequest submitRequest1 = mockSubmitAppRequest(
appId1, null, null);
try {
rmService.submitApplication(submitRequest1);
} catch (YarnException e) {
fail("Exception is not expected.");
}
RMApp app1 = rmContext.getRMApps().get(appId1);
assertNotNull(app1, "app doesn't exist");
assertEquals(YarnConfiguration.DEFAULT_APPLICATION_NAME, app1.getName(),
"app name doesn't match");
assertTrue(app1.getApplicationTags().contains("cluster:cluster1"),
"custom tag not present");
assertEquals("bar", app1.getQueue(), "app queue doesn't match");
assertEquals("foo",
app1.getApplicationSubmissionContext().getNodeLabelExpression(),
"app node label doesn't match");
setupCurrentCall("host.cluster2.com");
ApplicationId appId2 = getApplicationId(101);
SubmitApplicationRequest submitRequest2 = mockSubmitAppRequest(
appId2, null, null);
submitRequest2.getApplicationSubmissionContext().setApplicationType(
"matchType");
Set<String> aTags = new HashSet<String>();
aTags.add(APPLICATION_TAG_SC_PREPROCESSOR);
submitRequest2.getApplicationSubmissionContext().setApplicationTags(aTags);
try {
rmService.submitApplication(submitRequest2);
} catch (YarnException e) {
fail("Exception is not expected.");
}
RMApp app2 = rmContext.getRMApps().get(appId2);
assertNotNull(app2, "app doesn't exist");
assertEquals(YarnConfiguration.DEFAULT_APPLICATION_NAME, app2.getName(),
"app name doesn't match");
assertTrue(app2.getApplicationTags().contains(APPLICATION_TAG_SC_PREPROCESSOR),
"client tag not present");
assertTrue(app2.getApplicationTags().contains("cluster:cluster2"),
"custom tag not present");
assertEquals("hello", app2.getQueue(), "app queue doesn't match");
assertEquals("zuess",
app2.getApplicationSubmissionContext().getNodeLabelExpression(),
"app node label doesn't match");
// Test Default commands
setupCurrentCall("host2.cluster3.com");
ApplicationId appId3 = getApplicationId(102);
SubmitApplicationRequest submitRequest3 = mockSubmitAppRequest(
appId3, null, null);
submitRequest3.getApplicationSubmissionContext().setApplicationType(
"matchType");
submitRequest3.getApplicationSubmissionContext().setApplicationTags(aTags);
try {
rmService.submitApplication(submitRequest3);
} catch (YarnException e) {
fail("Exception is not expected.");
}
RMApp app3 = rmContext.getRMApps().get(appId3);
assertNotNull(app3, "app doesn't exist");
assertEquals(YarnConfiguration.DEFAULT_APPLICATION_NAME, app3.getName(),
"app name doesn't match");
assertTrue(app3.getApplicationTags().contains(APPLICATION_TAG_SC_PREPROCESSOR),
"client tag not present");
assertTrue(app3.getApplicationTags().contains("cluster:other"),
"custom tag not present");
assertEquals("default", app3.getQueue(), "app queue doesn't match");
assertEquals("barfoo", app3.getApplicationSubmissionContext().getNodeLabelExpression(),
"app node label doesn't match");
// Test regex
setupCurrentCall("host.cluster100.com");
ApplicationId appId4 = getApplicationId(103);
SubmitApplicationRequest submitRequest4 = mockSubmitAppRequest(
appId4, null, null);
try {
rmService.submitApplication(submitRequest4);
} catch (YarnException e) {
fail("Exception is not expected.");
}
RMApp app4 = rmContext.getRMApps().get(appId4);
assertTrue(app4.getApplicationTags().contains("cluster:reg"),
"custom tag not present");
assertEquals("reg",
app4.getApplicationSubmissionContext().getNodeLabelExpression(),
"app node label doesn't match");
testSubmissionContextWithAbsentTAG(rmService, rmContext);
rmService.serviceStop();
}
private void testSubmissionContextWithAbsentTAG(ClientRMService rmService,
RMContext rmContext) throws Exception {
setupCurrentCall("host.testcluster1.com");
ApplicationId appId5 = getApplicationId(104);
SubmitApplicationRequest submitRequest5 = mockSubmitAppRequest(
appId5, null, null);
try {
rmService.submitApplication(submitRequest5);
} catch (YarnException e) {
fail("Exception is not expected.");
}
RMApp app5 = rmContext.getRMApps().get(appId5);
assertEquals(app5.getApplicationTags().size(), 0, "custom tag present");
assertNull(app5.getApplicationSubmissionContext().getNodeLabelExpression(),
"app node label present");
assertEquals(app5.getQueue(), "default",
"Queue name is not present");
}
private void setupCurrentCall(String hostName) throws UnknownHostException {
Server.Call mockCall = mock(Server.Call.class);
when(mockCall.getHostInetAddress()).thenReturn(
InetAddress.getByAddress(hostName,
new byte[]{123, 123, 123, 123}));
Server.getCurCall().set(mockCall);
}
@Test
@Timeout(value = 30)
@SuppressWarnings ("rawtypes")
public void testAppSubmit() throws Exception {
ResourceScheduler scheduler = mockResourceScheduler();
RMContext rmContext = mock(RMContext.class);
mockRMContext(scheduler, rmContext);
RMStateStore stateStore = mock(RMStateStore.class);
when(rmContext.getStateStore()).thenReturn(stateStore);
RMAppManager appManager = new RMAppManager(rmContext, scheduler,
null, mock(ApplicationACLsManager.class), new Configuration());
when(rmContext.getDispatcher().getEventHandler()).thenReturn(
new EventHandler<Event>() {
public void handle(Event event) {}
});
doReturn(mock(RMTimelineCollectorManager.class)).when(rmContext)
.getRMTimelineCollectorManager();
ApplicationId appId1 = getApplicationId(100);
ApplicationACLsManager mockAclsManager = mock(ApplicationACLsManager.class);
when(
mockAclsManager.checkAccess(UserGroupInformation.getCurrentUser(),
ApplicationAccessType.VIEW_APP, null, appId1)).thenReturn(true);
QueueACLsManager mockQueueACLsManager = mock(QueueACLsManager.class);
when(mockQueueACLsManager.checkAccess(any(UserGroupInformation.class),
any(QueueACL.class), any(RMApp.class), any(String.class),
any()))
.thenReturn(true);
ClientRMService rmService =
new ClientRMService(rmContext, scheduler, appManager,
mockAclsManager, mockQueueACLsManager, null);
rmService.init(new Configuration());
// without name and queue
SubmitApplicationRequest submitRequest1 = mockSubmitAppRequest(
appId1, null, null);
try {
rmService.submitApplication(submitRequest1);
} catch (YarnException e) {
fail("Exception is not expected.");
}
RMApp app1 = rmContext.getRMApps().get(appId1);
assertNotNull(app1, "app doesn't exist");
assertEquals(YarnConfiguration.DEFAULT_APPLICATION_NAME, app1.getName(),
"app name doesn't match");
assertEquals(YarnConfiguration.DEFAULT_QUEUE_NAME, app1.getQueue(),
"app queue doesn't match");
// with name and queue
String name = MockApps.newAppName();
String queue = MockApps.newQueue();
ApplicationId appId2 = getApplicationId(101);
SubmitApplicationRequest submitRequest2 = mockSubmitAppRequest(
appId2, name, queue);
submitRequest2.getApplicationSubmissionContext().setApplicationType(
"matchType");
try {
rmService.submitApplication(submitRequest2);
} catch (YarnException e) {
fail("Exception is not expected.");
}
RMApp app2 = rmContext.getRMApps().get(appId2);
assertNotNull(app2, "app doesn't exist");
assertEquals(name, app2.getName(), "app name doesn't match");
assertEquals(queue, app2.getQueue(), "app queue doesn't match");
// duplicate appId
try {
rmService.submitApplication(submitRequest2);
} catch (YarnException e) {
fail("Exception is not expected.");
}
GetApplicationsRequest getAllAppsRequest =
GetApplicationsRequest.newInstance(new HashSet<String>());
GetApplicationsResponse getAllApplicationsResponse =
rmService.getApplications(getAllAppsRequest);
assertEquals(5,
getAllApplicationsResponse.getApplicationList().size());
Set<String> appTypes = new HashSet<String>();
appTypes.add("matchType");
getAllAppsRequest = GetApplicationsRequest.newInstance(appTypes);
getAllApplicationsResponse =
rmService.getApplications(getAllAppsRequest);
assertEquals(1,
getAllApplicationsResponse.getApplicationList().size());
assertEquals(appId2,
getAllApplicationsResponse.getApplicationList()
.get(0).getApplicationId());
// Test query with uppercase appType also works
appTypes = new HashSet<String>();
appTypes.add("MATCHTYPE");
getAllAppsRequest = GetApplicationsRequest.newInstance(appTypes);
getAllApplicationsResponse =
rmService.getApplications(getAllAppsRequest);
assertEquals(1,
getAllApplicationsResponse.getApplicationList().size());
assertEquals(appId2,
getAllApplicationsResponse.getApplicationList()
.get(0).getApplicationId());
}
@Test
public void testGetApplications() throws Exception {
/**
* 1. Submit 3 applications alternately in two queues
* 2. Test each of the filters
*/
// Basic setup
ResourceScheduler scheduler = mockResourceScheduler();
RMContext rmContext = mock(RMContext.class);
mockRMContext(scheduler, rmContext);
RMStateStore stateStore = mock(RMStateStore.class);
when(rmContext.getStateStore()).thenReturn(stateStore);
doReturn(mock(RMTimelineCollectorManager.class)).when(rmContext)
.getRMTimelineCollectorManager();
RMAppManager appManager = new RMAppManager(rmContext, scheduler,
null, mock(ApplicationACLsManager.class), new Configuration());
when(rmContext.getDispatcher().getEventHandler()).thenReturn(
new EventHandler<Event>() {
public void handle(Event event) {}
});
ApplicationACLsManager mockAclsManager = mock(ApplicationACLsManager.class);
QueueACLsManager mockQueueACLsManager = mock(QueueACLsManager.class);
when(mockQueueACLsManager.checkAccess(any(UserGroupInformation.class),
any(QueueACL.class), any(RMApp.class), any(),
any()))
.thenReturn(true);
ClientRMService rmService =
new ClientRMService(rmContext, scheduler, appManager,
mockAclsManager, mockQueueACLsManager, null);
rmService.init(new Configuration());
// Initialize appnames and queues
String[] queues = {QUEUE_1, QUEUE_2};
String[] appNames =
{MockApps.newAppName(), MockApps.newAppName(), MockApps.newAppName()};
ApplicationId[] appIds =
{getApplicationId(101), getApplicationId(102), getApplicationId(103)};
List<String> tags = Arrays.asList("Tag1", "Tag2", "Tag3");
long[] submitTimeMillis = new long[3];
// Submit applications
for (int i = 0; i < appIds.length; i++) {
ApplicationId appId = appIds[i];
when(mockAclsManager.checkAccess(UserGroupInformation.getCurrentUser(),
ApplicationAccessType.VIEW_APP, null, appId)).thenReturn(true);
SubmitApplicationRequest submitRequest = mockSubmitAppRequest(
appId, appNames[i], queues[i % queues.length],
new HashSet<String>(tags.subList(0, i + 1)));
// make sure each app is submitted at a different time
Thread.sleep(1);
rmService.submitApplication(submitRequest);
submitTimeMillis[i] = rmService.getApplicationReport(
GetApplicationReportRequest.newInstance(appId))
.getApplicationReport().getStartTime();
}
// Test different cases of ClientRMService#getApplications()
GetApplicationsRequest request = GetApplicationsRequest.newInstance();
assertEquals(6,
rmService.getApplications(request).getApplicationList().size(),
"Incorrect total number of apps");
// Check limit
request.setLimit(1L);
assertEquals(1,
rmService.getApplications(request).getApplicationList().size(),
"Failed to limit applications");
// Check start range
request = GetApplicationsRequest.newInstance();
request.setStartRange(submitTimeMillis[0] + 1, System.currentTimeMillis());
// 2 applications are submitted after first timeMills
assertEquals(2, rmService.getApplications(request).getApplicationList().size(),
"Incorrect number of matching start range");
// 1 application is submitted after the second timeMills
request.setStartRange(submitTimeMillis[1] + 1, System.currentTimeMillis());
assertEquals(1, rmService.getApplications(request).getApplicationList().size(),
"Incorrect number of matching start range");
// no application is submitted after the third timeMills
request.setStartRange(submitTimeMillis[2] + 1, System.currentTimeMillis());
assertEquals(0, rmService.getApplications(request).getApplicationList().size(),
"Incorrect number of matching start range");
// Check queue
request = GetApplicationsRequest.newInstance();
Set<String> queueSet = new HashSet<String>();
request.setQueues(queueSet);
queueSet.add(queues[0]);
assertEquals(3, rmService.getApplications(request).getApplicationList().size(),
"Incorrect number of applications in queue");
assertEquals(3, rmService.getApplications(request).getApplicationList().size(),
"Incorrect number of applications in queue");
queueSet.add(queues[1]);
assertEquals(3, rmService.getApplications(request).getApplicationList().size(),
"Incorrect number of applications in queue");
// Check user
request = GetApplicationsRequest.newInstance();
Set<String> userSet = new HashSet<String>();
request.setUsers(userSet);
userSet.add("random-user-name");
assertEquals(0, rmService.getApplications(request).getApplicationList().size(),
"Incorrect number of applications for user");
userSet.add(UserGroupInformation.getCurrentUser().getShortUserName());
assertEquals(3, rmService.getApplications(request).getApplicationList().size(),
"Incorrect number of applications for user");
rmService.setDisplayPerUserApps(true);
userSet.clear();
assertEquals(6, rmService.getApplications(request).getApplicationList().size(),
"Incorrect number of applications for user");
rmService.setDisplayPerUserApps(false);
// Check tags
request = GetApplicationsRequest.newInstance(
ApplicationsRequestScope.ALL, null, null, null, null, null, null,
null, null);
Set<String> tagSet = new HashSet<String>();
request.setApplicationTags(tagSet);
assertEquals(6, rmService.getApplications(request).getApplicationList().size(),
"Incorrect number of matching tags");
tagSet = Sets.newHashSet(tags.get(0));
request.setApplicationTags(tagSet);
assertEquals(3, rmService.getApplications(request).getApplicationList().size(),
"Incorrect number of matching tags");
tagSet = Sets.newHashSet(tags.get(1));
request.setApplicationTags(tagSet);
assertEquals(2, rmService.getApplications(request).getApplicationList().size(),
"Incorrect number of matching tags");
tagSet = Sets.newHashSet(tags.get(2));
request.setApplicationTags(tagSet);
assertEquals(1, rmService.getApplications(request).getApplicationList().size(),
"Incorrect number of matching tags");
// Check scope
request = GetApplicationsRequest.newInstance(
ApplicationsRequestScope.VIEWABLE);
assertEquals(6, rmService.getApplications(request).getApplicationList().size(),
"Incorrect number of applications for the scope");
request = GetApplicationsRequest.newInstance(
ApplicationsRequestScope.OWN);
assertEquals(3, rmService.getApplications(request).getApplicationList().size(),
"Incorrect number of applications for the scope");
}
@Test
@Timeout(value = 4)
public void testConcurrentAppSubmit()
throws IOException, InterruptedException, BrokenBarrierException,
YarnException {
ResourceScheduler scheduler = mockResourceScheduler();
RMContext rmContext = mock(RMContext.class);
mockRMContext(scheduler, rmContext);
RMStateStore stateStore = mock(RMStateStore.class);
when(rmContext.getStateStore()).thenReturn(stateStore);
RMAppManager appManager = new RMAppManager(rmContext, scheduler,
null, mock(ApplicationACLsManager.class), new Configuration());
final ApplicationId appId1 = getApplicationId(100);
final ApplicationId appId2 = getApplicationId(101);
final SubmitApplicationRequest submitRequest1 = mockSubmitAppRequest(
appId1, null, null);
final SubmitApplicationRequest submitRequest2 = mockSubmitAppRequest(
appId2, null, null);
final CyclicBarrier startBarrier = new CyclicBarrier(2);
final CyclicBarrier endBarrier = new CyclicBarrier(2);
EventHandler<Event> eventHandler = new EventHandler<Event>() {
@Override
public void handle(Event rawEvent) {
if (rawEvent instanceof RMAppEvent) {
RMAppEvent event = (RMAppEvent) rawEvent;
if (event.getApplicationId().equals(appId1)) {
try {
startBarrier.await();
endBarrier.await();
} catch (BrokenBarrierException e) {
LOG.warn("Broken Barrier", e);
} catch (InterruptedException e) {
LOG.warn("Interrupted while awaiting barriers", e);
}
}
}
}
};
when(rmContext.getDispatcher().getEventHandler()).thenReturn(eventHandler);
doReturn(mock(RMTimelineCollectorManager.class)).when(rmContext)
.getRMTimelineCollectorManager();
final ClientRMService rmService =
new ClientRMService(rmContext, scheduler, appManager, null, null,
null);
rmService.init(new Configuration());
// submit an app and wait for it to block while in app submission
Thread t = new Thread() {
@Override
public void run() {
try {
rmService.submitApplication(submitRequest1);
} catch (YarnException | IOException e) {}
}
};
t.start();
// submit another app, so go through while the first app is blocked
startBarrier.await();
rmService.submitApplication(submitRequest2);
endBarrier.await();
t.join();
}
private SubmitApplicationRequest mockSubmitAppRequest(ApplicationId appId,
String name, String queue) {
return mockSubmitAppRequest(appId, name, queue, null);
}
private SubmitApplicationRequest mockSubmitAppRequest(ApplicationId appId,
String name, String queue, Set<String> tags) {
return mockSubmitAppRequest(appId, name, queue, tags, false);
}
@SuppressWarnings("deprecation")
private SubmitApplicationRequest mockSubmitAppRequest(ApplicationId appId,
String name, String queue, Set<String> tags, boolean unmanaged) {
ContainerLaunchContext amContainerSpec = mock(ContainerLaunchContext.class);
Resource resource = Resources.createResource(
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
ApplicationSubmissionContext submissionContext =
recordFactory.newRecordInstance(ApplicationSubmissionContext.class);
submissionContext.setAMContainerSpec(amContainerSpec);
submissionContext.setApplicationName(name);
submissionContext.setQueue(queue);
submissionContext.setApplicationId(appId);
submissionContext.setResource(resource);
submissionContext.setApplicationType(appType);
submissionContext.setApplicationTags(tags);
submissionContext.setUnmanagedAM(unmanaged);
submissionContext.setPriority(Priority.newInstance(0));
SubmitApplicationRequest submitRequest =
recordFactory.newRecordInstance(SubmitApplicationRequest.class);
submitRequest.setApplicationSubmissionContext(submissionContext);
return submitRequest;
}
private void mockRMContext(ResourceScheduler scheduler, RMContext rmContext)
throws IOException {
Dispatcher dispatcher = mock(Dispatcher.class);
when(rmContext.getDispatcher()).thenReturn(dispatcher);
@SuppressWarnings("unchecked")
EventHandler<Event> eventHandler = mock(EventHandler.class);
when(dispatcher.getEventHandler()).thenReturn(eventHandler);
QueueInfo queInfo = recordFactory.newRecordInstance(QueueInfo.class);
queInfo.setQueueName("testqueue");
QueueConfigurations queueConfigs =
recordFactory.newRecordInstance(QueueConfigurations.class);
queueConfigs.setCapacity(0.5f);
queueConfigs.setAbsoluteCapacity(0.1f);
queueConfigs.setMaxCapacity(1.0f);
queueConfigs.setAbsoluteMaxCapacity(1.0f);
queueConfigs.setMaxAMPercentage(0.2f);
Map<String, QueueConfigurations> queueConfigsByPartition =
new HashMap<>();
queueConfigsByPartition.put("*", queueConfigs);
queInfo.setQueueConfigurations(queueConfigsByPartition);
when(scheduler.getQueueInfo(eq("testqueue"), anyBoolean(), anyBoolean()))
.thenReturn(queInfo);
when(scheduler.getQueueInfo(eq("nonexistentqueue"), anyBoolean(),
anyBoolean())).thenThrow(new IOException("queue does not exist"));
RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class);
when(rmContext.getRMApplicationHistoryWriter()).thenReturn(writer);
SystemMetricsPublisher publisher = mock(SystemMetricsPublisher.class);
when(rmContext.getSystemMetricsPublisher()).thenReturn(publisher);
when(rmContext.getYarnConfiguration()).thenReturn(new YarnConfiguration());
ConcurrentHashMap<ApplicationId, RMApp> apps =
getRMApps(rmContext, scheduler);
when(rmContext.getRMApps()).thenReturn(apps);
when(scheduler.getAppsInQueue(eq("testqueue"))).thenReturn(
getSchedulerApps(apps));
when(rmContext.getScheduler()).thenReturn(scheduler);
}
private ConcurrentHashMap<ApplicationId, RMApp> getRMApps(
RMContext rmContext, YarnScheduler yarnScheduler) {
ConcurrentHashMap<ApplicationId, RMApp> apps =
new ConcurrentHashMap<ApplicationId, RMApp>();
ApplicationId applicationId1 = getApplicationId(1);
ApplicationId applicationId2 = getApplicationId(2);
ApplicationId applicationId3 = getApplicationId(3);
YarnConfiguration config = new YarnConfiguration();
apps.put(applicationId1, getRMApp(rmContext, yarnScheduler, applicationId1,
config, "testqueue", 10, 3, null, null));
apps.put(applicationId2, getRMApp(rmContext, yarnScheduler, applicationId2,
config, "a", 20, 2, null, ""));
apps.put(applicationId3, getRMApp(rmContext, yarnScheduler, applicationId3,
config, "testqueue", 40, 5, "high-mem", "high-mem"));
return apps;
}
private List<ApplicationAttemptId> getSchedulerApps(
Map<ApplicationId, RMApp> apps) {
List<ApplicationAttemptId> schedApps = new ArrayList<ApplicationAttemptId>();
// Return app IDs for the apps in testqueue (as defined in getRMApps)
schedApps.add(ApplicationAttemptId.newInstance(getApplicationId(1), 0));
schedApps.add(ApplicationAttemptId.newInstance(getApplicationId(3), 0));
return schedApps;
}
private static ApplicationId getApplicationId(int id) {
return ApplicationId.newInstance(123456, id);
}
private static ApplicationAttemptId getApplicationAttemptId(int id) {
return ApplicationAttemptId.newInstance(getApplicationId(id), 1);
}
private RMAppImpl getRMApp(RMContext rmContext, YarnScheduler yarnScheduler,
ApplicationId applicationId3, YarnConfiguration config, String queueName,
final long memorySeconds, final long vcoreSeconds,
String appNodeLabelExpression, String amNodeLabelExpression) {
ApplicationSubmissionContext asContext = mock(ApplicationSubmissionContext.class);
when(asContext.getMaxAppAttempts()).thenReturn(1);
when(asContext.getNodeLabelExpression()).thenReturn(appNodeLabelExpression);
when(asContext.getPriority()).thenReturn(Priority.newInstance(0));
RMAppImpl app =
spy(new RMAppImpl(applicationId3, rmContext, config, null, null,
queueName, asContext, yarnScheduler, null,
System.currentTimeMillis(), "YARN", null,
Collections.singletonList(BuilderUtils.newResourceRequest(
RMAppAttemptImpl.AM_CONTAINER_PRIORITY, ResourceRequest.ANY,
Resource.newInstance(1024, 1), 1))){
@Override
public ApplicationReport createAndGetApplicationReport(
String clientUserName, boolean allowAccess) {
ApplicationReport report = super.createAndGetApplicationReport(
clientUserName, allowAccess);
ApplicationResourceUsageReport usageReport =
report.getApplicationResourceUsageReport();
usageReport.setMemorySeconds(memorySeconds);
usageReport.setVcoreSeconds(vcoreSeconds);
report.setApplicationResourceUsageReport(usageReport);
return report;
}
});
app.getAMResourceRequests().get(0)
.setNodeLabelExpression(amNodeLabelExpression);
ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(
ApplicationId.newInstance(123456, 1), 1);
RMAppAttemptImpl rmAppAttemptImpl = spy(new RMAppAttemptImpl(attemptId,
rmContext, yarnScheduler, null, asContext, config, null, app));
Container container = Container.newInstance(
ContainerId.newContainerId(attemptId, 1), null,
"", null, null, null);
RMContainerImpl containerimpl = spy(new RMContainerImpl(container,
SchedulerRequestKey.extractFrom(container), attemptId, null, "",
rmContext));
Map<ApplicationAttemptId, RMAppAttempt> attempts = new HashMap<>();
attempts.put(attemptId, rmAppAttemptImpl);
when(app.getCurrentAppAttempt()).thenReturn(rmAppAttemptImpl);
when(app.getAppAttempts()).thenReturn(attempts);
when(app.getApplicationPriority()).thenReturn(Priority.newInstance(0));
when(rmAppAttemptImpl.getMasterContainer()).thenReturn(container);
ResourceScheduler rs = mock(ResourceScheduler.class);
when(rmContext.getScheduler()).thenReturn(rs);
when(rmContext.getScheduler().getRMContainer(any(ContainerId.class)))
.thenReturn(containerimpl);
SchedulerAppReport sAppReport = mock(SchedulerAppReport.class);
when(
rmContext.getScheduler().getSchedulerAppInfo(
any(ApplicationAttemptId.class))).thenReturn(sAppReport);
List<RMContainer> rmContainers = new ArrayList<RMContainer>();
rmContainers.add(containerimpl);
when(
rmContext.getScheduler().getSchedulerAppInfo(attemptId)
.getLiveContainers()).thenReturn(rmContainers);
ContainerStatus cs = mock(ContainerStatus.class);
when(containerimpl.completed()).thenReturn(false);
when(containerimpl.getDiagnosticsInfo()).thenReturn("N/A");
when(containerimpl.getContainerExitStatus()).thenReturn(0);
when(containerimpl.getContainerState()).thenReturn(ContainerState.COMPLETE);
return app;
}
private static ResourceScheduler mockResourceScheduler()
throws YarnException {
ResourceScheduler scheduler = mock(ResourceScheduler.class);
when(scheduler.getMinimumResourceCapability()).thenReturn(
Resources.createResource(
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB));
when(scheduler.getMaximumResourceCapability()).thenReturn(
Resources.createResource(
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB));
when(scheduler.getMaximumResourceCapability(anyString())).thenReturn(
Resources.createResource(
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB));
when(scheduler.getAppsInQueue(QUEUE_1)).thenReturn(
Arrays.asList(getApplicationAttemptId(101), getApplicationAttemptId(102)));
when(scheduler.getAppsInQueue(QUEUE_2)).thenReturn(
Arrays.asList(getApplicationAttemptId(103)));
ApplicationAttemptId attemptId = getApplicationAttemptId(1);
when(scheduler.getAppResourceUsageReport(attemptId)).thenReturn(null);
ResourceCalculator rs = mock(ResourceCalculator.class);
when(scheduler.getResourceCalculator()).thenReturn(rs);
when(scheduler.checkAndGetApplicationPriority(any(Priority.class),
any(UserGroupInformation.class), anyString(), any(ApplicationId.class)))
.thenReturn(Priority.newInstance(0));
return scheduler;
}
private ResourceManager setupResourceManager() {
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
ReservationSystemTestUtil.setupQueueConfiguration(conf);
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
conf.setBoolean(YarnConfiguration.RM_RESERVATION_SYSTEM_ENABLE, true);
MockRM rm = new MockRM(conf);
resourceManager = rm;
rm.start();
try {
rm.registerNode("127.0.0.1:1", 102400, 100);
// allow plan follower to synchronize
Thread.sleep(1050);
} catch (Exception e) {
fail(e.getMessage());
}
return rm;
}
private ReservationSubmissionRequest submitReservationTestHelper(
ClientRMService clientService, long arrival, long deadline,
long duration) {
ReservationSubmissionResponse sResponse = null;
GetNewReservationRequest newReservationRequest =
GetNewReservationRequest.newInstance();
ReservationId reservationID = null;
try {
reservationID = clientService.getNewReservation(newReservationRequest)
.getReservationId();
} catch (Exception e) {
fail(e.getMessage());
}
ReservationSubmissionRequest sRequest =
ReservationSystemTestUtil.createSimpleReservationRequest(reservationID,
4, arrival, deadline, duration);
try {
sResponse = clientService.submitReservation(sRequest);
} catch (Exception e) {
fail(e.getMessage());
}
assertNotNull(sResponse);
assertNotNull(reservationID);
System.out.println("Submit reservation response: " + reservationID);
return sRequest;
}
@Test
public void testCreateReservation() {
resourceManager = setupResourceManager();
ClientRMService clientService = resourceManager.getClientRMService();
Clock clock = new UTCClock();
long arrival = clock.getTime();
long duration = 60000;
long deadline = (long) (arrival + 1.05 * duration);
ReservationSubmissionRequest sRequest =
submitReservationTestHelper(clientService, arrival, deadline, duration);
// Submit the reservation again with the same request and make sure it
// passes.
try {
clientService.submitReservation(sRequest);
} catch (Exception e) {
fail(e.getMessage());
}
// Submit the reservation with the same reservation id but different
// reservation definition, and ensure YarnException is thrown.
arrival = clock.getTime();
ReservationDefinition rDef = sRequest.getReservationDefinition();
rDef.setArrival(arrival + duration);
sRequest.setReservationDefinition(rDef);
try {
clientService.submitReservation(sRequest);
fail("Reservation submission should fail if a duplicate "
+ "reservation id is used, but the reservation definition has been "
+ "updated.");
} catch (Exception e) {
assertTrue(e instanceof YarnException);
}
}
@Test
public void testUpdateReservation() {
resourceManager = setupResourceManager();
ClientRMService clientService = resourceManager.getClientRMService();
Clock clock = new UTCClock();
long arrival = clock.getTime();
long duration = 60000;
long deadline = (long) (arrival + 1.05 * duration);
ReservationSubmissionRequest sRequest =
submitReservationTestHelper(clientService, arrival, deadline, duration);
ReservationDefinition rDef = sRequest.getReservationDefinition();
ReservationRequest rr =
rDef.getReservationRequests().getReservationResources().get(0);
ReservationId reservationID = sRequest.getReservationId();
rr.setNumContainers(5);
arrival = clock.getTime();
duration = 30000;
deadline = (long) (arrival + 1.05 * duration);
rr.setDuration(duration);
rDef.setArrival(arrival);
rDef.setDeadline(deadline);
ReservationUpdateRequest uRequest =
ReservationUpdateRequest.newInstance(rDef, reservationID);
ReservationUpdateResponse uResponse = null;
try {
uResponse = clientService.updateReservation(uRequest);
} catch (Exception e) {
fail(e.getMessage());
}
assertNotNull(uResponse);
System.out.println("Update reservation response: " + uResponse);
}
@Test
public void testListReservationsByReservationId() {
resourceManager = setupResourceManager();
ClientRMService clientService = resourceManager.getClientRMService();
Clock clock = new UTCClock();
long arrival = clock.getTime();
long duration = 60000;
long deadline = (long) (arrival + 1.05 * duration);
ReservationSubmissionRequest sRequest =
submitReservationTestHelper(clientService, arrival, deadline, duration);
ReservationId reservationID = sRequest.getReservationId();
ReservationListResponse response = null;
ReservationListRequest request = ReservationListRequest.newInstance(
ReservationSystemTestUtil.reservationQ, reservationID.toString(), -1,
-1, false);
try {
response = clientService.listReservations(request);
} catch (Exception e) {
fail(e.getMessage());
}
assertNotNull(response);
assertEquals(1, response.getReservationAllocationState().size());
assertEquals(response.getReservationAllocationState().get(0)
.getReservationId().getId(), reservationID.getId());
assertEquals(response.getReservationAllocationState().get(0)
.getResourceAllocationRequests().size(), 0);
}
@Test
public void testListReservationsByTimeInterval() {
resourceManager = setupResourceManager();
ClientRMService clientService = resourceManager.getClientRMService();
Clock clock = new UTCClock();
long arrival = clock.getTime();
long duration = 60000;
long deadline = (long) (arrival + 1.05 * duration);
ReservationSubmissionRequest sRequest =
submitReservationTestHelper(clientService, arrival, deadline, duration);
// List reservations, search by a point in time within the reservation
// range.
arrival = clock.getTime();
ReservationId reservationID = sRequest.getReservationId();
ReservationListRequest request = ReservationListRequest.newInstance(
ReservationSystemTestUtil.reservationQ, "", arrival + duration / 2,
arrival + duration / 2, true);
ReservationListResponse response = null;
try {
response = clientService.listReservations(request);
} catch (Exception e) {
fail(e.getMessage());
}
assertNotNull(response);
assertEquals(1, response.getReservationAllocationState().size());
assertEquals(response.getReservationAllocationState().get(0)
.getReservationId().getId(), reservationID.getId());
// List reservations, search by time within reservation interval.
request = ReservationListRequest.newInstance(
ReservationSystemTestUtil.reservationQ, "", 1, Long.MAX_VALUE, true);
response = null;
try {
response = clientService.listReservations(request);
} catch (Exception e) {
fail(e.getMessage());
}
assertNotNull(response);
assertEquals(1, response.getReservationAllocationState().size());
assertEquals(response.getReservationAllocationState().get(0)
.getReservationId().getId(), reservationID.getId());
// Verify that the full resource allocations exist.
assertTrue(response.getReservationAllocationState().get(0)
.getResourceAllocationRequests().size() > 0);
// Verify that the full RDL is returned.
ReservationRequests reservationRequests =
response.getReservationAllocationState().get(0)
.getReservationDefinition().getReservationRequests();
assertEquals("R_ALL",
reservationRequests.getInterpreter().toString());
assertTrue(reservationRequests.getReservationResources().get(0)
.getDuration() == duration);
}
@Test
public void testListReservationsByInvalidTimeInterval() {
resourceManager = setupResourceManager();
ClientRMService clientService = resourceManager.getClientRMService();
Clock clock = new UTCClock();
long arrival = clock.getTime();
long duration = 60000;
long deadline = (long) (arrival + 1.05 * duration);
ReservationSubmissionRequest sRequest =
submitReservationTestHelper(clientService, arrival, deadline, duration);
// List reservations, search by invalid end time == -1.
ReservationListRequest request = ReservationListRequest
.newInstance(ReservationSystemTestUtil.reservationQ, "", 1, -1, true);
ReservationListResponse response = null;
try {
response = clientService.listReservations(request);
} catch (Exception e) {
fail(e.getMessage());
}
assertNotNull(response);
assertEquals(1, response.getReservationAllocationState().size());
assertEquals(response.getReservationAllocationState().get(0)
.getReservationId().getId(), sRequest.getReservationId().getId());
// List reservations, search by invalid end time < -1.
request = ReservationListRequest
.newInstance(ReservationSystemTestUtil.reservationQ, "", 1, -10, true);
response = null;
try {
response = clientService.listReservations(request);
} catch (Exception e) {
fail(e.getMessage());
}
assertNotNull(response);
assertEquals(1, response.getReservationAllocationState().size());
assertEquals(response.getReservationAllocationState().get(0)
.getReservationId().getId(), sRequest.getReservationId().getId());
}
@Test
public void testListReservationsByTimeIntervalContainingNoReservations() {
resourceManager = setupResourceManager();
ClientRMService clientService = resourceManager.getClientRMService();
Clock clock = new UTCClock();
long arrival = clock.getTime();
long duration = 60000;
long deadline = (long) (arrival + 1.05 * duration);
ReservationSubmissionRequest sRequest =
submitReservationTestHelper(clientService, arrival, deadline, duration);
// List reservations, search by very large start time.
ReservationListRequest request = ReservationListRequest.newInstance(
ReservationSystemTestUtil.reservationQ, "", Long.MAX_VALUE, -1, false);
ReservationListResponse response = null;
try {
response = clientService.listReservations(request);
} catch (Exception e) {
fail(e.getMessage());
}
// Ensure all reservations are filtered out.
assertNotNull(response);
assertThat(response.getReservationAllocationState()).isEmpty();
duration = 30000;
deadline = sRequest.getReservationDefinition().getDeadline();
// List reservations, search by start time after the reservation
// end time.
request = ReservationListRequest.newInstance(
ReservationSystemTestUtil.reservationQ, "", deadline + duration,
deadline + 2 * duration, false);
response = null;
try {
response = clientService.listReservations(request);
} catch (Exception e) {
fail(e.getMessage());
}
// Ensure all reservations are filtered out.
assertNotNull(response);
assertThat(response.getReservationAllocationState()).isEmpty();
arrival = clock.getTime();
// List reservations, search by end time before the reservation start
// time.
request = ReservationListRequest.newInstance(
ReservationSystemTestUtil.reservationQ, "", 0, arrival - duration,
false);
response = null;
try {
response = clientService.listReservations(request);
} catch (Exception e) {
fail(e.getMessage());
}
// Ensure all reservations are filtered out.
assertNotNull(response);
assertThat(response.getReservationAllocationState()).isEmpty();
// List reservations, search by very small end time.
request = ReservationListRequest
.newInstance(ReservationSystemTestUtil.reservationQ, "", 0, 1, false);
response = null;
try {
response = clientService.listReservations(request);
} catch (Exception e) {
fail(e.getMessage());
}
// Ensure all reservations are filtered out.
assertNotNull(response);
assertThat(response.getReservationAllocationState()).isEmpty();
}
@Test
public void testReservationDelete() {
resourceManager = setupResourceManager();
ClientRMService clientService = resourceManager.getClientRMService();
Clock clock = new UTCClock();
long arrival = clock.getTime();
long duration = 60000;
long deadline = (long) (arrival + 1.05 * duration);
ReservationSubmissionRequest sRequest =
submitReservationTestHelper(clientService, arrival, deadline, duration);
ReservationId reservationID = sRequest.getReservationId();
// Delete the reservation
ReservationDeleteRequest dRequest =
ReservationDeleteRequest.newInstance(reservationID);
ReservationDeleteResponse dResponse = null;
try {
dResponse = clientService.deleteReservation(dRequest);
} catch (Exception e) {
fail(e.getMessage());
}
assertNotNull(dResponse);
System.out.println("Delete reservation response: " + dResponse);
// List reservations, search by non-existent reservationID
ReservationListRequest request = ReservationListRequest.newInstance(
ReservationSystemTestUtil.reservationQ, reservationID.toString(), -1,
-1, false);
ReservationListResponse response = null;
try {
response = clientService.listReservations(request);
} catch (Exception e) {
fail(e.getMessage());
}
assertNotNull(response);
assertEquals(0, response.getReservationAllocationState().size());
}
@Test
public void testGetNodeLabels() throws Exception {
MockRM rm = new MockRM() {
protected ClientRMService createClientRMService() {
return new ClientRMService(this.rmContext, scheduler,
this.rmAppManager, this.applicationACLsManager,
this.queueACLsManager, this.getRMContext()
.getRMDelegationTokenSecretManager());
};
};
resourceManager = rm;
rm.start();
NodeLabel labelX = NodeLabel.newInstance("x", false);
NodeLabel labelY = NodeLabel.newInstance("y");
RMNodeLabelsManager labelsMgr = rm.getRMContext().getNodeLabelManager();
labelsMgr.addToCluserNodeLabels(ImmutableSet.of(labelX, labelY));
NodeId node1 = NodeId.newInstance("host1", 1234);
NodeId node2 = NodeId.newInstance("host2", 1234);
Map<NodeId, Set<String>> map = new HashMap<NodeId, Set<String>>();
map.put(node1, ImmutableSet.of("x"));
map.put(node2, ImmutableSet.of("y"));
labelsMgr.replaceLabelsOnNode(map);
// Create a client.
conf = new Configuration();
rpc = YarnRPC.create(conf);
InetSocketAddress rmAddress = rm.getClientRMService().getBindAddress();
LOG.info("Connecting to ResourceManager at " + rmAddress);
client = (ApplicationClientProtocol) rpc.getProxy(
ApplicationClientProtocol.class, rmAddress, conf);
// Get node labels collection
GetClusterNodeLabelsResponse response = client
.getClusterNodeLabels(GetClusterNodeLabelsRequest.newInstance());
assertTrue(response.getNodeLabelList().containsAll(
Arrays.asList(labelX, labelY)));
// Get node labels mapping
GetNodesToLabelsResponse response1 = client
.getNodeToLabels(GetNodesToLabelsRequest.newInstance());
Map<NodeId, Set<String>> nodeToLabels = response1.getNodeToLabels();
assertTrue(nodeToLabels.keySet().containsAll(
Arrays.asList(node1, node2)));
assertTrue(nodeToLabels.get(node1)
.containsAll(Arrays.asList(labelX.getName())));
assertTrue(nodeToLabels.get(node2)
.containsAll(Arrays.asList(labelY.getName())));
// Below label "x" is not present in the response as exclusivity is true
assertFalse(nodeToLabels.get(node1).containsAll(
Arrays.asList(NodeLabel.newInstance("x"))));
}
@Test
public void testGetLabelsToNodes() throws Exception {
MockRM rm = new MockRM() {
protected ClientRMService createClientRMService() {
return new ClientRMService(this.rmContext, scheduler,
this.rmAppManager, this.applicationACLsManager,
this.queueACLsManager, this.getRMContext()
.getRMDelegationTokenSecretManager());
};
};
resourceManager = rm;
rm.start();
NodeLabel labelX = NodeLabel.newInstance("x", false);
NodeLabel labelY = NodeLabel.newInstance("y", false);
NodeLabel labelZ = NodeLabel.newInstance("z", false);
RMNodeLabelsManager labelsMgr = rm.getRMContext().getNodeLabelManager();
labelsMgr.addToCluserNodeLabels(ImmutableSet.of(labelX, labelY, labelZ));
NodeId node1A = NodeId.newInstance("host1", 1234);
NodeId node1B = NodeId.newInstance("host1", 5678);
NodeId node2A = NodeId.newInstance("host2", 1234);
NodeId node3A = NodeId.newInstance("host3", 1234);
NodeId node3B = NodeId.newInstance("host3", 5678);
Map<NodeId, Set<String>> map = new HashMap<NodeId, Set<String>>();
map.put(node1A, ImmutableSet.of("x"));
map.put(node1B, ImmutableSet.of("z"));
map.put(node2A, ImmutableSet.of("y"));
map.put(node3A, ImmutableSet.of("y"));
map.put(node3B, ImmutableSet.of("z"));
labelsMgr.replaceLabelsOnNode(map);
// Create a client.
conf = new Configuration();
rpc = YarnRPC.create(conf);
InetSocketAddress rmAddress = rm.getClientRMService().getBindAddress();
LOG.info("Connecting to ResourceManager at " + rmAddress);
client = (ApplicationClientProtocol) rpc.getProxy(
ApplicationClientProtocol.class, rmAddress, conf);
// Get node labels collection
GetClusterNodeLabelsResponse response = client
.getClusterNodeLabels(GetClusterNodeLabelsRequest.newInstance());
assertTrue(response.getNodeLabelList().containsAll(
Arrays.asList(labelX, labelY, labelZ)));
// Get labels to nodes mapping
GetLabelsToNodesResponse response1 = client
.getLabelsToNodes(GetLabelsToNodesRequest.newInstance());
Map<String, Set<NodeId>> labelsToNodes = response1.getLabelsToNodes();
assertTrue(labelsToNodes.keySet().containsAll(
Arrays.asList(labelX.getName(), labelY.getName(), labelZ.getName())));
assertTrue(labelsToNodes.get(labelX.getName()).containsAll(
Arrays.asList(node1A)));
assertTrue(labelsToNodes.get(labelY.getName()).containsAll(
Arrays.asList(node2A, node3A)));
assertTrue(labelsToNodes.get(labelZ.getName()).containsAll(
Arrays.asList(node1B, node3B)));
// Get labels to nodes mapping for specific labels
Set<String> setlabels = new HashSet<String>(Arrays.asList(new String[]{"x",
"z"}));
GetLabelsToNodesResponse response2 = client
.getLabelsToNodes(GetLabelsToNodesRequest.newInstance(setlabels));
labelsToNodes = response2.getLabelsToNodes();
assertTrue(labelsToNodes.keySet().containsAll(
Arrays.asList(labelX.getName(), labelZ.getName())));
assertTrue(labelsToNodes.get(labelX.getName()).containsAll(
Arrays.asList(node1A)));
assertTrue(labelsToNodes.get(labelZ.getName()).containsAll(
Arrays.asList(node1B, node3B)));
assertThat(labelsToNodes.get(labelY.getName())).isNull();
}
@Test
@Timeout(value = 120)
public void testGetClusterNodeAttributes() throws IOException, YarnException {
Configuration newConf = NodeAttributeTestUtils.getRandomDirConf(null);
MockRM rm = new MockRM(newConf) {
protected ClientRMService createClientRMService() {
return new ClientRMService(this.rmContext, scheduler, this.rmAppManager,
this.applicationACLsManager, this.queueACLsManager,
this.getRMContext().getRMDelegationTokenSecretManager());
}
};
resourceManager = rm;
rm.start();
NodeAttributesManager mgr = rm.getRMContext().getNodeAttributesManager();
NodeId host1 = NodeId.newInstance("host1", 0);
NodeId host2 = NodeId.newInstance("host2", 0);
NodeAttribute gpu = NodeAttribute
.newInstance(NodeAttribute.PREFIX_CENTRALIZED, "GPU",
NodeAttributeType.STRING, "nvida");
NodeAttribute os = NodeAttribute
.newInstance(NodeAttribute.PREFIX_CENTRALIZED, "OS",
NodeAttributeType.STRING, "windows64");
NodeAttribute docker = NodeAttribute
.newInstance(NodeAttribute.PREFIX_DISTRIBUTED, "DOCKER",
NodeAttributeType.STRING, "docker0");
Map<String, Set<NodeAttribute>> nodes = new HashMap<>();
nodes.put(host1.getHost(), ImmutableSet.of(gpu, os));
nodes.put(host2.getHost(), ImmutableSet.of(docker));
mgr.addNodeAttributes(nodes);
// Create a client.
conf = new Configuration();
rpc = YarnRPC.create(conf);
InetSocketAddress rmAddress = rm.getClientRMService().getBindAddress();
LOG.info("Connecting to ResourceManager at " + rmAddress);
client = (ApplicationClientProtocol) rpc.getProxy(
ApplicationClientProtocol.class, rmAddress, conf);
GetClusterNodeAttributesRequest request =
GetClusterNodeAttributesRequest.newInstance();
GetClusterNodeAttributesResponse response =
client.getClusterNodeAttributes(request);
Set<NodeAttributeInfo> attributes = response.getNodeAttributes();
assertEquals(3, attributes.size(), "Size not correct");
assertTrue(attributes.contains(NodeAttributeInfo.newInstance(gpu)));
assertTrue(attributes.contains(NodeAttributeInfo.newInstance(os)));
assertTrue(attributes.contains(NodeAttributeInfo.newInstance(docker)));
}
@Test
@Timeout(value = 120)
public void testGetAttributesToNodes() throws IOException, YarnException {
Configuration newConf = NodeAttributeTestUtils.getRandomDirConf(null);
MockRM rm = new MockRM(newConf) {
protected ClientRMService createClientRMService() {
return new ClientRMService(this.rmContext, scheduler, this.rmAppManager,
this.applicationACLsManager, this.queueACLsManager,
this.getRMContext().getRMDelegationTokenSecretManager());
}
};
resourceManager = rm;
rm.start();
NodeAttributesManager mgr = rm.getRMContext().getNodeAttributesManager();
String node1 = "host1";
String node2 = "host2";
NodeAttribute gpu =
NodeAttribute.newInstance(NodeAttribute.PREFIX_CENTRALIZED, "GPU",
NodeAttributeType.STRING, "nvidia");
NodeAttribute os =
NodeAttribute.newInstance(NodeAttribute.PREFIX_CENTRALIZED, "OS",
NodeAttributeType.STRING, "windows64");
NodeAttribute docker =
NodeAttribute.newInstance(NodeAttribute.PREFIX_DISTRIBUTED, "DOCKER",
NodeAttributeType.STRING, "docker0");
NodeAttribute dist =
NodeAttribute.newInstance(NodeAttribute.PREFIX_DISTRIBUTED, "VERSION",
NodeAttributeType.STRING, "3_0_2");
Map<String, Set<NodeAttribute>> nodes = new HashMap<>();
nodes.put(node1, ImmutableSet.of(gpu, os, dist));
nodes.put(node2, ImmutableSet.of(docker, dist));
mgr.addNodeAttributes(nodes);
// Create a client.
conf = new Configuration();
rpc = YarnRPC.create(conf);
InetSocketAddress rmAddress = rm.getClientRMService().getBindAddress();
LOG.info("Connecting to ResourceManager at " + rmAddress);
client = (ApplicationClientProtocol) rpc.getProxy(
ApplicationClientProtocol.class, rmAddress, conf);
GetAttributesToNodesRequest request =
GetAttributesToNodesRequest.newInstance();
GetAttributesToNodesResponse response =
client.getAttributesToNodes(request);
Map<NodeAttributeKey, List<NodeToAttributeValue>> attrs =
response.getAttributesToNodes();
assertThat(response.getAttributesToNodes()).hasSize(4);
assertThat(attrs.get(dist.getAttributeKey())).hasSize(2);
assertThat(attrs.get(os.getAttributeKey())).hasSize(1);
assertThat(attrs.get(gpu.getAttributeKey())).hasSize(1);
assertTrue(findHostnameAndValInMapping(node1, "3_0_2",
attrs.get(dist.getAttributeKey())));
assertTrue(findHostnameAndValInMapping(node2, "3_0_2",
attrs.get(dist.getAttributeKey())));
assertTrue(findHostnameAndValInMapping(node2, "docker0",
attrs.get(docker.getAttributeKey())));
GetAttributesToNodesRequest request2 = GetAttributesToNodesRequest
.newInstance(ImmutableSet.of(docker.getAttributeKey()));
GetAttributesToNodesResponse response2 =
client.getAttributesToNodes(request2);
Map<NodeAttributeKey, List<NodeToAttributeValue>> attrs2 =
response2.getAttributesToNodes();
assertThat(attrs2).hasSize(1);
assertTrue(findHostnameAndValInMapping(node2, "docker0",
attrs2.get(docker.getAttributeKey())));
GetAttributesToNodesRequest request3 =
GetAttributesToNodesRequest.newInstance(
ImmutableSet.of(docker.getAttributeKey(), os.getAttributeKey()));
GetAttributesToNodesResponse response3 =
client.getAttributesToNodes(request3);
Map<NodeAttributeKey, List<NodeToAttributeValue>> attrs3 =
response3.getAttributesToNodes();
assertThat(attrs3).hasSize(2);
assertTrue(findHostnameAndValInMapping(node1, "windows64",
attrs3.get(os.getAttributeKey())));
assertTrue(findHostnameAndValInMapping(node2, "docker0",
attrs3.get(docker.getAttributeKey())));
}
private boolean findHostnameAndValInMapping(String hostname, String attrVal,
List<NodeToAttributeValue> mappingVals) {
for (NodeToAttributeValue value : mappingVals) {
if (value.getHostname().equals(hostname)) {
return attrVal.equals(value.getAttributeValue());
}
}
return false;
}
@Test
@Timeout(value = 120)
public void testGetNodesToAttributes() throws IOException, YarnException {
Configuration newConf = NodeAttributeTestUtils.getRandomDirConf(null);
MockRM rm = new MockRM(newConf) {
protected ClientRMService createClientRMService() {
return new ClientRMService(this.rmContext, scheduler, this.rmAppManager,
this.applicationACLsManager, this.queueACLsManager,
this.getRMContext().getRMDelegationTokenSecretManager());
}
};
resourceManager = rm;
rm.start();
NodeAttributesManager mgr = rm.getRMContext().getNodeAttributesManager();
String node1 = "host1";
String node2 = "host2";
NodeAttribute gpu = NodeAttribute
.newInstance(NodeAttribute.PREFIX_CENTRALIZED, "GPU",
NodeAttributeType.STRING, "nvida");
NodeAttribute os = NodeAttribute
.newInstance(NodeAttribute.PREFIX_CENTRALIZED, "OS",
NodeAttributeType.STRING, "windows64");
NodeAttribute docker = NodeAttribute
.newInstance(NodeAttribute.PREFIX_DISTRIBUTED, "DOCKER",
NodeAttributeType.STRING, "docker0");
NodeAttribute dist = NodeAttribute
.newInstance(NodeAttribute.PREFIX_DISTRIBUTED, "VERSION",
NodeAttributeType.STRING, "3_0_2");
Map<String, Set<NodeAttribute>> nodes = new HashMap<>();
nodes.put(node1, ImmutableSet.of(gpu, os, dist));
nodes.put(node2, ImmutableSet.of(docker, dist));
mgr.addNodeAttributes(nodes);
// Create a client.
conf = new Configuration();
rpc = YarnRPC.create(conf);
InetSocketAddress rmAddress = rm.getClientRMService().getBindAddress();
LOG.info("Connecting to ResourceManager at " + rmAddress);
client = (ApplicationClientProtocol) rpc.getProxy(
ApplicationClientProtocol.class, rmAddress, conf);
// Specify null for hostnames.
GetNodesToAttributesRequest request1 =
GetNodesToAttributesRequest.newInstance(null);
GetNodesToAttributesResponse response1 =
client.getNodesToAttributes(request1);
Map<String, Set<NodeAttribute>> hostToAttrs =
response1.getNodeToAttributes();
assertEquals(2, hostToAttrs.size());
assertTrue(hostToAttrs.get(node2).contains(dist));
assertTrue(hostToAttrs.get(node2).contains(docker));
assertTrue(hostToAttrs.get(node1).contains(dist));
// Specify particular node
GetNodesToAttributesRequest request2 =
GetNodesToAttributesRequest.newInstance(ImmutableSet.of(node1));
GetNodesToAttributesResponse response2 =
client.getNodesToAttributes(request2);
hostToAttrs = response2.getNodeToAttributes();
assertEquals(1, response2.getNodeToAttributes().size());
assertTrue(hostToAttrs.get(node1).contains(dist));
// Test queury with empty set
GetNodesToAttributesRequest request3 =
GetNodesToAttributesRequest.newInstance(Collections.emptySet());
GetNodesToAttributesResponse response3 =
client.getNodesToAttributes(request3);
hostToAttrs = response3.getNodeToAttributes();
assertEquals(2, hostToAttrs.size());
assertTrue(hostToAttrs.get(node2).contains(dist));
assertTrue(hostToAttrs.get(node2).contains(docker));
assertTrue(hostToAttrs.get(node1).contains(dist));
// test invalid hostname
GetNodesToAttributesRequest request4 =
GetNodesToAttributesRequest.newInstance(ImmutableSet.of("invalid"));
GetNodesToAttributesResponse response4 =
client.getNodesToAttributes(request4);
hostToAttrs = response4.getNodeToAttributes();
assertEquals(0, hostToAttrs.size());
}
@Test
@Timeout(value = 120)
public void testUpdatePriorityAndKillAppWithZeroClusterResource()
throws Exception {
int maxPriority = 10;
int appPriority = 5;
conf = new YarnConfiguration();
assumeFalse(conf.get(YarnConfiguration.RM_SCHEDULER).equals(FairScheduler.class.getName()),
"FairScheduler does not support Application Priorities");
conf.setInt(YarnConfiguration.MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY,
maxPriority);
MockRM rm = new MockRM(conf);
resourceManager = rm;
rm.init(conf);
rm.start();
MockRMAppSubmissionData data = MockRMAppSubmissionData.Builder
.createWithMemory(1024, rm)
.withAppPriority(Priority.newInstance(appPriority))
.build();
RMApp app1 = MockRMAppSubmitter.submit(rm, data);
ClientRMService rmService = rm.getClientRMService();
testApplicationPriorityUpdation(rmService, app1, appPriority, appPriority);
rm.killApp(app1.getApplicationId());
rm.waitForState(app1.getApplicationId(), RMAppState.KILLED);
}
@Test
@Timeout(value = 120)
public void testUpdateApplicationPriorityRequest() throws Exception {
int maxPriority = 10;
int appPriority = 5;
conf = new YarnConfiguration();
assumeFalse(conf.get(YarnConfiguration.RM_SCHEDULER).equals(FairScheduler.class.getName()),
"FairScheduler does not support Application Priorities");
conf.setInt(YarnConfiguration.MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY,
maxPriority);
MockRM rm = new MockRM(conf);
resourceManager = rm;
rm.init(conf);
rm.start();
rm.registerNode("host1:1234", 1024);
// Start app1 with appPriority 5
MockRMAppSubmissionData data = MockRMAppSubmissionData.Builder
.createWithMemory(1024, rm)
.withAppPriority(Priority.newInstance(appPriority))
.build();
RMApp app1 = MockRMAppSubmitter.submit(rm, data);
assertEquals(appPriority, app1.getApplicationPriority().getPriority(),
"Incorrect priority has been set to application");
appPriority = 11;
ClientRMService rmService = rm.getClientRMService();
testApplicationPriorityUpdation(rmService, app1, appPriority, maxPriority);
appPriority = 9;
testApplicationPriorityUpdation(rmService, app1, appPriority, appPriority);
rm.killApp(app1.getApplicationId());
rm.waitForState(app1.getApplicationId(), RMAppState.KILLED);
// Update priority request for invalid application id.
ApplicationId invalidAppId = ApplicationId.newInstance(123456789L, 3);
UpdateApplicationPriorityRequest updateRequest =
UpdateApplicationPriorityRequest.newInstance(invalidAppId,
Priority.newInstance(appPriority));
try {
rmService.updateApplicationPriority(updateRequest);
fail("ApplicationNotFoundException should be thrown "
+ "for invalid application id");
} catch (ApplicationNotFoundException e) {
// Expected
}
updateRequest =
UpdateApplicationPriorityRequest.newInstance(app1.getApplicationId(),
Priority.newInstance(11));
assertEquals(appPriority, rmService.updateApplicationPriority(updateRequest)
.getApplicationPriority().getPriority(),
"Incorrect priority has been set to application");
}
private void testApplicationPriorityUpdation(ClientRMService rmService,
RMApp app1, int tobeUpdatedPriority, int expected) throws YarnException,
IOException {
UpdateApplicationPriorityRequest updateRequest =
UpdateApplicationPriorityRequest.newInstance(app1.getApplicationId(),
Priority.newInstance(tobeUpdatedPriority));
UpdateApplicationPriorityResponse updateApplicationPriority =
rmService.updateApplicationPriority(updateRequest);
assertEquals(expected, app1.getApplicationSubmissionContext().getPriority()
.getPriority(), "Incorrect priority has been set to application");
assertEquals(expected, updateApplicationPriority.getApplicationPriority().getPriority(),
"Incorrect priority has been returned");
}
private File createExcludeFile(File testDir) throws IOException {
File excludeFile = new File(testDir, "excludeFile");
try (FileOutputStream out = new FileOutputStream(excludeFile)) {
out.write("decommisssionedHost".getBytes(UTF_8));
}
return excludeFile;
}
@Test
public void testRMStartWithDecommissionedNode() throws Exception {
File testDir = GenericTestUtils.getRandomizedTestDir();
assertTrue(testDir.mkdirs(),
"Failed to create test directory: " + testDir.getAbsolutePath());
try {
File excludeFile = createExcludeFile(testDir);
conf = new YarnConfiguration();
conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH,
excludeFile.getAbsolutePath());
MockRM rm = new MockRM(conf) {
protected ClientRMService createClientRMService() {
return new ClientRMService(this.rmContext, scheduler,
this.rmAppManager, this.applicationACLsManager, this.queueACLsManager,
this.getRMContext().getRMDelegationTokenSecretManager());
};
};
resourceManager = rm;
rm.start();
rpc = YarnRPC.create(conf);
InetSocketAddress rmAddress = rm.getClientRMService().getBindAddress();
LOG.info("Connecting to ResourceManager at " + rmAddress);
client = (ApplicationClientProtocol) rpc.getProxy(
ApplicationClientProtocol.class, rmAddress, conf);
// Make call
GetClusterNodesRequest request =
GetClusterNodesRequest.newInstance(EnumSet.allOf(NodeState.class));
List<NodeReport> nodeReports = client.getClusterNodes(request).getNodeReports();
assertEquals(1, nodeReports.size());
} finally {
FileUtil.fullyDelete(testDir);
}
}
@Test
public void testGetResourceTypesInfoWhenResourceProfileDisabled()
throws Exception {
conf = new YarnConfiguration();
MockRM rm = new MockRM(conf) {
protected ClientRMService createClientRMService() {
return new ClientRMService(this.rmContext, scheduler,
this.rmAppManager, this.applicationACLsManager, this.queueACLsManager,
this.getRMContext().getRMDelegationTokenSecretManager());
}
};
resourceManager = rm;
rm.start();
rpc = YarnRPC.create(conf);
InetSocketAddress rmAddress = rm.getClientRMService().getBindAddress();
LOG.info("Connecting to ResourceManager at " + rmAddress);
client = (ApplicationClientProtocol) rpc.getProxy(
ApplicationClientProtocol.class, rmAddress, conf);
// Make call
GetAllResourceTypeInfoRequest request =
GetAllResourceTypeInfoRequest.newInstance();
GetAllResourceTypeInfoResponse response = client.getResourceTypeInfo(request);
assertEquals(2, response.getResourceTypeInfo().size());
// Check memory
assertEquals(ResourceInformation.MEMORY_MB.getName(),
response.getResourceTypeInfo().get(0).getName());
assertEquals(ResourceInformation.MEMORY_MB.getUnits(),
response.getResourceTypeInfo().get(0).getDefaultUnit());
// Check vcores
assertEquals(ResourceInformation.VCORES.getName(),
response.getResourceTypeInfo().get(1).getName());
assertEquals(ResourceInformation.VCORES.getUnits(),
response.getResourceTypeInfo().get(1).getDefaultUnit());
}
@Test
public void testGetApplicationsWithPerUserApps()
throws IOException, YarnException {
/*
* Submit 3 applications alternately in two queues
*/
// Basic setup
ResourceScheduler scheduler = mockResourceScheduler();
RMContext rmContext = mock(RMContext.class);
mockRMContext(scheduler, rmContext);
RMStateStore stateStore = mock(RMStateStore.class);
when(rmContext.getStateStore()).thenReturn(stateStore);
doReturn(mock(RMTimelineCollectorManager.class)).when(rmContext)
.getRMTimelineCollectorManager();
RMAppManager appManager = new RMAppManager(rmContext, scheduler, null,
mock(ApplicationACLsManager.class), new Configuration());
when(rmContext.getDispatcher().getEventHandler())
.thenReturn(new EventHandler<Event>() {
public void handle(Event event) {
}
});
// Simulate Queue ACL manager which returns false always
QueueACLsManager queueAclsManager = mock(QueueACLsManager.class);
when(queueAclsManager.checkAccess(any(UserGroupInformation.class),
any(QueueACL.class), any(RMApp.class), any(String.class),
anyList())).thenReturn(false);
// Simulate app ACL manager which returns false always
ApplicationACLsManager appAclsManager = mock(ApplicationACLsManager.class);
when(appAclsManager.checkAccess(eq(UserGroupInformation.getCurrentUser()),
any(ApplicationAccessType.class), any(String.class),
any(ApplicationId.class))).thenReturn(false);
ClientRMService rmService = new ClientRMService(rmContext, scheduler,
appManager, appAclsManager, queueAclsManager, null);
rmService.init(new Configuration());
// Initialize appnames and queues
String[] queues = {QUEUE_1, QUEUE_2};
String[] appNames = {MockApps.newAppName(), MockApps.newAppName(),
MockApps.newAppName()};
ApplicationId[] appIds = {getApplicationId(101), getApplicationId(102),
getApplicationId(103)};
List<String> tags = Arrays.asList("Tag1", "Tag2", "Tag3");
long[] submitTimeMillis = new long[3];
// Submit applications
for (int i = 0; i < appIds.length; i++) {
ApplicationId appId = appIds[i];
SubmitApplicationRequest submitRequest = mockSubmitAppRequest(appId,
appNames[i], queues[i % queues.length],
new HashSet<String>(tags.subList(0, i + 1)));
rmService.submitApplication(submitRequest);
submitTimeMillis[i] = System.currentTimeMillis();
}
// Test different cases of ClientRMService#getApplications()
GetApplicationsRequest request = GetApplicationsRequest.newInstance();
assertEquals(6, rmService.getApplications(request).getApplicationList().size(),
"Incorrect total number of apps");
rmService.setDisplayPerUserApps(true);
assertEquals(0, rmService.getApplications(request).getApplicationList().size(),
"Incorrect number of applications for user");
rmService.setDisplayPerUserApps(false);
}
@Test
public void testRegisterNMWithDiffUnits() throws Exception {
ResourceUtils.resetResourceTypes();
Configuration yarnConf = new YarnConfiguration();
String resourceTypesFileName = "resource-types-4.xml";
InputStream source =
yarnConf.getClassLoader().getResourceAsStream(resourceTypesFileName);
resourceTypesFile = new File(yarnConf.getClassLoader().
getResource(".").getPath(), "resource-types.xml");
FileUtils.copyInputStreamToFile(source, resourceTypesFile);
ResourceUtils.getResourceTypes();
yarnConf.setClass(
CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS,
DominantResourceCalculator.class, ResourceCalculator.class);
MockRM rm = new MockRM(yarnConf) {
protected ClientRMService createClientRMService() {
return new ClientRMService(this.rmContext, scheduler,
this.rmAppManager, this.applicationACLsManager, this.queueACLsManager,
this.getRMContext().getRMDelegationTokenSecretManager());
};
};
resourceManager = rm;
rm.start();
Resource resource = Resources.createResource(976562);
resource.setResourceInformation("memory-mb",
ResourceInformation.newInstance("memory-mb", "G", 976562));
resource.setResourceInformation("resource1",
ResourceInformation.newInstance("resource1", "T", 1));
resource.setResourceInformation("resource2",
ResourceInformation.newInstance("resource2", "M", 1));
MockNM node = rm.registerNode("host1:1234", resource);
node.nodeHeartbeat(true);
// Create a client.
conf = new Configuration();
rpc = YarnRPC.create(conf);
InetSocketAddress rmAddress = rm.getClientRMService().getBindAddress();
LOG.info("Connecting to ResourceManager at " + rmAddress);
client = (ApplicationClientProtocol) rpc.getProxy(
ApplicationClientProtocol.class, rmAddress, conf);
// Make call
GetClusterNodesRequest request =
GetClusterNodesRequest.newInstance(EnumSet.of(NodeState.RUNNING));
List<NodeReport> nodeReports =
client.getClusterNodes(request).getNodeReports();
assertEquals(1, nodeReports.size());
assertNotSame(NodeState.UNHEALTHY, nodeReports.get(0).getNodeState(),
"Node is expected to be healthy!");
assertEquals(1, nodeReports.size());
//Resource 'resource1' has been passed as 1T while registering NM.
//1T should be converted to 1000G
assertEquals("G", nodeReports.get(0).getCapability().
getResourceInformation("resource1").getUnits());
assertEquals(1000, nodeReports.get(0).getCapability().
getResourceInformation("resource1").getValue());
//Resource 'resource2' has been passed as 1M while registering NM
//1M should be converted to 1000000000M
assertEquals("m", nodeReports.get(0).getCapability().
getResourceInformation("resource2").getUnits());
assertEquals(1000000000, nodeReports.get(0).getCapability().
getResourceInformation("resource2").getValue());
//Resource 'memory-mb' has been passed as 976562G while registering NM
//976562G should be converted to 976562Mi
assertEquals("Mi", nodeReports.get(0).getCapability().
getResourceInformation("memory-mb").getUnits());
assertEquals(976562, nodeReports.get(0).getCapability().
getResourceInformation("memory-mb").getValue());
}
@Test
public void testGetClusterMetrics() throws Exception {
MockRM rm = new MockRM() {
protected ClientRMService createClientRMService() {
return new ClientRMService(this.rmContext, scheduler,
this.rmAppManager, this.applicationACLsManager, this.queueACLsManager,
this.getRMContext().getRMDelegationTokenSecretManager());
};
};
resourceManager = rm;
rm.start();
ClusterMetrics clusterMetrics = ClusterMetrics.getMetrics();
clusterMetrics.incrDecommissioningNMs();
repeat(2, clusterMetrics::incrDecommisionedNMs);
repeat(3, clusterMetrics::incrNumActiveNodes);
repeat(4, clusterMetrics::incrNumLostNMs);
repeat(5, clusterMetrics::incrNumUnhealthyNMs);
repeat(6, clusterMetrics::incrNumRebootedNMs);
repeat(7, clusterMetrics::incrNumShutdownNMs);
// Create a client.
conf = new Configuration();
rpc = YarnRPC.create(conf);
InetSocketAddress rmAddress = rm.getClientRMService().getBindAddress();
LOG.info("Connecting to ResourceManager at " + rmAddress);
client = (ApplicationClientProtocol) rpc.getProxy(
ApplicationClientProtocol.class, rmAddress, conf);
YarnClusterMetrics ymetrics = client.getClusterMetrics(
GetClusterMetricsRequest.newInstance()).getClusterMetrics();
assertEquals(0, ymetrics.getNumNodeManagers());
assertEquals(1, ymetrics.getNumDecommissioningNodeManagers());
assertEquals(2, ymetrics.getNumDecommissionedNodeManagers());
assertEquals(3, ymetrics.getNumActiveNodeManagers());
assertEquals(4, ymetrics.getNumLostNodeManagers());
assertEquals(5, ymetrics.getNumUnhealthyNodeManagers());
assertEquals(6, ymetrics.getNumRebootedNodeManagers());
assertEquals(7, ymetrics.getNumShutdownNodeManagers());
}
@AfterEach
public void tearDown() throws Exception {
if (resourceTypesFile != null && resourceTypesFile.exists()) {
resourceTypesFile.delete();
}
ClusterMetrics.destroy();
DefaultMetricsSystem.shutdown();
if (conf != null && client != null && rpc != null) {
rpc.stopProxy(client, conf);
}
if (resourceManager != null) {
resourceManager.close();
}
}
private static void repeat(int n, Runnable r) {
for (int i = 0; i < n; ++i) {
r.run();
}
}
}