AbsoluteResourceCapacityCalculator.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacityVector.ResourceUnitCapacityType;
import org.apache.hadoop.yarn.util.UnitsConversionUtil;
import java.util.Map;
import static org.apache.hadoop.yarn.api.records.ResourceInformation.MEMORY_URI;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueUpdateWarning.QueueUpdateWarningType.BRANCH_DOWNSCALED;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ResourceCalculationDriver.MB_UNIT;
public class AbsoluteResourceCapacityCalculator extends AbstractQueueCapacityCalculator {
@Override
public void calculateResourcePrerequisites(ResourceCalculationDriver resourceCalculationDriver) {
setNormalizedResourceRatio(resourceCalculationDriver);
}
@Override
public double calculateMinimumResource(
ResourceCalculationDriver resourceCalculationDriver, CalculationContext context,
String label) {
String resourceName = context.getResourceName();
double normalizedRatio = resourceCalculationDriver.getNormalizedResourceRatios().getOrDefault(
label, ResourceVector.of(1)).getValue(resourceName);
double remainingResourceRatio = resourceCalculationDriver.getRemainingRatioOfResource(
label, resourceName);
return normalizedRatio * remainingResourceRatio * context.getCurrentMinimumCapacityEntry(
label).getResourceValue();
}
@Override
public double calculateMaximumResource(
ResourceCalculationDriver resourceCalculationDriver, CalculationContext context,
String label) {
return context.getCurrentMaximumCapacityEntry(label).getResourceValue();
}
@Override
public void updateCapacitiesAfterCalculation(
ResourceCalculationDriver resourceCalculationDriver, CSQueue queue, String label) {
CapacitySchedulerQueueCapacityHandler.setQueueCapacities(
resourceCalculationDriver.getUpdateContext()
.getUpdatedClusterResource(label), queue, label);
}
@Override
public ResourceUnitCapacityType getCapacityType() {
return ResourceUnitCapacityType.ABSOLUTE;
}
/**
* Calculates the normalized resource ratio of a parent queue, under which children are defined
* with absolute capacity type. If the effective resource of the parent is less, than the
* aggregated configured absolute resource of its children, the resource ratio will be less,
* than 1.
*
* @param calculationDriver the driver, which contains the parent queue that will form the base
* of the normalization calculation
*/
public static void setNormalizedResourceRatio(ResourceCalculationDriver calculationDriver) {
CSQueue queue = calculationDriver.getQueue();
for (String label : queue.getConfiguredNodeLabels()) {
// ManagedParents assign zero capacity to queues in case of overutilization, downscaling is
// turned off for their children
if (queue instanceof ManagedParentQueue) {
return;
}
for (String resourceName : queue.getConfiguredCapacityVector(label).getResourceNames()) {
long childrenConfiguredResource = 0;
long effectiveMinResource = queue.getQueueResourceQuotas().getEffectiveMinResource(
label).getResourceValue(resourceName);
// Total configured min resources of direct children of the queue
for (CSQueue childQueue : queue.getChildQueues()) {
if (!childQueue.getConfiguredNodeLabels().contains(label)) {
continue;
}
QueueCapacityVector capacityVector = childQueue.getConfiguredCapacityVector(label);
if (capacityVector.isResourceOfType(resourceName, ResourceUnitCapacityType.ABSOLUTE)) {
childrenConfiguredResource += capacityVector.getResource(resourceName)
.getResourceValue();
}
}
// If no children is using ABSOLUTE capacity type, normalization is not needed
if (childrenConfiguredResource == 0) {
continue;
}
// Factor to scale down effective resource: When cluster has sufficient
// resources, effective_min_resources will be same as configured
// min_resources.
float numeratorForMinRatio = childrenConfiguredResource;
if (effectiveMinResource < childrenConfiguredResource) {
numeratorForMinRatio = queue.getQueueResourceQuotas().getEffectiveMinResource(label)
.getResourceValue(resourceName);
calculationDriver.getUpdateContext().addUpdateWarning(BRANCH_DOWNSCALED.ofQueue(
queue.getQueuePath()));
}
String unit = resourceName.equals(MEMORY_URI) ? MB_UNIT : "";
long convertedValue = UnitsConversionUtil.convert(unit, calculationDriver.getUpdateContext()
.getUpdatedClusterResource(label).getResourceInformation(resourceName).getUnits(),
childrenConfiguredResource);
if (convertedValue != 0) {
Map<String, ResourceVector> normalizedResourceRatios =
calculationDriver.getNormalizedResourceRatios();
normalizedResourceRatios.putIfAbsent(label, ResourceVector.newInstance());
normalizedResourceRatios.get(label).setValue(resourceName, numeratorForMinRatio /
convertedValue);
}
}
}
}
}