CapacitySchedulerQueueCapacityHandler.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacityVector.ResourceUnitCapacityType;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Set;
import java.util.stream.Stream;
import static org.apache.hadoop.yarn.api.records.ResourceInformation.MEMORY_URI;
import static org.apache.hadoop.yarn.api.records.ResourceInformation.VCORES_URI;
import static org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager.NO_LABEL;
/**
* Controls how capacity and resource values are set and calculated for a queue.
* Effective minimum and maximum resource values are set for each label and resource separately.
*/
public class CapacitySchedulerQueueCapacityHandler {
private static final Logger LOG =
LoggerFactory.getLogger(CapacitySchedulerQueueCapacityHandler.class);
private final Map<ResourceUnitCapacityType, AbstractQueueCapacityCalculator>
calculators;
private final AbstractQueueCapacityCalculator rootCalculator =
new RootQueueCapacityCalculator();
private final RMNodeLabelsManager labelsManager;
private final Collection<String> definedResources = new LinkedHashSet<>();
private final boolean isLegacyQueueMode;
public CapacitySchedulerQueueCapacityHandler(RMNodeLabelsManager labelsManager,
CapacitySchedulerConfiguration configuration) {
this.calculators = new HashMap<>();
this.labelsManager = labelsManager;
this.calculators.put(ResourceUnitCapacityType.ABSOLUTE,
new AbsoluteResourceCapacityCalculator());
this.calculators.put(ResourceUnitCapacityType.PERCENTAGE,
new PercentageQueueCapacityCalculator());
this.calculators.put(ResourceUnitCapacityType.WEIGHT,
new WeightQueueCapacityCalculator());
this.isLegacyQueueMode = configuration.isLegacyQueueMode();
loadResourceNames();
}
/**
* Updates the resource and metrics values of all children under a specific queue.
* These values are calculated at runtime.
*
* @param clusterResource resource of the cluster
* @param queue parent queue whose children will be updated
* @return update context that contains information about the update phase
*/
public QueueCapacityUpdateContext updateChildren(Resource clusterResource, CSQueue queue) {
ResourceLimits resourceLimits = new ResourceLimits(clusterResource);
QueueCapacityUpdateContext updateContext =
new QueueCapacityUpdateContext(clusterResource, labelsManager);
update(queue, updateContext, resourceLimits);
return updateContext;
}
/**
* Updates the resource and metrics value of the root queue. Root queue always has percentage
* capacity type and is assigned the cluster resource as its minimum and maximum effective
* resource.
* @param rootQueue root queue
* @param clusterResource cluster resource
*/
public void updateRoot(CSQueue rootQueue, Resource clusterResource) {
ResourceLimits resourceLimits = new ResourceLimits(clusterResource);
QueueCapacityUpdateContext updateContext =
new QueueCapacityUpdateContext(clusterResource, labelsManager);
RootCalculationDriver rootCalculationDriver = new RootCalculationDriver(rootQueue,
updateContext,
rootCalculator, definedResources);
rootCalculationDriver.calculateResources();
rootQueue.refreshAfterResourceCalculation(updateContext.getUpdatedClusterResource(),
resourceLimits);
}
private void update(
CSQueue queue, QueueCapacityUpdateContext updateContext, ResourceLimits resourceLimits) {
if (queue == null || CollectionUtils.isEmpty(queue.getChildQueues())) {
return;
}
ResourceCalculationDriver resourceCalculationDriver = new ResourceCalculationDriver(
queue, updateContext, calculators, definedResources);
resourceCalculationDriver.calculateResources();
updateChildrenAfterCalculation(resourceCalculationDriver, resourceLimits);
}
private void updateChildrenAfterCalculation(
ResourceCalculationDriver resourceCalculationDriver, ResourceLimits resourceLimits) {
AbstractParentQueue parentQueue = (AbstractParentQueue) resourceCalculationDriver.getQueue();
for (CSQueue childQueue : parentQueue.getChildQueues()) {
updateQueueCapacities(resourceCalculationDriver, childQueue);
ResourceLimits childLimit = parentQueue.getResourceLimitsOfChild(childQueue,
resourceCalculationDriver.getUpdateContext().getUpdatedClusterResource(),
resourceLimits, NO_LABEL, false);
childQueue.refreshAfterResourceCalculation(resourceCalculationDriver.getUpdateContext()
.getUpdatedClusterResource(), childLimit);
update(childQueue, resourceCalculationDriver.getUpdateContext(), childLimit);
}
}
/**
* Updates the capacity values of the currently evaluated child.
* @param queue queue on which the capacities are set
*/
private void updateQueueCapacities(
ResourceCalculationDriver resourceCalculationDriver, CSQueue queue) {
queue.getWriteLock().lock();
try {
for (String label : queue.getConfiguredNodeLabels()) {
if (!isLegacyQueueMode) {
// Post update capacities based on the calculated effective resource values
setQueueCapacities(resourceCalculationDriver.getUpdateContext().getUpdatedClusterResource(
label), queue, label);
} else {
// Update capacities according to the legacy logic
for (ResourceUnitCapacityType capacityType :
queue.getConfiguredCapacityVector(label).getDefinedCapacityTypes()) {
AbstractQueueCapacityCalculator calculator = calculators.get(capacityType);
calculator.updateCapacitiesAfterCalculation(resourceCalculationDriver, queue, label);
}
}
}
} finally {
queue.getWriteLock().unlock();
}
}
/**
* Sets capacity and absolute capacity values of a queue based on minimum and
* maximum effective resources.
*
* @param clusterResource overall cluster resource
* @param queue child queue for which the capacities are set
* @param label node label
*/
public static void setQueueCapacities(Resource clusterResource, CSQueue queue, String label) {
if (!(queue instanceof AbstractCSQueue)) {
return;
}
AbstractCSQueue csQueue = (AbstractCSQueue) queue;
// Do not override reservations when there are no cluster resources yet
if ((csQueue instanceof ReservationQueue ||
csQueue instanceof PlanQueue) &&
Stream.of(clusterResource.getResources())
.map(ResourceInformation::getValue)
.noneMatch(num -> num > 0)) {
return;
}
ResourceCalculator resourceCalculator = csQueue.resourceCalculator;
CSQueue parent = queue.getParent();
if (parent == null) {
return;
}
// Update capacity with a double calculated from the parent's minResources
// and the recently changed queue minResources.
// capacity = effectiveMinResource / {parent's effectiveMinResource}
float result = resourceCalculator.divide(clusterResource,
queue.getQueueResourceQuotas().getEffectiveMinResource(label),
parent.getQueueResourceQuotas().getEffectiveMinResource(label));
queue.getQueueCapacities().setCapacity(label,
Float.isInfinite(result) ? 0 : result);
// Update maxCapacity with a double calculated from the parent's maxResources
// and the recently changed queue maxResources.
// maxCapacity = effectiveMaxResource / parent's effectiveMaxResource
result = resourceCalculator.divide(clusterResource,
queue.getQueueResourceQuotas().getEffectiveMaxResource(label),
parent.getQueueResourceQuotas().getEffectiveMaxResource(label));
queue.getQueueCapacities().setMaximumCapacity(label,
Float.isInfinite(result) ? 0 : result);
csQueue.updateAbsoluteCapacities();
}
private void loadResourceNames() {
Set<String> resources = new HashSet<>(ResourceUtils.getResourceTypes().keySet());
if (resources.contains(MEMORY_URI)) {
resources.remove(MEMORY_URI);
definedResources.add(MEMORY_URI);
}
if (resources.contains(VCORES_URI)) {
resources.remove(VCORES_URI);
definedResources.add(VCORES_URI);
}
definedResources.addAll(resources);
}
}