WeightQueueCapacityCalculator.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 * <p>
 * http://www.apache.org/licenses/LICENSE-2.0
 * <p>
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;

import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacityVector.ResourceUnitCapacityType;

import java.util.Collection;

import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacityVector.ResourceUnitCapacityType.WEIGHT;

public class WeightQueueCapacityCalculator extends AbstractQueueCapacityCalculator {

  @Override
  public void calculateResourcePrerequisites(ResourceCalculationDriver resourceCalculationDriver) {
    // Precalculate the summary of children's weight
    for (CSQueue childQueue : resourceCalculationDriver.getChildQueues()) {
      for (String label : childQueue.getConfiguredNodeLabels()) {
        for (String resourceName : childQueue.getConfiguredCapacityVector(label)
            .getResourceNamesByCapacityType(getCapacityType())) {
          resourceCalculationDriver.incrementWeight(label, resourceName, childQueue
              .getConfiguredCapacityVector(label).getResource(resourceName).getResourceValue());
        }
      }
    }
  }

  @Override
  public double calculateMinimumResource(ResourceCalculationDriver resourceCalculationDriver,
                                        CalculationContext context,
                                        String label) {
    String resourceName = context.getResourceName();
    double normalizedWeight = context.getCurrentMinimumCapacityEntry(label).getResourceValue() /
        resourceCalculationDriver.getSumWeightsByResource(label, resourceName);

    double remainingResource = resourceCalculationDriver.getBatchRemainingResource(label)
        .getValue(resourceName);

    // Due to rounding loss it is better to use all remaining resources if no other resource uses
    // weight
    if (normalizedWeight == 1) {
      return remainingResource;
    }

    double remainingResourceRatio = resourceCalculationDriver.getRemainingRatioOfResource(
        label, resourceName);
    double parentAbsoluteCapacity = resourceCalculationDriver.getParentAbsoluteMinCapacity(
        label, resourceName);
    double queueAbsoluteCapacity = parentAbsoluteCapacity * remainingResourceRatio
        * normalizedWeight;

    return resourceCalculationDriver.getUpdateContext()
        .getUpdatedClusterResource(label).getResourceValue(resourceName) * queueAbsoluteCapacity;
  }

  @Override
  public double calculateMaximumResource(ResourceCalculationDriver resourceCalculationDriver,
                                        CalculationContext context,
                                        String label) {
    throw new IllegalStateException("Resource " + context.getCurrentMinimumCapacityEntry(
        label).getResourceName() +
        " has " + "WEIGHT maximum capacity type, which is not supported");
  }

  @Override
  public ResourceUnitCapacityType getCapacityType() {
    return WEIGHT;
  }

  @Override
  public void updateCapacitiesAfterCalculation(
      ResourceCalculationDriver resourceCalculationDriver, CSQueue queue, String label) {
    double sumCapacityPerResource = 0f;

    Collection<String> resourceNames = getResourceNames(queue, label);
    for (String resourceName : resourceNames) {
      double sumBranchWeight = resourceCalculationDriver.getSumWeightsByResource(label,
          resourceName);
      double capacity =  queue.getConfiguredCapacityVector(
          label).getResource(resourceName).getResourceValue() / sumBranchWeight;
      sumCapacityPerResource += capacity;
    }

    queue.getQueueCapacities().setNormalizedWeight(label,
        (float) (sumCapacityPerResource / resourceNames.size()));
    ((AbstractCSQueue) queue).updateAbsoluteCapacities();
  }
}