ResourceCalculationDriver.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 * <p>
 * http://www.apache.org/licenses/LICENSE-2.0
 * <p>
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;

import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacityVector.QueueCapacityVectorEntry;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacityVector.ResourceUnitCapacityType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueUpdateWarning.QueueUpdateWarningType;

import java.util.Collection;
import java.util.HashMap;
import java.util.Map;

import static org.apache.hadoop.yarn.api.records.ResourceInformation.MEMORY_URI;

/**
 * Drives the main logic of resource calculation for all children under a queue. Acts as a
 * bookkeeper of disposable update information that is used by all children under the common parent.
 */
public class ResourceCalculationDriver {
  private static final ResourceUnitCapacityType[] CALCULATOR_PRECEDENCE =
      new ResourceUnitCapacityType[] {
          ResourceUnitCapacityType.ABSOLUTE,
          ResourceUnitCapacityType.PERCENTAGE,
          ResourceUnitCapacityType.WEIGHT};
  static final String MB_UNIT = "Mi";

  protected final QueueResourceRoundingStrategy roundingStrategy =
      new DefaultQueueResourceRoundingStrategy(CALCULATOR_PRECEDENCE);
  protected final CSQueue queue;
  protected final QueueCapacityUpdateContext updateContext;
  protected final Map<ResourceUnitCapacityType, AbstractQueueCapacityCalculator> calculators;
  protected final Collection<String> definedResources;

  protected final Map<String, ResourceVector> overallRemainingResourcePerLabel = new HashMap<>();
  protected final Map<String, ResourceVector> batchRemainingResourcePerLabel = new HashMap<>();
  // Used by ABSOLUTE capacity types
  protected final Map<String, ResourceVector> normalizedResourceRatioPerLabel = new HashMap<>();
  // Used by WEIGHT capacity types
  protected final Map<String, Map<String, Double>> sumWeightsPerLabel = new HashMap<>();
  protected Map<String, Double> usedResourceByCurrentCalculatorPerLabel = new HashMap<>();

  public ResourceCalculationDriver(
      CSQueue queue, QueueCapacityUpdateContext updateContext,
      Map<ResourceUnitCapacityType, AbstractQueueCapacityCalculator> calculators,
      Collection<String> definedResources) {
    this.queue = queue;
    this.updateContext = updateContext;
    this.calculators = calculators;
    this.definedResources = definedResources;
  }


  /**
   * Returns the parent that is driving the calculation.
   *
   * @return a common parent queue
   */
  public CSQueue getQueue() {
    return queue;
  }

  /**
   * Returns all the children defined under the driver parent queue.
   *
   * @return child queues
   */
  public Collection<CSQueue> getChildQueues() {
    return queue.getChildQueues();
  }

  /**
   * Returns the context that is used throughout the whole update phase.
   *
   * @return update context
   */
  public QueueCapacityUpdateContext getUpdateContext() {
    return updateContext;
  }

  /**
   * Increments the aggregated weight.
   *
   * @param label        node label
   * @param resourceName resource unit name
   * @param value        weight value
   */
  public void incrementWeight(String label, String resourceName, double value) {
    sumWeightsPerLabel.putIfAbsent(label, new HashMap<>());
    sumWeightsPerLabel.get(label).put(resourceName,
        sumWeightsPerLabel.get(label).getOrDefault(resourceName, 0d) + value);
  }

  /**
   * Returns the aggregated children weights.
   *
   * @param label        node label
   * @param resourceName resource unit name
   * @return aggregated weights of children
   */
  public double getSumWeightsByResource(String label, String resourceName) {
    return sumWeightsPerLabel.get(label).get(resourceName);
  }

  /**
   * Returns the ratio of the summary of children absolute configured resources and the parent's
   * effective minimum resource.
   *
   * @return normalized resource ratio for all labels
   */
  public Map<String, ResourceVector> getNormalizedResourceRatios() {
    return normalizedResourceRatioPerLabel;
  }

  /**
   * Returns the remaining resource ratio under the parent queue. The remaining resource is only
   * decremented after a capacity type is fully evaluated.
   *
   * @param label node label
   * @param resourceName name of resource unit
   * @return resource ratio
   */
  public double getRemainingRatioOfResource(String label, String resourceName) {
    return batchRemainingResourcePerLabel.get(label).getValue(resourceName)
        / queue.getEffectiveCapacity(label).getResourceValue(resourceName);
  }

  /**
   * Returns the ratio of the parent queue's effective minimum resource relative to the full cluster
   * resource.
   *
   * @param label node label
   * @param resourceName name of resource unit
   * @return absolute minimum capacity
   */
  public double getParentAbsoluteMinCapacity(String label, String resourceName) {
    return (double) queue.getEffectiveCapacity(label).getResourceValue(resourceName)
        / getUpdateContext().getUpdatedClusterResource(label).getResourceValue(resourceName);
  }

  /**
   * Returns the ratio of the parent queue's effective maximum resource relative to the full cluster
   * resource.
   *
   * @param label node label
   * @param resourceName name of resource unit
   * @return absolute maximum capacity
   */
  public double getParentAbsoluteMaxCapacity(String label, String resourceName) {
    return (double) queue.getEffectiveMaxCapacity(label).getResourceValue(resourceName)
        / getUpdateContext().getUpdatedClusterResource(label).getResourceValue(resourceName);
  }

  /**
   * Returns the remaining resources of a parent that is still available for its
   * children. Decremented only after the calculator is finished its work on the corresponding
   * resources.
   *
   * @param label node label
   * @return remaining resources
   */
  public ResourceVector getBatchRemainingResource(String label) {
    batchRemainingResourcePerLabel.putIfAbsent(label, ResourceVector.newInstance());
    return batchRemainingResourcePerLabel.get(label);
  }

  /**
   * Calculates and sets the minimum and maximum effective resources for all children under the
   * parent queue with which this driver was initialized.
   */
  public void calculateResources() {
    // Reset both remaining resource storage to the parent's available resource
    for (String label : queue.getConfiguredNodeLabels()) {
      overallRemainingResourcePerLabel.put(label,
          ResourceVector.of(queue.getEffectiveCapacity(label)));
      batchRemainingResourcePerLabel.put(label,
          ResourceVector.of(queue.getEffectiveCapacity(label)));
    }

    for (AbstractQueueCapacityCalculator capacityCalculator : calculators.values()) {
      capacityCalculator.calculateResourcePrerequisites(this);
    }

    for (String resourceName : definedResources) {
      for (ResourceUnitCapacityType capacityType : CALCULATOR_PRECEDENCE) {
        for (CSQueue childQueue : getChildQueues()) {
          CalculationContext context = new CalculationContext(resourceName, capacityType,
              childQueue);
          calculateResourceOnChild(context);
        }

        // Flush aggregated used resource by labels at the end of a calculator phase
        for (Map.Entry<String, Double> entry : usedResourceByCurrentCalculatorPerLabel.entrySet()) {
          batchRemainingResourcePerLabel.get(entry.getKey()).decrement(resourceName,
              entry.getValue());
        }

        usedResourceByCurrentCalculatorPerLabel = new HashMap<>();
      }
    }

    validateRemainingResource();
  }

  private void calculateResourceOnChild(CalculationContext context) {
    context.getQueue().getWriteLock().lock();
    try {
      for (String label : context.getQueue().getConfiguredNodeLabels()) {
        if (!context.getQueue().getConfiguredCapacityVector(label).isResourceOfType(
            context.getResourceName(), context.getCapacityType())) {
          continue;
        }

        if (!overallRemainingResourcePerLabel.containsKey(label)) {
          continue;
        }

        double usedResourceByChild = setChildResources(context, label);
        double aggregatedUsedResource = usedResourceByCurrentCalculatorPerLabel.getOrDefault(label,
            0d);
        double resourceUsedByLabel = aggregatedUsedResource + usedResourceByChild;

        overallRemainingResourcePerLabel.get(label).decrement(context.getResourceName(),
            usedResourceByChild);
        usedResourceByCurrentCalculatorPerLabel.put(label, resourceUsedByLabel);
      }
    } finally {
      context.getQueue().getWriteLock().unlock();
    }
  }

  private double setChildResources(CalculationContext context, String label) {
    QueueCapacityVectorEntry capacityVectorEntry = context.getQueue().getConfiguredCapacityVector(
        label).getResource(context.getResourceName());
    QueueCapacityVectorEntry maximumCapacityVectorEntry = context.getQueue()
        .getConfiguredMaxCapacityVector(label).getResource(context.getResourceName());
    AbstractQueueCapacityCalculator maximumCapacityCalculator = calculators.get(
        maximumCapacityVectorEntry.getVectorResourceType());

    double minimumResource =
        calculators.get(context.getCapacityType()).calculateMinimumResource(this, context, label);
    double maximumResource = maximumCapacityCalculator.calculateMaximumResource(this, context,
        label);

    minimumResource = roundingStrategy.getRoundedResource(minimumResource, capacityVectorEntry);
    maximumResource = roundingStrategy.getRoundedResource(maximumResource,
        maximumCapacityVectorEntry);
    Pair<Double, Double> resources = validateCalculatedResources(context, label,
        new ImmutablePair<>(
        minimumResource, maximumResource));
    minimumResource = resources.getLeft();
    maximumResource = resources.getRight();

    context.getQueue().getQueueResourceQuotas().getEffectiveMinResource(label).setResourceValue(
        context.getResourceName(), (long) minimumResource);
    context.getQueue().getQueueResourceQuotas().getEffectiveMaxResource(label).setResourceValue(
        context.getResourceName(), (long) maximumResource);

    return minimumResource;
  }

  private Pair<Double, Double> validateCalculatedResources(CalculationContext context,
      String label, Pair<Double, Double> calculatedResources) {
    double minimumResource = calculatedResources.getLeft();
    long minimumMemoryResource =
        context.getQueue().getQueueResourceQuotas().getEffectiveMinResource(label).getMemorySize();

    double remainingResourceUnderParent = overallRemainingResourcePerLabel.get(label).getValue(
        context.getResourceName());

    long parentMaximumResource = queue.getEffectiveMaxCapacity(label).getResourceValue(
        context.getResourceName());
    double maximumResource = calculatedResources.getRight();

    // Memory is the primary resource, if its zero, all other resource units are zero as well.
    if (!context.getResourceName().equals(MEMORY_URI) && minimumMemoryResource == 0) {
      minimumResource = 0;
    }

    if (maximumResource != 0 && maximumResource > parentMaximumResource) {
      updateContext.addUpdateWarning(QueueUpdateWarningType.QUEUE_MAX_RESOURCE_EXCEEDS_PARENT
          .ofQueue(context.getQueue().getQueuePath()));
    }
    maximumResource = maximumResource == 0 ? parentMaximumResource : Math.min(maximumResource,
        parentMaximumResource);

    if (maximumResource < minimumResource) {
      updateContext.addUpdateWarning(QueueUpdateWarningType.QUEUE_EXCEEDS_MAX_RESOURCE.ofQueue(
          context.getQueue().getQueuePath()));
      minimumResource = maximumResource;
    }

    if (minimumResource > remainingResourceUnderParent) {
      // Legacy auto queues are assigned a zero resource if not enough resource is left
      if (queue instanceof ManagedParentQueue) {
        minimumResource = 0;
      } else {
        updateContext.addUpdateWarning(
            QueueUpdateWarningType.QUEUE_OVERUTILIZED.ofQueue(
                context.getQueue().getQueuePath()).withInfo(
                    "Resource name: " + context.getResourceName() +
                        " resource value: " + minimumResource));
        minimumResource = remainingResourceUnderParent;
      }
    }

    if (minimumResource == 0) {
      updateContext.addUpdateWarning(QueueUpdateWarningType.QUEUE_ZERO_RESOURCE.ofQueue(
          context.getQueue().getQueuePath())
          .withInfo("Resource name: " + context.getResourceName()));
    }

    return new ImmutablePair<>(minimumResource, maximumResource);
  }

  private void validateRemainingResource() {
    for (String label : queue.getConfiguredNodeLabels()) {
      if (!batchRemainingResourcePerLabel.get(label).equals(ResourceVector.newInstance())) {
        updateContext.addUpdateWarning(QueueUpdateWarningType.BRANCH_UNDERUTILIZED.ofQueue(
            queue.getQueuePath()).withInfo("Label: " + label));
      }
    }
  }
}