AppSchedulingInfo.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.server.resourcemanager.scheduler;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.server.resourcemanager.ClusterMetrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.RejectionReason;
import org.apache.hadoop.yarn.api.records.RejectedSchedulingRequest;
import org.apache.hadoop.yarn.api.records.SchedulingRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.DiagnosticsCollector;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ApplicationSchedulingConfig;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.AppPlacementAllocator;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PendingAskUpdateResult;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SingleConstraintAppPlacementAllocator;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.util.resource.Resources;
/**
 * This class keeps track of all the consumption of an application. This also
 * keeps track of current running/completed containers for the application.
 */
@Private
@Unstable
public class AppSchedulingInfo {
  
  private static final Logger LOG =
      LoggerFactory.getLogger(AppSchedulingInfo.class);

  private final ApplicationId applicationId;
  private final ApplicationAttemptId applicationAttemptId;
  private final AtomicLong containerIdCounter;
  private final String user;

  private Queue queue;
  private AbstractUsersManager abstractUsersManager;
  // whether accepted/allocated by scheduler
  private volatile boolean pending = true;
  private ResourceUsage appResourceUsage;

  private AtomicBoolean userBlacklistChanged = new AtomicBoolean(false);
  // Set of places (nodes / racks) blacklisted by the system. Today, this only
  // has places blacklisted for AM containers.
  private final Set<String> placesBlacklistedBySystem = new HashSet<>();
  private Set<String> placesBlacklistedByApp = new HashSet<>();

  private Set<String> requestedPartitions = new HashSet<>();

  private final ConcurrentSkipListSet<SchedulerRequestKey>
      schedulerKeys = new ConcurrentSkipListSet<>();
  private final Map<SchedulerRequestKey, AppPlacementAllocator<SchedulerNode>>
      schedulerKeyToAppPlacementAllocator = new ConcurrentHashMap<>();

  private final ReentrantReadWriteLock.ReadLock readLock;
  private final ReentrantReadWriteLock.WriteLock writeLock;

  public final ContainerUpdateContext updateContext;
  private final Map<String, String> applicationSchedulingEnvs = new HashMap<>();
  private final RMContext rmContext;
  private final int retryAttempts;
  private boolean unmanagedAM;

  private final String defaultResourceRequestAppPlacementType;

  public AppSchedulingInfo(ApplicationAttemptId appAttemptId, String user,
      Queue queue, AbstractUsersManager abstractUsersManager, long epoch,
      ResourceUsage appResourceUsage,
      Map<String, String> applicationSchedulingEnvs, RMContext rmContext,
      boolean unmanagedAM) {
    this.applicationAttemptId = appAttemptId;
    this.applicationId = appAttemptId.getApplicationId();
    this.queue = queue;
    this.user = user;
    this.abstractUsersManager = abstractUsersManager;
    this.containerIdCounter = new AtomicLong(
        epoch << ResourceManager.EPOCH_BIT_SHIFT);
    this.appResourceUsage = appResourceUsage;
    this.applicationSchedulingEnvs.putAll(applicationSchedulingEnvs);
    this.rmContext = rmContext;
    this.retryAttempts = rmContext.getYarnConfiguration().getInt(
         YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_RETRY_ATTEMPTS,
         YarnConfiguration.DEFAULT_RM_PLACEMENT_CONSTRAINTS_RETRY_ATTEMPTS);
    this.unmanagedAM = unmanagedAM;

    ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
    updateContext = new ContainerUpdateContext(this);
    readLock = lock.readLock();
    writeLock = lock.writeLock();

    this.defaultResourceRequestAppPlacementType =
        getDefaultResourceRequestAppPlacementType();
  }

  /**
   * Set default App Placement Allocator.
   *
   * @return app placement class.
   */
  public String getDefaultResourceRequestAppPlacementType() {
    if (this.rmContext != null
        && this.rmContext.getYarnConfiguration() != null) {

      String appPlacementClass = applicationSchedulingEnvs.get(
          ApplicationSchedulingConfig.ENV_APPLICATION_PLACEMENT_TYPE_CLASS);
      if (null != appPlacementClass) {
        return appPlacementClass;
      } else {
        Configuration conf = rmContext.getYarnConfiguration();
        return conf.get(
            YarnConfiguration.APPLICATION_PLACEMENT_TYPE_CLASS);
      }
    }
    return null;
  }

  public ApplicationId getApplicationId() {
    return applicationId;
  }

  public ApplicationAttemptId getApplicationAttemptId() {
    return applicationAttemptId;
  }

  public String getUser() {
    return user;
  }

  public long getNewContainerId() {
    return this.containerIdCounter.incrementAndGet();
  }

  public String getQueueName() {
    this.readLock.lock();
    try {
      return queue.getQueueName();
    } finally {
      this.readLock.unlock();
    }
  }

  public boolean isPending() {
    return pending;
  }

  public void setUnmanagedAM(boolean unmanagedAM) {
    this.unmanagedAM = unmanagedAM;
  }

  public boolean isUnmanagedAM() {
    return unmanagedAM;
  }

  public Set<String> getRequestedPartitions() {
    return requestedPartitions;
  }

  /**
   * Clear any pending requests from this application.
   */
  private void clearRequests() {
    schedulerKeys.clear();
    schedulerKeyToAppPlacementAllocator.clear();
    LOG.info("Application " + applicationId + " requests cleared");
  }

  public ContainerUpdateContext getUpdateContext() {
    return updateContext;
  }

  /**
   * The ApplicationMaster is updating resource requirements for the
   * application, by asking for more resources and releasing resources acquired
   * by the application.
   *
   * @param resourceRequests resource requests to be allocated
   * @param recoverPreemptedRequestForAContainer
   *          recover ResourceRequest/SchedulingRequest on preemption
   * @return true if any resource was updated, false otherwise
   */
  public boolean updateResourceRequests(List<ResourceRequest> resourceRequests,
      boolean recoverPreemptedRequestForAContainer) {
    // Flag to track if any incoming requests update "ANY" requests
    boolean offswitchResourcesUpdated;

    writeLock.lock();
    try {
      // Update AppPlacementAllocator by requests
      offswitchResourcesUpdated = internalAddResourceRequests(
          recoverPreemptedRequestForAContainer, resourceRequests);
    } finally {
      writeLock.unlock();
    }

    return offswitchResourcesUpdated;
  }

  /**
   * The ApplicationMaster is updating resource requirements for the
   * application, by asking for more resources and releasing resources acquired
   * by the application.
   *
   * @param dedupRequests (dedup) resource requests to be allocated
   * @param recoverPreemptedRequestForAContainer
   *          recover ResourceRequest/SchedulingRequest on preemption
   * @return true if any resource was updated, false otherwise
   */
  public boolean updateResourceRequests(
      Map<SchedulerRequestKey, Map<String, ResourceRequest>> dedupRequests,
      boolean recoverPreemptedRequestForAContainer) {
    // Flag to track if any incoming requests update "ANY" requests
    boolean offswitchResourcesUpdated;

    writeLock.lock();
    try {
      // Update AppPlacementAllocator by requests
      offswitchResourcesUpdated = internalAddResourceRequests(
          recoverPreemptedRequestForAContainer, dedupRequests);
    } finally {
      writeLock.unlock();
    }

    return offswitchResourcesUpdated;
  }

  /**
   * The ApplicationMaster is updating resource requirements for the
   * application, by asking for more resources and releasing resources acquired
   * by the application.
   *
   * @param schedulingRequests resource requests to be allocated
   * @param recoverPreemptedRequestForAContainer
   *          recover ResourceRequest/SchedulingRequest on preemption
   * @return true if any resource was updated, false otherwise
   */
  public boolean updateSchedulingRequests(
      List<SchedulingRequest> schedulingRequests,
      boolean recoverPreemptedRequestForAContainer) {
    // Flag to track if any incoming requests update "ANY" requests
    boolean offswitchResourcesUpdated;

    writeLock.lock();
    try {
      // Update AppPlacementAllocator by requests
      offswitchResourcesUpdated = addSchedulingRequests(
          recoverPreemptedRequestForAContainer, schedulingRequests);
    } finally {
      writeLock.unlock();
    }

    return offswitchResourcesUpdated;
  }

  public void removeAppPlacement(SchedulerRequestKey schedulerRequestKey) {
    schedulerKeyToAppPlacementAllocator.remove(schedulerRequestKey);
  }

  private boolean addSchedulingRequests(
      boolean recoverPreemptedRequestForAContainer,
      List<SchedulingRequest> schedulingRequests) {
    // Do we need to update pending resource for app/queue, etc.?
    boolean requireUpdatePendingResource = false;

    for (SchedulingRequest request : schedulingRequests) {
      SchedulerRequestKey schedulerRequestKey = SchedulerRequestKey.create(
          request);

      AppPlacementAllocator appPlacementAllocator =
          getAndAddAppPlacementAllocatorIfNotExist(schedulerRequestKey,
              SingleConstraintAppPlacementAllocator.class.getCanonicalName());

      // Update AppPlacementAllocator
      PendingAskUpdateResult pendingAmountChanges =
          appPlacementAllocator.updatePendingAsk(schedulerRequestKey,
              request, recoverPreemptedRequestForAContainer);

      if (null != pendingAmountChanges) {
        updatePendingResources(pendingAmountChanges, schedulerRequestKey,
            queue.getMetrics());
        requireUpdatePendingResource = true;
      }
    }

    return requireUpdatePendingResource;
  }

  /**
   * Get and insert AppPlacementAllocator if it doesn't exist, this should be
   * protected by write lock.
   * @param schedulerRequestKey schedulerRequestKey
   * @param placementTypeClass placementTypeClass
   * @return AppPlacementAllocator
   */
  private AppPlacementAllocator<SchedulerNode> getAndAddAppPlacementAllocatorIfNotExist(
      SchedulerRequestKey schedulerRequestKey, String placementTypeClass) {
    AppPlacementAllocator<SchedulerNode> appPlacementAllocator;
    if ((appPlacementAllocator = schedulerKeyToAppPlacementAllocator.get(
        schedulerRequestKey)) == null) {
      appPlacementAllocator =
          ApplicationPlacementAllocatorFactory.getAppPlacementAllocator(
              placementTypeClass, this, schedulerRequestKey, rmContext);
      schedulerKeyToAppPlacementAllocator.put(schedulerRequestKey,
          appPlacementAllocator);
    }
    return appPlacementAllocator;
  }

  private boolean internalAddResourceRequests(
      boolean recoverPreemptedRequestForAContainer,
      Map<SchedulerRequestKey, Map<String, ResourceRequest>> dedupRequests) {
    boolean offswitchResourcesUpdated = false;
    for (Map.Entry<SchedulerRequestKey, Map<String, ResourceRequest>> entry :
    dedupRequests.entrySet()) {
      SchedulerRequestKey schedulerRequestKey = entry.getKey();
      AppPlacementAllocator<SchedulerNode> appPlacementAllocator =
          getAndAddAppPlacementAllocatorIfNotExist(schedulerRequestKey,
              defaultResourceRequestAppPlacementType);

      // Update AppPlacementAllocator
      PendingAskUpdateResult pendingAmountChanges =
          appPlacementAllocator.updatePendingAsk(entry.getValue().values(),
              recoverPreemptedRequestForAContainer);

      if (null != pendingAmountChanges) {
        updatePendingResources(pendingAmountChanges, schedulerRequestKey,
            queue.getMetrics());
        offswitchResourcesUpdated = true;
      }
    }
    return offswitchResourcesUpdated;
  }

  private boolean internalAddResourceRequests(boolean recoverPreemptedRequestForAContainer,
      List<ResourceRequest> resourceRequests) {
    if (null == resourceRequests || resourceRequests.isEmpty()) {
      return false;
    }

    // A map to group resource requests and dedup
    Map<SchedulerRequestKey, Map<String, ResourceRequest>> dedupRequests =
        new HashMap<>();

    // Group resource request by schedulerRequestKey and resourceName
    for (ResourceRequest request : resourceRequests) {
      SchedulerRequestKey schedulerKey = SchedulerRequestKey.create(request);
      if (!dedupRequests.containsKey(schedulerKey)) {
        dedupRequests.put(schedulerKey, new HashMap<>());
      }
      dedupRequests.get(schedulerKey).put(request.getResourceName(), request);
    }

    return internalAddResourceRequests(recoverPreemptedRequestForAContainer,
        dedupRequests);
  }

  private void updatePendingResources(PendingAskUpdateResult updateResult,
      SchedulerRequestKey schedulerKey, QueueMetrics metrics) {

    PendingAsk lastPendingAsk = updateResult.getLastPendingAsk();
    PendingAsk newPendingAsk = updateResult.getNewPendingAsk();
    String lastNodePartition = updateResult.getLastNodePartition();
    String newNodePartition = updateResult.getNewNodePartition();

    int lastRequestContainers =
        (lastPendingAsk != null) ? lastPendingAsk.getCount() : 0;
    if (newPendingAsk.getCount() <= 0) {
      if (lastRequestContainers >= 0) {
        schedulerKeys.remove(schedulerKey);
        schedulerKeyToAppPlacementAllocator.remove(schedulerKey);
      }
      LOG.info("checking for deactivate of application :"
          + this.applicationId);
      checkForDeactivation();
    } else {
      // Activate application. Metrics activation is done here.
      if (lastRequestContainers <= 0) {
        schedulerKeys.add(schedulerKey);
        abstractUsersManager.activateApplication(user, applicationId);
      }
    }

    if (lastPendingAsk != null) {
      // Deduct resources from metrics / pending resources of queue/app.
      metrics.decrPendingResources(lastNodePartition, user,
          lastPendingAsk.getCount(), lastPendingAsk.getPerAllocationResource());
      Resource decreasedResource = Resources.multiply(
          lastPendingAsk.getPerAllocationResource(), lastRequestContainers);
      queue.decPendingResource(lastNodePartition, decreasedResource);
      appResourceUsage.decPending(lastNodePartition, decreasedResource);
    }

    // Increase resources to metrics / pending resources of queue/app.
    metrics.incrPendingResources(newNodePartition, user,
        newPendingAsk.getCount(), newPendingAsk.getPerAllocationResource());
    Resource increasedResource = Resources.multiply(
        newPendingAsk.getPerAllocationResource(), newPendingAsk.getCount());
    queue.incPendingResource(newNodePartition, increasedResource);
    appResourceUsage.incPending(newNodePartition, increasedResource);
  }

  public void addRequestedPartition(String partition) {
    requestedPartitions.add(partition);
  }

  public void decPendingResource(String partition, Resource toDecrease) {
    queue.decPendingResource(partition, toDecrease);
    appResourceUsage.decPending(partition, toDecrease);
  }

  /**
   * The ApplicationMaster is updating the placesBlacklistedByApp used for
   * containers other than AMs.
   *
   * @param blacklistAdditions
   *          resources to be added to the userBlacklist
   * @param blacklistRemovals
   *          resources to be removed from the userBlacklist
   */
  public void updatePlacesBlacklistedByApp(
      List<String> blacklistAdditions, List<String> blacklistRemovals) {
    if (updateBlacklistedPlaces(placesBlacklistedByApp, blacklistAdditions,
        blacklistRemovals)) {
      userBlacklistChanged.set(true);
    }
  }

  /**
   * Update the list of places that are blacklisted by the system. Today the
   * system only blacklists places when it sees that AMs failed there
   *
   * @param blacklistAdditions
   *          resources to be added to placesBlacklistedBySystem
   * @param blacklistRemovals
   *          resources to be removed from placesBlacklistedBySystem
   */
  public void updatePlacesBlacklistedBySystem(
      List<String> blacklistAdditions, List<String> blacklistRemovals) {
    updateBlacklistedPlaces(placesBlacklistedBySystem, blacklistAdditions,
        blacklistRemovals);
  }

  private static boolean updateBlacklistedPlaces(Set<String> blacklist,
      List<String> blacklistAdditions, List<String> blacklistRemovals) {
    boolean changed = false;
    synchronized (blacklist) {
      if (blacklistAdditions != null) {
        changed = blacklist.addAll(blacklistAdditions);
      }

      if (blacklistRemovals != null) {
        changed = blacklist.removeAll(blacklistRemovals) || changed;
      }
    }
    return changed;
  }

  public boolean getAndResetBlacklistChanged() {
    return userBlacklistChanged.getAndSet(false);
  }

  public Collection<SchedulerRequestKey> getSchedulerKeys() {
    return schedulerKeys;
  }

  /**
   * Used by REST API to fetch ResourceRequest
   * @return All pending ResourceRequests.
   */
  public List<ResourceRequest> getAllResourceRequests() {
    List<ResourceRequest> ret = new ArrayList<>();
    this.readLock.lock();
    try {
      for (AppPlacementAllocator ap : schedulerKeyToAppPlacementAllocator
          .values()) {
        ret.addAll(ap.getResourceRequests().values());
      }
    } finally {
      this.readLock.unlock();
    }
    return ret;
  }

  /**
   * Fetch SchedulingRequests.
   * @return All pending SchedulingRequests.
   */
  public List<SchedulingRequest> getAllSchedulingRequests() {
    List<SchedulingRequest> ret = new ArrayList<>();
    this.readLock.lock();
    try {
      schedulerKeyToAppPlacementAllocator.values().stream()
          .filter(ap -> ap.getSchedulingRequest() != null)
          .forEach(ap -> ret.add(ap.getSchedulingRequest()));
    } finally {
      this.readLock.unlock();
    }
    return ret;
  }

  public List<RejectedSchedulingRequest> getRejectedRequest() {
    this.readLock.lock();
    try {
      return schedulerKeyToAppPlacementAllocator.values().stream()
          .filter(ap -> ap.getPlacementAttempt() >= retryAttempts)
          .map(ap -> RejectedSchedulingRequest.newInstance(
              RejectionReason.COULD_NOT_SCHEDULE_ON_NODE,
              ap.getSchedulingRequest()))
          .collect(Collectors.toList());
    } finally {
      this.readLock.unlock();
    }
  }

  public PendingAsk getNextPendingAsk() {
    readLock.lock();
    try {
      if (!schedulerKeys.isEmpty()) {
        SchedulerRequestKey firstRequestKey = schedulerKeys.first();
        return getPendingAsk(firstRequestKey, ResourceRequest.ANY);
      } else {
        return null;
      }
    } finally {
      readLock.unlock();
    }
  }

  public PendingAsk getPendingAsk(SchedulerRequestKey schedulerKey) {
    return getPendingAsk(schedulerKey, ResourceRequest.ANY);
  }

  public PendingAsk getPendingAsk(SchedulerRequestKey schedulerKey,
      String resourceName) {
    this.readLock.lock();
    try {
      AppPlacementAllocator ap = schedulerKeyToAppPlacementAllocator.get(
          schedulerKey);
      return (ap == null) ? PendingAsk.ZERO : ap.getPendingAsk(resourceName);
    } finally {
      this.readLock.unlock();
    }
  }

  /**
   * Returns if the place (node/rack today) is either blacklisted by the
   * application (user) or the system.
   *
   * @param resourceName
   *          the resourcename
   * @param blacklistedBySystem
   *          true if it should check amBlacklist
   * @return true if its blacklisted
   */
  public boolean isPlaceBlacklisted(String resourceName,
      boolean blacklistedBySystem) {
    if (blacklistedBySystem){
      synchronized (placesBlacklistedBySystem) {
        return placesBlacklistedBySystem.contains(resourceName);
      }
    } else {
      synchronized (placesBlacklistedByApp) {
        return placesBlacklistedByApp.contains(resourceName);
      }
    }
  }

  public ContainerRequest allocate(NodeType type,
      SchedulerNode node, SchedulerRequestKey schedulerKey,
      RMContainer containerAllocated) {
    writeLock.lock();
    try {
      if (null != containerAllocated) {
        updateMetricsForAllocatedContainer(type, node, containerAllocated);
      }

      return schedulerKeyToAppPlacementAllocator.get(schedulerKey).allocate(
          schedulerKey, type, node);
    } finally {
      writeLock.unlock();
    }
  }

  public void checkForDeactivation() {
    if (schedulerKeys.isEmpty()) {
      abstractUsersManager.deactivateApplication(user, applicationId);
    }
  }
  
  public void move(Queue newQueue) {
    this.writeLock.lock();
    try {
      QueueMetrics oldMetrics = queue.getMetrics();
      QueueMetrics newMetrics = newQueue.getMetrics();
      for (AppPlacementAllocator ap : schedulerKeyToAppPlacementAllocator
          .values()) {
        PendingAsk ask = ap.getPendingAsk(ResourceRequest.ANY);
        if (ask.getCount() > 0) {
          oldMetrics.decrPendingResources(
              ap.getPrimaryRequestedNodePartition(),
              user, ask.getCount(), ask.getPerAllocationResource());
          newMetrics.incrPendingResources(
              ap.getPrimaryRequestedNodePartition(),
              user, ask.getCount(), ask.getPerAllocationResource());

          Resource delta = Resources.multiply(ask.getPerAllocationResource(),
              ask.getCount());
          // Update Queue
          queue.decPendingResource(
              ap.getPrimaryRequestedNodePartition(), delta);
          newQueue.incPendingResource(
              ap.getPrimaryRequestedNodePartition(), delta);
        }
      }

      oldMetrics.moveAppFrom(this, isUnmanagedAM());
      newMetrics.moveAppTo(this, isUnmanagedAM());

      abstractUsersManager.deactivateApplication(user, applicationId);
      abstractUsersManager = newQueue.getAbstractUsersManager();
      if (!schedulerKeys.isEmpty()) {
        abstractUsersManager.activateApplication(user, applicationId);
      }
      this.queue = newQueue;
    } finally {
      this.writeLock.unlock();
    }
  }

  public void stop() {
    // clear pending resources metrics for the application
    this.writeLock.lock();
    try {
      QueueMetrics metrics = queue.getMetrics();
      for (AppPlacementAllocator ap : schedulerKeyToAppPlacementAllocator
          .values()) {
        PendingAsk ask = ap.getPendingAsk(ResourceRequest.ANY);
        if (ask.getCount() > 0) {
          metrics.decrPendingResources(ap.getPrimaryRequestedNodePartition(),
              user, ask.getCount(), ask.getPerAllocationResource());

          // Update Queue
          queue.decPendingResource(
              ap.getPrimaryRequestedNodePartition(),
              Resources.multiply(ask.getPerAllocationResource(),
                  ask.getCount()));
        }
      }

      metrics.finishAppAttempt(applicationId, pending, user, unmanagedAM);

      // Clear requests themselves
      clearRequests();
    } finally {
      this.writeLock.unlock();
    }
  }

  public void setQueue(Queue queue) {
    this.writeLock.lock();
    try {
      this.queue = queue;
    } finally {
      this.writeLock.unlock();
    }
  }

  private Set<String> getBlackList() {
    return this.placesBlacklistedByApp;
  }

  public Set<String> getBlackListCopy() {
    synchronized (placesBlacklistedByApp) {
      return new HashSet<>(this.placesBlacklistedByApp);
    }
  }

  public void transferStateFromPreviousAppSchedulingInfo(
      AppSchedulingInfo appInfo) {
    // This should not require locking the placesBlacklistedByApp since it will
    // not be used by this instance until after setCurrentAppAttempt.
    this.placesBlacklistedByApp = appInfo.getBlackList();
  }

  public void recoverContainer(RMContainer rmContainer, String partition) {
    if (rmContainer.getExecutionType() != ExecutionType.GUARANTEED) {
      return;
    }
    this.writeLock.lock();
    try {
      QueueMetrics metrics = queue.getMetrics();
      if (pending) {
        // If there was any container to recover, the application was
        // running from scheduler's POV.
        pending = false;
        metrics.runAppAttempt(applicationId, user, isUnmanagedAM());
      }

      // Container is completed. Skip recovering resources.
      if (rmContainer.getState().equals(RMContainerState.COMPLETED)) {
        return;
      }

      metrics.allocateResources(partition, user, 1,
          rmContainer.getAllocatedResource(), false);
    } finally {
      this.writeLock.unlock();
    }
  }

  /*
   * In async environment, pending resource request could be updated during
   * scheduling, this method checks pending request before allocating
   */
  public boolean checkAllocation(NodeType type, SchedulerNode node,
      SchedulerRequestKey schedulerKey) {
    readLock.lock();
    try {
      AppPlacementAllocator ap = schedulerKeyToAppPlacementAllocator.get(
          schedulerKey);
      if (null == ap) {
        return false;
      }
      return ap.canAllocate(type, node);
    } finally {
      readLock.unlock();
    }
  }

  private void updateMetricsForAllocatedContainer(NodeType type,
      SchedulerNode node, RMContainer containerAllocated) {
    QueueMetrics metrics = queue.getMetrics();
    if (pending) {
      // once an allocation is done we assume the application is
      // running from scheduler's POV.
      pending = false;
      metrics.runAppAttempt(applicationId, user, isUnmanagedAM());
    }

    updateMetrics(applicationId, type, node, containerAllocated, user, queue);
  }

  public static void updateMetrics(ApplicationId applicationId, NodeType type,
      SchedulerNode node, RMContainer containerAllocated, String user,
      Queue queue) {
    LOG.debug("allocate: applicationId={} container={} host={} user={}"
        + " resource={} type={}", applicationId,
        containerAllocated.getContainer().getId(),
        containerAllocated.getNodeId(), user,
        containerAllocated.getContainer().getResource(),
        type);
    if(node != null) {
      queue.getMetrics().allocateResources(node.getPartition(), user, 1,
          containerAllocated.getContainer().getResource(), false);
      queue.getMetrics().decrPendingResources(
          containerAllocated.getNodeLabelExpression(), user, 1,
          containerAllocated.getContainer().getResource());
    }
    queue.getMetrics().incrNodeTypeAggregations(user, type);
    ClusterMetrics.getMetrics().incrNumContainerAssigned();
  }

  // Get AppPlacementAllocator by specified schedulerKey
  public <N extends SchedulerNode> AppPlacementAllocator<N> getAppPlacementAllocator(
      SchedulerRequestKey schedulerkey) {
    return (AppPlacementAllocator<N>) schedulerKeyToAppPlacementAllocator.get(
        schedulerkey);
  }

  /**
   * Can delay to next?.
   *
   * @param schedulerKey schedulerKey
   * @param resourceName resourceName
   *
   * @return If request exists, return {relaxLocality}
   *         Otherwise, return true.
   */
  public boolean canDelayTo(
      SchedulerRequestKey schedulerKey, String resourceName) {
    this.readLock.lock();
    try {
      AppPlacementAllocator ap =
          schedulerKeyToAppPlacementAllocator.get(schedulerKey);
      return (ap == null) || ap.canDelayTo(resourceName);
    } finally {
      this.readLock.unlock();
    }
  }

  /**
   * Pre-check node to see if it satisfy the given schedulerKey and
   * scheduler mode.
   *
   * @param schedulerKey schedulerKey
   * @param schedulerNode schedulerNode
   * @param schedulingMode schedulingMode
   * @param dcOpt optional diagnostics collector
   * @return can use the node or not.
   */
  public boolean precheckNode(SchedulerRequestKey schedulerKey,
      SchedulerNode schedulerNode, SchedulingMode schedulingMode,
      Optional<DiagnosticsCollector> dcOpt) {
    this.readLock.lock();
    try {
      AppPlacementAllocator ap =
          schedulerKeyToAppPlacementAllocator.get(schedulerKey);
      return (ap != null) && (ap.getPlacementAttempt() < retryAttempts) &&
          ap.precheckNode(schedulerNode, schedulingMode, dcOpt);
    } finally {
      this.readLock.unlock();
    }
  }

  /**
   * Get scheduling envs configured for this application.
   *
   * @return a map of applicationSchedulingEnvs
   */
  public Map<String, String> getApplicationSchedulingEnvs() {
    return applicationSchedulingEnvs;
  }

  /**
   * Get the defaultNodeLabelExpression for the application's current queue.
   *
   * @return defaultNodeLabelExpression
   */
  public String getDefaultNodeLabelExpression() {
    try {
      this.readLock.lock();
      return queue.getDefaultNodeLabelExpression();
    } finally {
      this.readLock.unlock();
    }
  }

  public RMContext getRMContext() {
    return this.rmContext;
  }
}