ManagedParentQueue.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueResourceQuotas;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler
.SchedulerDynamicEditException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.queuemanagement.GuaranteedOrZeroCapacityOverTimePolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica
.FiCaSchedulerApp;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.AUTO_CREATED_LEAF_QUEUE_TEMPLATE_PREFIX;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.CAPACITY;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.MAXIMUM_CAPACITY;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.getQueueCapacityConfigParser;
/**
* Auto Creation enabled Parent queue. This queue initially does not have any
* children to start with and all child
* leaf queues will be auto created. Currently this does not allow other
* pre-configured leaf or parent queues to
* co-exist along with auto-created leaf queues. The auto creation is limited
* to leaf queues currently.
*/
public class ManagedParentQueue extends AbstractManagedParentQueue {
private boolean shouldFailAutoCreationWhenGuaranteedCapacityExceeded = false;
private static final Logger LOG = LoggerFactory.getLogger(
ManagedParentQueue.class);
public ManagedParentQueue(final CapacitySchedulerQueueContext queueContext,
final String queueName, final CSQueue parent, final CSQueue old)
throws IOException {
super(queueContext, queueName, parent, old);
super.setupQueueConfigs(queueContext.getClusterResource());
shouldFailAutoCreationWhenGuaranteedCapacityExceeded =
queueContext.getConfiguration()
.getShouldFailAutoQueueCreationWhenGuaranteedCapacityExceeded(
getQueuePathObject());
leafQueueTemplate = initializeLeafQueueConfigs().build();
initializeQueueManagementPolicy();
}
@Override
public void reinitialize(CSQueue newlyParsedQueue, Resource clusterResource)
throws IOException {
writeLock.lock();
try {
validate(newlyParsedQueue);
shouldFailAutoCreationWhenGuaranteedCapacityExceeded =
queueContext.getConfiguration()
.getShouldFailAutoQueueCreationWhenGuaranteedCapacityExceeded(
getQueuePathObject());
//validate if capacity is exceeded for child queues
if (shouldFailAutoCreationWhenGuaranteedCapacityExceeded) {
float childCap = sumOfChildCapacities();
if (getCapacity() < childCap) {
throw new IOException(
"Total of Auto Created leaf queues guaranteed capacity : "
+ childCap + " exceeds Parent queue's " + getQueuePath()
+ " guaranteed capacity " + getCapacity() + ""
+ ".Cannot enforce policy to auto"
+ " create queues beyond parent queue's capacity");
}
}
leafQueueTemplate = initializeLeafQueueConfigs().build();
super.reinitialize(newlyParsedQueue, clusterResource);
// run reinitialize on each existing queue, to trigger absolute cap
// recomputations
for (CSQueue res : this.getChildQueues()) {
res.reinitialize(res, clusterResource);
}
//clear state in policy
reinitializeQueueManagementPolicy();
//reassign capacities according to policy
final List<QueueManagementChange> queueManagementChanges =
queueManagementPolicy.computeQueueManagementChanges();
validateAndApplyQueueManagementChanges(queueManagementChanges);
LOG.info(
"Reinitialized Managed Parent Queue: [{}] with capacity [{}]"
+ " with max capacity [{}]",
getQueueName(), super.getCapacity(), super.getMaximumCapacity());
} catch (YarnException ye) {
LOG.error("Exception while computing policy changes for leaf queue : "
+ getQueuePath(), ye);
throw new IOException(ye);
} finally {
writeLock.unlock();
}
}
private void initializeQueueManagementPolicy() throws IOException {
queueManagementPolicy =
queueContext.getConfiguration().getAutoCreatedQueueManagementPolicyClass(
getQueuePathObject());
queueManagementPolicy.init(this);
}
private void reinitializeQueueManagementPolicy() throws IOException {
AutoCreatedQueueManagementPolicy managementPolicy =
queueContext.getConfiguration().getAutoCreatedQueueManagementPolicyClass(
getQueuePathObject());
if (!(managementPolicy.getClass().equals(
this.queueManagementPolicy.getClass()))) {
queueManagementPolicy = managementPolicy;
queueManagementPolicy.init(this);
} else{
queueManagementPolicy.reinitialize(this);
}
}
protected AutoCreatedLeafQueueConfig.Builder initializeLeafQueueConfigs() throws IOException {
AutoCreatedLeafQueueConfig.Builder builder =
new AutoCreatedLeafQueueConfig.Builder();
CapacitySchedulerConfiguration configuration =
queueContext.getConfiguration();
// TODO load configs into CapacitySchedulerConfiguration instead of duplicating them
String leafQueueTemplateConfPrefix = getLeafQueueConfigPrefix();
//Load template configuration into CapacitySchedulerConfiguration
CapacitySchedulerConfiguration autoCreatedTemplateConfig =
super.initializeLeafQueueConfigs(leafQueueTemplateConfPrefix);
builder.configuration(autoCreatedTemplateConfig);
QueueResourceQuotas queueResourceQuotas = new QueueResourceQuotas();
setAbsoluteResourceTemplates(configuration, queueResourceQuotas);
QueuePath templateQueuePath = QueuePrefixes
.getAutoCreatedQueueObjectTemplateConfPrefix(getQueuePathObject());
Set<String> templateConfiguredNodeLabels = queueContext
.getQueueManager().getConfiguredNodeLabelsForAllQueues()
.getLabelsByQueue(templateQueuePath.getFullPath());
//Load template capacities
QueueCapacities queueCapacities = new QueueCapacities(false);
CSQueueUtils.loadCapacitiesByLabelsFromConf(templateQueuePath,
queueCapacities,
configuration,
templateConfiguredNodeLabels);
/**
* Populate leaf queue template (of Parent resources configured in
* ABSOLUTE_RESOURCE) capacities with actual values for which configured has
* been defined in ABSOLUTE_RESOURCE format.
*
*/
if (this.capacityConfigType.equals(CapacityConfigType.ABSOLUTE_RESOURCE)) {
updateQueueCapacities(queueCapacities);
}
builder.capacities(queueCapacities);
builder.resourceQuotas(queueResourceQuotas);
return builder;
}
private void setAbsoluteResourceTemplates(CapacitySchedulerConfiguration configuration,
QueueResourceQuotas queueResourceQuotas) throws IOException {
QueuePath templateQueuePath = QueuePrefixes
.getAutoCreatedQueueObjectTemplateConfPrefix(getQueuePathObject());
Set<String> templateConfiguredNodeLabels = queueContext
.getQueueManager().getConfiguredNodeLabelsForAllQueues()
.getLabelsByQueue(templateQueuePath.getFullPath());
for (String nodeLabel : templateConfiguredNodeLabels) {
Resource templateMinResource = configuration.getMinimumResourceRequirement(
nodeLabel, templateQueuePath, resourceTypes);
queueResourceQuotas.setConfiguredMinResource(nodeLabel, templateMinResource);
if (this.capacityConfigType.equals(CapacityConfigType.PERCENTAGE)
&& !templateMinResource.equals(Resources.none())) {
throw new IOException("Managed Parent Queue " + this.getQueuePath()
+ " config type is different from leaf queue template config type");
}
}
}
private void updateQueueCapacities(QueueCapacities queueCapacities) {
CapacitySchedulerConfiguration configuration =
queueContext.getConfiguration();
for (String label : queueCapacities.getExistingNodeLabels()) {
queueCapacities.setCapacity(label,
resourceCalculator.divide(
queueContext.getClusterResource(),
configuration.getMinimumResourceRequirement(
label,
QueuePrefixes
.getAutoCreatedQueueObjectTemplateConfPrefix(getQueuePathObject()),
resourceTypes),
getQueueResourceQuotas().getConfiguredMinResource(label)));
Resource childMaxResource = configuration
.getMaximumResourceRequirement(label,
QueuePrefixes
.getAutoCreatedQueueObjectTemplateConfPrefix(getQueuePathObject()),
resourceTypes);
Resource parentMaxRes = getQueueResourceQuotas()
.getConfiguredMaxResource(label);
Resource effMaxResource = Resources.min(
resourceCalculator,
queueContext.getClusterResource(),
childMaxResource.equals(Resources.none()) ? parentMaxRes
: childMaxResource,
parentMaxRes);
queueCapacities.setMaximumCapacity(
label, resourceCalculator.divide(
queueContext.getClusterResource(),
effMaxResource,
getQueueResourceQuotas().getConfiguredMaxResource(label)));
queueCapacities.setAbsoluteCapacity(
label, queueCapacities.getCapacity(label)
* getQueueCapacities().getAbsoluteCapacity(label));
queueCapacities.setAbsoluteMaximumCapacity(label,
queueCapacities.getMaximumCapacity(label)
* getQueueCapacities().getAbsoluteMaximumCapacity(label));
}
}
protected void validate(final CSQueue newlyParsedQueue) throws IOException {
// Sanity check
if (!(newlyParsedQueue instanceof ManagedParentQueue) || !newlyParsedQueue
.getQueuePath().equals(getQueuePath())) {
throw new IOException(
"Trying to reinitialize " + getQueuePath() + " from "
+ newlyParsedQueue.getQueuePath());
}
}
@Override
public void addChildQueue(CSQueue childQueue)
throws SchedulerDynamicEditException, IOException {
writeLock.lock();
try {
if (childQueue == null || !(childQueue instanceof AutoCreatedLeafQueue)) {
throw new SchedulerDynamicEditException(
"Expected child queue to be an instance of AutoCreatedLeafQueue");
}
CapacitySchedulerConfiguration conf = queueContext.getConfiguration();
ManagedParentQueue parentQueue =
(ManagedParentQueue) childQueue.getParent();
if (parentQueue == null) {
throw new SchedulerDynamicEditException(
"Parent Queue is null, should not add child queue!");
}
String leafQueuePath = childQueue.getQueuePath();
int maxQueues = conf.getAutoCreatedQueuesMaxChildQueuesLimit(
parentQueue.getQueuePathObject());
if (parentQueue.getChildQueues().size() >= maxQueues) {
throw new SchedulerDynamicEditException(
"Cannot auto create leaf queue " + leafQueuePath + ".Max Child "
+ "Queue limit exceeded which is configured as : " + maxQueues
+ " and number of child queues is : " + parentQueue
.getChildQueues().size());
}
if (shouldFailAutoCreationWhenGuaranteedCapacityExceeded) {
if (getLeafQueueTemplate().getQueueCapacities().getAbsoluteCapacity()
+ parentQueue.sumOfChildAbsCapacities() > parentQueue
.getAbsoluteCapacity()) {
throw new SchedulerDynamicEditException(
"Cannot auto create leaf queue " + leafQueuePath + ". Child "
+ "queues capacities have reached parent queue : "
+ parentQueue.getQueuePath() + "'s guaranteed capacity");
}
}
((GuaranteedOrZeroCapacityOverTimePolicy) queueManagementPolicy)
.updateTemplateAbsoluteCapacities(parentQueue.getQueueCapacities());
AutoCreatedLeafQueue leafQueue = (AutoCreatedLeafQueue) childQueue;
super.addChildQueue(leafQueue);
/* Below is to avoid Setting Queue Capacity to NaN when ClusterResource
is zero during RM Startup with DominantResourceCalculator */
if (this.capacityConfigType.equals(
CapacityConfigType.ABSOLUTE_RESOURCE)) {
QueueCapacities queueCapacities =
getLeafQueueTemplate().getQueueCapacities();
updateQueueCapacities(queueCapacities);
}
setLeafQueuesCapacityVector(leafQueue);
final AutoCreatedLeafQueueConfig initialLeafQueueTemplate =
queueManagementPolicy.getInitialLeafQueueConfiguration(leafQueue);
leafQueue.reinitializeFromTemplate(initialLeafQueueTemplate);
// Do one update cluster resource call to make sure all absolute resources
// effective resources are updated.
updateClusterResource(queueContext.getClusterResource(),
new ResourceLimits(queueContext.getClusterResource()));
} finally {
writeLock.unlock();
}
}
private void setLeafQueuesCapacityVector(AutoCreatedLeafQueue leafQueue) {
// Parse the capacityVector specified in the leaf-template
CapacitySchedulerConfiguration leafConfig = leafQueueTemplate.getLeafQueueConfigs();
Set<String> templateConfiguredNodeLabels = queueContext
.getQueueManager().getConfiguredNodeLabelsForAllQueues()
.getLabelsByQueue(queuePath.getFullPath());
for (String label : templateConfiguredNodeLabels) {
final String leafConfigPath =
QueuePrefixes.getNodeLabelPrefix(
QueuePrefixes.getAutoCreatedQueueObjectTemplateConfPrefix(getQueuePathObject()),
label);
String capacityString = leafConfig.get(leafConfigPath + CAPACITY, "0");
leafQueue.setConfiguredMinCapacityVector(label,
getQueueCapacityConfigParser().parse(capacityString, leafQueue.getQueuePathObject()));
String maxCapacityString = leafConfig.get(leafConfigPath + MAXIMUM_CAPACITY, "100");
leafQueue.setConfiguredMaxCapacityVector(label,
getQueueCapacityConfigParser().parse(maxCapacityString, leafQueue.getQueuePathObject()));
}
}
public List<FiCaSchedulerApp> getScheduleableApplications() {
readLock.lock();
try {
List<FiCaSchedulerApp> apps = new ArrayList<>();
for (CSQueue childQueue : getChildQueues()) {
apps.addAll(((AbstractLeafQueue) childQueue).getApplications());
}
return Collections.unmodifiableList(apps);
} finally {
readLock.unlock();
}
}
public List<FiCaSchedulerApp> getPendingApplications() {
readLock.lock();
try {
List<FiCaSchedulerApp> apps = new ArrayList<>();
for (CSQueue childQueue : getChildQueues()) {
apps.addAll(((AbstractLeafQueue) childQueue).getPendingApplications());
}
return Collections.unmodifiableList(apps);
} finally {
readLock.unlock();
}
}
public List<FiCaSchedulerApp> getAllApplications() {
readLock.lock();
try {
List<FiCaSchedulerApp> apps = new ArrayList<>();
for (CSQueue childQueue : getChildQueues()) {
apps.addAll(((AbstractLeafQueue) childQueue).getAllApplications());
}
return Collections.unmodifiableList(apps);
} finally {
readLock.unlock();
}
}
public String getLeafQueueConfigPrefix() {
return CapacitySchedulerConfiguration.PREFIX + QueuePrefixes
.getAutoCreatedQueueTemplateConfPrefix(getQueuePathObject());
}
public boolean shouldFailAutoCreationWhenGuaranteedCapacityExceeded() {
return shouldFailAutoCreationWhenGuaranteedCapacityExceeded;
}
/**
* Asynchronously called from scheduler to apply queue management changes.
*
* @param queueManagementChanges QueueManagementChange List.
* @throws IOException an I/O exception has occurred.
* @throws SchedulerDynamicEditException when validate and apply QueueManagementChanges fails.
*/
public void validateAndApplyQueueManagementChanges(
List<QueueManagementChange> queueManagementChanges)
throws IOException, SchedulerDynamicEditException {
writeLock.lock();
try {
validateQueueManagementChanges(queueManagementChanges);
applyQueueManagementChanges(queueManagementChanges);
AutoCreatedQueueManagementPolicy policy =
getAutoCreatedQueueManagementPolicy();
//acquires write lock on policy
policy.commitQueueManagementChanges(queueManagementChanges);
} finally {
writeLock.unlock();
}
}
public void validateQueueManagementChanges(
List<QueueManagementChange> queueManagementChanges)
throws SchedulerDynamicEditException {
for (QueueManagementChange queueManagementChange : queueManagementChanges) {
CSQueue childQueue = queueManagementChange.getQueue();
if (!(childQueue instanceof AutoCreatedLeafQueue)) {
throw new SchedulerDynamicEditException(
"queue should be " + "AutoCreatedLeafQueue. Found " + childQueue
.getClass());
}
if (!(AbstractManagedParentQueue.class.
isAssignableFrom(childQueue.getParent().getClass()))) {
LOG.error("Queue " + getQueuePath()
+ " is not an instance of PlanQueue or ManagedParentQueue." + " "
+ "Ignoring update " + queueManagementChanges);
throw new SchedulerDynamicEditException(
"Queue " + getQueuePath() + " is not a AutoEnabledParentQueue."
+ " Ignoring update " + queueManagementChanges);
}
if (queueManagementChange.getQueueAction() ==
QueueManagementChange.QueueAction.UPDATE_QUEUE) {
AutoCreatedLeafQueueConfig template =
queueManagementChange.getUpdatedQueueTemplate();
((AutoCreatedLeafQueue) childQueue).validateConfigurations(template);
}
}
}
private void applyQueueManagementChanges(
List<QueueManagementChange> queueManagementChanges)
throws SchedulerDynamicEditException, IOException {
for (QueueManagementChange queueManagementChange : queueManagementChanges) {
if (queueManagementChange.getQueueAction() ==
QueueManagementChange.QueueAction.UPDATE_QUEUE) {
AutoCreatedLeafQueue childQueueToBeUpdated =
(AutoCreatedLeafQueue) queueManagementChange.getQueue();
setLeafQueuesCapacityVector(childQueueToBeUpdated);
//acquires write lock on leaf queue
childQueueToBeUpdated.reinitializeFromTemplate(
queueManagementChange.getUpdatedQueueTemplate());
}
}
}
public void setLeafQueueConfigs(String leafQueueName) {
CapacitySchedulerConfiguration templateConfig = leafQueueTemplate.getLeafQueueConfigs();
for (Map.Entry<String, String> confKeyValuePair : templateConfig) {
final String name = confKeyValuePair.getKey()
.replaceFirst(AUTO_CREATED_LEAF_QUEUE_TEMPLATE_PREFIX,
leafQueueName);
queueContext.setConfigurationEntry(name, confKeyValuePair.getValue());
}
}
}