PolicyGenerator.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.globalpolicygenerator.policygenerator;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.manager.FederationPolicyManager;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGContext;
import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGUtils;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerQueueInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerTypeInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
/**
* The PolicyGenerator runs periodically and updates the policy configuration
* for each queue into the FederationStateStore. The policy update behavior is
* defined by the GlobalPolicy instance that is used.
*/
public class PolicyGenerator implements Runnable, Configurable {
private static final Logger LOG =
LoggerFactory.getLogger(PolicyGenerator.class);
private GPGContext gpgContext;
private Configuration conf;
// Information request map
private Map<Class, String> pathMap = new HashMap<>();
// Global policy instance
@VisibleForTesting
private GlobalPolicy policy;
/**
* The PolicyGenerator periodically reads SubCluster load and updates
* policies into the FederationStateStore.
*
* @param conf Configuration.
* @param context GPG Context.
*/
public PolicyGenerator(Configuration conf, GPGContext context) {
setConf(conf);
init(context);
}
private void init(GPGContext context) {
this.gpgContext = context;
LOG.info("Initialized PolicyGenerator");
}
@Override
public void setConf(Configuration conf) {
this.conf = conf;
this.policy = FederationStateStoreFacade.createInstance(conf,
YarnConfiguration.GPG_GLOBAL_POLICY_CLASS,
YarnConfiguration.DEFAULT_GPG_GLOBAL_POLICY_CLASS, GlobalPolicy.class);
policy.setConf(conf);
pathMap.putAll(policy.registerPaths());
}
@Override
public Configuration getConf() {
return this.conf;
}
@Override
public final void run() {
Map<SubClusterId, SubClusterInfo> activeSubClusters;
try {
activeSubClusters = gpgContext.getStateStoreFacade().getSubClusters(true);
} catch (YarnException e) {
LOG.error("Error retrieving active sub-clusters", e);
return;
}
// Parse the scheduler information from all the SCs
Map<SubClusterId, SchedulerInfo> schedInfo = getSchedulerInfo(activeSubClusters);
// Extract and enforce that all the schedulers have matching type
Set<String> queueNames = extractQueues(schedInfo);
// Remove black listed SubClusters
activeSubClusters.keySet().removeAll(getBlackList());
LOG.info("Active non-blacklist sub-clusters: {}",
activeSubClusters.keySet());
// Get cluster metrics information from non-black listed RMs - later used
// to evaluate SubCluster load
Map<SubClusterId, Map<Class, Object>> clusterInfo =
getInfos(activeSubClusters);
// Update into the FederationStateStore
for (String queueName : queueNames) {
// Retrieve the manager from the policy facade
FederationPolicyManager manager;
try {
manager = this.gpgContext.getPolicyFacade().getPolicyManager(queueName);
} catch (YarnException e) {
LOG.error("GetPolicy for queue {} failed.", queueName, e);
continue;
}
LOG.info("Updating policy for queue {}.", queueName);
manager = policy.updatePolicy(queueName, clusterInfo, manager);
try {
this.gpgContext.getPolicyFacade().setPolicyManager(manager);
} catch (YarnException e) {
LOG.error("SetPolicy for queue {} failed.", queueName, e);
}
}
}
/**
* Helper to retrieve metrics from the RM REST endpoints.
*
* @param activeSubClusters A map of active SubCluster IDs to info
* @return Mapping relationship between SubClusterId and Metric.
*/
@VisibleForTesting
protected Map<SubClusterId, Map<Class, Object>> getInfos(
Map<SubClusterId, SubClusterInfo> activeSubClusters) {
Map<SubClusterId, Map<Class, Object>> clusterInfo = new HashMap<>();
for (SubClusterInfo sci : activeSubClusters.values()) {
for (Map.Entry<Class, String> e : this.pathMap.entrySet()) {
if (!clusterInfo.containsKey(sci.getSubClusterId())) {
clusterInfo.put(sci.getSubClusterId(), new HashMap<>());
}
Object ret = GPGUtils.invokeRMWebService(sci.getRMWebServiceAddress(),
e.getValue(), e.getKey(), conf);
clusterInfo.get(sci.getSubClusterId()).put(e.getKey(), ret);
}
}
return clusterInfo;
}
/**
* Helper to retrieve SchedulerInfos.
*
* @param activeSubClusters A map of active SubCluster IDs to info
* @return Mapping relationship between SubClusterId and SubClusterInfo.
*/
@VisibleForTesting
protected Map<SubClusterId, SchedulerInfo> getSchedulerInfo(
Map<SubClusterId, SubClusterInfo> activeSubClusters) {
Map<SubClusterId, SchedulerInfo> schedInfo =
new HashMap<>();
for (SubClusterInfo sci : activeSubClusters.values()) {
SchedulerTypeInfo sti = GPGUtils
.invokeRMWebService(sci.getRMWebServiceAddress(),
RMWSConsts.SCHEDULER, SchedulerTypeInfo.class, conf);
if(sti != null){
schedInfo.put(sci.getSubClusterId(), sti.getSchedulerInfo());
} else {
LOG.warn("Skipped null scheduler info from SubCluster {}.", sci.getSubClusterId());
}
}
return schedInfo;
}
/**
* Helper to get a set of blacklisted SubCluster Ids from configuration.
*/
private Set<SubClusterId> getBlackList() {
String blackListParam =
conf.get(YarnConfiguration.GPG_POLICY_GENERATOR_BLACKLIST);
if(blackListParam == null){
return Collections.emptySet();
}
Set<SubClusterId> blackList = new HashSet<>();
for (String id : blackListParam.split(",")) {
blackList.add(SubClusterId.newInstance(id));
}
return blackList;
}
/**
* Given the scheduler information for all RMs, extract the union of
* queue names - right now we only consider instances of capacity scheduler.
*
* @param schedInfo the scheduler information
* @return a set of queue names
*/
private Set<String> extractQueues(Map<SubClusterId, SchedulerInfo> schedInfo) {
Set<String> queueNames = new HashSet<>();
for (Map.Entry<SubClusterId, SchedulerInfo> entry : schedInfo.entrySet()) {
if (entry.getValue() instanceof CapacitySchedulerInfo) {
// Flatten the queue structure and get only non leaf queues
queueNames.addAll(flattenQueue((CapacitySchedulerInfo) entry.getValue())
.get(CapacitySchedulerQueueInfo.class));
} else {
LOG.warn("Skipping SubCluster {}, not configured with capacity scheduler.",
entry.getKey());
}
}
return queueNames;
}
// Helpers to flatten the queue structure into a multimap of
// queue type to set of queue names
private Map<Class, Set<String>> flattenQueue(CapacitySchedulerInfo csi) {
Map<Class, Set<String>> flattened = new HashMap<>();
addOrAppend(flattened, csi.getClass(), csi.getQueueName());
for (CapacitySchedulerQueueInfo csqi : csi.getQueues().getQueueInfoList()) {
flattenQueue(csqi, flattened);
}
return flattened;
}
private void flattenQueue(CapacitySchedulerQueueInfo csi,
Map<Class, Set<String>> flattened) {
addOrAppend(flattened, csi.getClass(), csi.getQueueName());
if (csi.getQueues() != null) {
for (CapacitySchedulerQueueInfo csqi : csi.getQueues().getQueueInfoList()) {
flattenQueue(csqi, flattened);
}
}
}
private <K, V> void addOrAppend(Map<K, Set<V>> multimap, K key, V value) {
if (!multimap.containsKey(key)) {
multimap.put(key, new HashSet<>());
}
multimap.get(key).add(value);
}
public GlobalPolicy getPolicy() {
return policy;
}
public void setPolicy(GlobalPolicy policy) {
this.policy = policy;
}
}