FederationQueueWeight.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.api.protocolrecords;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.util.Records;
import java.util.LinkedHashMap;
import java.util.Map;
/**
* Queue weights for representing Federation.
*/
@Private
@Unstable
public abstract class FederationQueueWeight {
/**
* The FederationQueueWeight object consists of three parts:
* routerWeight, amrmWeight, and headRoomAlpha.
*
* @param routerWeight Weight for routing applications to different subclusters.
* We will route the application to different subclusters based on the configured weights.
* Assuming we have two subclusters, SC-1 and SC-2,
* with a weight of 0.7 for SC-1 and 0.3 for SC-2,
* the application will be allocated in such a way
* that 70% of the applications will be assigned to SC-1 and 30% to SC-2.
*
* @param amrmWeight Weight for resource request from ApplicationMaster (AM) to
* different subclusters' Resource Manager (RM).
* Assuming we have two subclusters, SC-1 and SC-2,
* with a weight of 0.6 for SC-1 and 0.4 for SC-2,
* When AM requesting resources,
* 60% of the requests will be made to the Resource Manager (RM) of SC-1
* and 40% to the RM of SC-2.
*
* @param headRoomAlpha
* used by policies that balance weight-based and load-based considerations in their decisions.
* For policies that use this parameter,
* values close to 1 indicate that most of the decision
* should be based on currently observed headroom from various sub-clusters,
* values close to zero, indicate that the decision should be
* mostly based on weights and practically ignore current load.
*
* @return FederationQueueWeight
*/
@Private
@Unstable
public static FederationQueueWeight newInstance(String routerWeight,
String amrmWeight, String headRoomAlpha) {
FederationQueueWeight federationQueueWeight = Records.newRecord(FederationQueueWeight.class);
federationQueueWeight.setRouterWeight(routerWeight);
federationQueueWeight.setAmrmWeight(amrmWeight);
federationQueueWeight.setHeadRoomAlpha(headRoomAlpha);
return federationQueueWeight;
}
@Private
@Unstable
public static FederationQueueWeight newInstance(String routerWeight,
String amrmWeight, String headRoomAlpha, String queue, String policyManagerClassName) {
FederationQueueWeight federationQueueWeight = Records.newRecord(FederationQueueWeight.class);
federationQueueWeight.setRouterWeight(routerWeight);
federationQueueWeight.setAmrmWeight(amrmWeight);
federationQueueWeight.setHeadRoomAlpha(headRoomAlpha);
federationQueueWeight.setQueue(queue);
federationQueueWeight.setPolicyManagerClassName(policyManagerClassName);
return federationQueueWeight;
}
@Public
@Unstable
public abstract String getRouterWeight();
@Public
@Unstable
public abstract void setRouterWeight(String routerWeight);
@Public
@Unstable
public abstract String getAmrmWeight();
@Public
@Unstable
public abstract void setAmrmWeight(String amrmWeight);
@Public
@Unstable
public abstract String getHeadRoomAlpha();
@Public
@Unstable
public abstract void setHeadRoomAlpha(String headRoomAlpha);
private static final String COMMA = ",";
private static final String COLON = ":";
/**
* Check if the subCluster Queue Weight Ratio are valid.
*
* This method can be used to validate RouterPolicyWeight and AMRMPolicyWeight.
*
* @param subClusterWeight the weight ratios of subClusters.
* @throws YarnException exceptions from yarn servers.
*/
public static void checkSubClusterQueueWeightRatioValid(String subClusterWeight)
throws YarnException {
// The subClusterWeight cannot be empty.
if (StringUtils.isBlank(subClusterWeight)) {
throw new YarnException("subClusterWeight can't be empty!");
}
// SC-1:0.7,SC-2:0.3 -> [SC-1:0.7,SC-2:0.3]
String[] subClusterWeights = subClusterWeight.split(COMMA);
Map<String, Double> subClusterWeightMap = new LinkedHashMap<>();
for (String subClusterWeightItem : subClusterWeights) {
// SC-1:0.7 -> [SC-1,0.7]
// We require that the parsing result is not empty and must have a length of 2.
String[] subClusterWeightItems = subClusterWeightItem.split(COLON);
if (subClusterWeightItems == null || subClusterWeightItems.length != 2) {
throw new YarnException("The subClusterWeight cannot be empty," +
" and the subClusterWeight size must be 2. (eg.SC-1,0.2)");
}
subClusterWeightMap.put(subClusterWeightItems[0], Double.valueOf(subClusterWeightItems[1]));
}
// The sum of weight ratios for subClusters must be equal to 1.
double sum = subClusterWeightMap.values().stream().mapToDouble(Double::doubleValue).sum();
boolean isValid = Math.abs(sum - 1.0) < 1e-6; // Comparing with a tolerance of 1e-6
if (!isValid) {
throw new YarnException("The sum of ratios for all subClusters must be equal to 1.");
}
}
/**
* Check if HeadRoomAlpha is a number and is between 0 and 1.
*
* @param headRoomAlpha headroomalpha.
* @throws YarnException exceptions from yarn servers.
*/
public static void checkHeadRoomAlphaValid(String headRoomAlpha) throws YarnException {
if (!isNumeric(headRoomAlpha)) {
throw new YarnException("HeadRoomAlpha must be a number.");
}
double dHeadRoomAlpha = Double.parseDouble(headRoomAlpha);
if (!(dHeadRoomAlpha >= 0 && dHeadRoomAlpha <= 1)) {
throw new YarnException("HeadRoomAlpha must be between 0-1.");
}
}
/**
* Determines whether the given value is a number.
*
* @param value given value.
* @return true, is a number, false, not a number.
*/
protected static boolean isNumeric(String value) {
return NumberUtils.isCreatable(value);
}
@Public
@Unstable
public abstract String getQueue();
@Public
@Unstable
public abstract void setQueue(String queue);
@Public
@Unstable
public abstract String getPolicyManagerClassName();
@Public
@Unstable
public abstract void setPolicyManagerClassName(String policyManagerClassName);
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append("FederationQueueWeight { ");
builder.append("Queue: ").append(getQueue()).append(", ");
builder.append("RouterWeight: ").append(getRouterWeight()).append(", ");
builder.append("AmrmWeight: ").append(getAmrmWeight()).append(", ");
builder.append("PolicyManagerClassName: ").append(getPolicyManagerClassName());
builder.append(" }");
return builder.toString();
}
}