MockDefaultRequestInterceptorREST.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.router.webapp;
import java.io.IOException;
import java.net.ConnectException;
import java.security.Principal;
import java.util.ArrayList;
import java.util.Set;
import java.util.Map;
import java.util.HashMap;
import java.util.Collections;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import org.apache.commons.lang3.EnumUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AuthorizationException;
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet;
import org.apache.hadoop.thirdparty.com.google.common.net.HttpHeaders;
import org.apache.hadoop.util.Sets;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationListRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationListResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerReport;
import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.api.records.SignalContainerCommand;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
import org.apache.hadoop.yarn.api.records.ApplicationTimeout;
import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
import org.apache.hadoop.yarn.api.records.ReservationRequest;
import org.apache.hadoop.yarn.api.records.ReservationRequests;
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.resourcemanager.ClientRMService;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesLogger;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityLevel;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.MutableCSConfigurationProvider;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.NodeIDsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeLabelsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.LabelsToNodesInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppState;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationSubmissionContextInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterMetricsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterUserInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NewApplication;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodesInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceOptionInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeToLabelsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeLabelInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppTimeoutInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppTimeoutsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppPriority;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppQueue;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.StatisticsItemInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationStatisticsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppActivitiesInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NewReservation;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationSubmissionRequestInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationUpdateRequestInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationDeleteRequestInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationListInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationDefinitionInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationRequestInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationRequestsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationUpdateResponseInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationDeleteResponseInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeToLabelsEntryList;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ActivitiesInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.BulkActivitiesInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeAllocationInfo;
import org.apache.hadoop.yarn.server.router.RouterServerUtil;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.PartitionInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.RMQueueAclInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerTypeInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerInfo;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.apache.hadoop.yarn.webapp.BadRequestException;
import org.apache.hadoop.yarn.webapp.ForbiddenException;
import org.apache.hadoop.yarn.webapp.NotFoundException;
import org.apache.hadoop.yarn.webapp.dao.ConfInfo;
import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.hadoop.yarn.server.router.webapp.BaseRouterWebServicesTest.DEDICATED_QUEUE_PATH;
import static org.apache.hadoop.yarn.server.router.webapp.BaseRouterWebServicesTest.DEFAULT_QUEUE_PATH;
import static org.apache.hadoop.yarn.server.router.webapp.BaseRouterWebServicesTest.ROOT_QUEUE_PATH;
import static org.apache.hadoop.yarn.server.router.webapp.BaseRouterWebServicesTest.QUEUE_DEFAULT;
import static org.apache.hadoop.yarn.server.router.webapp.BaseRouterWebServicesTest.QUEUE_DEDICATED;
import static org.apache.hadoop.yarn.server.router.webapp.BaseRouterWebServicesTest.QUEUE_DEDICATED_FULL;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
/**
* This class mocks the RESTRequestInterceptor.
*/
public class MockDefaultRequestInterceptorREST
extends DefaultRequestInterceptorREST {
private static final Logger LOG =
LoggerFactory.getLogger(MockDefaultRequestInterceptorREST.class);
final private AtomicInteger applicationCounter = new AtomicInteger(0);
// True if the Mock RM is running, false otherwise.
// This property allows us to write tests for specific scenario as YARN RM
// down e.g. network issue, failover.
private boolean isRunning = true;
private Map<ApplicationId, ApplicationReport> applicationMap = new HashMap<>();
public static final String APP_STATE_RUNNING = "RUNNING";
// duration(milliseconds), 1mins
public static final long DURATION = 60*1000;
// Containers 4
public static final int NUM_CONTAINERS = 4;
private Map<ReservationId, SubClusterId> reservationMap = new HashMap<>();
private AtomicLong resCounter = new AtomicLong();
private MockRM mockRM = null;
private void validateRunning() throws ConnectException {
if (!isRunning) {
throw new ConnectException("RM is stopped");
}
}
@Override
public Response createNewApplication(HttpServletRequest hsr)
throws AuthorizationException, IOException, InterruptedException {
validateRunning();
ApplicationId applicationId =
ApplicationId.newInstance(Integer.parseInt(getSubClusterId().getId()),
applicationCounter.incrementAndGet());
NewApplication appId =
new NewApplication(applicationId.toString(), new ResourceInfo());
Response response = Mockito.mock(Response.class);
Mockito.when(response.readEntity(NewApplication.class)).thenReturn(appId);
Mockito.when(response.getStatus()).thenReturn(HttpServletResponse.SC_OK);
return response;
}
@Override
public Response submitApplication(ApplicationSubmissionContextInfo newApp,
HttpServletRequest hsr)
throws AuthorizationException, IOException, InterruptedException {
validateRunning();
ApplicationId appId = ApplicationId.fromString(newApp.getApplicationId());
LOG.info("Application submitted: " + appId);
// Initialize appReport
ApplicationReport appReport = ApplicationReport.newInstance(
appId, ApplicationAttemptId.newInstance(appId, 1), null, newApp.getQueue(), null, null, 0,
null, YarnApplicationState.ACCEPTED, "", null, 0, 0, null, null, null, 0,
newApp.getApplicationType(), null, null, false, Priority.newInstance(newApp.getPriority()),
null, null);
// Initialize appTimeoutsMap
HashMap<ApplicationTimeoutType, ApplicationTimeout> appTimeoutsMap = new HashMap<>();
ApplicationTimeoutType timeoutType = ApplicationTimeoutType.LIFETIME;
ApplicationTimeout appTimeOut =
ApplicationTimeout.newInstance(ApplicationTimeoutType.LIFETIME, "UNLIMITED", 10);
appTimeoutsMap.put(timeoutType, appTimeOut);
appReport.setApplicationTimeouts(appTimeoutsMap);
applicationMap.put(appId, appReport);
return Response.status(Status.ACCEPTED).header(HttpHeaders.LOCATION, "")
.entity(getSubClusterId()).build();
}
@Override
public AppInfo getApp(HttpServletRequest hsr, String appId,
Set<String> unselectedFields) {
if (!isRunning) {
throw new RuntimeException("RM is stopped");
}
ApplicationId applicationId = ApplicationId.fromString(appId);
if (!applicationMap.containsKey(applicationId)) {
throw new NotFoundException("app with id: " + appId + " not found");
}
return new AppInfo();
}
@Override
public AppsInfo getApps(HttpServletRequest hsr, String stateQuery,
Set<String> statesQuery, String finalStatusQuery, String userQuery,
String queueQuery, String count, String startedBegin, String startedEnd,
String finishBegin, String finishEnd, Set<String> applicationTypes,
Set<String> applicationTags, String name, Set<String> unselectedFields) {
if (!isRunning) {
throw new RuntimeException("RM is stopped");
}
AppsInfo appsInfo = new AppsInfo();
AppInfo appInfo = new AppInfo();
appInfo.setAppId(
ApplicationId.newInstance(Integer.parseInt(getSubClusterId().getId()),
applicationCounter.incrementAndGet()).toString());
appInfo.setAMHostHttpAddress("http://i_am_the_AM:1234");
appsInfo.add(appInfo);
return appsInfo;
}
@Override
public Response updateAppState(AppState targetState, HttpServletRequest hsr,
String appId) throws AuthorizationException, YarnException,
InterruptedException, IOException {
validateRunning();
ApplicationId applicationId = ApplicationId.fromString(appId);
if (applicationMap.remove(applicationId) == null) {
throw new ApplicationNotFoundException(
"Trying to kill an absent application: " + appId);
}
if (targetState == null) {
return Response.status(Status.BAD_REQUEST).build();
}
LOG.info("Force killing application: " + appId);
AppState ret = new AppState();
ret.setState(targetState.toString());
return Response.status(Status.OK).entity(ret).build();
}
@Override
public NodeInfo getNode(String nodeId) {
if (!isRunning) {
throw new RuntimeException("RM is stopped");
}
NodeInfo node = null;
SubClusterId subCluster = getSubClusterId();
String subClusterId = subCluster.getId();
if (nodeId == null || nodeId.contains(subClusterId) || nodeId.contains("test")) {
node = new NodeInfo();
node.setId(nodeId);
node.setLastHealthUpdate(Integer.parseInt(getSubClusterId().getId()));
}
return node;
}
@Override
public NodesInfo getNodes(String states) {
if (!isRunning) {
throw new RuntimeException("RM is stopped");
}
NodeInfo node = new NodeInfo();
node.setId("Node " + Integer.valueOf(getSubClusterId().getId()));
node.setLastHealthUpdate(Integer.parseInt(getSubClusterId().getId()));
NodesInfo nodes = new NodesInfo();
nodes.add(node);
return nodes;
}
@Override
public ResourceInfo updateNodeResource(HttpServletRequest hsr,
String nodeId, ResourceOptionInfo resourceOption) {
if (!isRunning) {
throw new RuntimeException("RM is stopped");
}
Resource resource = resourceOption.getResourceOption().getResource();
return new ResourceInfo(resource);
}
@Override
public ClusterMetricsInfo getClusterMetricsInfo() {
if (!isRunning) {
throw new RuntimeException("RM is stopped");
}
ClusterMetricsInfo metrics = new ClusterMetricsInfo();
metrics.setAppsSubmitted(Integer.parseInt(getSubClusterId().getId()));
metrics.setAppsCompleted(Integer.parseInt(getSubClusterId().getId()));
metrics.setAppsPending(Integer.parseInt(getSubClusterId().getId()));
metrics.setAppsRunning(Integer.parseInt(getSubClusterId().getId()));
metrics.setAppsFailed(Integer.parseInt(getSubClusterId().getId()));
metrics.setAppsKilled(Integer.parseInt(getSubClusterId().getId()));
return metrics;
}
@Override
public AppState getAppState(HttpServletRequest hsr, String appId)
throws AuthorizationException {
if (!isRunning) {
throw new RuntimeException("RM is stopped");
}
ApplicationId applicationId = ApplicationId.fromString(appId);
if (!applicationMap.containsKey(applicationId)) {
throw new NotFoundException("app with id: " + appId + " not found");
}
return new AppState(APP_STATE_RUNNING);
}
public void setSubClusterId(int subClusterId) {
setSubClusterId(SubClusterId.newInstance(Integer.toString(subClusterId)));
}
public boolean isRunning() {
return isRunning;
}
public void setRunning(boolean runningMode) {
this.isRunning = runningMode;
}
@Override
public ContainersInfo getContainers(HttpServletRequest req, HttpServletResponse res,
String appId, String appAttemptId) {
if (!isRunning) {
throw new RuntimeException("RM is stopped");
}
// Try format conversion for app_id
ApplicationId applicationId = null;
try {
applicationId = ApplicationId.fromString(appId);
} catch (Exception e) {
throw new BadRequestException(e);
}
// Try format conversion for app_attempt_id
ApplicationAttemptId applicationAttemptId = null;
try {
applicationAttemptId =
ApplicationAttemptId.fromString(appAttemptId);
} catch (Exception e) {
throw new BadRequestException(e);
}
// We avoid to check if the Application exists in the system because we need
// to validate that each subCluster returns 1 container.
ContainersInfo containers = new ContainersInfo();
int subClusterId = Integer.valueOf(getSubClusterId().getId());
ContainerId containerId = ContainerId.newContainerId(
ApplicationAttemptId.fromString(appAttemptId), subClusterId);
Resource allocatedResource =
Resource.newInstance(subClusterId, subClusterId);
NodeId assignedNode = NodeId.newInstance("Node", subClusterId);
Priority priority = Priority.newInstance(subClusterId);
long creationTime = subClusterId;
long finishTime = subClusterId;
String diagnosticInfo = "Diagnostic " + subClusterId;
String logUrl = "Log " + subClusterId;
int containerExitStatus = subClusterId;
ContainerState containerState = ContainerState.COMPLETE;
String nodeHttpAddress = "HttpAddress " + subClusterId;
ContainerReport containerReport = ContainerReport.newInstance(
containerId, allocatedResource, assignedNode, priority,
creationTime, finishTime, diagnosticInfo, logUrl,
containerExitStatus, containerState, nodeHttpAddress);
ContainerInfo container = new ContainerInfo(containerReport);
containers.add(container);
return containers;
}
@Override
public NodeToLabelsInfo getNodeToLabels(HttpServletRequest hsr) throws IOException {
if (!isRunning) {
throw new RuntimeException("RM is stopped");
}
NodeLabelsInfo cpuNode = new NodeLabelsInfo(Collections.singleton("CPU"));
NodeLabelsInfo gpuNode = new NodeLabelsInfo(Collections.singleton("GPU"));
HashMap<String, NodeLabelsInfo> nodeLabels = new HashMap<>();
nodeLabels.put("node1", cpuNode);
nodeLabels.put("node2", gpuNode);
return new NodeToLabelsInfo(nodeLabels);
}
@Override
public LabelsToNodesInfo getLabelsToNodes(Set<String> labels) throws IOException {
if (!isRunning) {
throw new RuntimeException("RM is stopped");
}
Map<NodeLabelInfo, NodeIDsInfo> labelsToNodes = new HashMap<>();
NodeLabel labelX = NodeLabel.newInstance("x", false);
NodeLabelInfo nodeLabelInfoX = new NodeLabelInfo(labelX);
ArrayList<String> hostsX = new ArrayList<>(Arrays.asList("host1A", "host1B"));
Resource resourceX = Resource.newInstance(20*1024, 10);
NodeIDsInfo nodeIDsInfoX = new NodeIDsInfo(hostsX, resourceX);
labelsToNodes.put(nodeLabelInfoX, nodeIDsInfoX);
NodeLabel labelY = NodeLabel.newInstance("y", false);
NodeLabelInfo nodeLabelInfoY = new NodeLabelInfo(labelY);
ArrayList<String> hostsY = new ArrayList<>(Arrays.asList("host2A", "host2B"));
Resource resourceY = Resource.newInstance(40*1024, 20);
NodeIDsInfo nodeIDsInfoY = new NodeIDsInfo(hostsY, resourceY);
labelsToNodes.put(nodeLabelInfoY, nodeIDsInfoY);
NodeLabel labelZ = NodeLabel.newInstance("z", false);
NodeLabelInfo nodeLabelInfoZ = new NodeLabelInfo(labelZ);
ArrayList<String> hostsZ = new ArrayList<>(Arrays.asList("host3A", "host3B"));
Resource resourceZ = Resource.newInstance(80*1024, 40);
NodeIDsInfo nodeIDsInfoZ = new NodeIDsInfo(hostsZ, resourceZ);
labelsToNodes.put(nodeLabelInfoZ, nodeIDsInfoZ);
return new LabelsToNodesInfo(labelsToNodes);
}
@Override
public NodeLabelsInfo getClusterNodeLabels(HttpServletRequest hsr) throws IOException {
if (!isRunning) {
throw new RuntimeException("RM is stopped");
}
NodeLabel labelCpu = NodeLabel.newInstance("cpu", false);
NodeLabel labelGpu = NodeLabel.newInstance("gpu", false);
return new NodeLabelsInfo(Sets.newHashSet(labelCpu, labelGpu));
}
@Override
public NodeLabelsInfo getLabelsOnNode(HttpServletRequest hsr, String nodeId) throws IOException {
if (!isRunning) {
throw new RuntimeException("RM is stopped");
}
if (StringUtils.equalsIgnoreCase(nodeId, "node1")) {
NodeLabel labelCpu = NodeLabel.newInstance("x", false);
NodeLabel labelGpu = NodeLabel.newInstance("y", false);
return new NodeLabelsInfo(Sets.newHashSet(labelCpu, labelGpu));
} else {
return null;
}
}
@Override
public ContainerInfo getContainer(HttpServletRequest req, HttpServletResponse res,
String appId, String appAttemptId, String containerId) {
if (!isRunning) {
throw new RuntimeException("RM is stopped");
}
ContainerId newContainerId = ContainerId.fromString(containerId);
Resource allocatedResource = Resource.newInstance(1024, 2);
int subClusterId = Integer.valueOf(getSubClusterId().getId());
NodeId assignedNode = NodeId.newInstance("Node", subClusterId);
Priority priority = Priority.newInstance(subClusterId);
long creationTime = subClusterId;
long finishTime = subClusterId;
String diagnosticInfo = "Diagnostic " + subClusterId;
String logUrl = "Log " + subClusterId;
int containerExitStatus = subClusterId;
ContainerState containerState = ContainerState.COMPLETE;
String nodeHttpAddress = "HttpAddress " + subClusterId;
ContainerReport containerReport = ContainerReport.newInstance(
newContainerId, allocatedResource, assignedNode, priority,
creationTime, finishTime, diagnosticInfo, logUrl,
containerExitStatus, containerState, nodeHttpAddress);
return new ContainerInfo(containerReport);
}
@Override
public Response signalToContainer(String containerId, String command,
HttpServletRequest req) throws AuthorizationException {
if (!isRunning) {
throw new RuntimeException("RM is stopped");
}
if (!EnumUtils.isValidEnum(SignalContainerCommand.class, command.toUpperCase())) {
String errMsg = "Invalid command: " + command.toUpperCase() + ", valid commands are: "
+ Arrays.asList(SignalContainerCommand.values());
return Response.status(Status.BAD_REQUEST).entity(errMsg).build();
}
return Response.status(Status.OK).build();
}
@Override
public AppAttemptInfo getAppAttempt(HttpServletRequest req, HttpServletResponse res,
String appId, String appAttemptId) {
if (!isRunning) {
throw new RuntimeException("RM is stopped");
}
ApplicationId applicationId = ApplicationId.fromString(appId);
if (!applicationMap.containsKey(applicationId)) {
throw new NotFoundException("app with id: " + appId + " not found");
}
ApplicationAttemptId attemptId = ApplicationAttemptId.fromString(appAttemptId);
ApplicationReport newApplicationReport = ApplicationReport.newInstance(
applicationId, attemptId, "user", "queue", "appname", "host", 124, null,
YarnApplicationState.RUNNING, "diagnostics", "url", 1, 2, 3, 4,
FinalApplicationStatus.SUCCEEDED, null, "N/A", 0.53789f, "YARN", null);
ApplicationAttemptReport attempt = ApplicationAttemptReport.newInstance(
attemptId, "host", 124, "url", "oUrl", "diagnostics",
YarnApplicationAttemptState.FINISHED, ContainerId.newContainerId(
newApplicationReport.getCurrentApplicationAttemptId(), 1));
return new AppAttemptInfo(attempt);
}
@Override
public AppAttemptsInfo getAppAttempts(HttpServletRequest hsr, String appId) {
if (!isRunning) {
throw new RuntimeException("RM is stopped");
}
ApplicationId applicationId = ApplicationId.fromString(appId);
if (!applicationMap.containsKey(applicationId)) {
throw new NotFoundException("app with id: " + appId + " not found");
}
AppAttemptsInfo infos = new AppAttemptsInfo();
infos.add(TestRouterWebServiceUtil.generateAppAttemptInfo(0));
infos.add(TestRouterWebServiceUtil.generateAppAttemptInfo(1));
return infos;
}
@Override
public AppTimeoutInfo getAppTimeout(HttpServletRequest hsr,
String appId, String type) throws AuthorizationException {
if (!isRunning) {
throw new RuntimeException("RM is stopped");
}
ApplicationId applicationId = ApplicationId.fromString(appId);
if (!applicationMap.containsKey(applicationId)) {
throw new NotFoundException("app with id: " + appId + " not found");
}
ApplicationReport appReport = applicationMap.get(applicationId);
Map<ApplicationTimeoutType, ApplicationTimeout> timeouts = appReport.getApplicationTimeouts();
ApplicationTimeoutType paramType = ApplicationTimeoutType.valueOf(type);
if (paramType == null) {
throw new NotFoundException("application timeout type not found");
}
if (!timeouts.containsKey(paramType)) {
throw new NotFoundException("timeout with id: " + appId + " not found");
}
ApplicationTimeout applicationTimeout = timeouts.get(paramType);
AppTimeoutInfo timeoutInfo = new AppTimeoutInfo();
timeoutInfo.setExpiryTime(applicationTimeout.getExpiryTime());
timeoutInfo.setTimeoutType(applicationTimeout.getTimeoutType());
timeoutInfo.setRemainingTime(applicationTimeout.getRemainingTime());
return timeoutInfo;
}
@Override
public AppTimeoutsInfo getAppTimeouts(HttpServletRequest hsr, String appId)
throws AuthorizationException {
if (!isRunning) {
throw new RuntimeException("RM is stopped");
}
ApplicationId applicationId = ApplicationId.fromString(appId);
if (!applicationMap.containsKey(applicationId)) {
throw new NotFoundException("app with id: " + appId + " not found");
}
ApplicationReport appReport = applicationMap.get(applicationId);
Map<ApplicationTimeoutType, ApplicationTimeout> timeouts = appReport.getApplicationTimeouts();
AppTimeoutsInfo timeoutsInfo = new AppTimeoutsInfo();
for (ApplicationTimeout timeout : timeouts.values()) {
AppTimeoutInfo timeoutInfo = new AppTimeoutInfo();
timeoutInfo.setExpiryTime(timeout.getExpiryTime());
timeoutInfo.setTimeoutType(timeout.getTimeoutType());
timeoutInfo.setRemainingTime(timeout.getRemainingTime());
timeoutsInfo.add(timeoutInfo);
}
return timeoutsInfo;
}
@Override
public Response updateApplicationTimeout(AppTimeoutInfo appTimeout, HttpServletRequest hsr,
String appId) throws AuthorizationException,
YarnException, InterruptedException, IOException {
if (!isRunning) {
throw new RuntimeException("RM is stopped");
}
ApplicationId applicationId = ApplicationId.fromString(appId);
if (!applicationMap.containsKey(applicationId)) {
throw new NotFoundException("app with id: " + appId + " not found");
}
ApplicationReport appReport = applicationMap.get(applicationId);
Map<ApplicationTimeoutType, ApplicationTimeout> timeouts = appReport.getApplicationTimeouts();
ApplicationTimeoutType paramTimeoutType = appTimeout.getTimeoutType();
if (!timeouts.containsKey(paramTimeoutType)) {
throw new NotFoundException("TimeOutType with id: " + appId + " not found");
}
ApplicationTimeout applicationTimeout = timeouts.get(paramTimeoutType);
applicationTimeout.setTimeoutType(appTimeout.getTimeoutType());
applicationTimeout.setExpiryTime(appTimeout.getExpireTime());
applicationTimeout.setRemainingTime(appTimeout.getRemainingTimeInSec());
AppTimeoutInfo result = new AppTimeoutInfo(applicationTimeout);
return Response.status(Status.OK).entity(result).build();
}
@Override
public Response updateApplicationPriority(AppPriority targetPriority, HttpServletRequest hsr,
String appId) throws YarnException, InterruptedException, IOException {
if (!isRunning) {
throw new RuntimeException("RM is stopped");
}
ApplicationId applicationId = ApplicationId.fromString(appId);
if (targetPriority == null) {
return Response.status(Status.BAD_REQUEST).build();
}
if (!applicationMap.containsKey(applicationId)) {
throw new NotFoundException("app with id: " + appId + " not found");
}
ApplicationReport appReport = applicationMap.get(applicationId);
Priority newPriority = Priority.newInstance(targetPriority.getPriority());
appReport.setPriority(newPriority);
return Response.status(Status.OK).entity(targetPriority).build();
}
@Override
public AppPriority getAppPriority(HttpServletRequest hsr, String appId)
throws AuthorizationException {
if (!isRunning) {
throw new RuntimeException("RM is stopped");
}
ApplicationId applicationId = ApplicationId.fromString(appId);
if (!applicationMap.containsKey(applicationId)) {
throw new NotFoundException("app with id: " + appId + " not found");
}
ApplicationReport appReport = applicationMap.get(applicationId);
Priority priority = appReport.getPriority();
return new AppPriority(priority.getPriority());
}
@Override
public AppQueue getAppQueue(HttpServletRequest hsr, String appId)
throws AuthorizationException {
if (!isRunning) {
throw new RuntimeException("RM is stopped");
}
ApplicationId applicationId = ApplicationId.fromString(appId);
if (!applicationMap.containsKey(applicationId)) {
throw new NotFoundException("app with id: " + appId + " not found");
}
String queue = applicationMap.get(applicationId).getQueue();
return new AppQueue(queue);
}
@Override
public Response updateAppQueue(AppQueue targetQueue, HttpServletRequest hsr, String appId)
throws AuthorizationException, YarnException, InterruptedException, IOException {
if (!isRunning) {
throw new RuntimeException("RM is stopped");
}
ApplicationId applicationId = ApplicationId.fromString(appId);
if (!applicationMap.containsKey(applicationId)) {
throw new NotFoundException("app with id: " + appId + " not found");
}
if (targetQueue == null || StringUtils.isBlank(targetQueue.getQueue())) {
return Response.status(Status.BAD_REQUEST).build();
}
ApplicationReport appReport = applicationMap.get(applicationId);
String originalQueue = appReport.getQueue();
appReport.setQueue(targetQueue.getQueue());
applicationMap.put(applicationId, appReport);
LOG.info("Update applicationId = {} from originalQueue = {} to targetQueue = {}.",
appId, originalQueue, targetQueue);
AppQueue targetAppQueue = new AppQueue(targetQueue.getQueue());
return Response.status(Status.OK).entity(targetAppQueue).build();
}
public void updateApplicationState(YarnApplicationState appState, String appId)
throws AuthorizationException, YarnException, InterruptedException, IOException {
validateRunning();
ApplicationId applicationId = ApplicationId.fromString(appId);
if (!applicationMap.containsKey(applicationId)) {
throw new NotFoundException("app with id: " + appId + " not found");
}
ApplicationReport appReport = applicationMap.get(applicationId);
appReport.setYarnApplicationState(appState);
}
@Override
public ApplicationStatisticsInfo getAppStatistics(
HttpServletRequest hsr, Set<String> stateQueries, Set<String> typeQueries) {
if (!isRunning) {
throw new RuntimeException("RM is stopped");
}
Map<String, StatisticsItemInfo> itemInfoMap = new HashMap<>();
for (ApplicationReport appReport : applicationMap.values()) {
YarnApplicationState appState = appReport.getYarnApplicationState();
String appType = appReport.getApplicationType();
if (stateQueries.contains(appState.name()) && typeQueries.contains(appType)) {
String itemInfoMapKey = appState.toString() + "_" + appType;
StatisticsItemInfo itemInfo = itemInfoMap.getOrDefault(itemInfoMapKey, null);
if (itemInfo == null) {
itemInfo = new StatisticsItemInfo(appState, appType, 1);
} else {
long newCount = itemInfo.getCount() + 1;
itemInfo.setCount(newCount);
}
itemInfoMap.put(itemInfoMapKey, itemInfo);
}
}
return new ApplicationStatisticsInfo(itemInfoMap.values());
}
@Override
public AppActivitiesInfo getAppActivities(
HttpServletRequest hsr, String appId, String time, Set<String> requestPriorities,
Set<String> allocationRequestIds, String groupBy, String limit, Set<String> actions,
boolean summarize) {
if (!isRunning) {
throw new RuntimeException("RM is stopped");
}
ApplicationId applicationId = ApplicationId.fromString(appId);
if (!applicationMap.containsKey(applicationId)) {
throw new NotFoundException("app with id: " + appId + " not found");
}
SchedulerNode schedulerNode = TestUtils.getMockNode("host0", "rack", 1, 10240);
RMContext rmContext = Mockito.mock(RMContext.class);
Mockito.when(rmContext.getYarnConfiguration()).thenReturn(this.getConf());
ResourceScheduler scheduler = Mockito.mock(ResourceScheduler.class);
Mockito.when(scheduler.getMinimumResourceCapability()).thenReturn(Resources.none());
Mockito.when(rmContext.getScheduler()).thenReturn(scheduler);
LeafQueue mockQueue = Mockito.mock(LeafQueue.class);
Map<ApplicationId, RMApp> rmApps = new ConcurrentHashMap<>();
Mockito.doReturn(rmApps).when(rmContext).getRMApps();
FiCaSchedulerNode node = (FiCaSchedulerNode) schedulerNode;
ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(applicationId, 0);
RMApp mockApp = Mockito.mock(RMApp.class);
Mockito.doReturn(appAttemptId.getApplicationId()).when(mockApp).getApplicationId();
Mockito.doReturn(FinalApplicationStatus.UNDEFINED).when(mockApp).getFinalApplicationStatus();
rmApps.put(appAttemptId.getApplicationId(), mockApp);
FiCaSchedulerApp app = new FiCaSchedulerApp(appAttemptId, "user", mockQueue,
mock(ActiveUsersManager.class), rmContext);
ActivitiesManager newActivitiesManager = new ActivitiesManager(rmContext);
newActivitiesManager.turnOnAppActivitiesRecording(app.getApplicationId(), 3);
int numActivities = 10;
for (int i = 0; i < numActivities; i++) {
ActivitiesLogger.APP.startAppAllocationRecording(newActivitiesManager, node,
SystemClock.getInstance().getTime(), app);
ActivitiesLogger.APP.recordAppActivityWithoutAllocation(newActivitiesManager, node, app,
new SchedulerRequestKey(Priority.newInstance(0), 0, null),
ActivityDiagnosticConstant.NODE_IS_BLACKLISTED, ActivityState.REJECTED,
ActivityLevel.NODE);
ActivitiesLogger.APP.finishSkippedAppAllocationRecording(newActivitiesManager,
app.getApplicationId(), ActivityState.SKIPPED, ActivityDiagnosticConstant.EMPTY);
}
Set<Integer> prioritiesInt =
requestPriorities.stream().map(pri -> Integer.parseInt(pri)).collect(Collectors.toSet());
Set<Long> allocationReqIds =
allocationRequestIds.stream().map(id -> Long.parseLong(id)).collect(Collectors.toSet());
AppActivitiesInfo appActivitiesInfo = newActivitiesManager.
getAppActivitiesInfo(app.getApplicationId(), prioritiesInt, allocationReqIds, null,
Integer.parseInt(limit), summarize, 3);
return appActivitiesInfo;
}
@Override
public Response listReservation(String queue, String reservationId, long startTime, long endTime,
boolean includeResourceAllocations, HttpServletRequest hsr) throws Exception {
if (!isRunning) {
throw new RuntimeException("RM is stopped");
}
if (!StringUtils.equals(queue, QUEUE_DEDICATED_FULL)) {
throw new RuntimeException("The specified queue: " + queue +
" is not managed by reservation system." +
" Please try again with a valid reservable queue.");
}
ReservationId reservationID =
ReservationId.parseReservationId(reservationId);
if (!reservationMap.containsKey(reservationID)) {
throw new NotFoundException("reservationId with id: " + reservationId + " not found");
}
ClientRMService clientService = mockRM.getClientRMService();
// listReservations
ReservationListRequest request = ReservationListRequest.newInstance(
queue, reservationId, startTime, endTime, includeResourceAllocations);
ReservationListResponse resRespInfo = clientService.listReservations(request);
ReservationListInfo resResponse =
new ReservationListInfo(resRespInfo, includeResourceAllocations);
return Response.status(Status.OK).entity(resResponse).build();
}
@Override
public Response createNewReservation(HttpServletRequest hsr)
throws AuthorizationException, IOException, InterruptedException {
if (!isRunning) {
throw new RuntimeException("RM is stopped");
}
ReservationId resId = ReservationId.newInstance(Time.now(), resCounter.incrementAndGet());
LOG.info("Allocated new reservationId: {}.", resId);
NewReservation reservationId = new NewReservation(resId.toString());
return Response.status(Status.OK).entity(reservationId).build();
}
@Override
public Response submitReservation(ReservationSubmissionRequestInfo resContext,
HttpServletRequest hsr) throws AuthorizationException, IOException, InterruptedException {
if (!isRunning) {
throw new RuntimeException("RM is stopped");
}
ReservationId reservationId = ReservationId.parseReservationId(resContext.getReservationId());
ReservationDefinitionInfo definitionInfo = resContext.getReservationDefinition();
ReservationDefinition definition =
RouterServerUtil.convertReservationDefinition(definitionInfo);
ReservationSubmissionRequest request = ReservationSubmissionRequest.newInstance(
definition, resContext.getQueue(), reservationId);
submitReservation(request);
LOG.info("Reservation submitted: {}.", reservationId);
SubClusterId subClusterId = getSubClusterId();
reservationMap.put(reservationId, subClusterId);
return Response.status(Status.ACCEPTED).build();
}
private void submitReservation(ReservationSubmissionRequest request) {
try {
// synchronize plan
ReservationSystem reservationSystem = mockRM.getReservationSystem();
reservationSystem.synchronizePlan(QUEUE_DEDICATED_FULL, true);
// Generate reserved resources
ClientRMService clientService = mockRM.getClientRMService();
clientService.submitReservation(request);
} catch (IOException | YarnException e) {
throw new RuntimeException(e);
}
}
@Override
public Response updateReservation(ReservationUpdateRequestInfo resContext,
HttpServletRequest hsr) throws AuthorizationException, IOException, InterruptedException {
if (resContext == null || resContext.getReservationId() == null ||
resContext.getReservationDefinition() == null) {
return Response.status(Status.BAD_REQUEST).build();
}
String resId = resContext.getReservationId();
ReservationId reservationId = ReservationId.parseReservationId(resId);
if (!reservationMap.containsKey(reservationId)) {
throw new NotFoundException("reservationId with id: " + reservationId + " not found");
}
// Generate reserved resources
updateReservation(resContext);
ReservationUpdateResponseInfo resRespInfo = new ReservationUpdateResponseInfo();
return Response.status(Status.OK).entity(resRespInfo).build();
}
private void updateReservation(ReservationUpdateRequestInfo resContext) throws IOException {
if (resContext == null) {
throw new BadRequestException("Input ReservationSubmissionContext should not be null");
}
ReservationDefinitionInfo resInfo = resContext.getReservationDefinition();
if (resInfo == null) {
throw new BadRequestException("Input ReservationDefinition should not be null");
}
ReservationRequestsInfo resReqsInfo = resInfo.getReservationRequests();
if (resReqsInfo == null || resReqsInfo.getReservationRequest() == null
|| resReqsInfo.getReservationRequest().isEmpty()) {
throw new BadRequestException("The ReservationDefinition should " +
"contain at least one ReservationRequest");
}
if (resContext.getReservationId() == null) {
throw new BadRequestException("Update operations must specify an existing ReservationId");
}
ReservationRequestInterpreter[] values = ReservationRequestInterpreter.values();
ReservationRequestInterpreter requestInterpreter =
values[resReqsInfo.getReservationRequestsInterpreter()];
List<ReservationRequest> list = new ArrayList<>();
for (ReservationRequestInfo resReqInfo : resReqsInfo.getReservationRequest()) {
ResourceInfo rInfo = resReqInfo.getCapability();
Resource capability = Resource.newInstance(rInfo.getMemorySize(), rInfo.getvCores());
int numContainers = resReqInfo.getNumContainers();
int minConcurrency = resReqInfo.getMinConcurrency();
long duration = resReqInfo.getDuration();
ReservationRequest rr = ReservationRequest.newInstance(
capability, numContainers, minConcurrency, duration);
list.add(rr);
}
ReservationRequests reqs = ReservationRequests.newInstance(list, requestInterpreter);
ReservationDefinition rDef = ReservationDefinition.newInstance(
resInfo.getArrival(), resInfo.getDeadline(), reqs,
resInfo.getReservationName(), resInfo.getRecurrenceExpression(),
Priority.newInstance(resInfo.getPriority()));
ReservationUpdateRequest request = ReservationUpdateRequest.newInstance(
rDef, ReservationId.parseReservationId(resContext.getReservationId()));
ClientRMService clientService = mockRM.getClientRMService();
try {
clientService.updateReservation(request);
} catch (YarnException ex) {
throw new RuntimeException(ex);
}
}
@Override
public Response deleteReservation(ReservationDeleteRequestInfo resContext, HttpServletRequest hsr)
throws AuthorizationException, IOException, InterruptedException {
if (!isRunning) {
throw new RuntimeException("RM is stopped");
}
try {
String resId = resContext.getReservationId();
ReservationId reservationId = ReservationId.parseReservationId(resId);
if (!reservationMap.containsKey(reservationId)) {
throw new NotFoundException("reservationId with id: " + reservationId + " not found");
}
ReservationDeleteRequest reservationDeleteRequest =
ReservationDeleteRequest.newInstance(reservationId);
ClientRMService clientService = mockRM.getClientRMService();
clientService.deleteReservation(reservationDeleteRequest);
ReservationDeleteResponseInfo resRespInfo = new ReservationDeleteResponseInfo();
reservationMap.remove(reservationId);
return Response.status(Status.OK).entity(resRespInfo).build();
} catch (YarnException e) {
throw new RuntimeException(e);
}
}
@VisibleForTesting
public MockRM getMockRM() {
return mockRM;
}
@VisibleForTesting
public void setMockRM(MockRM mockResourceManager) {
this.mockRM = mockResourceManager;
}
@Override
public NodeLabelsInfo getRMNodeLabels(HttpServletRequest hsr) {
NodeLabelInfo nodeLabelInfo = new NodeLabelInfo();
nodeLabelInfo.setExclusivity(true);
nodeLabelInfo.setName("Test-Label");
nodeLabelInfo.setActiveNMs(10);
PartitionInfo partitionInfo = new PartitionInfo();
NodeLabelsInfo nodeLabelsInfo = new NodeLabelsInfo();
nodeLabelsInfo.getNodeLabelsInfo().add(nodeLabelInfo);
return nodeLabelsInfo;
}
private MockRM setupResourceManager() throws Exception {
DefaultMetricsSystem.setMiniClusterMode(true);
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
// Define default queue
conf.setCapacity(DEFAULT_QUEUE_PATH, 20);
// Define dedicated queues
conf.setQueues(ROOT_QUEUE_PATH,
new String[] {QUEUE_DEFAULT, QUEUE_DEDICATED});
conf.setCapacity(DEDICATED_QUEUE_PATH, 80);
conf.setReservable(DEDICATED_QUEUE_PATH, true);
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, ResourceScheduler.class);
conf.setBoolean(YarnConfiguration.RM_RESERVATION_SYSTEM_ENABLE, true);
MockRM rm = new MockRM(conf);
rm.start();
rm.registerNode("127.0.0.1:5678", 100*1024, 100);
return rm;
}
@Override
public RMQueueAclInfo checkUserAccessToQueue(String queue, String username,
String queueAclType, HttpServletRequest hsr) throws AuthorizationException {
ResourceManager mockResourceManager = mock(ResourceManager.class);
Configuration conf = new YarnConfiguration();
ResourceScheduler mockScheduler = new CapacityScheduler() {
@Override
public synchronized boolean checkAccess(UserGroupInformation callerUGI,
QueueACL acl, String queueName) {
if (acl == QueueACL.ADMINISTER_QUEUE) {
if (callerUGI.getUserName().equals("admin")) {
return true;
}
} else {
if (ImmutableSet.of("admin", "yarn").contains(callerUGI.getUserName())) {
return true;
}
}
return false;
}
};
when(mockResourceManager.getResourceScheduler()).thenReturn(mockScheduler);
MockRMWebServices webSvc = new MockRMWebServices(mockResourceManager, conf,
mock(HttpServletResponse.class));
return webSvc.checkUserAccessToQueue(queue, username, queueAclType, hsr);
}
class MockRMWebServices {
@Context
private HttpServletResponse httpServletResponse;
private ResourceManager resourceManager;
private void initForReadableEndpoints() {
// clear content type
httpServletResponse.setContentType(null);
}
MockRMWebServices(ResourceManager rm, Configuration conf, HttpServletResponse response) {
this.resourceManager = rm;
this.httpServletResponse = response;
}
private UserGroupInformation getCallerUserGroupInformation(
HttpServletRequest hsr, boolean usePrincipal) {
String remoteUser = hsr.getRemoteUser();
if (usePrincipal) {
Principal princ = hsr.getUserPrincipal();
remoteUser = princ == null ? null : princ.getName();
}
UserGroupInformation callerUGI = null;
if (remoteUser != null) {
callerUGI = UserGroupInformation.createRemoteUser(remoteUser);
}
return callerUGI;
}
public RMQueueAclInfo checkUserAccessToQueue(
String queue, String username, String queueAclType, HttpServletRequest hsr)
throws AuthorizationException {
initForReadableEndpoints();
// For the user who invokes this REST call, he/she should have admin access
// to the queue. Otherwise we will reject the call.
UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr, true);
if (callerUGI != null && !this.resourceManager.getResourceScheduler().checkAccess(
callerUGI, QueueACL.ADMINISTER_QUEUE, queue)) {
throw new ForbiddenException(
"User=" + callerUGI.getUserName() + " doesn't haven access to queue="
+ queue + " so it cannot check ACLs for other users.");
}
// Create UGI for the to-be-checked user.
UserGroupInformation user = UserGroupInformation.createRemoteUser(username);
if (user == null) {
throw new ForbiddenException(
"Failed to retrieve UserGroupInformation for user=" + username);
}
// Check if the specified queue acl is valid.
QueueACL queueACL;
try {
queueACL = QueueACL.valueOf(queueAclType);
} catch (IllegalArgumentException e) {
throw new BadRequestException("Specified queueAclType=" + queueAclType
+ " is not a valid type, valid queue acl types={"
+ "SUBMIT_APPLICATIONS/ADMINISTER_QUEUE}");
}
if (!this.resourceManager.getResourceScheduler().checkAccess(user, queueACL, queue)) {
return new RMQueueAclInfo(false, user.getUserName(),
"User=" + username + " doesn't have access to queue=" + queue
+ " with acl-type=" + queueAclType);
}
return new RMQueueAclInfo(true, user.getUserName(), "");
}
public String dumpSchedulerLogs(String time, HttpServletRequest hsr)
throws IOException {
int period = Integer.parseInt(time);
if (period <= 0) {
throw new BadRequestException("Period must be greater than 0");
}
return "Capacity scheduler logs are being created.";
}
}
@Override
public String dumpSchedulerLogs(String time, HttpServletRequest hsr) throws IOException {
ResourceManager mockResourceManager = mock(ResourceManager.class);
Configuration conf = new YarnConfiguration();
MockRMWebServices webSvc = new MockRMWebServices(mockResourceManager, conf,
mock(HttpServletResponse.class));
return webSvc.dumpSchedulerLogs(time, hsr);
}
public Response replaceLabelsOnNodes(NodeToLabelsEntryList newNodeToLabels,
HttpServletRequest hsr) throws IOException {
return Response.status(Status.OK).entity(
"subCluster-0:Success,subCluster-1:Success,subCluster-2:Success,subCluster-3:Success,")
.build();
}
@Override
public Response replaceLabelsOnNode(Set<String> newNodeLabelsName,
HttpServletRequest hsr, String nodeId) throws Exception {
return Response.status(Status.OK).entity("subCluster#3:Success;").build();
}
public ActivitiesInfo getActivities(HttpServletRequest hsr, String nodeId, String groupBy) {
if (!EnumUtils.isValidEnum(RMWSConsts.ActivitiesGroupBy.class, groupBy.toUpperCase())) {
String errMessage = "Got invalid groupBy: " + groupBy + ", valid groupBy types: "
+ Arrays.asList(RMWSConsts.ActivitiesGroupBy.values());
throw new IllegalArgumentException(errMessage);
}
SubClusterId subClusterId = getSubClusterId();
ActivitiesInfo activitiesInfo = mock(ActivitiesInfo.class);
Mockito.when(activitiesInfo.getNodeId()).thenReturn(nodeId);
Mockito.when(activitiesInfo.getTimestamp()).thenReturn(1673081972L);
Mockito.when(activitiesInfo.getDiagnostic()).thenReturn("Diagnostic:" + subClusterId.getId());
List<NodeAllocationInfo> allocationInfos = new ArrayList<>();
NodeAllocationInfo nodeAllocationInfo = mock(NodeAllocationInfo.class);
Mockito.when(nodeAllocationInfo.getPartition()).thenReturn("p" + subClusterId.getId());
Mockito.when(nodeAllocationInfo.getFinalAllocationState()).thenReturn("ALLOCATED");
allocationInfos.add(nodeAllocationInfo);
Mockito.when(activitiesInfo.getAllocations()).thenReturn(allocationInfos);
return activitiesInfo;
}
@Override
public BulkActivitiesInfo getBulkActivities(HttpServletRequest hsr,
String groupBy, int activitiesCount) {
if (activitiesCount <= 0) {
throw new IllegalArgumentException("activitiesCount needs to be greater than 0.");
}
if (!EnumUtils.isValidEnum(RMWSConsts.ActivitiesGroupBy.class, groupBy.toUpperCase())) {
String errMessage = "Got invalid groupBy: " + groupBy + ", valid groupBy types: "
+ Arrays.asList(RMWSConsts.ActivitiesGroupBy.values());
throw new IllegalArgumentException(errMessage);
}
BulkActivitiesInfo bulkActivitiesInfo = new BulkActivitiesInfo();
for (int i = 0; i < activitiesCount; i++) {
SubClusterId subClusterId = getSubClusterId();
ActivitiesInfo activitiesInfo = mock(ActivitiesInfo.class);
Mockito.when(activitiesInfo.getNodeId()).thenReturn(subClusterId + "-nodeId-" + i);
Mockito.when(activitiesInfo.getTimestamp()).thenReturn(1673081972L);
Mockito.when(activitiesInfo.getDiagnostic()).thenReturn("Diagnostic:" + subClusterId.getId());
List<NodeAllocationInfo> allocationInfos = new ArrayList<>();
NodeAllocationInfo nodeAllocationInfo = mock(NodeAllocationInfo.class);
Mockito.when(nodeAllocationInfo.getPartition()).thenReturn("p" + subClusterId.getId());
Mockito.when(nodeAllocationInfo.getFinalAllocationState()).thenReturn("ALLOCATED");
allocationInfos.add(nodeAllocationInfo);
Mockito.when(activitiesInfo.getAllocations()).thenReturn(allocationInfos);
bulkActivitiesInfo.getActivities().add(activitiesInfo);
}
return bulkActivitiesInfo;
}
public SchedulerTypeInfo getSchedulerInfo() {
try {
ResourceManager resourceManager = CapacitySchedulerTestUtilities.createResourceManager();
CapacityScheduler cs = (CapacityScheduler) resourceManager.getResourceScheduler();
CSQueue root = cs.getRootQueue();
SchedulerInfo schedulerInfo = new CapacitySchedulerInfo(root, cs);
return new SchedulerTypeInfo(schedulerInfo);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@Override
public Response addToClusterNodeLabels(NodeLabelsInfo newNodeLabels, HttpServletRequest hsr)
throws Exception {
List<NodeLabelInfo> nodeLabelInfoList = newNodeLabels.getNodeLabelsInfo();
NodeLabelInfo nodeLabelInfo = nodeLabelInfoList.get(0);
String nodeLabelName = nodeLabelInfo.getName();
// If nodeLabelName is ALL, we let all subclusters pass
if (StringUtils.equals("ALL", nodeLabelName)) {
return Response.status(Status.OK).build();
} else if (StringUtils.equals("A0", nodeLabelName)) {
SubClusterId subClusterId = getSubClusterId();
String id = subClusterId.getId();
if (StringUtils.contains("A0", id)) {
return Response.status(Status.OK).build();
} else {
return Response.status(Status.BAD_REQUEST).entity(null).build();
}
}
throw new YarnException("addToClusterNodeLabels Error");
}
@Override
public Response removeFromClusterNodeLabels(Set<String> oldNodeLabels, HttpServletRequest hsr)
throws Exception {
Response response = Mockito.mock(Response.class);
// If oldNodeLabels contains ALL, we let all subclusters pass
if (oldNodeLabels.contains("ALL")) {
return Response.status(Status.OK).build();
} else if (oldNodeLabels.contains("A0")) {
SubClusterId subClusterId = getSubClusterId();
String id = subClusterId.getId();
if (StringUtils.contains("A0", id)) {
Mockito.when(response.getStatus()).thenReturn(HttpServletResponse.SC_OK);
return response;
} else {
Mockito.when(response.getStatus()).thenReturn(HttpServletResponse.SC_BAD_REQUEST);
return response;
}
}
throw new YarnException("removeFromClusterNodeLabels Error");
}
@Override
public Response updateSchedulerConfiguration(SchedConfUpdateInfo mutationInfo,
HttpServletRequest req) throws AuthorizationException, InterruptedException {
RMContext rmContext = mockRM.getRMContext();
MutableCSConfigurationProvider provider = new MutableCSConfigurationProvider(rmContext);
try {
Configuration conf = new Configuration();
conf.set(YarnConfiguration.SCHEDULER_CONFIGURATION_STORE_CLASS,
YarnConfiguration.MEMORY_CONFIGURATION_STORE);
provider.init(conf);
provider.logAndApplyMutation(UserGroupInformation.getCurrentUser(), mutationInfo);
} catch (Exception e) {
throw new RuntimeException(e);
}
return Response.status(Status.OK).
entity("Configuration change successfully applied.").build();
}
@Override
public Response getSchedulerConfiguration(HttpServletRequest req) throws AuthorizationException {
return Response.status(Status.OK).entity(new ConfInfo(mockRM.getConfig()))
.build();
}
public ClusterInfo getClusterInfo() {
ClusterInfo clusterInfo = new ClusterInfo(mockRM);
return clusterInfo;
}
@Override
public ClusterUserInfo getClusterUserInfo(HttpServletRequest hsr) {
String remoteUser = hsr.getRemoteUser();
UserGroupInformation callerUGI = UserGroupInformation.createRemoteUser(remoteUser);
return new ClusterUserInfo(mockRM, callerUGI);
}
}