RouterServerUtil.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.router;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.thirdparty.protobuf.GeneratedMessageV3;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ReservationRequest;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ReservationRequests;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerLaunchContextPBImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.proto.YarnProtos.StringStringMapProto;
import org.apache.hadoop.yarn.proto.YarnProtos.StringBytesMapProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationACLMapProto;
import org.apache.hadoop.yarn.proto.YarnProtos.StringLocalResourceMapProto;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationDefinitionInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationRequestsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationRequestInfo;
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceInfo;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.util.Records;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.ByteArrayOutputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.EnumSet;
import java.io.IOException;
/**
* Common utility methods used by the Router server.
*
*/
@Private
@Unstable
public final class RouterServerUtil {
private static final String APPLICATION_ID_PREFIX = "application_";
private static final String APP_ATTEMPT_ID_PREFIX = "appattempt_";
private static final String CONTAINER_PREFIX = "container_";
private static final String EPOCH_PREFIX = "e";
private static final String RESERVEIDSTR_PREFIX = "reservation_";
/** Disable constructor. */
private RouterServerUtil() {
}
public static final Logger LOG =
LoggerFactory.getLogger(RouterServerUtil.class);
/**
* Throws an exception due to an error.
*
* @param t the throwable raised in the called class.
* @param errMsgFormat the error message format string.
* @param args referenced by the format specifiers in the format string.
* @throws YarnException on failure
*/
@Public
@Unstable
public static void logAndThrowException(Throwable t, String errMsgFormat, Object... args)
throws YarnException {
String msg = String.format(errMsgFormat, args);
if (t != null) {
String newErrMsg = getErrorMsg(msg, t);
LOG.error(newErrMsg, t);
throw new YarnException(newErrMsg, t);
} else {
LOG.error(msg);
throw new YarnException(msg);
}
}
/**
* Throws an exception due to an error.
*
* @param errMsg the error message
* @param t the throwable raised in the called class.
* @throws YarnException on failure
*/
@Public
@Unstable
public static void logAndThrowException(String errMsg, Throwable t)
throws YarnException {
if (t != null) {
String newErrMsg = getErrorMsg(errMsg, t);
LOG.error(newErrMsg, t);
throw new YarnException(newErrMsg, t);
} else {
LOG.error(errMsg);
throw new YarnException(errMsg);
}
}
/**
* Throws an exception due to an error.
*
* @param errMsg the error message
* @throws YarnException on failure
*/
@Public
@Unstable
public static void logAndThrowException(String errMsg) throws YarnException {
LOG.error(errMsg);
throw new YarnException(errMsg);
}
private static String getErrorMsg(String errMsg, Throwable t) {
if (t.getMessage() != null) {
return errMsg + "" + t.getMessage();
}
return errMsg;
}
public static <R> R createRequestInterceptorChain(Configuration conf, String pipeLineClassName,
String interceptorClassName, Class<R> clazz) {
List<String> interceptorClassNames = getInterceptorClassNames(conf,
pipeLineClassName, interceptorClassName);
R pipeline = null;
R current = null;
for (String className : interceptorClassNames) {
try {
Class<?> interceptorClass = conf.getClassByName(className);
if (clazz.isAssignableFrom(interceptorClass)) {
Object interceptorInstance = ReflectionUtils.newInstance(interceptorClass, conf);
if (pipeline == null) {
pipeline = clazz.cast(interceptorInstance);
current = clazz.cast(interceptorInstance);
continue;
} else {
Method method = clazz.getMethod("setNextInterceptor", clazz);
method.invoke(current, interceptorInstance);
current = clazz.cast(interceptorInstance);
}
} else {
LOG.error("Class: {} not instance of {}.", className, clazz.getCanonicalName());
throw new YarnRuntimeException("Class: " + className + " not instance of "
+ clazz.getCanonicalName());
}
} catch (ClassNotFoundException e) {
LOG.error("Could not instantiate RequestInterceptor: {}", className, e);
throw new YarnRuntimeException("Could not instantiate RequestInterceptor: " + className, e);
} catch (InvocationTargetException e) {
LOG.error("RequestInterceptor {} call setNextInterceptor error.", className, e);
throw new YarnRuntimeException("RequestInterceptor " + className
+ " call setNextInterceptor error.", e);
} catch (NoSuchMethodException e) {
LOG.error("RequestInterceptor {} does not contain the method setNextInterceptor.",
className);
throw new YarnRuntimeException("RequestInterceptor " + className +
" does not contain the method setNextInterceptor.", e);
} catch (IllegalAccessException e) {
LOG.error("RequestInterceptor {} call the method setNextInterceptor " +
"does not have access.", className);
throw new YarnRuntimeException("RequestInterceptor "
+ className + " call the method setNextInterceptor does not have access.", e);
}
}
if (pipeline == null) {
throw new YarnRuntimeException(
"RequestInterceptor pipeline is not configured in the system.");
}
return pipeline;
}
private static List<String> getInterceptorClassNames(Configuration conf,
String pipeLineClass, String interceptorClass) {
String configuredInterceptorClassNames = conf.get(pipeLineClass, interceptorClass);
List<String> interceptorClassNames = new ArrayList<>();
Collection<String> tempList =
StringUtils.getStringCollection(configuredInterceptorClassNames);
for (String item : tempList) {
interceptorClassNames.add(item.trim());
}
return interceptorClassNames;
}
/**
* Throws an IOException due to an error.
*
* @param errMsg the error message
* @param t the throwable raised in the called class.
* @throws IOException on failure
*/
@Public
@Unstable
public static void logAndThrowIOException(String errMsg, Throwable t)
throws IOException {
if (t != null) {
String newErrMsg = getErrorMsg(errMsg, t);
LOG.error(newErrMsg, t);
throw new IOException(newErrMsg, t);
} else {
LOG.error(errMsg);
throw new IOException(errMsg);
}
}
/**
* Throws an IOException due to an error.
*
* @param t the throwable raised in the called class.
* @param errMsgFormat the error message format string.
* @param args referenced by the format specifiers in the format string.
* @throws IOException on failure
*/
@Public
@Unstable
public static void logAndThrowIOException(Throwable t, String errMsgFormat, Object... args)
throws IOException {
String msg = String.format(errMsgFormat, args);
if (t != null) {
String newErrMsg = getErrorMsg(msg, t);
LOG.error(newErrMsg, t);
throw new IOException(newErrMsg, t);
} else {
LOG.error(msg);
throw new IOException(msg);
}
}
/**
* Throws an RunTimeException due to an error.
*
* @param errMsg the error message
* @param t the throwable raised in the called class.
* @throws RuntimeException on failure
*/
@Public
@Unstable
public static void logAndThrowRunTimeException(String errMsg, Throwable t)
throws RuntimeException {
if (t != null) {
String newErrMsg = getErrorMsg(errMsg, t);
LOG.error(newErrMsg, t);
throw new RuntimeException(newErrMsg, t);
} else {
LOG.error(errMsg);
throw new RuntimeException(errMsg);
}
}
/**
* Throws an RunTimeException due to an error.
*
* @param t the throwable raised in the called class.
* @param errMsgFormat the error message format string.
* @param args referenced by the format specifiers in the format string.
* @throws RuntimeException on failure
*/
@Public
@Unstable
public static void logAndThrowRunTimeException(Throwable t, String errMsgFormat, Object... args)
throws RuntimeException {
String msg = String.format(errMsgFormat, args);
if (t != null) {
String newErrMsg = getErrorMsg(msg, t);
LOG.error(newErrMsg, t);
throw new RuntimeException(newErrMsg, t);
} else {
LOG.error(msg);
throw new RuntimeException(msg);
}
}
/**
* Throws an RunTimeException due to an error.
*
* @param t the throwable raised in the called class.
* @param errMsgFormat the error message format string.
* @param args referenced by the format specifiers in the format string.
* @return RuntimeException
*/
@Public
@Unstable
public static RuntimeException logAndReturnRunTimeException(
Throwable t, String errMsgFormat, Object... args) {
String msg = String.format(errMsgFormat, args);
if (t != null) {
String newErrMsg = getErrorMsg(msg, t);
LOG.error(newErrMsg, t);
return new RuntimeException(newErrMsg, t);
} else {
LOG.error(msg);
return new RuntimeException(msg);
}
}
/**
* Throws an RunTimeException due to an error.
*
* @param errMsgFormat the error message format string.
* @param args referenced by the format specifiers in the format string.
* @return RuntimeException
*/
@Public
@Unstable
public static RuntimeException logAndReturnRunTimeException(
String errMsgFormat, Object... args) {
return logAndReturnRunTimeException(null, errMsgFormat, args);
}
/**
* Throws an YarnRuntimeException due to an error.
*
* @param t the throwable raised in the called class.
* @param errMsgFormat the error message format string.
* @param args referenced by the format specifiers in the format string.
* @return YarnRuntimeException
*/
@Public
@Unstable
public static YarnRuntimeException logAndReturnYarnRunTimeException(
Throwable t, String errMsgFormat, Object... args) {
String msg = String.format(errMsgFormat, args);
if (t != null) {
String newErrMsg = getErrorMsg(msg, t);
LOG.error(newErrMsg, t);
return new YarnRuntimeException(newErrMsg, t);
} else {
LOG.error(msg);
return new YarnRuntimeException(msg);
}
}
/**
* Check applicationId is accurate.
*
* We need to ensure that applicationId cannot be empty and
* can be converted to ApplicationId object normally.
*
* @param applicationId applicationId of type string
* @throws IllegalArgumentException If the format of the applicationId is not accurate,
* an IllegalArgumentException needs to be thrown.
*/
@Public
@Unstable
public static void validateApplicationId(String applicationId)
throws IllegalArgumentException {
// Make Sure applicationId is not empty.
if (applicationId == null || applicationId.isEmpty()) {
throw new IllegalArgumentException("Parameter error, the appId is empty or null.");
}
// Make sure the prefix information of applicationId is accurate.
if (!applicationId.startsWith(APPLICATION_ID_PREFIX)) {
throw new IllegalArgumentException("Invalid ApplicationId prefix: "
+ applicationId + ". The valid ApplicationId should start with prefix application");
}
// Check the split position of the string.
int pos1 = APPLICATION_ID_PREFIX.length() - 1;
int pos2 = applicationId.indexOf('_', pos1 + 1);
if (pos2 < 0) {
throw new IllegalArgumentException("Invalid ApplicationId: " + applicationId);
}
// Confirm that the parsed rmId and appId are numeric types.
String rmId = applicationId.substring(pos1 + 1, pos2);
String appId = applicationId.substring(pos2 + 1);
if(!NumberUtils.isDigits(rmId) || !NumberUtils.isDigits(appId)){
throw new IllegalArgumentException("Invalid ApplicationId: " + applicationId);
}
}
/**
* Check appAttemptId is accurate.
*
* We need to ensure that appAttemptId cannot be empty and
* can be converted to ApplicationAttemptId object normally.
*
* @param appAttemptId appAttemptId of type string.
* @throws IllegalArgumentException If the format of the appAttemptId is not accurate,
* an IllegalArgumentException needs to be thrown.
*/
@Public
@Unstable
public static void validateApplicationAttemptId(String appAttemptId)
throws IllegalArgumentException {
// Make Sure appAttemptId is not empty.
if (appAttemptId == null || appAttemptId.isEmpty()) {
throw new IllegalArgumentException("Parameter error, the appAttemptId is empty or null.");
}
// Make sure the prefix information of appAttemptId is accurate.
if (!appAttemptId.startsWith(APP_ATTEMPT_ID_PREFIX)) {
throw new IllegalArgumentException("Invalid AppAttemptId prefix: " + appAttemptId);
}
// Check the split position of the string.
int pos1 = APP_ATTEMPT_ID_PREFIX.length() - 1;
int pos2 = appAttemptId.indexOf('_', pos1 + 1);
if (pos2 < 0) {
throw new IllegalArgumentException("Invalid AppAttemptId: " + appAttemptId);
}
int pos3 = appAttemptId.indexOf('_', pos2 + 1);
if (pos3 < 0) {
throw new IllegalArgumentException("Invalid AppAttemptId: " + appAttemptId);
}
// Confirm that the parsed rmId and appId and attemptId are numeric types.
String rmId = appAttemptId.substring(pos1 + 1, pos2);
String appId = appAttemptId.substring(pos2 + 1, pos3);
String attemptId = appAttemptId.substring(pos3 + 1);
if (!NumberUtils.isDigits(rmId) || !NumberUtils.isDigits(appId)
|| !NumberUtils.isDigits(attemptId)) {
throw new IllegalArgumentException("Invalid AppAttemptId: " + appAttemptId);
}
}
/**
* Check containerId is accurate.
*
* We need to ensure that containerId cannot be empty and
* can be converted to ContainerId object normally.
*
* @param containerId containerId of type string.
* @throws IllegalArgumentException If the format of the appAttemptId is not accurate,
* an IllegalArgumentException needs to be thrown.
*/
@Public
@Unstable
public static void validateContainerId(String containerId)
throws IllegalArgumentException {
// Make Sure containerId is not empty.
if (containerId == null || containerId.isEmpty()) {
throw new IllegalArgumentException("Parameter error, the containerId is empty or null.");
}
// Make sure the prefix information of containerId is accurate.
if (!containerId.startsWith(CONTAINER_PREFIX)) {
throw new IllegalArgumentException("Invalid ContainerId prefix: " + containerId);
}
// Check the split position of the string.
int pos1 = CONTAINER_PREFIX.length() - 1;
String epoch = "0";
if (containerId.regionMatches(pos1 + 1, EPOCH_PREFIX, 0, EPOCH_PREFIX.length())) {
int pos2 = containerId.indexOf('_', pos1 + 1);
if (pos2 < 0) {
throw new IllegalArgumentException("Invalid ContainerId: " + containerId);
}
String epochStr = containerId.substring(pos1 + 1 + EPOCH_PREFIX.length(), pos2);
epoch = epochStr;
// rewind the current position
pos1 = pos2;
}
int pos2 = containerId.indexOf('_', pos1 + 1);
if (pos2 < 0) {
throw new IllegalArgumentException("Invalid ContainerId: " + containerId);
}
int pos3 = containerId.indexOf('_', pos2 + 1);
if (pos3 < 0) {
throw new IllegalArgumentException("Invalid ContainerId: " + containerId);
}
int pos4 = containerId.indexOf('_', pos3 + 1);
if (pos4 < 0) {
throw new IllegalArgumentException("Invalid ContainerId: " + containerId);
}
// Confirm that the parsed appId and clusterTimestamp and attemptId and cid and epoch
// are numeric types.
String appId = containerId.substring(pos2 + 1, pos3);
String clusterTimestamp = containerId.substring(pos1 + 1, pos2);
String attemptId = containerId.substring(pos3 + 1, pos4);
String cid = containerId.substring(pos4 + 1);
if (!NumberUtils.isDigits(appId) || !NumberUtils.isDigits(clusterTimestamp)
|| !NumberUtils.isDigits(attemptId) || !NumberUtils.isDigits(cid)
|| !NumberUtils.isDigits(epoch)) {
throw new IllegalArgumentException("Invalid ContainerId: " + containerId);
}
}
public static boolean isAllowedDelegationTokenOp() throws IOException {
if (UserGroupInformation.isSecurityEnabled()) {
return EnumSet.of(UserGroupInformation.AuthenticationMethod.KERBEROS,
UserGroupInformation.AuthenticationMethod.KERBEROS_SSL,
UserGroupInformation.AuthenticationMethod.CERTIFICATE)
.contains(UserGroupInformation.getCurrentUser()
.getRealAuthenticationMethod());
} else {
return true;
}
}
public static String getRenewerForToken(Token<RMDelegationTokenIdentifier> token)
throws IOException {
UserGroupInformation user = UserGroupInformation.getCurrentUser();
UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
// we can always renew our own tokens
return loginUser.getUserName().equals(user.getUserName())
? token.decodeIdentifier().getRenewer().toString() : user.getShortUserName();
}
/**
* Set User information.
*
* If the username is empty, we will use the Yarn Router user directly.
* Do not create a proxy user if userName matches the userName on current UGI.
*
* @param userName userName.
* @return UserGroupInformation.
*/
public static UserGroupInformation setupUser(final String userName) {
UserGroupInformation user = null;
try {
// If userName is empty, we will return UserGroupInformation.getCurrentUser.
// Do not create a proxy user if user name matches the user name on
// current UGI
if (userName == null || userName.trim().isEmpty()) {
user = UserGroupInformation.getCurrentUser();
} else if (UserGroupInformation.isSecurityEnabled()) {
user = UserGroupInformation.createProxyUser(userName, UserGroupInformation.getLoginUser());
} else if (userName.equalsIgnoreCase(UserGroupInformation.getCurrentUser().getUserName())) {
user = UserGroupInformation.getCurrentUser();
} else {
user = UserGroupInformation.createProxyUser(userName,
UserGroupInformation.getCurrentUser());
}
return user;
} catch (IOException e) {
throw RouterServerUtil.logAndReturnYarnRunTimeException(e,
"Error while creating Router Service for user : %s.", user);
}
}
/**
* Check reservationId is accurate.
*
* We need to ensure that reservationId cannot be empty and
* can be converted to ReservationId object normally.
*
* @param reservationId reservationId.
* @throws IllegalArgumentException If the format of the reservationId is not accurate,
* an IllegalArgumentException needs to be thrown.
*/
@Public
@Unstable
public static void validateReservationId(String reservationId) throws IllegalArgumentException {
if (reservationId == null || reservationId.isEmpty()) {
throw new IllegalArgumentException("Parameter error, the reservationId is empty or null.");
}
if (!reservationId.startsWith(RESERVEIDSTR_PREFIX)) {
throw new IllegalArgumentException("Invalid ReservationId: " + reservationId);
}
String[] resFields = reservationId.split("_");
if (resFields.length != 3) {
throw new IllegalArgumentException("Invalid ReservationId: " + reservationId);
}
String clusterTimestamp = resFields[1];
String id = resFields[2];
if (!NumberUtils.isDigits(id) || !NumberUtils.isDigits(clusterTimestamp)) {
throw new IllegalArgumentException("Invalid ReservationId: " + reservationId);
}
}
/**
* Convert ReservationDefinitionInfo to ReservationDefinition.
*
* @param definitionInfo ReservationDefinitionInfo Object.
* @return ReservationDefinition.
*/
public static ReservationDefinition convertReservationDefinition(
ReservationDefinitionInfo definitionInfo) {
if (definitionInfo == null || definitionInfo.getReservationRequests() == null
|| definitionInfo.getReservationRequests().getReservationRequest() == null
|| definitionInfo.getReservationRequests().getReservationRequest().isEmpty()) {
throw new RuntimeException("definitionInfo Or ReservationRequests is Null.");
}
// basic variable
long arrival = definitionInfo.getArrival();
long deadline = definitionInfo.getDeadline();
// ReservationRequests reservationRequests
String name = definitionInfo.getReservationName();
String recurrenceExpression = definitionInfo.getRecurrenceExpression();
Priority priority = Priority.newInstance(definitionInfo.getPriority());
// reservation requests info
List<ReservationRequest> reservationRequestList = new ArrayList<>();
ReservationRequestsInfo reservationRequestsInfo = definitionInfo.getReservationRequests();
List<ReservationRequestInfo> reservationRequestInfos =
reservationRequestsInfo.getReservationRequest();
for (ReservationRequestInfo resRequestInfo : reservationRequestInfos) {
ResourceInfo resourceInfo = resRequestInfo.getCapability();
Resource capability =
Resource.newInstance(resourceInfo.getMemorySize(), resourceInfo.getvCores());
ReservationRequest reservationRequest = ReservationRequest.newInstance(capability,
resRequestInfo.getNumContainers(), resRequestInfo.getMinConcurrency(),
resRequestInfo.getDuration());
reservationRequestList.add(reservationRequest);
}
ReservationRequestInterpreter[] values = ReservationRequestInterpreter.values();
ReservationRequestInterpreter reservationRequestInterpreter =
values[reservationRequestsInfo.getReservationRequestsInterpreter()];
ReservationRequests reservationRequests = ReservationRequests.newInstance(
reservationRequestList, reservationRequestInterpreter);
ReservationDefinition definition = ReservationDefinition.newInstance(
arrival, deadline, reservationRequests, name, recurrenceExpression, priority);
return definition;
}
/**
* Checks if the ApplicationSubmissionContext submitted with the application
* is valid.
*
* Current checks:
* - if its size is within limits.
*
* @param appContext the app context to check.
* @param conf Configuration.
* @throws IOException if an IO error occurred.
* @throws YarnException yarn exception.
*/
@Public
@Unstable
public static void checkAppSubmissionContext(ApplicationSubmissionContextPBImpl appContext,
Configuration conf) throws IOException, YarnException {
// Prevents DoS over the ApplicationClientProtocol by checking the context
// the application was submitted with for any excessively large fields.
double bytesOfMaxAscSize = conf.getStorageSize(
YarnConfiguration.ROUTER_ASC_INTERCEPTOR_MAX_SIZE,
YarnConfiguration.DEFAULT_ROUTER_ASC_INTERCEPTOR_MAX_SIZE, StorageUnit.BYTES);
if (appContext != null) {
int bytesOfSerializedSize = appContext.getProto().getSerializedSize();
if (bytesOfSerializedSize >= bytesOfMaxAscSize) {
logContainerLaunchContext(appContext);
String applicationId = appContext.getApplicationId().toString();
String limit = StringUtils.byteDesc((long) bytesOfMaxAscSize);
String appContentSize = StringUtils.byteDesc(bytesOfSerializedSize);
String errMsg = String.format(
"The size of the ApplicationSubmissionContext of the application %s is " +
"above the limit %s, size = %s.", applicationId, limit, appContentSize);
LOG.error(errMsg);
throw new YarnException(errMsg);
}
}
}
/**
* Private helper for checkAppSubmissionContext that logs the fields in the
* context for debugging.
*
* @param appContext the app context.
* @throws IOException if an IO error occurred.
*/
@Private
@Unstable
private static void logContainerLaunchContext(ApplicationSubmissionContextPBImpl appContext)
throws IOException {
if (appContext == null || appContext.getAMContainerSpec() == null ||
!(appContext.getAMContainerSpec() instanceof ContainerLaunchContextPBImpl)) {
return;
}
ContainerLaunchContext launchContext = appContext.getAMContainerSpec();
ContainerLaunchContextPBImpl clc = (ContainerLaunchContextPBImpl) launchContext;
LOG.warn("ContainerLaunchContext size: {}.", clc.getProto().getSerializedSize());
// ContainerLaunchContext contains:
// 1) Map<String, LocalResource> localResources,
List<StringLocalResourceMapProto> lrs = clc.getProto().getLocalResourcesList();
logContainerLaunchContext("LocalResource size: {}. Length: {}.", lrs);
// 2) Map<String, String> environment, List<String> commands,
List<StringStringMapProto> envs = clc.getProto().getEnvironmentList();
logContainerLaunchContext("Environment size: {}. Length: {}.", envs);
List<String> cmds = clc.getCommands();
if (CollectionUtils.isNotEmpty(cmds)) {
LOG.warn("Commands size: {}. Length: {}.", cmds.size(), serialize(cmds).length);
}
// 3) Map<String, ByteBuffer> serviceData,
List<StringBytesMapProto> serviceData = clc.getProto().getServiceDataList();
logContainerLaunchContext("ServiceData size: {}. Length: {}.", serviceData);
// 4) Map<ApplicationAccessType, String> acls
List<ApplicationACLMapProto> acls = clc.getProto().getApplicationACLsList();
logContainerLaunchContext("ACLs size: {}. Length: {}.", acls);
}
/**
* Log ContainerLaunchContext Data SerializedSize.
*
* @param format format of logging.
* @param lists data list.
* @param <R> generic type R.
*/
private static <R extends GeneratedMessageV3> void logContainerLaunchContext(String format,
List<R> lists) {
if (CollectionUtils.isNotEmpty(lists)) {
int sumLength = 0;
for (R item : lists) {
sumLength += item.getSerializedSize();
}
LOG.warn(format, lists.size(), sumLength);
}
}
/**
* Serialize an object in ByteArray.
*
* @return obj ByteArray.
* @throws IOException if an IO error occurred.
*/
@Private
@Unstable
private static byte[] serialize(Object obj) throws IOException {
try (ByteArrayOutputStream b = new ByteArrayOutputStream()) {
try (ObjectOutputStream o = new ObjectOutputStream(b)) {
o.writeObject(obj);
}
return b.toByteArray();
}
}
/**
* Get trimmed version of ApplicationSubmissionContext to be saved to
* Federation State Store.
*
* @param actualContext actual ApplicationSubmissionContext.
* @return trimmed ApplicationSubmissionContext.
*/
@Private
@Unstable
public static ApplicationSubmissionContext getTrimmedAppSubmissionContext(
ApplicationSubmissionContext actualContext) {
if (actualContext == null) {
return null;
}
// Set Basic information
ApplicationSubmissionContext trimmedContext =
Records.newRecord(ApplicationSubmissionContext.class);
trimmedContext.setApplicationId(actualContext.getApplicationId());
trimmedContext.setApplicationName(actualContext.getApplicationName());
trimmedContext.setQueue(actualContext.getQueue());
trimmedContext.setPriority(actualContext.getPriority());
trimmedContext.setApplicationType(actualContext.getApplicationType());
trimmedContext.setNodeLabelExpression(actualContext.getNodeLabelExpression());
trimmedContext.setLogAggregationContext(actualContext.getLogAggregationContext());
trimmedContext.setApplicationTags(actualContext.getApplicationTags());
trimmedContext.setApplicationSchedulingPropertiesMap(
actualContext.getApplicationSchedulingPropertiesMap());
trimmedContext.setKeepContainersAcrossApplicationAttempts(
actualContext.getKeepContainersAcrossApplicationAttempts());
trimmedContext.setApplicationTimeouts(actualContext.getApplicationTimeouts());
return trimmedContext;
}
public static boolean isRouterWebProxyEnable(Configuration conf) {
return conf.getBoolean(YarnConfiguration.ROUTER_WEBAPP_PROXY_ENABLE,
YarnConfiguration.DEFAULT_ROUTER_WEBAPP_PROXY_ENABLE);
}
public static boolean checkPolicyManagerValid(String policyManager,
List<String> supportWeightList) throws YarnException {
if (supportWeightList.contains(policyManager)) {
return true;
}
return false;
}
}