RouterWebServiceUtil.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.router.webapp;
import static javax.servlet.http.HttpServletResponse.SC_NO_CONTENT;
import static javax.servlet.http.HttpServletResponse.SC_OK;
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebServices.DELEGATION_TOKEN_HEADER;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Collection;
import java.util.Set;
import java.util.HashSet;
import java.util.concurrent.TimeUnit;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.client.Entity;
import javax.ws.rs.client.Invocation.Builder;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authentication.server.KerberosAuthenticationHandler;
import org.apache.hadoop.security.authorize.AuthorizationException;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticationHandler;
import org.apache.hadoop.thirdparty.com.google.common.net.HttpHeaders;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebAppUtil;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterMetricsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodesInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeLabelsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeToLabelsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationStatisticsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.StatisticsItemInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeLabelInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.PartitionInfo;
import org.apache.hadoop.yarn.server.uam.UnmanagedApplicationManager;
import org.apache.hadoop.yarn.webapp.BadRequestException;
import org.apache.hadoop.yarn.webapp.ConflictException;
import org.apache.hadoop.yarn.webapp.ForbiddenException;
import org.apache.hadoop.yarn.webapp.NotFoundException;
import org.glassfish.jersey.client.ClientProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The Router webservice util class.
*/
public final class RouterWebServiceUtil {
private static String user = "YarnRouter";
private static final Logger LOG =
LoggerFactory.getLogger(RouterWebServiceUtil.class.getName());
private final static String PARTIAL_REPORT = "Partial Report ";
/** Disable constructor. */
private RouterWebServiceUtil() {
}
/**
* Creates and performs a REST call to a specific WebService.
*
* @param webApp the address of the remote webapp
* @param hsr the servlet request
* @param returnType the return type of the REST call
* @param <T> Type of return object.
* @param method the HTTP method of the REST call
* @param targetPath additional path to add to the webapp address
* @param formParam the form parameters as input for a specific REST call
* @param additionalParam the query parameters as input for a specific REST
* call in case the call has no servlet request
* @param conf configuration.
* @param client same client used to reduce number of clients created
* @return the retrieved entity from the REST call
*/
protected static <T> T genericForward(final String webApp,
final HttpServletRequest hsr, final Class<T> returnType,
final HTTPMethods method, final String targetPath, final Object formParam,
final Map<String, String[]> additionalParam, Configuration conf,
Client client) {
UserGroupInformation callerUGI;
if (hsr != null) {
callerUGI = RMWebAppUtil.getCallerUserGroupInformation(hsr, true);
} else {
// user not required
callerUGI = UserGroupInformation.createRemoteUser(user);
}
if (callerUGI == null) {
LOG.error("Unable to obtain user name, user not authenticated");
return null;
}
try {
return callerUGI.doAs((PrivilegedExceptionAction<T>) () -> {
Map<String, String[]> paramMap = null;
// We can have hsr or additionalParam. There are no case with both.
if (hsr != null) {
paramMap = hsr.getParameterMap();
} else if (additionalParam != null) {
paramMap = additionalParam;
}
Response response = RouterWebServiceUtil.invokeRMWebService(
webApp, targetPath, method, (hsr == null) ? null : hsr.getPathInfo(), paramMap,
formParam, getMediaTypeFromHttpServletRequest(hsr, returnType), conf, client);
try {
if(returnType == Response.class) {
return returnType.cast(response);
}
// YARN RM can answer with Status.OK or it throws an exception
if (response.getStatus() == SC_OK) {
T t = response.readEntity(returnType);
return t;
}
if (response.getStatus() == SC_NO_CONTENT) {
try {
return returnType.getConstructor().newInstance();
} catch (RuntimeException | ReflectiveOperationException e) {
LOG.error("Cannot create empty entity for {}", returnType, e);
}
}
RouterWebServiceUtil.retrieveException(response);
return null;
} finally {
if (response != null && returnType != Response.class) {
response.close();
}
}
});
} catch (InterruptedException e) {
return null;
} catch (IOException e) {
return null;
}
}
/**
* Performs an invocation of a REST call on a remote RMWebService.
* @param webApp the address of the remote webapp
* @param path to add to the webapp address
* @param method the HTTP method of the REST call
* @param additionalPath the servlet request path
* @param queryParams hsr of additional Param
* @param formParam the form parameters as input for a specific REST call
* @param mediaType Media type for Servlet request call
* @param conf to support http and https
* @param client same client used to reduce number of clients created
* @return Client response to REST call
*/
@SuppressWarnings("checkstyle:parameternumber")
private static Response invokeRMWebService(String webApp, String path,
HTTPMethods method, String additionalPath,
Map<String, String[]> queryParams, Object formParam, String mediaType,
Configuration conf, Client client) {
InetSocketAddress socketAddress = NetUtils
.getConnectAddress(NetUtils.createSocketAddr(webApp));
String scheme = YarnConfiguration.useHttps(conf) ? "https://" : "http://";
String webAddress = scheme + socketAddress.getHostName() + ":"
+ socketAddress.getPort();
Client client1 = ClientBuilder.newClient();
WebTarget webResource = client1.target(webAddress);
if (additionalPath != null && !additionalPath.isEmpty()) {
webResource = webResource.path(additionalPath);
} else {
webResource = webResource.path(path);
}
LOG.info("webApp:{}, path:{}, method:{}, additionalPath:{}, queryParams:{}, " +
"formParam:{}, mediaType:{}, conf:{}", webApp, path, method, additionalPath,
queryParams, formParam, mediaType, conf);
if (queryParams != null && !queryParams.isEmpty()) {
for (Entry<String, String[]> param : queryParams.entrySet()) {
String[] values = param.getValue();
for (int i = 0; i < values.length; i++) {
webResource = webResource.queryParam(param.getKey(), values[i]);
}
}
}
Builder builder = webResource.request(mediaType);
Response response = null;
try {
switch (method) {
case DELETE:
response = builder.delete(Response.class);
break;
case GET:
response = builder.get(Response.class);
break;
case POST:
response = builder.post(Entity.entity(formParam, mediaType));
break;
case PUT:
response = builder.put(Entity.entity(formParam, mediaType), Response.class);
break;
default:
break;
}
} finally {
client.close();
}
return response;
}
public static void retrieveException(Response response) {
String serverErrorMsg = response.readEntity(String.class);
int status = response.getStatus();
if (status == 400) {
throw new BadRequestException(serverErrorMsg);
}
if (status == 403) {
throw new ForbiddenException(serverErrorMsg);
}
if (status == 404) {
throw new NotFoundException(serverErrorMsg);
}
if (status == 409) {
throw new ConflictException(serverErrorMsg);
}
}
/**
* Merges a list of AppInfo grouping by ApplicationId. Our current policy is
* to merge the application reports from the reachable SubClusters. Via
* configuration parameter, we decide whether to return applications for which
* the primary AM is missing or to omit them.
*
* @param appsInfo a list of AppInfo to merge
* @param returnPartialResult if the merge AppsInfo should contain partial
* result or not
* @return the merged AppsInfo
*/
public static AppsInfo mergeAppsInfo(ArrayList<AppInfo> appsInfo,
boolean returnPartialResult) {
AppsInfo allApps = new AppsInfo();
Map<String, AppInfo> federationAM = new HashMap<>();
Map<String, AppInfo> federationUAMSum = new HashMap<>();
for (AppInfo a : appsInfo) {
// Check if this AppInfo is an AM
if (a.getAMHostHttpAddress() != null) {
// Insert in the list of AM
federationAM.put(a.getAppId(), a);
// Check if there are any UAM found before
if (federationUAMSum.containsKey(a.getAppId())) {
// Merge the current AM with the found UAM
mergeAMWithUAM(a, federationUAMSum.get(a.getAppId()));
// Remove the sum of the UAMs
federationUAMSum.remove(a.getAppId());
}
// This AppInfo is an UAM
} else {
if (federationAM.containsKey(a.getAppId())) {
// Merge the current UAM with its own AM
mergeAMWithUAM(federationAM.get(a.getAppId()), a);
} else if (federationUAMSum.containsKey(a.getAppId())) {
// Merge the current UAM with its own UAM and update the list of UAM
federationUAMSum.put(a.getAppId(),
mergeUAMWithUAM(federationUAMSum.get(a.getAppId()), a));
} else {
// Insert in the list of UAM
federationUAMSum.put(a.getAppId(), a);
}
}
}
// Check the remaining UAMs are depending or not from federation
for (AppInfo a : federationUAMSum.values()) {
if (returnPartialResult || (a.getName() != null
&& !(a.getName().startsWith(UnmanagedApplicationManager.APP_NAME)
|| a.getName().startsWith(PARTIAL_REPORT)))) {
federationAM.put(a.getAppId(), a);
}
}
allApps.addAll(new ArrayList<>(federationAM.values()));
return allApps;
}
/**
* Create a Jersey client instance.
* @param conf Configuration
* @return a jersey client
*/
protected static Client createJerseyClient(Configuration conf) {
Client client = ClientBuilder.newClient();
long checkConnectTimeOut = conf.getLong(YarnConfiguration.ROUTER_WEBAPP_CONNECT_TIMEOUT, 0);
int connectTimeOut = (int) conf.getTimeDuration(YarnConfiguration.ROUTER_WEBAPP_CONNECT_TIMEOUT,
YarnConfiguration.DEFAULT_ROUTER_WEBAPP_CONNECT_TIMEOUT, TimeUnit.MILLISECONDS);
if (checkConnectTimeOut <= 0 || checkConnectTimeOut > Integer.MAX_VALUE) {
LOG.warn("Configuration {} = {} ms error. We will use the default value({} ms).",
YarnConfiguration.ROUTER_WEBAPP_CONNECT_TIMEOUT, connectTimeOut,
YarnConfiguration.DEFAULT_ROUTER_WEBAPP_CONNECT_TIMEOUT);
connectTimeOut = (int) TimeUnit.MILLISECONDS.convert(
YarnConfiguration.DEFAULT_ROUTER_WEBAPP_CONNECT_TIMEOUT, TimeUnit.MILLISECONDS);
}
client.property(ClientProperties.CONNECT_TIMEOUT, connectTimeOut);
long checkReadTimeout = conf.getLong(YarnConfiguration.ROUTER_WEBAPP_READ_TIMEOUT, 0);
int readTimeout = (int) conf.getTimeDuration(YarnConfiguration.ROUTER_WEBAPP_READ_TIMEOUT,
YarnConfiguration.DEFAULT_ROUTER_WEBAPP_READ_TIMEOUT, TimeUnit.MILLISECONDS);
if (checkReadTimeout < 0 || checkReadTimeout > Integer.MAX_VALUE) {
LOG.warn("Configuration {} = {} ms error. We will use the default value({} ms).",
YarnConfiguration.ROUTER_WEBAPP_CONNECT_TIMEOUT, connectTimeOut,
YarnConfiguration.DEFAULT_ROUTER_WEBAPP_CONNECT_TIMEOUT);
readTimeout = (int) TimeUnit.MILLISECONDS.convert(
YarnConfiguration.DEFAULT_ROUTER_WEBAPP_CONNECT_TIMEOUT, TimeUnit.MILLISECONDS);
}
client.property(ClientProperties.READ_TIMEOUT, readTimeout);
return client;
}
private static AppInfo mergeUAMWithUAM(AppInfo uam1, AppInfo uam2) {
AppInfo partialReport = new AppInfo();
partialReport.setAppId(uam1.getAppId());
partialReport.setName(PARTIAL_REPORT + uam1.getAppId());
// We pick the status of the first uam
partialReport.setState(uam1.getState());
// Merge the newly partial AM with UAM1 and then with UAM2
mergeAMWithUAM(partialReport, uam1);
mergeAMWithUAM(partialReport, uam2);
return partialReport;
}
private static void mergeAMWithUAM(AppInfo am, AppInfo uam) {
am.setPreemptedResourceMB(
am.getPreemptedResourceMB() + uam.getPreemptedResourceMB());
am.setPreemptedResourceVCores(
am.getPreemptedResourceVCores() + uam.getPreemptedResourceVCores());
am.setNumNonAMContainerPreempted(am.getNumNonAMContainerPreempted()
+ uam.getNumNonAMContainerPreempted());
am.setNumAMContainerPreempted(
am.getNumAMContainerPreempted() + uam.getNumAMContainerPreempted());
am.setPreemptedMemorySeconds(
am.getPreemptedMemorySeconds() + uam.getPreemptedMemorySeconds());
am.setPreemptedVcoreSeconds(
am.getPreemptedVcoreSeconds() + uam.getPreemptedVcoreSeconds());
if (am.getState() == YarnApplicationState.RUNNING
&& uam.getState() == am.getState()) {
am.getResourceRequests().addAll(uam.getResourceRequests());
am.setAllocatedMB(am.getAllocatedMB() + uam.getAllocatedMB());
am.setAllocatedVCores(am.getAllocatedVCores() + uam.getAllocatedVCores());
am.setReservedMB(am.getReservedMB() + uam.getReservedMB());
am.setReservedVCores(am.getReservedVCores() + uam.getReservedMB());
am.setRunningContainers(
am.getRunningContainers() + uam.getRunningContainers());
am.setMemorySeconds(am.getMemorySeconds() + uam.getMemorySeconds());
am.setVcoreSeconds(am.getVcoreSeconds() + uam.getVcoreSeconds());
}
}
/**
* Deletes all the duplicate NodeInfo by discarding the old instances.
*
* @param nodes a list of NodeInfo to check for duplicates
* @return a NodesInfo that contains a list of NodeInfos without duplicates
*/
public static NodesInfo deleteDuplicateNodesInfo(ArrayList<NodeInfo> nodes) {
NodesInfo nodesInfo = new NodesInfo();
Map<String, NodeInfo> nodesMap = new LinkedHashMap<>();
for (NodeInfo node : nodes) {
String nodeId = node.getNodeId();
// If the node already exists, it could be an old instance
if (nodesMap.containsKey(nodeId)) {
// Check if the node is an old instance
if (nodesMap.get(nodeId).getLastHealthUpdate() < node
.getLastHealthUpdate()) {
nodesMap.put(node.getNodeId(), node);
}
} else {
nodesMap.put(node.getNodeId(), node);
}
}
nodesInfo.addAll(new ArrayList<>(nodesMap.values()));
return nodesInfo;
}
/**
* Adds all the values from the second ClusterMetricsInfo to the first one.
*
* @param metrics the ClusterMetricsInfo we want to update
* @param metricsResponse the ClusterMetricsInfo we want to add to the first
* param
*/
public static void mergeMetrics(ClusterMetricsInfo metrics,
ClusterMetricsInfo metricsResponse) {
metrics.setAppsSubmitted(
metrics.getAppsSubmitted() + metricsResponse.getAppsSubmitted());
metrics.setAppsCompleted(
metrics.getAppsCompleted() + metricsResponse.getAppsCompleted());
metrics.setAppsPending(
metrics.getAppsPending() + metricsResponse.getAppsPending());
metrics.setAppsRunning(
metrics.getAppsRunning() + metricsResponse.getAppsRunning());
metrics.setAppsFailed(
metrics.getAppsFailed() + metricsResponse.getAppsFailed());
metrics.setAppsKilled(
metrics.getAppsKilled() + metricsResponse.getAppsKilled());
metrics.setReservedMB(
metrics.getReservedMB() + metricsResponse.getReservedMB());
metrics.setAvailableMB(
metrics.getAvailableMB() + metricsResponse.getAvailableMB());
metrics.setAllocatedMB(
metrics.getAllocatedMB() + metricsResponse.getAllocatedMB());
metrics.setReservedVirtualCores(metrics.getReservedVirtualCores()
+ metricsResponse.getReservedVirtualCores());
metrics.setAvailableVirtualCores(metrics.getAvailableVirtualCores()
+ metricsResponse.getAvailableVirtualCores());
metrics.setAllocatedVirtualCores(metrics.getAllocatedVirtualCores()
+ metricsResponse.getAllocatedVirtualCores());
metrics.setContainersAllocated(metrics.getContainersAllocated()
+ metricsResponse.getContainersAllocated());
metrics.setContainersReserved(metrics.getReservedContainers()
+ metricsResponse.getReservedContainers());
metrics.setContainersPending(metrics.getPendingContainers()
+ metricsResponse.getPendingContainers());
metrics.setTotalMB(metrics.getTotalMB()
+ metricsResponse.getTotalMB());
metrics.setUtilizedMB(metrics.getUtilizedMB()
+ metricsResponse.getUtilizedMB());
metrics.setTotalVirtualCores(metrics.getTotalVirtualCores()
+ metricsResponse.getTotalVirtualCores());
metrics.setTotalNodes(metrics.getTotalNodes()
+ metricsResponse.getTotalNodes());
metrics.setUtilizedVirtualCores(metrics.getUtilizedVirtualCores()
+ metricsResponse.getUtilizedVirtualCores());
metrics.setLostNodes(metrics.getLostNodes()
+ metricsResponse.getLostNodes());
metrics.setUnhealthyNodes(metrics.getUnhealthyNodes()
+ metricsResponse.getUnhealthyNodes());
metrics.setDecommissioningNodes(metrics.getDecommissioningNodes()
+ metricsResponse.getDecommissioningNodes());
metrics.setDecommissionedNodes(metrics.getDecommissionedNodes()
+ metricsResponse.getDecommissionedNodes());
metrics.setRebootedNodes(metrics.getRebootedNodes()
+ metricsResponse.getRebootedNodes());
metrics.setActiveNodes(metrics.getActiveNodes()
+ metricsResponse.getActiveNodes());
metrics.setShutdownNodes(metrics.getShutdownNodes()
+ metricsResponse.getShutdownNodes());
int utilizedVirtualCoresPercent = metrics.getTotalVirtualCores() <= 0 ? 0 :
(int) (metrics.getUtilizedVirtualCores() * 100 / metrics.getTotalVirtualCores());
metrics.setUtilizedVirtualCoresPercent(utilizedVirtualCoresPercent);
int utilizedMBPercent = metrics.getTotalMB() <= 0 ? 0 :
(int) (metrics.getUtilizedMB() * 100 / metrics.getTotalMB());
metrics.setUtilizedMBPercent(utilizedMBPercent);
}
/**
* Extract from HttpServletRequest the MediaType in output.
*
* @param request the servlet request.
* @param returnType the return type of the REST call.
* @param <T> Generic Type T.
* @return MediaType.
*/
protected static <T> String getMediaTypeFromHttpServletRequest(
HttpServletRequest request, final Class<T> returnType) {
if (request == null) {
// By default, we return XML for REST call without HttpServletRequest
return MediaType.APPLICATION_XML;
}
if (!returnType.equals(Response.class)) {
return MediaType.APPLICATION_XML;
}
String header = request.getHeader(HttpHeaders.ACCEPT);
if (header == null || header.equals("*")) {
// By default, we return JSON
return MediaType.APPLICATION_JSON;
}
return header;
}
public static NodeToLabelsInfo mergeNodeToLabels(
Map<SubClusterInfo, NodeToLabelsInfo> nodeToLabelsInfoMap) {
HashMap<String, NodeLabelsInfo> nodeToLabels = new HashMap<>();
Collection<NodeToLabelsInfo> nodeToLabelsInfos = nodeToLabelsInfoMap.values();
nodeToLabelsInfos.stream().forEach(nodeToLabelsInfo -> {
for (Map.Entry<String, NodeLabelsInfo> item : nodeToLabelsInfo.getNodeToLabels().entrySet()) {
String key = item.getKey();
NodeLabelsInfo itemValue = item.getValue();
NodeLabelsInfo nodeToLabelsValue = nodeToLabels.getOrDefault(item.getKey(), null);
Set<NodeLabel> hashSet = new HashSet<>();
if (itemValue != null) {
hashSet.addAll(itemValue.getNodeLabels());
}
if (nodeToLabelsValue != null) {
hashSet.addAll(nodeToLabelsValue.getNodeLabels());
}
nodeToLabels.put(key, new NodeLabelsInfo(hashSet));
}
});
return new NodeToLabelsInfo(nodeToLabels);
}
public static ApplicationStatisticsInfo mergeApplicationStatisticsInfo(
Collection<ApplicationStatisticsInfo> appStatistics) {
ApplicationStatisticsInfo result = new ApplicationStatisticsInfo();
Map<String, StatisticsItemInfo> statisticsItemMap = new HashMap<>();
appStatistics.stream().forEach(appStatistic -> {
List<StatisticsItemInfo> statisticsItemInfos = appStatistic.getStatItems();
for (StatisticsItemInfo statisticsItemInfo : statisticsItemInfos) {
String statisticsItemKey =
statisticsItemInfo.getType() + "_" + statisticsItemInfo.getState().toString();
StatisticsItemInfo statisticsItemValue;
if (statisticsItemMap.containsKey(statisticsItemKey)) {
statisticsItemValue = statisticsItemMap.get(statisticsItemKey);
long statisticsItemValueCount = statisticsItemValue.getCount();
long statisticsItemInfoCount = statisticsItemInfo.getCount();
long newCount = statisticsItemValueCount + statisticsItemInfoCount;
statisticsItemValue.setCount(newCount);
} else {
statisticsItemValue = new StatisticsItemInfo(statisticsItemInfo);
}
statisticsItemMap.put(statisticsItemKey, statisticsItemValue);
}
});
if (!statisticsItemMap.isEmpty()) {
result.getStatItems().addAll(statisticsItemMap.values());
}
return result;
}
public static NodeLabelsInfo mergeNodeLabelsInfo(Map<SubClusterInfo, NodeLabelsInfo> paramMap) {
Map<String, NodeLabelInfo> resultMap = new HashMap<>();
paramMap.values().stream()
.flatMap(nodeLabelsInfo -> nodeLabelsInfo.getNodeLabelsInfo().stream())
.forEach(nodeLabelInfo -> {
String keyLabelName = nodeLabelInfo.getName();
if (resultMap.containsKey(keyLabelName)) {
NodeLabelInfo mapNodeLabelInfo = resultMap.get(keyLabelName);
mapNodeLabelInfo = mergeNodeLabelInfo(mapNodeLabelInfo, nodeLabelInfo);
resultMap.put(keyLabelName, mapNodeLabelInfo);
} else {
resultMap.put(keyLabelName, nodeLabelInfo);
}
});
NodeLabelsInfo nodeLabelsInfo = new NodeLabelsInfo();
nodeLabelsInfo.getNodeLabelsInfo().addAll(resultMap.values());
return nodeLabelsInfo;
}
private static NodeLabelInfo mergeNodeLabelInfo(NodeLabelInfo left, NodeLabelInfo right) {
NodeLabelInfo resultNodeLabelInfo = new NodeLabelInfo();
resultNodeLabelInfo.setName(left.getName());
int newActiveNMs = left.getActiveNMs() + right.getActiveNMs();
resultNodeLabelInfo.setActiveNMs(newActiveNMs);
boolean newExclusivity = left.getExclusivity() && right.getExclusivity();
resultNodeLabelInfo.setExclusivity(newExclusivity);
PartitionInfo leftPartition = left.getPartitionInfo();
PartitionInfo rightPartition = right.getPartitionInfo();
PartitionInfo newPartitionInfo = PartitionInfo.addTo(leftPartition, rightPartition);
resultNodeLabelInfo.setPartitionInfo(newPartitionInfo);
return resultNodeLabelInfo;
}
/**
* initForWritableEndpoints does the init and acls verification for all
* writable REST end points.
*
* @param conf Configuration.
* @param callerUGI remote caller who initiated the request.
* @throws AuthorizationException in case of no access to perfom this op.
*/
public static void initForWritableEndpoints(Configuration conf, UserGroupInformation callerUGI)
throws AuthorizationException {
if (callerUGI == null) {
String msg = "Unable to obtain user name, user not authenticated";
throw new AuthorizationException(msg);
}
if (UserGroupInformation.isSecurityEnabled() && isStaticUser(conf, callerUGI)) {
String msg = "The default static user cannot carry out this operation.";
throw new ForbiddenException(msg);
}
}
/**
* Determine whether the user is a static user.
*
* @param conf Configuration.
* @param callerUGI remote caller who initiated the request.
* @return true, static user; false, not static user;
*/
private static boolean isStaticUser(Configuration conf, UserGroupInformation callerUGI) {
String staticUser = conf.get(CommonConfigurationKeys.HADOOP_HTTP_STATIC_USER,
CommonConfigurationKeys.DEFAULT_HADOOP_HTTP_STATIC_USER);
return staticUser.equals(callerUGI.getUserName());
}
public static void createKerberosUserGroupInformation(HttpServletRequest hsr)
throws YarnException {
String authType = hsr.getAuthType();
if (!KerberosAuthenticationHandler.TYPE.equalsIgnoreCase(authType)) {
String msg = "Delegation token operations can only be carried out on a "
+ "Kerberos authenticated channel. Expected auth type is "
+ KerberosAuthenticationHandler.TYPE + ", got type " + authType;
throw new YarnException(msg);
}
Object ugiAttr =
hsr.getAttribute(DelegationTokenAuthenticationHandler.DELEGATION_TOKEN_UGI_ATTRIBUTE);
if (ugiAttr != null) {
String msg = "Delegation token operations cannot be carried out using "
+ "delegation token authentication.";
throw new YarnException(msg);
}
}
/**
* Parse Token data.
*
* @param encodedToken tokenData
* @return RMDelegationTokenIdentifier.
*/
public static Token<RMDelegationTokenIdentifier> extractToken(String encodedToken) {
Token<RMDelegationTokenIdentifier> token = new Token<>();
try {
token.decodeFromUrlString(encodedToken);
} catch (Exception ie) {
throw new BadRequestException("Could not decode encoded token");
}
return token;
}
public static Token<RMDelegationTokenIdentifier> extractToken(HttpServletRequest request) {
String encodedToken = request.getHeader(DELEGATION_TOKEN_HEADER);
if (encodedToken == null) {
String msg = "Header '" + DELEGATION_TOKEN_HEADER
+ "' containing encoded token not found";
throw new BadRequestException(msg);
}
return extractToken(encodedToken);
}
/**
* Get Kerberos UserGroupInformation.
*
* Parse ugi from hsr and set kerberos authentication attributes.
*
* @param conf Configuration.
* @param request the servlet request.
* @return UserGroupInformation.
* @throws AuthorizationException if Kerberos auth failed.
* @throws YarnException If Authentication Type verification fails.
*/
public static UserGroupInformation getKerberosUserGroupInformation(Configuration conf,
HttpServletRequest request) throws AuthorizationException, YarnException {
// Parse ugi from hsr And Check ugi as expected.
// If ugi is empty or user is a static user, an exception will be thrown.
UserGroupInformation callerUGI = RMWebAppUtil.getCallerUserGroupInformation(request, true);
initForWritableEndpoints(conf, callerUGI);
// Set AuthenticationMethod Kerberos for ugi.
createKerberosUserGroupInformation(request);
callerUGI.setAuthenticationMethod(UserGroupInformation.AuthenticationMethod.KERBEROS);
// return caller UGI
return callerUGI;
}
public static String generateWebTitle(String title, String msg) {
StringBuilder stringBuilder = new StringBuilder();
stringBuilder.append(title);
stringBuilder.append(" (");
stringBuilder.append(msg);
stringBuilder.append(")");
return stringBuilder.toString();
}
}