FSLeafQueue.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.TreeSet;

import org.apache.hadoop.classification.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.util.resource.Resources;

import static org.apache.hadoop.yarn.util.resource.Resources.none;

@Private
@Unstable
public class FSLeafQueue extends FSQueue {
  private static final Logger LOG = LoggerFactory.
      getLogger(FSLeafQueue.class.getName());
  private static final List<FSQueue> EMPTY_LIST = Collections.emptyList();

  private FSContext context;

  // apps that are runnable
  private final List<FSAppAttempt> runnableApps = new ArrayList<>();
  private final List<FSAppAttempt> nonRunnableApps = new ArrayList<>();
  // assignedApps keeps track of applications that have no appAttempts
  private final Set<ApplicationId> assignedApps = new HashSet<>();
  // get a lock with fair distribution for app list updates
  private final ReadWriteLock rwl = new ReentrantReadWriteLock(true);
  private final Lock readLock = rwl.readLock();
  private final Lock writeLock = rwl.writeLock();
  
  private Resource demand = Resources.createResource(0);
  
  // Variables used for preemption
  private long lastTimeAtMinShare;

  // Track the AM resource usage for this queue
  private Resource amResourceUsage;

  private final ActiveUsersManager activeUsersManager;

  public FSLeafQueue(String name, FairScheduler scheduler,
      FSParentQueue parent) {
    super(name, scheduler, parent);
    this.context = scheduler.getContext();
    this.lastTimeAtMinShare = scheduler.getClock().getTime();
    activeUsersManager = new ActiveUsersManager(getMetrics());
    amResourceUsage = Resource.newInstance(0, 0);
    getMetrics().setAMResourceUsage(amResourceUsage);
  }
  
  void addApp(FSAppAttempt app, boolean runnable) {
    writeLock.lock();
    try {
      if (runnable) {
        runnableApps.add(app);
      } else {
        nonRunnableApps.add(app);
      }
      // when an appAttempt is created for an application, we'd like to move
      // it over from assignedApps to either runnableApps or nonRunnableApps
      assignedApps.remove(app.getApplicationId());
      incUsedResource(app.getResourceUsage());
    } finally {
      writeLock.unlock();
    }
  }
  
  /**
   * Removes the given app from this queue.
   * @return whether or not the app was runnable
   */
  boolean removeApp(FSAppAttempt app) {
    boolean runnable = false;

    // Remove app from runnable/nonRunnable list while holding the write lock
    writeLock.lock();
    try {
      runnable = runnableApps.remove(app);
      if (!runnable) {
        // removeNonRunnableApp acquires the write lock again, which is fine
        if (!removeNonRunnableApp(app)) {
          throw new IllegalStateException("Given app to remove " + app +
              " does not exist in queue " + this);
        }
      }
    } finally {
      writeLock.unlock();
    }

    // Update AM resource usage if needed. If isAMRunning is true, we're not
    // running an unmanaged AM.
    if (runnable && app.isAmRunning()) {
      Resources.subtractFrom(amResourceUsage, app.getAMResource());
      getMetrics().setAMResourceUsage(amResourceUsage);
    }

    decUsedResource(app.getResourceUsage());
    return runnable;
  }

  /**
   * Removes the given app if it is non-runnable and belongs to this queue
   * @return true if the app is removed, false otherwise
   */
  boolean removeNonRunnableApp(FSAppAttempt app) {
    writeLock.lock();
    try {
      return nonRunnableApps.remove(app);
    } finally {
      writeLock.unlock();
    }
  }

  boolean isRunnableApp(FSAppAttempt attempt) {
    readLock.lock();
    try {
      return runnableApps.contains(attempt);
    } finally {
      readLock.unlock();
    }
  }

  boolean isNonRunnableApp(FSAppAttempt attempt) {
    readLock.lock();
    try {
      return nonRunnableApps.contains(attempt);
    } finally {
      readLock.unlock();
    }
  }

  List<FSAppAttempt> getCopyOfNonRunnableAppSchedulables() {
    List<FSAppAttempt> appsToReturn = new ArrayList<>();
    readLock.lock();
    try {
      appsToReturn.addAll(nonRunnableApps);
    } finally {
      readLock.unlock();
    }
    return appsToReturn;
  }

  @Override
  public void collectSchedulerApplications(
      Collection<ApplicationAttemptId> apps) {
    readLock.lock();
    try {
      for (FSAppAttempt appSched : runnableApps) {
        apps.add(appSched.getApplicationAttemptId());
      }
      for (FSAppAttempt appSched : nonRunnableApps) {
        apps.add(appSched.getApplicationAttemptId());
      }
    } finally {
      readLock.unlock();
    }
  }

  @Override
  void updateInternal() {
    readLock.lock();
    try {
      policy.computeShares(runnableApps, getFairShare());
    } finally {
      readLock.unlock();
    }
  }

  /**
   * Compute the extent of fairshare starvation for a set of apps.
   *
   * @param appsWithDemand apps to compute fairshare starvation for
   * @return aggregate fairshare starvation for all apps
   */
  private Resource updateStarvedAppsFairshare(
      TreeSet<FSAppAttempt> appsWithDemand) {
    Resource fairShareStarvation = Resources.clone(none());
    // Fetch apps with unmet demand sorted by fairshare starvation
    for (FSAppAttempt app : appsWithDemand) {
      Resource appStarvation = app.fairShareStarvation();
      if (!Resources.isNone(appStarvation))  {
        context.getStarvedApps().addStarvedApp(app);
        Resources.addTo(fairShareStarvation, appStarvation);
      } else {
        break;
      }
    }
    return fairShareStarvation;
  }

  /**
   * Distribute minshare starvation to a set of apps
   * @param appsWithDemand set of apps
   * @param minShareStarvation minshare starvation to distribute
   */
  private void updateStarvedAppsMinshare(
      final TreeSet<FSAppAttempt> appsWithDemand,
      final Resource minShareStarvation) {
    Resource pending = Resources.clone(minShareStarvation);

    // Keep adding apps to the starved list until the unmet demand goes over
    // the remaining minshare
    for (FSAppAttempt app : appsWithDemand) {
      if (!Resources.isNone(pending)) {
        Resource appMinShare = app.getPendingDemand();
        Resources.subtractFromNonNegative(
            appMinShare, app.getFairshareStarvation());

        if (Resources.greaterThan(policy.getResourceCalculator(),
            scheduler.getClusterResource(), appMinShare, pending)) {
          Resources.subtractFromNonNegative(appMinShare, pending);
          pending = none();
        } else {
          Resources.subtractFromNonNegative(pending, appMinShare);
        }
        app.setMinshareStarvation(appMinShare);
        context.getStarvedApps().addStarvedApp(app);
      } else {
        // Reset minshare starvation in case we had set it in a previous
        // iteration
        app.resetMinshareStarvation();
      }
    }
  }

  /**
   * Helper method to identify starved applications. This needs to be called
   * ONLY from {@link #updateInternal}, after the application shares
   * are updated.
   *
   * A queue can be starving due to fairshare or minshare.
   *
   * Minshare is defined only on the queue and not the applications.
   * Fairshare is defined for both the queue and the applications.
   *
   * If this queue is starved due to minshare, we need to identify the most
   * deserving apps if they themselves are not starved due to fairshare.
   *
   * If this queue is starving due to fairshare, there must be at least
   * one application that is starved. And, even if the queue is not
   * starved due to fairshare, there might still be starved applications.
   *
   * Caller does not need read/write lock on the leaf queue.
   */
  void updateStarvedApps() {
    // Fetch apps with pending demand
    TreeSet<FSAppAttempt> appsWithDemand = fetchAppsWithDemand(false);

    // Process apps with fairshare starvation
    Resource fairShareStarvation = updateStarvedAppsFairshare(appsWithDemand);

    // Compute extent of minshare starvation
    Resource minShareStarvation = minShareStarvation();

    // Compute minshare starvation that is not subsumed by fairshare starvation
    Resources.subtractFromNonNegative(minShareStarvation, fairShareStarvation);

    // Assign this minshare to apps with pending demand over fairshare
    updateStarvedAppsMinshare(appsWithDemand, minShareStarvation);
  }

  @Override
  public Resource getDemand() {
    return demand;
  }

  Resource getAmResourceUsage() {
    return amResourceUsage;
  }

  @Override
  public void updateDemand() {
    // Compute demand by iterating through apps in the queue
    // Limit demand to maxResources
    Resource tmpDemand = Resources.createResource(0);
    readLock.lock();
    try {
      for (FSAppAttempt sched : runnableApps) {
        sched.updateDemand();
        Resources.addTo(tmpDemand, sched.getDemand());
      }
      for (FSAppAttempt sched : nonRunnableApps) {
        sched.updateDemand();
        Resources.addTo(tmpDemand, sched.getDemand());
      }
    } finally {
      readLock.unlock();
    }
    // Cap demand to maxShare to limit allocation to maxShare
    demand = Resources.componentwiseMin(tmpDemand, getMaxShare());
    if (LOG.isDebugEnabled()) {
      LOG.debug("The updated demand for " + getName() + " is " + demand
          + "; the max is " + getMaxShare());
      LOG.debug("The updated fairshare for " + getName() + " is "
          + getFairShare());
    }
  }

  @Override
  public Resource assignContainer(FSSchedulerNode node) {
    Resource assigned = none();
    if (LOG.isDebugEnabled()) {
      LOG.debug("Node " + node.getNodeName() + " offered to queue: " +
          getName() + " fairShare: " + getFairShare());
    }

    if (!assignContainerPreCheck(node)) {
      return assigned;
    }

    for (FSAppAttempt sched : fetchAppsWithDemand(true)) {
      if (SchedulerAppUtils.isPlaceBlacklisted(sched, node, LOG)) {
        continue;
      }
      assigned = sched.assignContainer(node);

      boolean isContainerAssignedOrReserved = !assigned.equals(none());
      boolean isContainerReserved =
                assigned.equals(FairScheduler.CONTAINER_RESERVED);

      // check if an assignment or a reservation was made.
      if (isContainerAssignedOrReserved) {
        // only log container assignment if there was an actual allocation,
        // not a reservation.
        if (!isContainerReserved && LOG.isDebugEnabled()) {
          LOG.debug("Assigned container in queue:{} container:{}",
              getName(), assigned);
        }
        break;
      }
    }
    return assigned;
  }

  /**
   * Fetch the subset of apps that have unmet demand. When used for
   * preemption-related code (as opposed to allocation), omits apps that
   * should not be checked for starvation.
   *
   * @param assignment whether the apps are for allocation containers, as
   *                   opposed to preemption calculations
   * @return Set of apps with unmet demand
   */
  private TreeSet<FSAppAttempt> fetchAppsWithDemand(boolean assignment) {
    TreeSet<FSAppAttempt> pendingForResourceApps =
        new TreeSet<>(policy.getComparator());
    readLock.lock();
    try {
      for (FSAppAttempt app : runnableApps) {
        if (!Resources.isNone(app.getPendingDemand()) &&
            (assignment || app.shouldCheckForStarvation())) {
          pendingForResourceApps.add(app);
        }
      }
    } finally {
      readLock.unlock();
    }
    return pendingForResourceApps;
  }

  @Override
  public List<FSQueue> getChildQueues() {
    return EMPTY_LIST;
  }
  
  @Override
  public List<QueueUserACLInfo> getQueueUserAclInfo(UserGroupInformation user) {
    QueueUserACLInfo userAclInfo =
      recordFactory.newRecordInstance(QueueUserACLInfo.class);
    List<QueueACL> operations = new ArrayList<>();
    for (QueueACL operation : QueueACL.values()) {
      if (hasAccess(operation, user)) {
        operations.add(operation);
      }
    }

    userAclInfo.setQueueName(getQueueName());
    userAclInfo.setUserAcls(operations);
    return Collections.singletonList(userAclInfo);
  }
  
  private void setLastTimeAtMinShare(long lastTimeAtMinShare) {
    this.lastTimeAtMinShare = lastTimeAtMinShare;
  }

  @Override
  public int getNumRunnableApps() {
    readLock.lock();
    try {
      return runnableApps.size();
    } finally {
      readLock.unlock();
    }
  }

  int getNumNonRunnableApps() {
    readLock.lock();
    try {
      return nonRunnableApps.size();
    } finally {
      readLock.unlock();
    }
  }

  public int getNumPendingApps() {
    int numPendingApps = 0;
    readLock.lock();
    try {
      for (FSAppAttempt attempt : runnableApps) {
        if (attempt.isPending()) {
          numPendingApps++;
        }
      }
      numPendingApps += nonRunnableApps.size();
    } finally {
      readLock.unlock();
    }
    return numPendingApps;
  }

  public int getNumAssignedApps() {
    readLock.lock();
    try {
      return assignedApps.size();
    } finally {
      readLock.unlock();
    }
  }

  @Override
  public boolean isEmpty() {
    readLock.lock();
    try {
      if (runnableApps.size() > 0 || nonRunnableApps.size() > 0 ||
          assignedApps.size() > 0) {
        return false;
      }
    } finally {
      readLock.unlock();
    }
    return true;
  }

  /**
   * TODO: Based on how frequently this is called, we might want to club
   * counting pending and active apps in the same method.
   * @return active apps.
   */
  public int getNumActiveApps() {
    int numActiveApps = 0;
    readLock.lock();
    try {
      for (FSAppAttempt attempt : runnableApps) {
        if (!attempt.isPending()) {
          numActiveApps++;
        }
      }
    } finally {
      readLock.unlock();
    }
    return numActiveApps;
  }

  @Override
  public ActiveUsersManager getAbstractUsersManager() {
    return activeUsersManager;
  }

  /**
  * Compute the maximum resource AM can use. The value is the result of
  * multiplying FairShare and maxAMShare. If FairShare is zero, use
  * min(maxShare, available resource) instead to prevent zero value for
  * maximum AM resource since it forbids any job running in the queue.
  *
  * @return the maximum resource AM can use
  */
  private Resource computeMaxAMResource() {
    Resource maxResource = Resources.clone(getFairShare());
    Resource maxShare = getMaxShare();

    if (maxResource.getMemorySize() == 0) {
      maxResource.setMemorySize(
          Math.min(scheduler.getRootQueueMetrics().getAvailableMB(),
                   maxShare.getMemorySize()));
    }

    if (maxResource.getVirtualCores() == 0) {
      maxResource.setVirtualCores(Math.min(
          scheduler.getRootQueueMetrics().getAvailableVirtualCores(),
          maxShare.getVirtualCores()));
    }

    scheduler.getRootQueueMetrics()
        .fillInValuesFromAvailableResources(maxShare, maxResource);

    // Round up to allow AM to run when there is only one vcore on the cluster
    return Resources.multiplyAndRoundUp(maxResource, maxAMShare);
  }

  /**
   * Check whether this queue can run the Application Master under the
   * maxAMShare limit.
   *
   * @param amResource resources required to run the AM
   * @return true if this queue can run
   */
  public boolean canRunAppAM(Resource amResource) {
    if (Math.abs(maxAMShare - -1.0f) < 0.0001) {
      return true;
    }

    Resource maxAMResource = computeMaxAMResource();
    getMetrics().setMaxAMShare(maxAMResource);
    Resource ifRunAMResource = Resources.add(amResourceUsage, amResource);
    return Resources.fitsIn(ifRunAMResource, maxAMResource);
  }

  void addAMResourceUsage(Resource amResource) {
    if (amResource != null) {
      Resources.addTo(amResourceUsage, amResource);
      getMetrics().setAMResourceUsage(amResourceUsage);
    }
  }

  @Override
  public void recoverContainer(Resource clusterResource,
      SchedulerApplicationAttempt schedulerAttempt, RMContainer rmContainer) {
    // TODO Auto-generated method stub
  }

  /**
   * Allows setting weight for a dynamically created queue.
   * Currently only used for reservation based queues.
   * @param weight queue weight
   */
  public void setWeights(float weight) {
    this.weights = weight;
  }

  @Override
  public Resource getMaximumContainerAllocation() {
    if (maxContainerAllocation.equals(Resources.unbounded())
        && getParent() != null) {
      return getParent().getMaximumContainerAllocation();
    } else {
      return maxContainerAllocation;
    }
  }

  /**
   * Helper method to compute the amount of minshare starvation.
   *
   * @return the extent of minshare starvation
   */
  private Resource minShareStarvation() {
    // If demand < minshare, we should use demand to determine starvation
    Resource starvation =
        Resources.componentwiseMin(getMinShare(), getDemand());

    Resources.subtractFromNonNegative(starvation, getResourceUsage());

    boolean starved = !Resources.isNone(starvation);
    long now = scheduler.getClock().getTime();

    if (!starved) {
      // Record that the queue is not starved
      setLastTimeAtMinShare(now);
    }

    if (now - lastTimeAtMinShare < getMinSharePreemptionTimeout()) {
      // the queue is not starved for the preemption timeout
      starvation = Resources.clone(Resources.none());
    }

    return starvation;
  }

  /**
   * Helper method for tests to check if a queue is starved for minShare.
   * @return whether starved for minshare
   */
  @VisibleForTesting
  private boolean isStarvedForMinShare() {
    return !Resources.isNone(minShareStarvation());
  }

  /**
   * Helper method for tests to check if a queue is starved for fairshare.
   * @return whether starved for fairshare
   */
  @VisibleForTesting
  private boolean isStarvedForFairShare() {
    for (FSAppAttempt app : runnableApps) {
      if (app.isStarvedForFairShare()) {
        return true;
      }
    }
    return false;
  }

  /**
   * Helper method for tests to check if a queue is starved.
   * @return whether starved for either minshare or fairshare
   */
  @VisibleForTesting
  boolean isStarved() {
    return isStarvedForMinShare() || isStarvedForFairShare();
  }

  @Override
  protected void dumpStateInternal(StringBuilder sb) {
    sb.append("{Name: " + getName() +
        ", Weight: " + weights +
        ", Policy: " + policy.getName() +
        ", FairShare: " + getFairShare() +
        ", SteadyFairShare: " + getSteadyFairShare() +
        ", MaxShare: " + getMaxShare() +
        ", MinShare: " + minShare +
        ", ResourceUsage: " + getResourceUsage() +
        ", Demand: " + getDemand() +
        ", Runnable: " + getNumRunnableApps() +
        ", NumPendingApps: " + getNumPendingApps() +
        ", NonRunnable: " + getNumNonRunnableApps() +
        ", MaxAMShare: " + maxAMShare +
        ", MaxAMResource: " + computeMaxAMResource() +
        ", AMResourceUsage: " + getAmResourceUsage() +
        ", LastTimeAtMinShare: " + lastTimeAtMinShare +
        "}");
  }

  /**
   * This method is called when an application is assigned to this queue
   * for book-keeping purposes (to be able to determine if the queue is empty).
   * @param applicationId the application's id
   */
  public void addAssignedApp(ApplicationId applicationId) {
    writeLock.lock();
    try {
      assignedApps.add(applicationId);
    } finally {
      writeLock.unlock();
    }
  }

  /**
   * This method is called when an application is removed from this queue
   * during the submit process.
   * @param applicationId the application's id
   */
  public void removeAssignedApp(ApplicationId applicationId) {
    writeLock.lock();
    try {
      assignedApps.remove(applicationId);
    } finally {
      writeLock.unlock();
    }
  }
}