RouterCLI.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.client.cli;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.MissingArgumentException;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.ha.HAAdmin.UsageInfo;
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableMap;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.yarn.client.ClientRMProxy;
import org.apache.hadoop.yarn.client.util.FormattingCLIUtils;
import org.apache.hadoop.yarn.client.util.MemoryPageUtils;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol;
import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusterRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusterResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusters;
import org.apache.hadoop.yarn.server.api.protocolrecords.DeleteFederationApplicationRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.DeleteFederationApplicationResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.DeleteFederationQueuePoliciesRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.DeleteFederationQueuePoliciesResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePolicyRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePolicyResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.BatchSaveFederationQueuePoliciesRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.BatchSaveFederationQueuePoliciesResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.FederationQueueWeight;
import org.apache.hadoop.yarn.server.api.protocolrecords.QueryFederationQueuePoliciesRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.QueryFederationQueuePoliciesResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.FederationSubCluster;
import org.apache.hadoop.yarn.server.api.protocolrecords.GetSubClustersRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.GetSubClustersResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;
import org.xml.sax.SAXException;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.parsers.ParserConfigurationException;
import java.io.File;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.LinkedHashMap;
import java.util.stream.Collectors;
import static org.apache.hadoop.yarn.server.api.protocolrecords.FederationQueueWeight.checkHeadRoomAlphaValid;
import static org.apache.hadoop.yarn.server.api.protocolrecords.FederationQueueWeight.checkSubClusterQueueWeightRatioValid;
public class RouterCLI extends Configured implements Tool {
private static final Logger LOG = LoggerFactory.getLogger(RouterCLI.class);
// Common Constant
private static final String SEMICOLON = ";";
// Command Constant
private static final String CMD_EMPTY = "";
private static final int EXIT_SUCCESS = 0;
private static final int EXIT_ERROR = -1;
private static final String CMD_HELP = "-help";
// Command1: deregisterSubCluster
private static final String DEREGISTER_SUBCLUSTER_TITLE =
"Yarn Federation Deregister SubCluster";
// Columns information
private static final List<String> DEREGISTER_SUBCLUSTER_HEADER = Arrays.asList(
"SubCluster Id", "Deregister State", "Last HeartBeatTime", "Information", "SubCluster State");
// Constant
private static final String OPTION_SC = "sc";
private static final String OPTION_SUBCLUSTERID = "subClusterId";
private static final String OPTION_GET_SUBCLUSTERS = "getSubClusters";
private static final String OPTION_DEREGISTER_SUBCLUSTER = "deregisterSubCluster";
private static final String CMD_SUBCLUSTER = "-subCluster";
private static final String CMD_DEREGISTER_SUBCLUSTER = "-deregisterSubCluster";
// DeregisterSubCluster Command Parameters
protected final static UsageInfo DEREGISTER_SUBCLUSTER_USAGE = new UsageInfo(
"-deregisterSubCluster <-sc|--subClusterId>",
"This command is used to deregister subCluster, " +
"If the interval between the heartbeat time of the subCluster and" +
"the current time exceeds the timeout period, set the state of the subCluster to SC_LOST.");
// DeregisterSubCluster Command Examples
protected final static String DEREGISTER_SUBCLUSTER_EXAMPLE_1 =
"yarn routeradmin -subCluster -deregisterSubCluster -sc SC-1";
protected final static String DEREGISTER_SUBCLUSTER_EXAMPLE_2 =
"yarn routeradmin -subCluster -deregisterSubCluster --subClusterId SC-1";
// DeregisterSubCluster Command Help Information
protected final static String DEREGISTER_SUBCLUSTER_HELP_INFO =
"deregister subCluster, If the interval between the heartbeat time of the subCluster and" +
"the current time exceeds the timeout period, set the state of the subCluster to SC_LOST.";
protected final static UsageInfo GET_SUBCLUSTER_USAGE = new UsageInfo("-getSubClusters",
"This command is used to get information about all subclusters.");
private static final String GET_SUBCLUSTER_TITLE = "Yarn Federation SubCluster";
// Columns information
private static final List<String> GET_SUBCLUSTER_HEADER = Arrays.asList(
"SubCluster Id", "SubCluster State", "Last HeartBeatTime");
// GetSubCluster Command Examples
protected final static String GET_SUBCLUSTER_EXAMPLE =
"yarn routeradmin -subCluster -getSubClusters";
protected final static RouterCmdUsageInfos SUBCLUSTER_USAGEINFOS =
new RouterCmdUsageInfos()
// deregisterSubCluster
.addUsageInfo(DEREGISTER_SUBCLUSTER_USAGE)
.addExampleDescs(DEREGISTER_SUBCLUSTER_USAGE.args, "If we want to deregisterSubCluster SC-1")
.addExample(DEREGISTER_SUBCLUSTER_USAGE.args, DEREGISTER_SUBCLUSTER_EXAMPLE_1)
.addExample(DEREGISTER_SUBCLUSTER_USAGE.args, DEREGISTER_SUBCLUSTER_EXAMPLE_2)
// getSubCluster
.addUsageInfo(GET_SUBCLUSTER_USAGE)
.addExampleDescs(GET_SUBCLUSTER_USAGE.args,
"If we want to get information about all subClusters in Federation")
.addExample(GET_SUBCLUSTER_USAGE.args, GET_SUBCLUSTER_EXAMPLE);
// Command2: policy
private static final String CMD_POLICY = "-policy";
// save policy
private static final String OPTION_S = "s";
private static final String OPTION_SAVE = "save";
// batch save policy
private static final String OPTION_BATCH_S = "bs";
private static final String OPTION_BATCH_SAVE = "batch-save";
private static final String OPTION_FORMAT = "format";
private static final String FORMAT_XML = "xml";
private static final String OPTION_FILE = "f";
private static final String OPTION_INPUT_FILE = "input-file";
// list policy
private static final String OPTION_L = "l";
private static final String OPTION_LIST = "list";
private static final String OPTION_PAGE_SIZE = "pageSize";
private static final String OPTION_CURRENT_PAGE = "currentPage";
private static final String OPTION_QUEUE = "queue";
private static final String OPTION_QUEUES = "queues";
// delete policy
private static final String OPTION_D = "d";
private static final String OPTION_DELETE = "delete";
private static final String XML_TAG_SUBCLUSTERIDINFO = "subClusterIdInfo";
private static final String XML_TAG_AMRMPOLICYWEIGHTS = "amrmPolicyWeights";
private static final String XML_TAG_ROUTERPOLICYWEIGHTS = "routerPolicyWeights";
private static final String XML_TAG_HEADROOMALPHA = "headroomAlpha";
private static final String XML_TAG_FEDERATION_WEIGHTS = "federationWeights";
private static final String XML_TAG_QUEUE = "queue";
private static final String XML_TAG_NAME = "name";
private static final String LIST_POLICIES_TITLE =
"Yarn Federation Queue Policies";
// Columns information
private static final List<String> LIST_POLICIES_HEADER = Arrays.asList(
"Queue Name", "AMRM Weight", "Router Weight");
// Policy Commands
protected final static UsageInfo POLICY_SAVE_USAGE = new UsageInfo(
"-s|--save (<queue;router weight;amrm weight;headroomalpha>)",
"This command is used to save the policy information of the queue, " +
"including queue and weight information.");
protected final static String POLICY_SAVE_USAGE_EXAMPLE_DESC =
"We have two sub-clusters, SC-1 and SC-2. \\" +
"We want to configure a weight policy for the 'root.a' queue. \\" +
"The Router Weight is set to SC-1 with a weight of 0.7 and SC-2 with a weight of 0.3. \\" +
"The AMRM Weight is set SC-1 to 0.6 and SC-2 to 0.4. \\" +
"We are using the default value of 0.1 for headroomalpha.";
protected final static String POLICY_SAVE_USAGE_EXAMPLE_1 =
"yarn routeradmin -policy -s root.a;SC-1:0.7,SC-2:0.3;SC-1:0.6,SC-2:0.4;1.0";
protected final static String POLICY_SAVE_USAGE_EXAMPLE_2 =
"yarn routeradmin -policy --save root.a;SC-1:0.7,SC-2:0.3;SC-1:0.6,SC-2:0.4;1.0";
protected final static UsageInfo POLICY_BATCH_SAVE_USAGE = new UsageInfo(
"-bs|--batch-save (--format <xml>) (-f|--input-file <fileName>)",
"This command can batch load weight information for queues " +
"based on the provided `federation-weights.xml` file.");
protected final static String POLICY_BATCH_SAVE_USAGE_EXAMPLE_DESC =
"We have two sub-clusters, SC-1 and SC-2. \\" +
"We would like to configure weights for 'root.a' and 'root.b' queues. \\" +
"We can set the weights for 'root.a' and 'root.b' in the 'federation-weights.xml' file. \\" +
"and then use the batch-save command to save the configurations in bulk.";
protected final static String POLICY_BATCH_SAVE_USAGE_EXAMPLE_1 =
"yarn routeradmin -policy -bs --format xml -f federation-weights.xml";
protected final static String POLICY_BATCH_SAVE_USAGE_EXAMPLE_2 =
"yarn routeradmin -policy --batch-save --format xml -f federation-weights.xml";
protected final static UsageInfo POLICY_LIST_USAGE = new UsageInfo(
"-l|--list [--pageSize][--currentPage][--queue][--queues]",
"This command is used to display the configured queue weight information.");
protected final static String POLICY_LIST_USAGE_EXAMPLE_DESC =
"We can display the list of already configured queue weight information. \\" +
"We can use the --queue option to query the weight information for a specific queue \\" +
" or use the --queues option to query the weight information for multiple queues. \\";
protected final static String POLICY_LIST_USAGE_EXAMPLE_1 =
"yarn routeradmin -policy -l --pageSize 20 --currentPage 1 --queue root.a";
protected final static String POLICY_LIST_USAGE_EXAMPLE_2 =
"yarn routeradmin -policy -list --pageSize 20 --currentPage 1 --queues root.a,root.b";
protected final static UsageInfo POLICY_DELETE_USAGE = new UsageInfo(
"-d|--delete [--queue]",
"This command is used to delete the policy of the queue.");
protected final static String POLICY_DELETE_USAGE_EXAMPLE_DESC =
"We delete the weight information of root.a. \\" +
"We can use --queue to specify the name of the queue.";
protected final static String POLICY_DELETE_USAGE_EXAMPLE1 =
"yarn routeradmin -policy -d --queue root.a";
protected final static String POLICY_DELETE_USAGE_EXAMPLE2 =
"yarn routeradmin -policy --delete --queue root.a";
protected final static RouterCmdUsageInfos POLICY_USAGEINFOS = new RouterCmdUsageInfos()
// Policy Save
.addUsageInfo(POLICY_SAVE_USAGE)
.addExampleDescs(POLICY_SAVE_USAGE.args, POLICY_SAVE_USAGE_EXAMPLE_DESC)
.addExample(POLICY_SAVE_USAGE.args, POLICY_SAVE_USAGE_EXAMPLE_1)
.addExample(POLICY_SAVE_USAGE.args, POLICY_SAVE_USAGE_EXAMPLE_2)
// Policy Batch Save
.addUsageInfo(POLICY_BATCH_SAVE_USAGE)
.addExampleDescs(POLICY_BATCH_SAVE_USAGE.args, POLICY_BATCH_SAVE_USAGE_EXAMPLE_DESC)
.addExample(POLICY_BATCH_SAVE_USAGE.args, POLICY_BATCH_SAVE_USAGE_EXAMPLE_1)
.addExample(POLICY_BATCH_SAVE_USAGE.args, POLICY_BATCH_SAVE_USAGE_EXAMPLE_2)
// Policy List Save
.addUsageInfo(POLICY_LIST_USAGE)
.addExampleDescs(POLICY_LIST_USAGE.args, POLICY_LIST_USAGE_EXAMPLE_DESC)
.addExample(POLICY_LIST_USAGE.args, POLICY_LIST_USAGE_EXAMPLE_1)
.addExample(POLICY_LIST_USAGE.args, POLICY_LIST_USAGE_EXAMPLE_2)
// Policy Delete
.addUsageInfo(POLICY_DELETE_USAGE)
.addExampleDescs(POLICY_DELETE_USAGE.args, POLICY_DELETE_USAGE_EXAMPLE_DESC)
.addExample(POLICY_DELETE_USAGE.args, POLICY_DELETE_USAGE_EXAMPLE1)
.addExample(POLICY_DELETE_USAGE.args, POLICY_DELETE_USAGE_EXAMPLE2);
// Command3: application
private static final String CMD_APPLICATION = "-application";
// Application Delete
protected final static UsageInfo APPLICATION_DELETE_USAGE = new UsageInfo(
"--delete <application_id>",
"This command is used to delete the specified application.");
protected final static String APPLICATION_DELETE_USAGE_EXAMPLE_DESC =
"If we want to delete application_1440536969523_0001.";
protected final static String APPLICATION_DELETE_USAGE_EXAMPLE_1 =
"yarn routeradmin -application --delete application_1440536969523_0001";
protected final static RouterCmdUsageInfos APPLICATION_USAGEINFOS = new RouterCmdUsageInfos()
// application delete
.addUsageInfo(APPLICATION_DELETE_USAGE)
.addExampleDescs(APPLICATION_DELETE_USAGE.args, APPLICATION_DELETE_USAGE_EXAMPLE_DESC)
.addExample(APPLICATION_DELETE_USAGE.args, APPLICATION_DELETE_USAGE_EXAMPLE_1);
// delete application
private static final String OPTION_DELETE_APP = "delete";
protected final static Map<String, RouterCmdUsageInfos> ADMIN_USAGE =
ImmutableMap.<String, RouterCmdUsageInfos>builder()
// Command1: subCluster
.put(CMD_SUBCLUSTER, SUBCLUSTER_USAGEINFOS)
// Command2: policy
.put(CMD_POLICY, POLICY_USAGEINFOS)
// Command3: application
.put(CMD_APPLICATION, APPLICATION_USAGEINFOS)
.build();
public RouterCLI() {
super();
}
public RouterCLI(Configuration conf) {
super(conf);
}
private static void buildHelpMsg(String cmd, StringBuilder builder) {
RouterCmdUsageInfos routerUsageInfo = ADMIN_USAGE.get(cmd);
if (routerUsageInfo == null) {
return;
}
builder.append("[").append(cmd).append("]\n");
if (!routerUsageInfo.helpInfos.isEmpty()) {
builder.append("\t Description: \n");
for (String helpInfo : routerUsageInfo.helpInfos) {
builder.append("\t\t").append(helpInfo).append("\n\n");
}
}
if (!routerUsageInfo.usageInfos.isEmpty()) {
builder.append("\t UsageInfos: \n");
for (UsageInfo usageInfo : routerUsageInfo.usageInfos) {
builder.append("\t\t").append(usageInfo.args)
.append(": ")
.append("\n\t\t")
.append(usageInfo.help).append("\n\n");
}
}
if (MapUtils.isNotEmpty(routerUsageInfo.examples)) {
builder.append("\t Examples: \n");
int count = 1;
for (Map.Entry<String, List<String>> example : routerUsageInfo.examples.entrySet()) {
String keyCmd = example.getKey();
builder.append("\t\t")
.append("Cmd:").append(count)
.append(". ").append(keyCmd)
.append(": \n\n");
// Print Command Description
List<String> exampleDescs = routerUsageInfo.exampleDescs.get(keyCmd);
if (CollectionUtils.isNotEmpty(exampleDescs)) {
builder.append("\t\t").append("Cmd Requirement Description:\n");
for (String value : exampleDescs) {
String[] valueDescs = StringUtils.split(value, "\\");
for (String valueDesc : valueDescs) {
builder.append("\t\t").append(valueDesc).append("\n");
}
}
}
builder.append("\n");
// Print Command example
List<String> valueExamples = example.getValue();
if (CollectionUtils.isNotEmpty(valueExamples)) {
builder.append("\t\t").append("Cmd Examples:\n");
for (String valueExample : valueExamples) {
builder.append("\t\t").append(valueExample).append("\n");
}
}
builder.append("\n");
count++;
}
}
}
private static void printHelp() {
StringBuilder summary = new StringBuilder();
summary.append("routeradmin is the command to execute ")
.append("YARN Federation administrative commands.\n")
.append("The full syntax is: \n\n")
.append("routeradmin\n");
StringBuilder helpBuilder = new StringBuilder();
System.out.println(summary);
for (String cmdKey : ADMIN_USAGE.keySet()) {
buildHelpMsg(cmdKey, helpBuilder);
helpBuilder.append("\n");
}
helpBuilder.append(" -help [cmd]: Displays help for the given command or all commands")
.append(" if none is specified.");
System.out.println(helpBuilder);
System.out.println();
ToolRunner.printGenericCommandUsage(System.out);
}
protected ResourceManagerAdministrationProtocol createAdminProtocol()
throws IOException {
// Get the current configuration
final YarnConfiguration conf = new YarnConfiguration(getConf());
return ClientRMProxy.createRMProxy(conf, ResourceManagerAdministrationProtocol.class);
}
private static void buildUsageMsg(StringBuilder builder) {
builder.append("routeradmin is only used in Yarn Federation Mode.\n");
builder.append("Usage: routeradmin\n");
for (String cmdKey : ADMIN_USAGE.keySet()) {
buildHelpMsg(cmdKey, builder);
builder.append("\n");
}
builder.append(" -help [cmd]\n");
}
private static void printUsage(String cmd) {
StringBuilder usageBuilder = new StringBuilder();
if (ADMIN_USAGE.containsKey(cmd)) {
buildHelpMsg(cmd, usageBuilder);
} else {
buildUsageMsg(usageBuilder);
}
System.err.println(usageBuilder);
ToolRunner.printGenericCommandUsage(System.err);
}
private int handleSubCluster(String[] args) throws ParseException, IOException, YarnException {
// Prepare Options.
Options opts = new Options();
opts.addOption("subCluster", false,
"We provide a set of commands for SubCluster Include deregisterSubCluster, " +
"get SubClusters.");
opts.addOption("deregisterSubCluster", false,
"Deregister YARN subCluster, if subCluster Heartbeat Timeout.");
opts.addOption("getSubClusters", false,
"Get information about all subClusters of Federation.");
Option subClusterOpt = new Option(OPTION_SC, OPTION_SUBCLUSTERID, true,
"The subCluster can be specified using either the '-sc' or '--subCluster' option. " +
" If the subCluster's Heartbeat Timeout, it will be marked as 'SC_LOST'.");
subClusterOpt.setOptionalArg(true);
opts.addOption(subClusterOpt);
// Parse command line arguments.
CommandLine cliParser;
try {
cliParser = new GnuParser().parse(opts, args);
} catch (MissingArgumentException ex) {
System.out.println("Missing argument for options");
printUsage(args[0]);
return EXIT_ERROR;
}
// deregister subCluster
if (cliParser.hasOption(OPTION_DEREGISTER_SUBCLUSTER)) {
String subClusterId = null;
if (cliParser.hasOption(OPTION_SC) || cliParser.hasOption(OPTION_SUBCLUSTERID)) {
subClusterId = cliParser.getOptionValue(OPTION_SC);
if (subClusterId == null) {
subClusterId = cliParser.getOptionValue(OPTION_SUBCLUSTERID);
}
}
return handleDeregisterSubCluster(subClusterId);
} else if (cliParser.hasOption(OPTION_GET_SUBCLUSTERS)) {
// get subClusters
return handleGetSubClusters();
} else {
// printUsage
printUsage(args[0]);
}
return EXIT_ERROR;
}
private int handleGetSubClusters() throws IOException, YarnException {
PrintWriter writer = new PrintWriter(new OutputStreamWriter(
System.out, StandardCharsets.UTF_8));
ResourceManagerAdministrationProtocol adminProtocol = createAdminProtocol();
GetSubClustersRequest request = GetSubClustersRequest.newInstance();
GetSubClustersResponse response = adminProtocol.getFederationSubClusters(request);
FormattingCLIUtils formattingCLIUtils = new FormattingCLIUtils(GET_SUBCLUSTER_TITLE)
.addHeaders(GET_SUBCLUSTER_HEADER);
List<FederationSubCluster> federationSubClusters = response.getFederationSubClusters();
federationSubClusters.forEach(federationSubCluster -> {
String responseSubClusterId = federationSubCluster.getSubClusterId();
String state = federationSubCluster.getSubClusterState();
String lastHeartBeatTime = federationSubCluster.getLastHeartBeatTime();
formattingCLIUtils.addLine(responseSubClusterId, state, lastHeartBeatTime);
});
writer.print(formattingCLIUtils.render());
writer.flush();
return EXIT_SUCCESS;
}
/**
* According to the parameter Deregister SubCluster.
*
* @param subClusterId subClusterId.
* @return If the Deregister SubCluster operation is successful,
* it will return 0. Otherwise, it will return -1.
*
* @throws IOException raised on errors performing I/O.
* @throws YarnException exceptions from yarn servers.
* @throws ParseException Exceptions thrown during parsing of a command-line.
*/
private int handleDeregisterSubCluster(String subClusterId)
throws IOException, YarnException, ParseException {
// If subClusterId is not empty, try deregisterSubCluster subCluster,
// otherwise try deregisterSubCluster all subCluster.
if (StringUtils.isNotBlank(subClusterId)) {
return deregisterSubCluster(subClusterId);
} else {
return deregisterSubCluster();
}
}
private int deregisterSubCluster(String subClusterId)
throws IOException, YarnException {
PrintWriter writer = new PrintWriter(new OutputStreamWriter(
System.out, StandardCharsets.UTF_8));
ResourceManagerAdministrationProtocol adminProtocol = createAdminProtocol();
DeregisterSubClusterRequest request =
DeregisterSubClusterRequest.newInstance(subClusterId);
DeregisterSubClusterResponse response = adminProtocol.deregisterSubCluster(request);
FormattingCLIUtils formattingCLIUtils = new FormattingCLIUtils(DEREGISTER_SUBCLUSTER_TITLE)
.addHeaders(DEREGISTER_SUBCLUSTER_HEADER);
List<DeregisterSubClusters> deregisterSubClusters = response.getDeregisterSubClusters();
deregisterSubClusters.forEach(deregisterSubCluster -> {
String responseSubClusterId = deregisterSubCluster.getSubClusterId();
String deregisterState = deregisterSubCluster.getDeregisterState();
String lastHeartBeatTime = deregisterSubCluster.getLastHeartBeatTime();
String info = deregisterSubCluster.getInformation();
String subClusterState = deregisterSubCluster.getSubClusterState();
formattingCLIUtils.addLine(responseSubClusterId, deregisterState,
lastHeartBeatTime, info, subClusterState);
});
writer.print(formattingCLIUtils.render());
writer.flush();
return EXIT_SUCCESS;
}
private int deregisterSubCluster() throws IOException, YarnException {
deregisterSubCluster(CMD_EMPTY);
return EXIT_SUCCESS;
}
private int handlePolicy(String[] args)
throws IOException, YarnException, ParseException {
// Prepare Options.
Options opts = new Options();
opts.addOption("policy", false,
"We provide a set of commands for Policy Include list policies, " +
"save policies, batch save policies.");
Option saveOpt = new Option(OPTION_S, OPTION_SAVE, true,
"We will save the policy information of the queue, " +
"including queue and weight information");
saveOpt.setOptionalArg(true);
Option batchSaveOpt = new Option(OPTION_BATCH_S, OPTION_BATCH_SAVE, false,
"We will save queue policies in bulk, " +
"where users can provide XML files containing the policies. " +
"This command will parse the file contents and store the results " +
"in the FederationStateStore.");
Option formatOpt = new Option(null, "format", true,
"Users can specify the file format using this option. " +
"Currently, there are one supported file formats: XML." +
"These files contain the policy information for storing queue policies.");
Option fileOpt = new Option("f", "input-file", true,
"The location of the input configuration file. ");
formatOpt.setOptionalArg(true);
Option listOpt = new Option(OPTION_L, OPTION_LIST, false,
"We can display the configured queue strategy according to the parameters.");
Option pageSizeOpt = new Option(null, "pageSize", true,
"The number of policies displayed per page.");
Option currentPageOpt = new Option(null, "currentPage", true,
"Since users may configure numerous policies, we will choose to display them in pages. " +
"This parameter represents the page number to be displayed.");
Option queueOpt = new Option(null, "queue", true,
"the queue we need to filter. example: root.a");
Option queuesOpt = new Option(null, "queues", true,
"list of queues to filter. example: root.a,root.b,root.c");
Option deleteOpt = new Option(OPTION_D, OPTION_DELETE, false, "");
opts.addOption(saveOpt);
opts.addOption(batchSaveOpt);
opts.addOption(formatOpt);
opts.addOption(fileOpt);
opts.addOption(listOpt);
opts.addOption(pageSizeOpt);
opts.addOption(currentPageOpt);
opts.addOption(queueOpt);
opts.addOption(queuesOpt);
opts.addOption(deleteOpt);
// Parse command line arguments.
CommandLine cliParser;
try {
cliParser = new GnuParser().parse(opts, args);
} catch (MissingArgumentException ex) {
System.out.println("Missing argument for options");
printUsage(args[0]);
return EXIT_ERROR;
}
// Try to parse the cmd save.
// Save a single queue policy
if (cliParser.hasOption(OPTION_S) || cliParser.hasOption(OPTION_SAVE)) {
String policy = cliParser.getOptionValue(OPTION_S);
if (StringUtils.isBlank(policy)) {
policy = cliParser.getOptionValue(OPTION_SAVE);
}
return handleSavePolicy(policy);
} else if (cliParser.hasOption(OPTION_BATCH_S) || cliParser.hasOption(OPTION_BATCH_SAVE)) {
// Save Queue Policies in Batches
// Determine whether the file format is accurate, XML or JSON format.
// If it is not XML or JSON, we will directly prompt the user with an error message.
String format = null;
if (cliParser.hasOption(OPTION_FORMAT)) {
format = cliParser.getOptionValue(OPTION_FORMAT);
if (StringUtils.isBlank(format) ||
!StringUtils.equalsAnyIgnoreCase(format, FORMAT_XML)) {
System.out.println("We currently only support policy configuration files " +
"in XML formats.");
return EXIT_ERROR;
}
}
// Parse configuration file path.
String filePath = null;
if (cliParser.hasOption(OPTION_FILE) || cliParser.hasOption(OPTION_INPUT_FILE)) {
filePath = cliParser.getOptionValue(OPTION_FILE);
if (StringUtils.isBlank(filePath)) {
filePath = cliParser.getOptionValue(OPTION_INPUT_FILE);
}
}
// Batch SavePolicies.
return handBatchSavePolicies(format, filePath);
} else if(cliParser.hasOption(OPTION_L) || cliParser.hasOption(OPTION_LIST)) {
int pageSize = 10;
if (cliParser.hasOption(OPTION_PAGE_SIZE)) {
pageSize = Integer.parseInt(cliParser.getOptionValue(OPTION_PAGE_SIZE));
}
int currentPage = 1;
if (cliParser.hasOption(OPTION_CURRENT_PAGE)) {
currentPage = Integer.parseInt(cliParser.getOptionValue(OPTION_CURRENT_PAGE));
}
String queue = null;
if (cliParser.hasOption(OPTION_QUEUE)) {
queue = cliParser.getOptionValue(OPTION_QUEUE);
}
List<String> queues = null;
if (cliParser.hasOption(OPTION_QUEUES)) {
String tmpQueues = cliParser.getOptionValue(OPTION_QUEUES);
queues = Arrays.stream(tmpQueues.split(",")).collect(Collectors.toList());
}
// List Policies.
return handListPolicies(pageSize, currentPage, queue, queues);
} else if (cliParser.hasOption(OPTION_D) || cliParser.hasOption(OPTION_DELETE)) {
String queue = cliParser.getOptionValue(OPTION_QUEUE);
// Delete Policy.
return handDeletePolicy(queue);
} else {
// printUsage
printUsage(args[0]);
}
return EXIT_ERROR;
}
private int handleSavePolicy(String policy) {
LOG.info("Save Federation Policy = {}.", policy);
try {
SaveFederationQueuePolicyRequest request = parsePolicy(policy);
ResourceManagerAdministrationProtocol adminProtocol = createAdminProtocol();
SaveFederationQueuePolicyResponse response = adminProtocol.saveFederationQueuePolicy(request);
System.out.println(response.getMessage());
return EXIT_SUCCESS;
} catch (YarnException | IOException e) {
LOG.error("handleSavePolicy error.", e);
return EXIT_ERROR;
}
}
private int handBatchSavePolicies(String format, String policyFile) {
if(StringUtils.isBlank(format)) {
LOG.error("Batch Save Federation Policies. Format is Empty.");
return EXIT_ERROR;
}
if(StringUtils.isBlank(policyFile)) {
LOG.error("Batch Save Federation Policies. policyFile is Empty.");
return EXIT_ERROR;
}
LOG.info("Batch Save Federation Policies. Format = {}, PolicyFile = {}.",
format, policyFile);
switch (format) {
case FORMAT_XML:
return parseXml2PoliciesAndBatchSavePolicies(policyFile);
default:
System.out.println("We currently only support XML formats.");
return EXIT_ERROR;
}
}
/**
* We will parse the policy, and it has specific formatting requirements.
*
* 1. queue,router weight,amrm weight,headroomalpha {@link FederationQueueWeight}.
* 2. the sum of weights for all sub-clusters in routerWeight/amrmWeight should be 1.
*
* @param policy queue weight.
* @return If the conversion is correct, we will get the FederationQueueWeight,
* otherwise an exception will be thrown.
* @throws YarnException exceptions from yarn servers.
*/
protected SaveFederationQueuePolicyRequest parsePolicy(String policy) throws YarnException {
String[] policyItems = policy.split(SEMICOLON);
if (policyItems == null || policyItems.length != 4) {
throw new YarnException("The policy cannot be empty or the policy is incorrect. \n" +
" Required information to provide: queue,router weight,amrm weight,headroomalpha \n" +
" eg. root.a;SC-1:0.7,SC-2:0.3;SC-1:0.7,SC-2:0.3;1.0");
}
String queue = policyItems[0];
String routerWeight = policyItems[1];
String amrmWeight = policyItems[2];
String headroomalpha = policyItems[3];
LOG.info("Policy: [Queue = {}, RouterWeight = {}, AmRmWeight = {}, Headroomalpha = {}]",
queue, routerWeight, amrmWeight, headroomalpha);
checkSubClusterQueueWeightRatioValid(routerWeight);
checkSubClusterQueueWeightRatioValid(amrmWeight);
checkHeadRoomAlphaValid(headroomalpha);
FederationQueueWeight federationQueueWeight =
FederationQueueWeight.newInstance(routerWeight, amrmWeight, headroomalpha);
String policyManager = getConf().get(YarnConfiguration.FEDERATION_POLICY_MANAGER,
YarnConfiguration.DEFAULT_FEDERATION_POLICY_MANAGER);
SaveFederationQueuePolicyRequest request = SaveFederationQueuePolicyRequest.newInstance(
queue, federationQueueWeight, policyManager);
return request;
}
/**
* Parse Policies from XML and save them in batches to FederationStateStore.
*
* We save 20 policies in one batch.
* If the user needs to save 1000 policies, it will cycle 50 times.
*
* Every time a page is saved, we will print whether a page
* has been saved successfully or failed.
*
* @param policiesXml Policies Xml Path.
* @return 0, success; 1, failed.
*/
protected int parseXml2PoliciesAndBatchSavePolicies(String policiesXml) {
try {
List<FederationQueueWeight> federationQueueWeightsList = parsePoliciesByXml(policiesXml);
MemoryPageUtils<FederationQueueWeight> memoryPageUtils = new MemoryPageUtils<>(20);
federationQueueWeightsList.forEach(federationQueueWeight ->
memoryPageUtils.addToMemory(federationQueueWeight));
int pages = memoryPageUtils.getPages();
for (int i = 0; i < pages; i++) {
List<FederationQueueWeight> federationQueueWeights =
memoryPageUtils.readFromMemory(i);
BatchSaveFederationQueuePoliciesRequest request =
BatchSaveFederationQueuePoliciesRequest.newInstance(federationQueueWeights);
ResourceManagerAdministrationProtocol adminProtocol = createAdminProtocol();
BatchSaveFederationQueuePoliciesResponse response =
adminProtocol.batchSaveFederationQueuePolicies(request);
System.out.println("page <" + (i + 1) + "> : " + response.getMessage());
}
} catch (Exception e) {
LOG.error("BatchSaveFederationQueuePolicies error", e);
}
return EXIT_ERROR;
}
/**
* Parse FederationQueueWeight from the xml configuration file.
* <p>
* We allow users to provide an xml configuration file,
* which stores the weight information of the queue.
*
* @param policiesXml Policies Xml Path.
* @return FederationQueueWeight List.
* @throws IOException an I/O exception of some sort has occurred.
* @throws SAXException Encapsulate a general SAX error or warning.
* @throws ParserConfigurationException a serious configuration error..
*/
protected List<FederationQueueWeight> parsePoliciesByXml(String policiesXml)
throws IOException, SAXException, ParserConfigurationException {
List<FederationQueueWeight> weights = new ArrayList<>();
File xmlFile = new File(policiesXml);
DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
DocumentBuilder builder = factory.newDocumentBuilder();
Document document = builder.parse(xmlFile);
NodeList federationsList = document.getElementsByTagName(XML_TAG_FEDERATION_WEIGHTS);
for (int i = 0; i < federationsList.getLength(); i++) {
Node federationNode = federationsList.item(i);
if (federationNode.getNodeType() == Node.ELEMENT_NODE) {
Element federationElement = (Element) federationNode;
NodeList queueList = federationElement.getElementsByTagName(XML_TAG_QUEUE);
for (int j = 0; j < queueList.getLength(); j++) {
Node queueNode = queueList.item(j);
if (queueNode.getNodeType() == Node.ELEMENT_NODE) {
Element queueElement = (Element) queueNode;
// parse queueName.
String queueName = queueElement.getElementsByTagName(XML_TAG_NAME)
.item(0).getTextContent();
// parse amrmPolicyWeights / routerPolicyWeights.
String amrmWeight = parsePolicyWeightsNode(queueElement, XML_TAG_AMRMPOLICYWEIGHTS);
String routerWeight = parsePolicyWeightsNode(queueElement, XML_TAG_ROUTERPOLICYWEIGHTS);
// parse headroomAlpha.
String headroomAlpha = queueElement.getElementsByTagName(XML_TAG_HEADROOMALPHA)
.item(0).getTextContent();
String policyManager = getConf().get(YarnConfiguration.FEDERATION_POLICY_MANAGER,
YarnConfiguration.DEFAULT_FEDERATION_POLICY_MANAGER);
LOG.debug("Queue: {}, AmrmPolicyWeights: {}, RouterWeight: {}, HeadroomAlpha: {}.",
queueName, amrmWeight, routerWeight, headroomAlpha);
FederationQueueWeight weight = FederationQueueWeight.newInstance(routerWeight,
amrmWeight, headroomAlpha, queueName, policyManager);
weights.add(weight);
}
}
}
}
return weights;
}
/**
* We will parse the policyWeight information.
*
* @param queueElement xml Element.
* @param weightType weightType, including 2 types, AmrmPolicyWeight and RouterPolicyWeight.
* @return concatenated string of sub-cluster weights.
*/
private String parsePolicyWeightsNode(Element queueElement, String weightType) {
NodeList amrmPolicyWeightsList = queueElement.getElementsByTagName(weightType);
Node amrmPolicyWeightsNode = amrmPolicyWeightsList.item(0);
List<String> amRmPolicyWeights = new ArrayList<>();
if (amrmPolicyWeightsNode.getNodeType() == Node.ELEMENT_NODE) {
Element amrmPolicyWeightsElement = (Element) amrmPolicyWeightsNode;
NodeList subClusterIdInfoList =
amrmPolicyWeightsElement.getElementsByTagName(XML_TAG_SUBCLUSTERIDINFO);
for (int i = 0; i < subClusterIdInfoList.getLength(); i++) {
Node subClusterIdInfoNode = subClusterIdInfoList.item(i);
if (subClusterIdInfoNode.getNodeType() == Node.ELEMENT_NODE) {
Element subClusterIdInfoElement = (Element) subClusterIdInfoNode;
String subClusterId =
subClusterIdInfoElement.getElementsByTagName("id").item(0).getTextContent();
String weight =
subClusterIdInfoElement.getElementsByTagName("weight").item(0).getTextContent();
LOG.debug("WeightType[{}] - SubCluster ID: {}, Weight: {}.",
weightType, subClusterId, weight);
amRmPolicyWeights.add(subClusterId + ":" + weight);
}
}
}
return StringUtils.join(amRmPolicyWeights, ",");
}
/**
* Handles the list federation policies based on the specified parameters.
*
* @param pageSize Records displayed per page.
* @param currentPage The current page number.
* @param queue The name of the queue to be filtered.
* @param queues list of queues to filter.
* @return 0, success; 1, failed.
*/
protected int handListPolicies(int pageSize, int currentPage, String queue, List<String> queues) {
LOG.info("List Federation Policies, pageSize = {}, currentPage = {}, queue = {}, queues = {}",
pageSize, currentPage, queue, queues);
try {
PrintWriter writer = new PrintWriter(new OutputStreamWriter(
System.out, StandardCharsets.UTF_8));
QueryFederationQueuePoliciesRequest request =
QueryFederationQueuePoliciesRequest.newInstance(pageSize, currentPage, queue, queues);
ResourceManagerAdministrationProtocol adminProtocol = createAdminProtocol();
QueryFederationQueuePoliciesResponse response =
adminProtocol.listFederationQueuePolicies(request);
System.out.println("TotalPage = " + response.getTotalPage());
FormattingCLIUtils formattingCLIUtils = new FormattingCLIUtils(LIST_POLICIES_TITLE)
.addHeaders(LIST_POLICIES_HEADER);
List<FederationQueueWeight> federationQueueWeights = response.getFederationQueueWeights();
federationQueueWeights.forEach(federationQueueWeight -> {
String queueName = federationQueueWeight.getQueue();
String amrmWeight = federationQueueWeight.getAmrmWeight();
String routerWeight = federationQueueWeight.getRouterWeight();
formattingCLIUtils.addLine(queueName, amrmWeight, routerWeight);
});
writer.print(formattingCLIUtils.render());
writer.flush();
return EXIT_SUCCESS;
} catch (YarnException | IOException e) {
LOG.error("handleSavePolicy error.", e);
return EXIT_ERROR;
}
}
private int handleDeleteApplication(String application) {
LOG.info("Delete Application = {}.", application);
try {
DeleteFederationApplicationRequest request =
DeleteFederationApplicationRequest.newInstance(application);
ResourceManagerAdministrationProtocol adminProtocol = createAdminProtocol();
DeleteFederationApplicationResponse response =
adminProtocol.deleteFederationApplication(request);
System.out.println(response.getMessage());
return EXIT_SUCCESS;
} catch (Exception e) {
LOG.error("handleSavePolicy error.", e);
return EXIT_ERROR;
}
}
private int handleApplication(String[] args)
throws IOException, YarnException, ParseException {
// Prepare Options.
Options opts = new Options();
opts.addOption("application", false,
"We provide a set of commands to query and clean applications.");
Option deleteOpt = new Option(null, OPTION_DELETE_APP, true,
"We will clean up the provided application.");
opts.addOption(deleteOpt);
// Parse command line arguments.
CommandLine cliParser;
try {
cliParser = new DefaultParser().parse(opts, args);
} catch (MissingArgumentException ex) {
System.out.println("Missing argument for options");
printUsage(args[0]);
return EXIT_ERROR;
}
if (cliParser.hasOption(OPTION_DELETE_APP)) {
String application = cliParser.getOptionValue(OPTION_DELETE_APP);
return handleDeleteApplication(application);
}
return 0;
}
/**
* Delete queue weight information.
*
* @param queue Queue whose policy needs to be deleted.
* @return 0, success; 1, failed.
*/
protected int handDeletePolicy(String queue) {
LOG.info("Delete {} Policy.", queue);
try {
if (StringUtils.isBlank(queue)) {
System.err.println("Queue cannot be empty.");
}
List<String> queues = new ArrayList<>();
queues.add(queue);
DeleteFederationQueuePoliciesRequest request =
DeleteFederationQueuePoliciesRequest.newInstance(queues);
ResourceManagerAdministrationProtocol adminProtocol = createAdminProtocol();
DeleteFederationQueuePoliciesResponse response =
adminProtocol.deleteFederationPoliciesByQueues(request);
System.out.println(response.getMessage());
return EXIT_SUCCESS;
} catch (Exception e) {
LOG.error("handDeletePolicy queue = {} error.", queue, e);
return EXIT_ERROR;
}
}
@Override
public int run(String[] args) throws Exception {
YarnConfiguration yarnConf = getConf() == null ?
new YarnConfiguration() : new YarnConfiguration(getConf());
boolean isFederationEnabled = yarnConf.getBoolean(YarnConfiguration.FEDERATION_ENABLED,
YarnConfiguration.DEFAULT_FEDERATION_ENABLED);
if (args.length < 1 || !isFederationEnabled) {
printUsage(CMD_EMPTY);
return EXIT_ERROR;
}
String cmd = args[0];
if (CMD_HELP.equals(cmd)) {
if (args.length > 1) {
printUsage(args[1]);
} else {
printHelp();
}
return EXIT_SUCCESS;
} else if (CMD_SUBCLUSTER.equals(cmd)) {
return handleSubCluster(args);
} else if (CMD_POLICY.equals(cmd)) {
return handlePolicy(args);
} else if (CMD_APPLICATION.equals(cmd)) {
return handleApplication(args);
} else {
System.out.println("No related commands found.");
printHelp();
}
return EXIT_SUCCESS;
}
public static UsageInfo getPolicyBatchSaveUsage() {
return POLICY_BATCH_SAVE_USAGE;
}
static class RouterCmdUsageInfos {
private List<UsageInfo> usageInfos;
private List<String> helpInfos;
private Map<String, List<String>> examples;
protected Map<String, List<String>> exampleDescs;
RouterCmdUsageInfos() {
this.usageInfos = new ArrayList<>();
this.helpInfos = new ArrayList<>();
this.examples = new LinkedHashMap<>();
this.exampleDescs = new LinkedHashMap<>();
}
public RouterCmdUsageInfos addUsageInfo(UsageInfo usageInfo) {
this.usageInfos.add(usageInfo);
return this;
}
public RouterCmdUsageInfos addHelpInfo(String helpInfo) {
this.helpInfos.add(helpInfo);
return this;
}
private RouterCmdUsageInfos addExample(String cmd, String example) {
List<String> exampleList = this.examples.getOrDefault(cmd, new ArrayList<>());
exampleList.add(example);
this.examples.put(cmd, exampleList);
return this;
}
private RouterCmdUsageInfos addExampleDescs(String cmd, String exampleDesc) {
List<String> exampleDescList = this.exampleDescs.getOrDefault(cmd, new ArrayList<>());
exampleDescList.add(exampleDesc);
this.exampleDescs.put(cmd, exampleDescList);
return this;
}
public Map<String, List<String>> getExamples() {
return examples;
}
}
public static void main(String[] args) throws Exception {
int result = ToolRunner.run(new RouterCLI(), args);
System.exit(result);
}
@VisibleForTesting
public Map<String, RouterCmdUsageInfos> getAdminUsage(){
return ADMIN_USAGE;
}
}