AbstractParentQueue.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.LockSupport;

import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableList;
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableMap;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.util.Sets;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.security.AccessType;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesLogger;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.AllocationState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.policy.QueueOrderingPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerAllocationProposal;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.SchedulerContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSet;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSetUtils;
import org.apache.hadoop.yarn.util.UnitsConversionUtil;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.apache.hadoop.yarn.util.resource.Resources;

import static org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager.NO_LABEL;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.getACLsForFlexibleAutoCreatedParentQueue;

public abstract class AbstractParentQueue extends AbstractCSQueue {
  private static final Logger LOG =
      LoggerFactory.getLogger(AbstractParentQueue.class);

  protected final List<CSQueue> childQueues;
  private final boolean rootQueue;
  private AtomicInteger numApplications = new AtomicInteger(0);

  private final RecordFactory recordFactory =
      RecordFactoryProvider.getRecordFactory(null);

  private QueueOrderingPolicy queueOrderingPolicy;

  private long lastSkipQueueDebugLoggingTimestamp = -1;

  private int runnableApps;

  private final boolean allowZeroCapacitySum;

  private AutoCreatedQueueTemplate autoCreatedQueueTemplate;

  // A ratio of the queue's effective minimum resource and the summary of the configured
  // minimum resource of its children grouped by labels and calculated for each resource names
  // distinctively.
  private final Map<String, Map<String, Float>> effectiveMinResourceRatio =
      new ConcurrentHashMap<>();

  public AbstractParentQueue(CapacitySchedulerQueueContext queueContext,
      String queueName, CSQueue parent, CSQueue old)
      throws IOException {
    this(queueContext, queueName, parent, old, false);
  }

  public AbstractParentQueue(CapacitySchedulerQueueContext queueContext,
      String queueName, CSQueue parent, CSQueue old, boolean isDynamic) throws
      IOException {

    super(queueContext, queueName, parent, old);
    setDynamicQueue(isDynamic);
    this.rootQueue = (parent == null);

    float rawCapacity = queueContext.getConfiguration()
          .getNonLabeledQueueCapacity(this.queuePath);

    if (rootQueue &&
          (rawCapacity != CapacitySchedulerConfiguration.MAXIMUM_CAPACITY_VALUE)) {
      throw new IllegalArgumentException("Illegal " +
            "capacity of " + rawCapacity + " for queue " + queueName +
            ". Must be " + CapacitySchedulerConfiguration.MAXIMUM_CAPACITY_VALUE);
    }

    this.childQueues = new ArrayList<>();
    this.allowZeroCapacitySum =
          queueContext.getConfiguration()
              .getAllowZeroCapacitySum(getQueuePathObject());

  }

  // returns what is configured queue ordering policy
  private String getQueueOrderingPolicyConfigName() {
    return queueOrderingPolicy == null ?
        null :
        queueOrderingPolicy.getConfigName();
  }

  protected void setupQueueConfigs(Resource clusterResource)
      throws IOException {
    writeLock.lock();
    try {
      CapacitySchedulerConfiguration configuration = queueContext.getConfiguration();
      autoCreatedQueueTemplate = new AutoCreatedQueueTemplate(
          configuration, this.queuePath);
      super.setupQueueConfigs(clusterResource);
      StringBuilder aclsString = new StringBuilder();
      for (Map.Entry<AccessType, AccessControlList> e : getACLs().entrySet()) {
        aclsString.append(e.getKey()).append(":")
            .append(e.getValue().getAclString());
      }

      StringBuilder labelStrBuilder = new StringBuilder();
      if (getAccessibleNodeLabels() != null) {
        for (String nodeLabel : getAccessibleNodeLabels()) {
          labelStrBuilder.append(nodeLabel).append(",");
        }
      }

      // Initialize queue ordering policy
      queueOrderingPolicy = configuration.getQueueOrderingPolicy(
          getQueuePathObject(), parent == null ?
              null :
              ((AbstractParentQueue) parent).getQueueOrderingPolicyConfigName());
      queueOrderingPolicy.setQueues(childQueues);

      LOG.info(getQueueName() + ", " + getCapacityOrWeightString()
          + ", absoluteCapacity=" + getAbsoluteCapacity()
          + ", maxCapacity=" + getMaximumCapacity()
          + ", absoluteMaxCapacity=" + getAbsoluteMaximumCapacity()
          + ", state=" + getState() + ", acls="
          + aclsString + ", labels=" + labelStrBuilder + "\n"
          + ", reservationsContinueLooking=" + isReservationsContinueLooking()
          + ", orderingPolicy=" + getQueueOrderingPolicyConfigName()
          + ", priority=" + getPriority()
          + ", allowZeroCapacitySum=" + allowZeroCapacitySum);
    } finally {
      writeLock.unlock();
    }
  }

  @Override
  protected void setDynamicQueueACLProperties() {
    super.setDynamicQueueACLProperties();

    if (parent instanceof AbstractParentQueue) {
      acls.putAll(getACLsForFlexibleAutoCreatedParentQueue(
          ((AbstractParentQueue) parent).getAutoCreatedQueueTemplate()));
    }
  }

  private static float PRECISION = 0.0005f; // 0.05% precision

  // Check weight configuration, throw exception when configuration is invalid
  // return true when all children use weight mode.
  public QueueCapacityType getCapacityConfigurationTypeForQueues(
      Collection<CSQueue> queues) throws IOException {
    // Do we have ANY queue set capacity in any labels?
    boolean percentageIsSet = false;

    // Do we have ANY queue set weight in any labels?
    boolean weightIsSet = false;

    // Do we have ANY queue set absolute in any labels?
    boolean absoluteMinResSet = false;

    StringBuilder diagMsg = new StringBuilder();

    for (CSQueue queue : queues) {
      for (String nodeLabel : queueCapacities.getExistingNodeLabels()) {
        float capacityByLabel = queue.getQueueCapacities().getCapacity(nodeLabel);
        if (capacityByLabel > 0) {
          percentageIsSet = true;
        }
        float weightByLabel = queue.getQueueCapacities().getWeight(nodeLabel);
        // By default weight is set to -1, so >= 0 is enough.
        if (weightByLabel >= 0) {
          weightIsSet = true;
          diagMsg.append(
              "{Queue=" + queue.getQueuePath() + ", label=" + nodeLabel
                  + " uses weight mode}. ");
        }
        if (checkConfigTypeIsAbsoluteResource(queue.getQueuePathObject(), nodeLabel)) {
          absoluteMinResSet = true;
          // There's a special handling: when absolute resource is configured,
          // capacity will be calculated (and set) for UI/metrics purposes, so
          // when asboluteMinResource is set, unset percentage
          percentageIsSet = false;
          diagMsg.append(
              "{Queue=" + queue.getQueuePath() + ", label=" + nodeLabel
                  + " uses absolute mode}. ");
        }
        if (percentageIsSet) {
          diagMsg.append(
              "{Queue=" + queue.getQueuePath() + ", label=" + nodeLabel
                  + " uses percentage mode}. ");
        }
      }
    }
    // If we have mixed capacity, weight or absolute resource (any of the two)
    // We will throw exception
    // Root queue is an exception here, because by default root queue returns
    // 100 as capacity no matter what. We should look into this case in the
    // future. To avoid impact too many code paths, we don;t check root queue's
    // config.
    if (queues.iterator().hasNext() &&
        !queues.iterator().next().getQueuePath().equals(
        CapacitySchedulerConfiguration.ROOT) &&
        (percentageIsSet ? 1 : 0) + (weightIsSet ? 1 : 0) + (absoluteMinResSet ?
            1 :
            0) > 1) {
      throw new IOException("Parent queue '" + getQueuePath()
          + "' have children queue used mixed of "
          + " weight mode, percentage and absolute mode, it is not allowed, please "
          + "double check, details:" + diagMsg.toString());
    }

    if (weightIsSet || queues.isEmpty()) {
      return QueueCapacityType.WEIGHT;
    } else if (absoluteMinResSet) {
      return QueueCapacityType.ABSOLUTE_RESOURCE;
    } else {
      return QueueCapacityType.PERCENT;
    }
  }

  public enum QueueCapacityType {
    WEIGHT, ABSOLUTE_RESOURCE, PERCENT;
  }

  /**
   * Set child queue and verify capacities
   * +--------------+---------------------------+-------------------------------------+------------------------+
   * |              | parent-weight             | parent-pct                          | parent-abs             |
   * +--------------+---------------------------+-------------------------------------+------------------------+
   * | child-weight | No specific check         | No specific check                   | X                      |
   * +--------------+---------------------------+-------------------------------------+------------------------+
   * | child-pct    | Sum(children.capacity) =  | When:                               | X                      |
   * |              |   0 OR 100                |   parent.capacity>0                 |                        |
   * |              |                           |     sum(children.capacity)=100 OR 0 |                        |
   * |              |                           |   parent.capacity=0                 |                        |
   * |              |                           |     sum(children.capacity)=0        |                        |
   * +--------------+---------------------------+-------------------------------------+------------------------+
   * | child-abs    | X                         | X                                   | Sum(children.minRes)<= |
   * |              |                           |                                     | parent.minRes          |
   * +--------------+---------------------------+-------------------------------------+------------------------+
   * @param childQueues
   */
  void setChildQueues(Collection<CSQueue> childQueues) throws IOException {
    writeLock.lock();
    try {
      boolean isLegacyQueueMode = queueContext.getConfiguration().isLegacyQueueMode();
      if (isLegacyQueueMode) {
        QueueCapacityType childrenCapacityType =
            getCapacityConfigurationTypeForQueues(childQueues);
        QueueCapacityType parentCapacityType =
            getCapacityConfigurationTypeForQueues(ImmutableList.of(this));

        if (childrenCapacityType == QueueCapacityType.ABSOLUTE_RESOURCE
            || parentCapacityType == QueueCapacityType.ABSOLUTE_RESOURCE) {
          // We don't allow any mixed absolute + {weight, percentage} between
          // children and parent
          if (childrenCapacityType != parentCapacityType && !this.getQueuePath()
              .equals(CapacitySchedulerConfiguration.ROOT)) {
            throw new IOException("Parent=" + this.getQueuePath()
                + ": When absolute minResource is used, we must make sure both "
                + "parent and child all use absolute minResource");
          }

          // Ensure that for each parent queue: parent.min-resource >=
          // ��(child.min-resource).
          for (String nodeLabel : queueCapacities.getExistingNodeLabels()) {
            Resource minRes = Resources.createResource(0, 0);
            for (CSQueue queue : childQueues) {
              // Accumulate all min/max resource configured for all child queues.
              Resources.addTo(minRes, queue.getQueueResourceQuotas()
                  .getConfiguredMinResource(nodeLabel));
            }
            Resource resourceByLabel = labelManager.getResourceByLabel(nodeLabel,
                queueContext.getClusterResource());
            Resource parentMinResource =
                usageTracker.getQueueResourceQuotas().getConfiguredMinResource(nodeLabel);
            if (!parentMinResource.equals(Resources.none()) && Resources.lessThan(
                resourceCalculator, resourceByLabel, parentMinResource, minRes)) {
              throw new IOException(
                  "Parent Queues" + " capacity: " + parentMinResource
                      + " is less than" + " to its children:" + minRes
                      + " for queue:" + getQueueName());
            }
          }
        }

        // When child uses percent
        if (childrenCapacityType == QueueCapacityType.PERCENT) {
          float childrenPctSum = 0;
          // check label capacities
          for (String nodeLabel : queueCapacities.getExistingNodeLabels()) {
            // check children's labels
            childrenPctSum = 0;
            for (CSQueue queue : childQueues) {
              childrenPctSum += queue.getQueueCapacities().getCapacity(nodeLabel);
            }

            if (Math.abs(1 - childrenPctSum) > PRECISION) {
              // When children's percent sum != 100%
              if (Math.abs(childrenPctSum) > PRECISION) {
                // It is wrong when percent sum != {0, 1}
                throw new IOException(
                    "Illegal" + " capacity sum of " + childrenPctSum
                        + " for children of queue " + getQueueName() + " for label="
                        + nodeLabel + ". It should be either 0 or 1.0");
              } else {
                // We also allow children's percent sum = 0 under the following
                // conditions
                // - Parent uses weight mode
                // - Parent uses percent mode, and parent has
                //   (capacity=0 OR allowZero)
                if (parentCapacityType == QueueCapacityType.PERCENT) {
                  if ((Math.abs(queueCapacities.getCapacity(nodeLabel))
                      > PRECISION) && (!allowZeroCapacitySum)) {
                    throw new IOException(
                        "Illegal" + " capacity sum of " + childrenPctSum
                            + " for children of queue " + getQueueName()
                            + " for label=" + nodeLabel
                            + ". It is set to 0, but parent percent != 0, and "
                            + "doesn't allow children capacity to set to 0");
                  }
                }
              }
            } else {
              // Even if child pct sum == 1.0, we will make sure parent has
              // positive percent.
              if (parentCapacityType == QueueCapacityType.PERCENT && Math.abs(
                  queueCapacities.getCapacity(nodeLabel)) <= 0f
                  && !allowZeroCapacitySum) {
                throw new IOException(
                    "Illegal" + " capacity sum of " + childrenPctSum
                        + " for children of queue " + getQueueName() + " for label="
                        + nodeLabel + ". queue=" + getQueueName()
                        + " has zero capacity, but child"
                        + "queues have positive capacities");
              }
            }
          }
        }
      }

      this.childQueues.clear();
      this.childQueues.addAll(childQueues);
      if (LOG.isDebugEnabled()) {
        LOG.debug("setChildQueues: " + getChildQueuesToPrint());
      }
    } finally {
      writeLock.unlock();
    }
  }

  @Override
  public QueueInfo getQueueInfo(
      boolean includeChildQueues, boolean recursive) {
    readLock.lock();
    try {
      QueueInfo queueInfo = getQueueInfo();

      List<QueueInfo> childQueuesInfo = new ArrayList<>();
      if (includeChildQueues) {
        for (CSQueue child : childQueues) {
          // Get queue information recursively?
          childQueuesInfo.add(child.getQueueInfo(recursive, recursive));
        }
      }
      queueInfo.setChildQueues(childQueuesInfo);

      return queueInfo;
    } finally {
      readLock.unlock();
    }

  }

  private QueueUserACLInfo getUserAclInfo(
      UserGroupInformation user) {
    readLock.lock();
    try {
      QueueUserACLInfo userAclInfo = recordFactory.newRecordInstance(
          QueueUserACLInfo.class);
      List<QueueACL> operations = new ArrayList<QueueACL>();
      for (QueueACL operation : QueueACL.values()) {
        if (hasAccess(operation, user)) {
          operations.add(operation);
        }
      }

      userAclInfo.setQueueName(getQueuePath());
      userAclInfo.setUserAcls(operations);
      return userAclInfo;
    } finally {
      readLock.unlock();
    }

  }

  @Override
  public List<QueueUserACLInfo> getQueueUserAclInfo(
      UserGroupInformation user) {
    readLock.lock();
    try {
      List<QueueUserACLInfo> userAcls = new ArrayList<>();

      // Add parent queue acls
      userAcls.add(getUserAclInfo(user));

      // Add children queue acls
      for (CSQueue child : childQueues) {
        userAcls.addAll(child.getQueueUserAclInfo(user));
      }

      return userAcls;
    } finally {
      readLock.unlock();
    }

  }

  public String toString() {
    return getQueueName() + ": " +
        "numChildQueue= " + childQueues.size() + ", " +
        getCapacityOrWeightString() + ", " +
        "absoluteCapacity=" + queueCapacities.getAbsoluteCapacity() + ", " +
        "usedResources=" + usageTracker.getQueueUsage().getUsed() + ", " +
        "usedCapacity=" + getUsedCapacity() + ", " +
        "numApps=" + getNumApplications() + ", " +
        "numContainers=" + getNumContainers();
  }

  public CSQueue createNewQueue(String childQueuePath, boolean isLeaf)
      throws SchedulerDynamicEditException {
    try {
      AbstractCSQueue childQueue;
      String queueShortName = childQueuePath.substring(
          childQueuePath.lastIndexOf(".") + 1);

      if (isLeaf) {
        childQueue = new LeafQueue(queueContext,
            queueShortName, this, null, true);
      } else {
        childQueue = new ParentQueue(queueContext, queueShortName, this, null, true);
      }
      childQueue.setDynamicQueue(true);
      // It should be sufficient now, we don't need to set more, because weights
      // related setup will be handled in updateClusterResources

      return childQueue;
    } catch (IOException e) {
      throw new SchedulerDynamicEditException(e.toString());
    }
  }

  // New method to remove child queue
  public void removeChildQueue(CSQueue queue)
      throws SchedulerDynamicEditException {
    writeLock.lock();
    try {
      if (!(queue instanceof AbstractCSQueue) ||
          !((AbstractCSQueue) queue).isDynamicQueue()) {
        throw new SchedulerDynamicEditException("Queue " + getQueuePath()
            + " can not remove " + queue.getQueuePath() +
            " because it is not a dynamic queue");
      }

      // We need to check if the parent of the child queue is exactly this
      // ParentQueue object
      if (queue.getParent() != this) {
        throw new SchedulerDynamicEditException("Queue " + getQueuePath()
            + " can not remove " + queue.getQueuePath() +
            " because it has a different parent queue");
      }

      // Now we can do remove and update
      this.childQueues.remove(queue);
      queueContext.getQueueManager()
          .removeQueue(queue.getQueuePath());

      // Call updateClusterResource,
      // which will deal with all effectiveMin/MaxResource
      // Calculation
      this.updateClusterResource(queueContext.getClusterResource(),
          new ResourceLimits(queueContext.getClusterResource()));

    } finally {
      writeLock.unlock();
    }
  }

  /**
   * Check whether this queue supports adding additional child queues
   * dynamically.
   * @return true, if queue is eligible to create additional queues dynamically,
   * false otherwise
   */
  public boolean isEligibleForAutoQueueCreation() {
    return isDynamicQueue() || queueContext.getConfiguration().
        isAutoQueueCreationV2Enabled(getQueuePathObject());
  }
  /**
   * Check whether this queue supports legacy(v1) dynamic child queue creation.
   * @return true if queue is eligible to create child queues dynamically using
   * the legacy system, false otherwise
   */
  public boolean isEligibleForLegacyAutoQueueCreation() {
    return isDynamicQueue() || queueContext.getConfiguration().
        isAutoCreateChildQueueEnabled(getQueuePathObject());
  }

  @Override
  public void reinitialize(CSQueue newlyParsedQueue,
        Resource clusterResource) throws IOException {
    writeLock.lock();
    try {
      // We skip reinitialize for dynamic queues, when this is called, and
      // new queue is different from this queue, we will make this queue to be
      // static queue.
      if (newlyParsedQueue != this) {
        this.setDynamicQueue(false);
      }

      // Sanity check
      if (!(newlyParsedQueue instanceof AbstractParentQueue) || !newlyParsedQueue
          .getQueuePath().equals(getQueuePath())) {
        throw new IOException(
            "Trying to reinitialize " + getQueuePath() + " from "
                + newlyParsedQueue.getQueuePath());
      }

      AbstractParentQueue newlyParsedParentQueue = (AbstractParentQueue) newlyParsedQueue;

      // Set new configs
      setupQueueConfigs(clusterResource);

      // Re-configure existing child queues and add new ones
      // The CS has already checked to ensure all existing child queues are present!
      Map<String, CSQueue> currentChildQueues = getQueuesMap(childQueues);
      Map<String, CSQueue> newChildQueues = getQueuesMap(
          newlyParsedParentQueue.childQueues);

      // Reinitialize dynamic queues as well, because they are not parsed
      for (String queue : Sets.difference(currentChildQueues.keySet(),
          newChildQueues.keySet())) {
        CSQueue candidate = currentChildQueues.get(queue);
        if (candidate instanceof AbstractCSQueue) {
          if (((AbstractCSQueue) candidate).isDynamicQueue()) {
            candidate.reinitialize(candidate, clusterResource);
          }
        }
      }

      for (Map.Entry<String, CSQueue> e : newChildQueues.entrySet()) {
        String newChildQueueName = e.getKey();
        CSQueue newChildQueue = e.getValue();

        CSQueue childQueue = currentChildQueues.get(newChildQueueName);

        // Check if the child-queue already exists
        if (childQueue != null) {
          // Check if the child-queue has been converted into parent queue or
          // parent Queue has been converted to child queue. The CS has already
          // checked to ensure that this child-queue is in STOPPED state if
          // Child queue has been converted to ParentQueue.
          if ((childQueue instanceof AbstractLeafQueue
              && newChildQueue instanceof AbstractParentQueue)
              || (childQueue instanceof AbstractParentQueue
                  && newChildQueue instanceof AbstractLeafQueue)) {
            // We would convert this LeafQueue to ParentQueue, or vice versa.
            // consider this as the combination of DELETE then ADD.
            newChildQueue.setParent(this);
            currentChildQueues.put(newChildQueueName, newChildQueue);
            // inform CapacitySchedulerQueueManager
            CapacitySchedulerQueueManager queueManager =
                queueContext.getQueueManager();
            queueManager.addQueue(newChildQueueName, newChildQueue);
            continue;
          }
          // Re-init existing queues
          childQueue.reinitialize(newChildQueue, clusterResource);
          LOG.info(getQueuePath() + ": re-configured queue: " + childQueue);
        } else{
          // New child queue, do not re-init

          // Set parent to 'this'
          newChildQueue.setParent(this);

          // Save in list of current child queues
          currentChildQueues.put(newChildQueueName, newChildQueue);

          LOG.info(
              getQueuePath() + ": added new child queue: " + newChildQueue);
        }
      }

      // remove the deleted queue in the refreshed xml.
      for (Iterator<Map.Entry<String, CSQueue>> itr = currentChildQueues
          .entrySet().iterator(); itr.hasNext();) {
        Map.Entry<String, CSQueue> e = itr.next();
        String queueName = e.getKey();
        if (!newChildQueues.containsKey(queueName)) {
          if (((AbstractCSQueue)e.getValue()).isDynamicQueue()) {
            // Don't remove dynamic queue if we cannot find it in the config.
            continue;
          }
          itr.remove();
        }
      }

      // Re-sort all queues
      setChildQueues(currentChildQueues.values());

      // Make sure we notifies QueueOrderingPolicy
      queueOrderingPolicy.setQueues(childQueues);
    } finally {
      writeLock.unlock();
    }
  }

  private Map<String, CSQueue> getQueuesMap(List<CSQueue> queues) {
    Map<String, CSQueue> queuesMap = new HashMap<String, CSQueue>();
    for (CSQueue queue : queues) {
      queuesMap.put(queue.getQueuePath(), queue);
    }
    return queuesMap;
  }

  @Override
  public void submitApplication(ApplicationId applicationId, String user,
      String queue) throws AccessControlException {
    writeLock.lock();
    try {
      // Sanity check
      validateSubmitApplication(applicationId, user, queue);

      addApplication(applicationId, user);
    } finally {
      writeLock.unlock();
    }

    // Inform the parent queue
    if (parent != null) {
      try {
        parent.submitApplication(applicationId, user, queue);
      } catch (AccessControlException ace) {
        LOG.info("Failed to submit application to parent-queue: " +
            parent.getQueuePath(), ace);
        removeApplication(applicationId, user);
        throw ace;
      }
    }
  }

  public void validateSubmitApplication(ApplicationId applicationId,
      String userName, String queue) throws AccessControlException {
    writeLock.lock();
    try {
      if (queue.equals(getQueueName())) {
        throw new AccessControlException(
            "Cannot submit application " + "to non-leaf queue: " + getQueueName());
      }

      if (getState() != QueueState.RUNNING) {
        throw new AccessControlException("Queue " + getQueuePath()
            + " is STOPPED. Cannot accept submission of application: "
            + applicationId);
      }
    } finally {
      writeLock.unlock();
    }
  }

  @Override
  public void submitApplicationAttempt(FiCaSchedulerApp application,
      String userName) {
    // submit attempt logic.
  }

  @Override
  public void submitApplicationAttempt(FiCaSchedulerApp application,
      String userName, boolean isMoveApp) {
    throw new UnsupportedOperationException("Submission of application attempt"
        + " to parent queue is not supported");
  }

  @Override
  public void finishApplicationAttempt(FiCaSchedulerApp application,
      String queue) {
    // finish attempt logic.
  }

  private void addApplication(ApplicationId applicationId,
      String user) {
    numApplications.incrementAndGet();

    LOG.info(
        "Application added -" + " appId: " + applicationId + " user: " + user
            + " leaf-queue of parent: " + getQueuePath() + " #applications: "
            + getNumApplications());
  }

  @Override
  public void finishApplication(ApplicationId application, String user) {

    removeApplication(application, user);

    appFinished();

    // Inform the parent queue
    if (parent != null) {
      parent.finishApplication(application, user);
    }
  }

  private void removeApplication(ApplicationId applicationId,
      String user) {
    numApplications.decrementAndGet();

    LOG.info("Application removed -" + " appId: " + applicationId + " user: "
        + user + " leaf-queue of parent: " + getQueuePath()
        + " #applications: " + getNumApplications());
  }

  private String getParentName() {
    return parent != null ? parent.getQueuePath() : "";
  }

  @Override
  public CSAssignment assignContainers(Resource clusterResource,
       CandidateNodeSet<FiCaSchedulerNode> candidates,
       ResourceLimits resourceLimits, SchedulingMode schedulingMode) {
    FiCaSchedulerNode node = CandidateNodeSetUtils.getSingleNode(candidates);

    // if our queue cannot access this node, just return
    if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY
        && !queueNodeLabelsSettings.isAccessibleToPartition(candidates.getPartition())) {
      if (LOG.isDebugEnabled()) {
        long now = System.currentTimeMillis();
        // Do logging every 1 sec to avoid excessive logging.
        if (now - this.lastSkipQueueDebugLoggingTimestamp > 1000) {
          LOG.debug("Skip this queue=" + getQueuePath()
              + ", because it is not able to access partition=" + candidates
              .getPartition());
          this.lastSkipQueueDebugLoggingTimestamp = now;
        }
      }

      ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
          getParentName(), getQueuePath(), ActivityState.REJECTED,
          ActivityDiagnosticConstant.QUEUE_NOT_ABLE_TO_ACCESS_PARTITION);
      if (rootQueue) {
        ActivitiesLogger.NODE.finishSkippedNodeAllocation(activitiesManager,
            node);
      }

      return CSAssignment.NULL_ASSIGNMENT;
    }

    // Check if this queue need more resource, simply skip allocation if this
    // queue doesn't need more resources.
    if (!super.hasPendingResourceRequest(candidates.getPartition(),
        clusterResource, schedulingMode)) {
      if (LOG.isDebugEnabled()) {
        long now = System.currentTimeMillis();
        // Do logging every 1 sec to avoid excessive logging.
        if (now - this.lastSkipQueueDebugLoggingTimestamp > 1000) {
          LOG.debug("Skip this queue=" + getQueuePath()
              + ", because it doesn't need more resource, schedulingMode="
              + schedulingMode.name() + " node-partition=" + candidates
              .getPartition());
          this.lastSkipQueueDebugLoggingTimestamp = now;
        }
      }

      ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
          getParentName(), getQueuePath(), ActivityState.SKIPPED,
          ActivityDiagnosticConstant.QUEUE_DO_NOT_NEED_MORE_RESOURCE);
      if (rootQueue) {
        ActivitiesLogger.NODE.finishSkippedNodeAllocation(activitiesManager,
            node);
      }

      return CSAssignment.NULL_ASSIGNMENT;
    }

    CSAssignment assignment = new CSAssignment(Resources.createResource(0, 0),
        NodeType.NODE_LOCAL);

    while (canAssign(clusterResource, node)) {
      LOG.debug("Trying to assign containers to child-queue of {}",
          getQueuePath());

      // Are we over maximum-capacity for this queue?
      // This will also consider parent's limits and also continuous reservation
      // looking
      if (!super.canAssignToThisQueue(clusterResource,
          candidates.getPartition(),
          resourceLimits, Resources
              .createResource(getMetrics().getReservedMB(),
                  getMetrics().getReservedVirtualCores()), schedulingMode)) {

        ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
            getParentName(), getQueuePath(), ActivityState.REJECTED,
            ActivityDiagnosticConstant.QUEUE_HIT_MAX_CAPACITY_LIMIT);
        if (rootQueue) {
          ActivitiesLogger.NODE.finishSkippedNodeAllocation(activitiesManager,
              node);
        }

        break;
      }

      // Schedule
      CSAssignment assignedToChild = assignContainersToChildQueues(
          clusterResource, candidates, resourceLimits, schedulingMode);
      assignment.setType(assignedToChild.getType());
      assignment.setRequestLocalityType(
          assignedToChild.getRequestLocalityType());
      assignment.setExcessReservation(assignedToChild.getExcessReservation());
      assignment.setContainersToKill(assignedToChild.getContainersToKill());
      assignment.setFulfilledReservation(
          assignedToChild.isFulfilledReservation());
      assignment.setFulfilledReservedContainer(
          assignedToChild.getFulfilledReservedContainer());

      // Done if no child-queue assigned anything
      if (Resources.greaterThan(resourceCalculator, clusterResource,
          assignedToChild.getResource(), Resources.none())) {

        ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
            getParentName(), getQueuePath(), ActivityState.ACCEPTED,
            ActivityDiagnosticConstant.EMPTY);

        boolean isReserved =
            assignedToChild.getAssignmentInformation().getReservationDetails()
                != null && !assignedToChild.getAssignmentInformation()
                .getReservationDetails().isEmpty();
        if (rootQueue) {
          ActivitiesLogger.NODE.finishAllocatedNodeAllocation(
              activitiesManager, node,
              assignedToChild.getAssignmentInformation()
                  .getFirstAllocatedOrReservedContainerId(),
              isReserved ?
                  AllocationState.RESERVED : AllocationState.ALLOCATED);
        }

        // Track resource utilization in this pass of the scheduler
        Resources.addTo(assignment.getResource(),
            assignedToChild.getResource());
        Resources.addTo(assignment.getAssignmentInformation().getAllocated(),
            assignedToChild.getAssignmentInformation().getAllocated());
        Resources.addTo(assignment.getAssignmentInformation().getReserved(),
            assignedToChild.getAssignmentInformation().getReserved());
        assignment.getAssignmentInformation().incrAllocations(
            assignedToChild.getAssignmentInformation().getNumAllocations());
        assignment.getAssignmentInformation().incrReservations(
            assignedToChild.getAssignmentInformation().getNumReservations());
        assignment.getAssignmentInformation().getAllocationDetails().addAll(
            assignedToChild.getAssignmentInformation()
                .getAllocationDetails());
        assignment.getAssignmentInformation().getReservationDetails().addAll(
            assignedToChild.getAssignmentInformation()
                .getReservationDetails());
        assignment.setIncreasedAllocation(
            assignedToChild.isIncreasedAllocation());

        if (LOG.isDebugEnabled()) {
          LOG.debug("assignedContainer reserved=" + isReserved + " queue="
              + getQueuePath() + " usedCapacity=" + getUsedCapacity()
              + " absoluteUsedCapacity=" + getAbsoluteUsedCapacity() + " used="
              + usageTracker.getQueueUsage().getUsed() + " cluster=" + clusterResource);

          LOG.debug(
              "ParentQ=" + getQueuePath() + " assignedSoFarInThisIteration="
                  + assignment.getResource() + " usedCapacity="
                  + getUsedCapacity() + " absoluteUsedCapacity="
                  + getAbsoluteUsedCapacity());
        }
      } else{
        assignment.setSkippedType(assignedToChild.getSkippedType());

        ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
            getParentName(), getQueuePath(), ActivityState.SKIPPED,
            ActivityDiagnosticConstant.EMPTY);
        if (rootQueue) {
          ActivitiesLogger.NODE.finishSkippedNodeAllocation(activitiesManager,
              node);
        }

        break;
      }

      /*
       * Previously here, we can allocate more than one container for each
       * allocation under rootQ. Now this logic is not proper any more
       * in global scheduling world.
       *
       * So here do not try to allocate more than one container for each
       * allocation, let top scheduler make the decision.
       */
      break;
    }

    return assignment;
  }

  private boolean canAssign(Resource clusterResource, FiCaSchedulerNode node) {
    // When node == null means global scheduling is enabled, always return true
    if (null == node) {
      return true;
    }

    // Two conditions need to meet when trying to allocate:
    // 1) Node doesn't have reserved container
    // 2) Node's available-resource + killable-resource should > 0
    boolean accept = node.getReservedContainer() == null &&
        Resources.fitsIn(resourceCalculator, queueAllocationSettings.getMinimumAllocation(),
            Resources.add(node.getUnallocatedResource(), node.getTotalKillableResources()));
    if (!accept) {
      ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
          getParentName(), getQueuePath(), ActivityState.REJECTED,
          () -> node.getReservedContainer() != null ?
              ActivityDiagnosticConstant.
                  QUEUE_SKIPPED_BECAUSE_SINGLE_NODE_RESERVED :
              ActivityDiagnosticConstant.
                  QUEUE_SKIPPED_BECAUSE_SINGLE_NODE_RESOURCE_INSUFFICIENT);
      if (rootQueue) {
        ActivitiesLogger.NODE.finishSkippedNodeAllocation(activitiesManager,
            node);
      }
    }
    return accept;
  }

  public ResourceLimits getResourceLimitsOfChild(CSQueue child,
      Resource clusterResource, ResourceLimits parentLimits,
      String nodePartition, boolean netLimit) {
    // Set resource-limit of a given child, child.limit =
    // min(my.limit - my.used + child.used, child.max)

    // First, cap parent limit by parent's max
    parentLimits.setLimit(Resources.min(resourceCalculator, clusterResource,
        parentLimits.getLimit(),
        usageTracker.getQueueResourceQuotas().getEffectiveMaxResource(nodePartition)));

    // Parent available resource = parent-limit - parent-used-resource
    Resource limit = parentLimits.getLimit();
    if (netLimit) {
      limit = parentLimits.getNetLimit();
    }
    Resource parentMaxAvailableResource = Resources.subtract(
        limit, usageTracker.getQueueUsage().getUsed(nodePartition));

    // Deduct killable from used
    Resources.addTo(parentMaxAvailableResource,
        getTotalKillableResource(nodePartition));

    // Child's limit = parent-available-resource + child-used
    Resource childLimit = Resources.add(parentMaxAvailableResource,
        child.getQueueResourceUsage().getUsed(nodePartition));

    // Normalize before return
    childLimit =
        Resources.roundDown(resourceCalculator, childLimit,
            queueAllocationSettings.getMinimumAllocation());

    return new ResourceLimits(childLimit);
  }

  private Iterator<CSQueue> sortAndGetChildrenAllocationIterator(
      String partition) {
    return queueOrderingPolicy.getAssignmentIterator(partition);
  }

  private CSAssignment assignContainersToChildQueues(Resource cluster,
      CandidateNodeSet<FiCaSchedulerNode> candidates, ResourceLimits limits,
      SchedulingMode schedulingMode) {
    CSAssignment assignment = CSAssignment.NULL_ASSIGNMENT;

    printChildQueues();

    // Try to assign to most 'under-served' sub-queue
    for (Iterator<CSQueue> iter = sortAndGetChildrenAllocationIterator(
        candidates.getPartition()); iter.hasNext(); ) {
      CSQueue childQueue = iter.next();
      LOG.debug("Trying to assign to queue: {} stats: {}",
          childQueue.getQueuePath(), childQueue);

      // Get ResourceLimits of child queue before assign containers
      ResourceLimits childLimits =
          getResourceLimitsOfChild(childQueue, cluster, limits,
              candidates.getPartition(), true);

      CSAssignment childAssignment = childQueue.assignContainers(cluster,
          candidates, childLimits, schedulingMode);
      if(LOG.isDebugEnabled()) {
        LOG.debug("Assigned to queue: " + childQueue.getQueuePath() +
            " stats: " + childQueue + " --> " +
            childAssignment.getResource() + ", " + childAssignment.getType());
      }

      if (Resources.greaterThan(
              resourceCalculator, cluster,
              childAssignment.getResource(), Resources.none())) {
        assignment = childAssignment;
        break;
      } else if (childAssignment.getSkippedType() ==
          CSAssignment.SkippedType.QUEUE_LIMIT) {
        if (assignment.getSkippedType() !=
            CSAssignment.SkippedType.QUEUE_LIMIT) {
          assignment = childAssignment;
        }
        Resource blockedHeadroom = null;
        if (childQueue instanceof AbstractLeafQueue) {
          blockedHeadroom = childLimits.getHeadroom();
        } else {
          blockedHeadroom = childLimits.getBlockedHeadroom();
        }
        Resource resourceToSubtract = Resources.max(resourceCalculator,
            cluster, blockedHeadroom, Resources.none());
        limits.addBlockedHeadroom(resourceToSubtract);
        if(LOG.isDebugEnabled()) {
          LOG.debug("Decrease parentLimits " + limits.getLimit() +
              " for " + this.getQueuePath() + " by " +
              resourceToSubtract + " as childQueue=" +
              childQueue.getQueuePath() + " is blocked");
        }
      }
    }

    return assignment;
  }

  String getChildQueuesToPrint() {
    StringBuilder sb = new StringBuilder();
    for (CSQueue q : childQueues) {
      sb.append(q.getQueuePath() +
          " usedCapacity=(" + q.getUsedCapacity() + "), " +
          " label=("
          + StringUtils.join(q.getAccessibleNodeLabels().iterator(), ",")
          + ")");
    }
    return sb.toString();
  }

  private void printChildQueues() {
    if (LOG.isDebugEnabled()) {
      LOG.debug("printChildQueues - queue: " + getQueuePath()
          + " child-queues: " + getChildQueuesToPrint());
    }
  }

  private void internalReleaseResource(Resource clusterResource,
      FiCaSchedulerNode node, Resource releasedResource) {
    writeLock.lock();
    try {
      super.releaseResource(clusterResource, releasedResource,
          node.getPartition());

      LOG.debug("completedContainer {}, cluster={}", this, clusterResource);

    } finally {
      writeLock.unlock();
    }
  }

  @Override
  public void completedContainer(Resource clusterResource,
      FiCaSchedulerApp application, FiCaSchedulerNode node,
      RMContainer rmContainer, ContainerStatus containerStatus,
      RMContainerEventType event, CSQueue completedChildQueue,
      boolean sortQueues) {
    if (application != null) {
      internalReleaseResource(clusterResource, node,
          rmContainer.getContainer().getResource());

      // Inform the parent
      if (parent != null) {
        // complete my parent
        parent.completedContainer(clusterResource, application,
            node, rmContainer, null, event, this, sortQueues);
      }
    }
  }

  @Override
  public void refreshAfterResourceCalculation(Resource clusterResource,
      ResourceLimits resourceLimits) {
    CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource,
        this, labelManager, null);
    // Update configured capacity/max-capacity for default partition only
    CSQueueUtils.updateConfiguredCapacityMetrics(resourceCalculator,
        labelManager.getResourceByLabel(null, clusterResource),
        RMNodeLabelsManager.NO_LABEL, this);

    LOG.info("Refresh after resource calculation (PARENT) {}\n"
            + "effectiveMinResource = {}\n"
            + "effectiveMaxResource = {}\n"
            + "capacity = {}\n"
            + "maxCapacity = {}\n"
            + "absoluteCapacity = {}\n"
            + "absoluteMaxCapacity = {}",
        queuePath,
        getEffectiveCapacity(NO_LABEL),
        getEffectiveMaxCapacity(NO_LABEL),
        getCapacity(),
        getMaximumCapacity(),
        getAbsoluteCapacity(),
        getAbsoluteMaximumCapacity());
  }

  @Override
  public void updateClusterResource(Resource clusterResource,
      ResourceLimits resourceLimits) {
    if (queueContext.getConfiguration().isLegacyQueueMode()) {
      updateClusterResourceLegacyMode(clusterResource, resourceLimits);
      return;
    }

    CapacitySchedulerQueueCapacityHandler handler =
        queueContext.getQueueManager().getQueueCapacityHandler();
    if (rootQueue) {
      handler.updateRoot(this, clusterResource);
      handler.updateChildren(clusterResource, this);
    } else {
      handler.updateChildren(clusterResource, getParent());
    }
  }

  public void updateClusterResourceLegacyMode(Resource clusterResource,
                                              ResourceLimits resourceLimits) {
    writeLock.lock();
    try {
      // Special handle root queue
      if (rootQueue) {
        for (String nodeLabel : queueCapacities.getExistingNodeLabels()) {
          if (queueCapacities.getWeight(nodeLabel) > 0) {
            queueCapacities.setNormalizedWeight(nodeLabel, 1f);
          }
        }
      }

      // Update absolute capacities of this queue, this need to happen before
      // below calculation for effective capacities
      updateAbsoluteCapacities();

      // Normalize all dynamic queue queue's weight to 1 for all accessible node
      // labels, this is important because existing node labels could keep
      // changing when new node added, or node label mapping changed. We need
      // this to ensure auto created queue can access all labels.
      for (String nodeLabel : queueCapacities.getExistingNodeLabels()) {
        for (CSQueue queue : childQueues) {
          // For dynamic queue, we will set weight to 1 every time, because it
          // is possible new labels added to the parent.
          if (((AbstractCSQueue) queue).isDynamicQueue()) {
            if (queue.getQueueCapacities().getWeight(nodeLabel) == -1f) {
              queue.getQueueCapacities().setWeight(nodeLabel, 1f);
            }
          }
        }
      }

      // Normalize weight of children
      if (getCapacityConfigurationTypeForQueues(childQueues)
          == QueueCapacityType.WEIGHT) {
        for (String nodeLabel : queueCapacities.getExistingNodeLabels()) {
          float sumOfWeight = 0;

          for (CSQueue queue : childQueues) {
            if (queue.getQueueCapacities().getExistingNodeLabels()
                .contains(nodeLabel)) {
              float weight = Math.max(0,
                  queue.getQueueCapacities().getWeight(nodeLabel));
              sumOfWeight += weight;
            }
          }
          // When sum of weight == 0, skip setting normalized_weight (so
          // normalized weight will be 0).
          if (Math.abs(sumOfWeight) > 1e-6) {
            for (CSQueue queue : childQueues) {
              if (queue.getQueueCapacities().getExistingNodeLabels()
                  .contains(nodeLabel)) {
                queue.getQueueCapacities().setNormalizedWeight(nodeLabel,
                    queue.getQueueCapacities().getWeight(nodeLabel) /
                        sumOfWeight);
              }
            }
          }
        }
      }

      // Update effective capacity in all parent queue.
      for (String label : queueNodeLabelsSettings.getConfiguredNodeLabels()) {
        calculateEffectiveResourcesAndCapacity(label, clusterResource);
      }

      // Update all children
      for (CSQueue childQueue : childQueues) {
        // Get ResourceLimits of child queue before assign containers
        ResourceLimits childLimits = getResourceLimitsOfChild(childQueue,
            clusterResource, resourceLimits,
            RMNodeLabelsManager.NO_LABEL, false);
        childQueue.updateClusterResource(clusterResource, childLimits);
      }

      CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource,
          this, labelManager, null);
      // Update configured capacity/max-capacity for default partition only
      CSQueueUtils.updateConfiguredCapacityMetrics(resourceCalculator,
          labelManager.getResourceByLabel(null, clusterResource),
          RMNodeLabelsManager.NO_LABEL, this);
    } catch (IOException e) {
      LOG.error("Error during updating cluster resource: ", e);
      throw new YarnRuntimeException("Fatal issue during scheduling", e);
    } finally {
      writeLock.unlock();
    }
  }

  @Override
  public boolean hasChildQueues() {
    return true;
  }

  private void calculateEffectiveResourcesAndCapacity(String label,
      Resource clusterResource) {
    // Update effective resources for my self;
    if (rootQueue) {
      Resource resourceByLabel = labelManager.getResourceByLabel(label, clusterResource);
      usageTracker.getQueueResourceQuotas().setEffectiveMinResource(label, resourceByLabel);
      usageTracker.getQueueResourceQuotas().setEffectiveMaxResource(label, resourceByLabel);
    } else {
      super.updateEffectiveResources(clusterResource);
    }

    recalculateEffectiveMinRatio(label, clusterResource);
  }

  private void recalculateEffectiveMinRatio(String label, Resource clusterResource) {
    // For root queue, ensure that max/min resource is updated to latest
    // cluster resource.
    Resource resourceByLabel = labelManager.getResourceByLabel(label, clusterResource);

    // Total configured min resources of direct children of this given parent queue
    Resource configuredMinResources = Resource.newInstance(0L, 0);
    for (CSQueue childQueue : getChildQueues()) {
      Resources.addTo(configuredMinResources,
          childQueue.getQueueResourceQuotas().getConfiguredMinResource(label));
    }

    // Factor to scale down effective resource: When cluster has sufficient
    // resources, effective_min_resources will be same as configured min_resources.
    Resource numeratorForMinRatio = null;
    if (getQueuePath().equals("root")) {
      if (!resourceByLabel.equals(Resources.none()) && Resources.lessThan(resourceCalculator,
          clusterResource, resourceByLabel, configuredMinResources)) {
        numeratorForMinRatio = resourceByLabel;
      }
    } else {
      if (Resources.lessThan(resourceCalculator, clusterResource,
          usageTracker.getQueueResourceQuotas().getEffectiveMinResource(label),
          configuredMinResources)) {
        numeratorForMinRatio = usageTracker.getQueueResourceQuotas().getEffectiveMinResource(label);
      }
    }

    effectiveMinResourceRatio.put(label, getEffectiveMinRatio(
        configuredMinResources, numeratorForMinRatio));
  }

  private Map<String, Float> getEffectiveMinRatio(
      Resource configuredMinResources, Resource numeratorForMinRatio) {
    Map<String, Float> effectiveMinRatioPerResource = new HashMap<>();
    if (numeratorForMinRatio != null) {
      int maxLength = ResourceUtils.getNumberOfCountableResourceTypes();
      for (int i = 0; i < maxLength; i++) {
        ResourceInformation nResourceInformation = numeratorForMinRatio
            .getResourceInformation(i);
        ResourceInformation dResourceInformation = configuredMinResources
            .getResourceInformation(i);

        long nValue = nResourceInformation.getValue();
        long dValue = UnitsConversionUtil.convert(
            dResourceInformation.getUnits(), nResourceInformation.getUnits(),
            dResourceInformation.getValue());
        if (dValue != 0) {
          effectiveMinRatioPerResource.put(nResourceInformation.getName(),
              (float) nValue / dValue);
        }
      }
    }
    return ImmutableMap.copyOf(effectiveMinRatioPerResource);
  }

  @Override
  public List<CSQueue> getChildQueues() {
    readLock.lock();
    try {
      return new ArrayList<CSQueue>(childQueues);
    } finally {
      readLock.unlock();
    }

  }

  @Override
  public List<CSQueue> getChildQueuesByTryLock() {
    try {
      while (!readLock.tryLock()){
        LockSupport.parkNanos(10000);
      }
      return new ArrayList<>(childQueues);
    } finally {
      readLock.unlock();
    }
  }

  @Override
  public void recoverContainer(Resource clusterResource,
      SchedulerApplicationAttempt attempt, RMContainer rmContainer) {
    if (rmContainer.getState().equals(RMContainerState.COMPLETED)) {
      return;
    }
    if (rmContainer.getExecutionType() != ExecutionType.GUARANTEED) {
      return;
    }

    // Careful! Locking order is important!
    writeLock.lock();
    try {
      FiCaSchedulerNode node = queueContext.getNode(
          rmContainer.getContainer().getNodeId());
      allocateResource(clusterResource,
          rmContainer.getContainer().getResource(), node.getPartition());
    } finally {
      writeLock.unlock();
    }

    if (parent != null) {
      parent.recoverContainer(clusterResource, attempt, rmContainer);
    }
  }

  @Override
  public ActiveUsersManager getAbstractUsersManager() {
    // Should never be called since all applications are submitted to LeafQueues
    return null;
  }

  @Override
  public void collectSchedulerApplications(
      Collection<ApplicationAttemptId> apps) {
    readLock.lock();
    try {
      for (CSQueue queue : childQueues) {
        queue.collectSchedulerApplications(apps);
      }
    } finally {
      readLock.unlock();
    }

  }

  @Override
  public void attachContainer(Resource clusterResource,
      FiCaSchedulerApp application, RMContainer rmContainer) {
    if (application != null) {
      FiCaSchedulerNode node =
          queueContext.getNode(rmContainer.getContainer().getNodeId());
      allocateResource(clusterResource, rmContainer.getContainer()
          .getResource(), node.getPartition());
      LOG.info("movedContainer" + " queueMoveIn=" + getQueuePath()
          + " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity="
          + getAbsoluteUsedCapacity() + " used=" + usageTracker.getQueueUsage().getUsed() +
          " cluster=" + clusterResource);
      // Inform the parent
      if (parent != null) {
        parent.attachContainer(clusterResource, application, rmContainer);
      }
    }
  }

  @Override
  public void detachContainer(Resource clusterResource,
      FiCaSchedulerApp application, RMContainer rmContainer) {
    if (application != null) {
      FiCaSchedulerNode node =
          queueContext.getNode(rmContainer.getContainer().getNodeId());
      super.releaseResource(clusterResource,
          rmContainer.getContainer().getResource(),
          node.getPartition());
      LOG.info("movedContainer" + " queueMoveOut=" + getQueuePath()
          + " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity="
          + getAbsoluteUsedCapacity() + " used=" + usageTracker.getQueueUsage().getUsed() +
          " cluster=" + clusterResource);
      // Inform the parent
      if (parent != null) {
        parent.detachContainer(clusterResource, application, rmContainer);
      }
    }
  }

  public int getNumApplications() {
    return numApplications.get();
  }

  void allocateResource(Resource clusterResource,
      Resource resource, String nodePartition) {
    writeLock.lock();
    try {
      super.allocateResource(clusterResource, resource, nodePartition);

      /**
       * check if we need to kill (killable) containers if maximum resource violated.
       * Doing this because we will deduct killable resource when going from root.
       * For example:
       * <pre>
       *      Root
       *      /   \
       *     a     b
       *   /  \
       *  a1  a2
       * </pre>
       *
       * a: max=10G, used=10G, killable=2G
       * a1: used=8G, killable=2G
       * a2: used=2G, pending=2G, killable=0G
       *
       * When we get queue-a to allocate resource, even if queue-a
       * reaches its max resource, we deduct its used by killable, so we can allocate
       * at most 2G resources. ResourceLimits passed down to a2 has headroom set to 2G.
       *
       * If scheduler finds a 2G available resource in existing cluster, and assigns it
       * to a2, now a2's used= 2G + 2G = 4G, and a's used = 8G + 4G = 12G > 10G
       *
       * When this happens, we have to preempt killable container (on same or different
       * nodes) of parent queue to avoid violating parent's max resource.
       */
      if (!usageTracker.getQueueResourceQuotas().getEffectiveMaxResource(nodePartition)
          .equals(Resources.none())) {
        if (Resources.lessThan(resourceCalculator, clusterResource,
            usageTracker.getQueueResourceQuotas().getEffectiveMaxResource(nodePartition),
            usageTracker.getQueueUsage().getUsed(nodePartition))) {
          killContainersToEnforceMaxQueueCapacity(nodePartition,
              clusterResource);
        }
      } else {
        if (getQueueCapacities()
            .getAbsoluteMaximumCapacity(nodePartition) < getQueueCapacities()
                .getAbsoluteUsedCapacity(nodePartition)) {
          killContainersToEnforceMaxQueueCapacity(nodePartition,
              clusterResource);
        }
      }
    } finally {
      writeLock.unlock();
    }
  }

  private void killContainersToEnforceMaxQueueCapacity(String partition,
      Resource clusterResource) {
    Iterator<RMContainer> killableContainerIter = getKillableContainers(
        partition);
    if (!killableContainerIter.hasNext()) {
      return;
    }

    Resource partitionResource = labelManager.getResourceByLabel(partition,
        null);
    Resource maxResource = getEffectiveMaxCapacity(partition);

    while (Resources.greaterThan(resourceCalculator, partitionResource,
        usageTracker.getQueueUsage().getUsed(partition), maxResource)) {
      RMContainer toKillContainer = killableContainerIter.next();
      FiCaSchedulerApp attempt = queueContext.getApplicationAttempt(
          toKillContainer.getContainerId().getApplicationAttemptId());
      FiCaSchedulerNode node = queueContext.getNode(
          toKillContainer.getAllocatedNode());
      if (null != attempt && null != node) {
        AbstractLeafQueue lq = attempt.getCSLeafQueue();
        lq.completedContainer(clusterResource, attempt, node, toKillContainer,
            SchedulerUtils.createPreemptedContainerStatus(
                toKillContainer.getContainerId(),
                SchedulerUtils.PREEMPTED_CONTAINER), RMContainerEventType.KILL,
            null, false);
        LOG.info("Killed container=" + toKillContainer.getContainerId()
            + " from queue=" + lq.getQueuePath() + " to make queue=" + this
            .getQueuePath() + "'s max-capacity enforced");
      }

      if (!killableContainerIter.hasNext()) {
        break;
      }
    }
  }

  public void apply(Resource cluster,
      ResourceCommitRequest<FiCaSchedulerApp, FiCaSchedulerNode> request) {
    if (request.anythingAllocatedOrReserved()) {
      ContainerAllocationProposal<FiCaSchedulerApp, FiCaSchedulerNode>
          allocation = request.getFirstAllocatedOrReservedContainer();
      SchedulerContainer<FiCaSchedulerApp, FiCaSchedulerNode>
          schedulerContainer = allocation.getAllocatedOrReservedContainer();

      // Do not modify queue when allocation from reserved container
      if (allocation.getAllocateFromReservedContainer() == null) {
        writeLock.lock();
        try {
          // Book-keeping
          // Note: Update headroom to account for current allocation too...
          allocateResource(cluster, allocation.getAllocatedOrReservedResource(),
              schedulerContainer.getNodePartition());

          LOG.info("assignedContainer" + " queue=" + getQueuePath()
              + " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity="
              + getAbsoluteUsedCapacity() + " used=" + usageTracker.getQueueUsage().getUsed()
              + " cluster=" + cluster);
        } finally {
          writeLock.unlock();
        }
      }
    }

    if (parent != null) {
      parent.apply(cluster, request);
    }
  }

  @Override
  public void stopQueue() {
    this.writeLock.lock();
    try {
      if (getNumApplications() > 0) {
        updateQueueState(QueueState.DRAINING);
      } else {
        updateQueueState(QueueState.STOPPED);
      }
      if (getChildQueues() != null) {
        for(CSQueue child : getChildQueues()) {
          child.stopQueue();
        }
      }
    } finally {
      this.writeLock.unlock();
    }
  }

  public QueueOrderingPolicy getQueueOrderingPolicy() {
    return queueOrderingPolicy;
  }

  @Override
  int getNumRunnableApps() {
    readLock.lock();
    try {
      return runnableApps;
    } finally {
      readLock.unlock();
    }
  }

  void incrementRunnableApps() {
    writeLock.lock();
    try {
      runnableApps++;
    } finally {
      writeLock.unlock();
    }
  }

  void decrementRunnableApps() {
    writeLock.lock();
    try {
      runnableApps--;
    } finally {
      writeLock.unlock();
    }
  }

  Map<String, Float> getEffectiveMinRatio(String label) {
    return effectiveMinResourceRatio.get(label);
  }

  @Override
  public boolean isEligibleForAutoDeletion() {
    return isDynamicQueue() && getChildQueues().size() == 0 &&
        queueContext.getConfiguration().
            isAutoExpiredDeletionEnabled(this.getQueuePathObject());
  }

  public AutoCreatedQueueTemplate getAutoCreatedQueueTemplate() {
    return autoCreatedQueueTemplate;
  }

}