RMAppImpl.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.server.resourcemanager.rmapp;

import java.net.InetAddress;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.CallerContext;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.StringInterner;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ApplicationTimeout;
import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
import org.apache.hadoop.yarn.api.records.CollectorInfo;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.NodeUpdateType;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.AppAdminClient;
import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.security.client.ClientToAMTokenIdentifier;
import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.blacklist.BlacklistManager;
import org.apache.hadoop.yarn.server.resourcemanager.blacklist.DisabledBlacklistManager;
import org.apache.hadoop.yarn.server.resourcemanager.blacklist.SimpleBlacklistManager;
import org.apache.hadoop.yarn.server.resourcemanager.placement
    .ApplicationPlacementContext;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppNodeUpdateEvent.RMAppNodeUpdateType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AggregateAppResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppStartAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.timelineservice.collector.AppLevelTimelineCollector;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.state.InvalidStateTransitionException;
import org.apache.hadoop.yarn.state.MultipleArcTransition;
import org.apache.hadoop.yarn.state.SingleArcTransition;
import org.apache.hadoop.yarn.state.StateMachine;
import org.apache.hadoop.yarn.state.StateMachineFactory;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.hadoop.yarn.util.Times;
import org.apache.hadoop.yarn.util.resource.Resources;

import org.apache.hadoop.classification.VisibleForTesting;

@SuppressWarnings({ "rawtypes", "unchecked" })
public class RMAppImpl implements RMApp, Recoverable {

  private static final Logger LOG =
      LoggerFactory.getLogger(RMAppImpl.class);
  private static final String UNAVAILABLE = "N/A";
  private static final String UNLIMITED = "UNLIMITED";
  private static final long UNKNOWN = -1L;
  private static final EnumSet<RMAppState> COMPLETED_APP_STATES =
      EnumSet.of(RMAppState.FINISHED, RMAppState.FINISHING, RMAppState.FAILED,
          RMAppState.KILLED, RMAppState.FINAL_SAVING, RMAppState.KILLING);
  private static final String STATE_CHANGE_MESSAGE =
      "%s State change from %s to %s on event = %s";
  private static final String RECOVERY_MESSAGE =
      "Recovering app: %s with %d attempts and final state = %s";

  // Immutable fields
  private final ApplicationId applicationId;
  private final RMContext rmContext;
  private final Configuration conf;
  private final String user;
  private final UserGroupInformation userUgi;
  private final String name;
  private final ApplicationSubmissionContext submissionContext;
  private final Dispatcher dispatcher;
  private final YarnScheduler scheduler;
  private final ApplicationMasterService masterService;
  private final StringBuilder diagnostics = new StringBuilder();
  private final int maxAppAttempts;
  private final ReadLock readLock;
  private final WriteLock writeLock;
  private final Map<ApplicationAttemptId, RMAppAttempt> attempts
      = new LinkedHashMap<ApplicationAttemptId, RMAppAttempt>();
  private final long submitTime;
  private final Map<RMNode, NodeUpdateType> updatedNodes = new HashMap<>();
  private final String applicationType;
  private final Set<String> applicationTags;
  private Map<String, String> applicationSchedulingEnvs = new HashMap<>();

  private final long attemptFailuresValidityInterval;
  private boolean amBlacklistingEnabled = false;
  private float blacklistDisableThreshold;

  private Clock systemClock;

  private boolean isNumAttemptsBeyondThreshold = false;



  // Mutable fields
  private long startTime;
  private long launchTime = 0;
  private long finishTime = 0;
  private long storedFinishTime = 0;
  private int firstAttemptIdInStateStore = 1;
  private int nextAttemptId = 1;
  private AppCollectorData collectorData;
  private CollectorInfo collectorInfo;
  // This field isn't protected by readlock now.
  private volatile RMAppAttempt currentAttempt;
  private String queue;
  private EventHandler handler;
  private static final AppFinishedTransition FINISHED_TRANSITION =
      new AppFinishedTransition();
  private Set<NodeId> ranNodes = new ConcurrentSkipListSet<NodeId>();

  private final RMAppLogAggregation logAggregation;
  private Map<ApplicationTimeoutType, Long> applicationTimeouts =
      new HashMap<ApplicationTimeoutType, Long>();

  // These states stored are only valid when app is at killing or final_saving.
  private RMAppState stateBeforeKilling;
  private RMAppState stateBeforeFinalSaving;
  private RMAppEvent eventCausingFinalSaving;
  private RMAppState targetedFinalState;
  private RMAppState recoveredFinalState;
  private List<ResourceRequest> amReqs;
  
  private CallerContext callerContext;

  private ApplicationPlacementContext placementContext;

  Object transitionTodo;

  private Priority applicationPriority;

  private static final StateMachineFactory<RMAppImpl,
                                           RMAppState,
                                           RMAppEventType,
                                           RMAppEvent> stateMachineFactory
                               = new StateMachineFactory<RMAppImpl,
                                           RMAppState,
                                           RMAppEventType,
                                           RMAppEvent>(RMAppState.NEW)


     // Transitions from NEW state
    .addTransition(RMAppState.NEW, RMAppState.NEW,
        RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
    .addTransition(RMAppState.NEW, RMAppState.NEW_SAVING,
        RMAppEventType.START, new RMAppNewlySavingTransition())
    .addTransition(RMAppState.NEW, EnumSet.of(RMAppState.SUBMITTED,
            RMAppState.ACCEPTED, RMAppState.FINISHED, RMAppState.FAILED,
            RMAppState.KILLED, RMAppState.FINAL_SAVING),
        RMAppEventType.RECOVER, new RMAppRecoveredTransition())
    .addTransition(RMAppState.NEW, RMAppState.KILLED, RMAppEventType.KILL,
        new AppKilledTransition())
    .addTransition(RMAppState.NEW, RMAppState.FINAL_SAVING,
        RMAppEventType.APP_REJECTED,
        new FinalSavingTransition(new AppRejectedTransition(),
          RMAppState.FAILED))

    // Transitions from NEW_SAVING state
    .addTransition(RMAppState.NEW_SAVING, RMAppState.NEW_SAVING,
        RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
    .addTransition(RMAppState.NEW_SAVING, RMAppState.SUBMITTED,
        RMAppEventType.APP_NEW_SAVED, new AddApplicationToSchedulerTransition())
    .addTransition(RMAppState.NEW_SAVING, RMAppState.FINAL_SAVING,
        RMAppEventType.KILL,
        new FinalSavingTransition(
          new AppKilledTransition(), RMAppState.KILLED))
    .addTransition(RMAppState.NEW_SAVING, RMAppState.FINAL_SAVING,
        RMAppEventType.APP_REJECTED,
          new FinalSavingTransition(new AppRejectedTransition(),
            RMAppState.FAILED))
      .addTransition(RMAppState.NEW_SAVING, RMAppState.FAILED,
          RMAppEventType.APP_SAVE_FAILED, new AppRejectedTransition())

     // Transitions from SUBMITTED state
    .addTransition(RMAppState.SUBMITTED, RMAppState.SUBMITTED,
        RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
    .addTransition(RMAppState.SUBMITTED, RMAppState.FINAL_SAVING,
        RMAppEventType.APP_REJECTED,
        new FinalSavingTransition(
          new AppRejectedTransition(), RMAppState.FAILED))
    .addTransition(RMAppState.SUBMITTED, RMAppState.ACCEPTED,
        RMAppEventType.APP_ACCEPTED, new StartAppAttemptTransition())
    .addTransition(RMAppState.SUBMITTED, RMAppState.FINAL_SAVING,
        RMAppEventType.KILL,
        new FinalSavingTransition(
          new AppKilledTransition(), RMAppState.KILLED))

     // Transitions from ACCEPTED state
    .addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED,
        RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
    .addTransition(RMAppState.ACCEPTED, RMAppState.RUNNING,
        RMAppEventType.ATTEMPT_REGISTERED, new RMAppStateUpdateTransition(
            YarnApplicationState.RUNNING))
    .addTransition(RMAppState.ACCEPTED,
        EnumSet.of(RMAppState.ACCEPTED, RMAppState.FINAL_SAVING),
        // ACCEPTED state is possible to receive ATTEMPT_FAILED/ATTEMPT_FINISHED
        // event because RMAppRecoveredTransition is returning ACCEPTED state
        // directly and waiting for the previous AM to exit.
        RMAppEventType.ATTEMPT_FAILED,
        new AttemptFailedTransition(RMAppState.ACCEPTED))
    .addTransition(RMAppState.ACCEPTED, RMAppState.FINAL_SAVING,
        RMAppEventType.ATTEMPT_FINISHED,
        new FinalSavingTransition(FINISHED_TRANSITION, RMAppState.FINISHED))
    .addTransition(RMAppState.ACCEPTED, RMAppState.KILLING,
        RMAppEventType.KILL, new KillAttemptTransition())
    .addTransition(RMAppState.ACCEPTED, RMAppState.FINAL_SAVING,
        RMAppEventType.ATTEMPT_KILLED,
        new FinalSavingTransition(new AppKilledTransition(), RMAppState.KILLED))
    .addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED, 
        RMAppEventType.APP_RUNNING_ON_NODE,
        new AppRunningOnNodeTransition())
      // Handle AppAttemptLaunch to update the launchTime and publish to ATS
      .addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED,
        RMAppEventType.ATTEMPT_LAUNCHED,
        new AttemptLaunchedTransition())

     // Transitions from RUNNING state
    .addTransition(RMAppState.RUNNING, RMAppState.RUNNING,
        RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
    .addTransition(RMAppState.RUNNING, RMAppState.FINAL_SAVING,
        RMAppEventType.ATTEMPT_UNREGISTERED,
        new FinalSavingTransition(
          new AttemptUnregisteredTransition(),
          RMAppState.FINISHING, RMAppState.FINISHED))
    .addTransition(RMAppState.RUNNING, RMAppState.FINISHED,
      // UnManagedAM directly jumps to finished
        RMAppEventType.ATTEMPT_FINISHED, FINISHED_TRANSITION)
    .addTransition(RMAppState.RUNNING, RMAppState.RUNNING, 
        RMAppEventType.APP_RUNNING_ON_NODE,
        new AppRunningOnNodeTransition())
    .addTransition(RMAppState.RUNNING,
        EnumSet.of(RMAppState.ACCEPTED, RMAppState.FINAL_SAVING),
        RMAppEventType.ATTEMPT_FAILED,
        new AttemptFailedTransition(RMAppState.ACCEPTED))
    .addTransition(RMAppState.RUNNING, RMAppState.KILLING,
        RMAppEventType.KILL, new KillAttemptTransition())

     // Transitions from FINAL_SAVING state
    .addTransition(RMAppState.FINAL_SAVING,
      EnumSet.of(RMAppState.FINISHING, RMAppState.FAILED,
        RMAppState.KILLED, RMAppState.FINISHED), RMAppEventType.APP_UPDATE_SAVED,
        new FinalStateSavedTransition())
    .addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING,
        RMAppEventType.ATTEMPT_FINISHED,
        new AttemptFinishedAtFinalSavingTransition())
    .addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING, 
        RMAppEventType.APP_RUNNING_ON_NODE,
        new AppRunningOnNodeTransition())
    // ignorable transitions
    .addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING,
        EnumSet.of(RMAppEventType.NODE_UPDATE, RMAppEventType.KILL,
          RMAppEventType.APP_NEW_SAVED))

     // Transitions from FINISHING state
    .addTransition(RMAppState.FINISHING, RMAppState.FINISHED,
        RMAppEventType.ATTEMPT_FINISHED, FINISHED_TRANSITION)
    .addTransition(RMAppState.FINISHING, RMAppState.FINISHING, 
        RMAppEventType.APP_RUNNING_ON_NODE,
        new AppRunningOnNodeTransition())
    // ignorable transitions
    .addTransition(RMAppState.FINISHING, RMAppState.FINISHING,
      EnumSet.of(RMAppEventType.NODE_UPDATE,
        // ignore Kill/Move as we have already saved the final Finished state
        // in state store.
        RMAppEventType.KILL))

     // Transitions from KILLING state
    .addTransition(RMAppState.KILLING, RMAppState.KILLING, 
        RMAppEventType.APP_RUNNING_ON_NODE,
        new AppRunningOnNodeTransition())
    .addTransition(RMAppState.KILLING, RMAppState.FINAL_SAVING,
        RMAppEventType.ATTEMPT_KILLED,
        new FinalSavingTransition(
          new AppKilledTransition(), RMAppState.KILLED))
    .addTransition(RMAppState.KILLING, RMAppState.FINAL_SAVING,
        RMAppEventType.ATTEMPT_UNREGISTERED,
        new FinalSavingTransition(
          new AttemptUnregisteredTransition(),
          RMAppState.FINISHING, RMAppState.FINISHED))
    .addTransition(RMAppState.KILLING, RMAppState.FINISHED,
      // UnManagedAM directly jumps to finished
        RMAppEventType.ATTEMPT_FINISHED, FINISHED_TRANSITION)
    .addTransition(RMAppState.KILLING,
        EnumSet.of(RMAppState.FINAL_SAVING),
        RMAppEventType.ATTEMPT_FAILED,
        new AttemptFailedTransition(RMAppState.KILLING))

    .addTransition(RMAppState.KILLING, RMAppState.KILLING,
        EnumSet.of(
            RMAppEventType.NODE_UPDATE,
            RMAppEventType.ATTEMPT_REGISTERED,
            RMAppEventType.APP_UPDATE_SAVED,
            RMAppEventType.KILL))

     // Transitions from FINISHED state
     // ignorable transitions
    .addTransition(RMAppState.FINISHED, RMAppState.FINISHED, 
        RMAppEventType.APP_RUNNING_ON_NODE,
        new AppRunningOnNodeTransition())
    .addTransition(RMAppState.FINISHED, RMAppState.FINISHED,
        EnumSet.of(
            RMAppEventType.NODE_UPDATE,
            RMAppEventType.ATTEMPT_UNREGISTERED,
            RMAppEventType.ATTEMPT_FINISHED,
            RMAppEventType.KILL))

     // Transitions from FAILED state
     // ignorable transitions
    .addTransition(RMAppState.FAILED, RMAppState.FAILED, 
        RMAppEventType.APP_RUNNING_ON_NODE,
        new AppRunningOnNodeTransition())
    .addTransition(RMAppState.FAILED, RMAppState.FAILED,
        EnumSet.of(RMAppEventType.KILL, RMAppEventType.NODE_UPDATE))

     // Transitions from KILLED state
     // ignorable transitions
    .addTransition(RMAppState.KILLED, RMAppState.KILLED, 
        RMAppEventType.APP_RUNNING_ON_NODE,
        new AppRunningOnNodeTransition())
    .addTransition(
        RMAppState.KILLED,
        RMAppState.KILLED,
        EnumSet.of(RMAppEventType.APP_ACCEPTED,
            RMAppEventType.APP_REJECTED, RMAppEventType.KILL,
            RMAppEventType.ATTEMPT_FINISHED, RMAppEventType.ATTEMPT_FAILED,
            RMAppEventType.NODE_UPDATE, RMAppEventType.START))

     .installTopology();

  private final StateMachine<RMAppState, RMAppEventType, RMAppEvent>
                                                                 stateMachine;

  private static final int DUMMY_APPLICATION_ATTEMPT_NUMBER = -1;
  private static final float MINIMUM_AM_BLACKLIST_THRESHOLD_VALUE = 0.0f;
  private static final float MAXIMUM_AM_BLACKLIST_THRESHOLD_VALUE = 1.0f;

  public RMAppImpl(ApplicationId applicationId, RMContext rmContext,
      Configuration config, String name, String user, String queue,
      ApplicationSubmissionContext submissionContext, YarnScheduler scheduler,
      ApplicationMasterService masterService, long submitTime,
      String applicationType, Set<String> applicationTags,
      List<ResourceRequest> amReqs) {
    this(applicationId, rmContext, config, name, user, queue, submissionContext,
      scheduler, masterService, submitTime, applicationType, applicationTags,
      amReqs, null, -1);
  }

  public RMAppImpl(ApplicationId applicationId, RMContext rmContext,
      Configuration config, String name, String user, String queue,
      ApplicationSubmissionContext submissionContext, YarnScheduler scheduler,
      ApplicationMasterService masterService, long submitTime,
      String applicationType, Set<String> applicationTags,
      List<ResourceRequest> amReqs, ApplicationPlacementContext
      placementContext, long startTime) {
    this(applicationId, rmContext, config, name,
        (user != null ? UserGroupInformation.createRemoteUser(user) : null),
        queue, submissionContext, scheduler, masterService, submitTime,
        applicationType, applicationTags, amReqs, placementContext, startTime);
  }

  public RMAppImpl(ApplicationId applicationId, RMContext rmContext,
      Configuration config, String name, UserGroupInformation userUgi,
      String queue, ApplicationSubmissionContext submissionContext,
      YarnScheduler scheduler, ApplicationMasterService masterService,
      long submitTime, String applicationType, Set<String> applicationTags,
      List<ResourceRequest> amReqs, ApplicationPlacementContext
      placementContext, long startTime) {
    this.systemClock = SystemClock.getInstance();

    this.applicationId = applicationId;
    this.name = StringInterner.weakIntern(name);
    this.rmContext = rmContext;
    this.dispatcher = rmContext.getDispatcher();
    this.handler = dispatcher.getEventHandler();
    this.conf = config;
    this.user = StringInterner.weakIntern(
        (userUgi != null) ? userUgi.getShortUserName() : null);
    this.userUgi = userUgi;
    this.queue = queue;
    this.submissionContext = submissionContext;
    this.scheduler = scheduler;
    this.masterService = masterService;
    this.submitTime = submitTime;
    if (startTime <= 0) {
      this.startTime = this.systemClock.getTime();
    } else {
      this.startTime = startTime;
    }
    this.applicationType = StringInterner.weakIntern(applicationType);
    this.applicationTags = applicationTags;
    this.amReqs = amReqs;
    if (submissionContext.getPriority() != null) {
      this.applicationPriority = Priority
          .newInstance(submissionContext.getPriority().getPriority());
    } else {
      // If incoming app does not have priority configured in submission
      // context, system could be assume that its a 0 priority app and could be
      // considered as normal.
      this.applicationPriority = Priority.newInstance(0);
    }

    int globalMaxAppAttempts = conf.getInt(
        YarnConfiguration.GLOBAL_RM_AM_MAX_ATTEMPTS,
        conf.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
            YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS));
    int rmMaxAppAttempts = conf.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
        YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
    int individualMaxAppAttempts = submissionContext.getMaxAppAttempts();
    if (individualMaxAppAttempts <= 0) {
      this.maxAppAttempts = rmMaxAppAttempts;
      LOG.warn("The specific max attempts: " + individualMaxAppAttempts
          + " for application: " + applicationId.getId()
          + " is invalid, because it is less than or equal to zero."
          + " Use the rm max attempts instead.");
    } else if (individualMaxAppAttempts > globalMaxAppAttempts) {
      this.maxAppAttempts = globalMaxAppAttempts;
      LOG.warn("The specific max attempts: " + individualMaxAppAttempts
          + " for application: " + applicationId.getId()
          + " is invalid, because it is out of the range [1, "
          + globalMaxAppAttempts + "]. Use the global max attempts instead.");
    } else {
      this.maxAppAttempts = individualMaxAppAttempts;
    }

    this.attemptFailuresValidityInterval =
        submissionContext.getAttemptFailuresValidityInterval();
    if (this.attemptFailuresValidityInterval > 0) {
      LOG.info("The attemptFailuresValidityInterval for the application: "
          + this.applicationId + " is " + this.attemptFailuresValidityInterval
          + ".");
    }

    ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
    this.readLock = lock.readLock();
    this.writeLock = lock.writeLock();

    this.stateMachine = stateMachineFactory.make(this);

    this.callerContext = CallerContext.getCurrent();

    this.placementContext = placementContext;

    // If applications are not explicitly specifying envs, try to pull from
    // AM container environment lists.
    if(submissionContext.getAMContainerSpec() != null) {
      applicationSchedulingEnvs
          .putAll(submissionContext.getAMContainerSpec().getEnvironment());
    }
    applicationSchedulingEnvs
        .putAll(submissionContext.getApplicationSchedulingPropertiesMap());

    this.logAggregation = new RMAppLogAggregation(conf, readLock, writeLock);

    // amBlacklistingEnabled can be configured globally
    // Just use the global values
    amBlacklistingEnabled =
        conf.getBoolean(
          YarnConfiguration.AM_SCHEDULING_NODE_BLACKLISTING_ENABLED,
          YarnConfiguration.DEFAULT_AM_SCHEDULING_NODE_BLACKLISTING_ENABLED);
    if (amBlacklistingEnabled) {
      blacklistDisableThreshold = conf.getFloat(
          YarnConfiguration.AM_SCHEDULING_NODE_BLACKLISTING_DISABLE_THRESHOLD,
          YarnConfiguration.
          DEFAULT_AM_SCHEDULING_NODE_BLACKLISTING_DISABLE_THRESHOLD);
      // Verify whether blacklistDisableThreshold is valid. And for invalid
      // threshold, reset to global level blacklistDisableThreshold
      // configured.
      if (blacklistDisableThreshold < MINIMUM_AM_BLACKLIST_THRESHOLD_VALUE ||
          blacklistDisableThreshold > MAXIMUM_AM_BLACKLIST_THRESHOLD_VALUE) {
        blacklistDisableThreshold = YarnConfiguration.
            DEFAULT_AM_SCHEDULING_NODE_BLACKLISTING_DISABLE_THRESHOLD;
      }
    }
  }

  /**
   * Starts the application level timeline collector for this app. This should
   * be used only if the timeline service v.2 is enabled.
   */
  public void startTimelineCollector() {
    AppLevelTimelineCollector collector =
        new AppLevelTimelineCollector(applicationId, user);
    rmContext.getRMTimelineCollectorManager().putIfAbsent(
        applicationId, collector);
  }

  /**
   * Stops the application level timeline collector for this app. This should be
   * used only if the timeline service v.2 is enabled.
   */
  public void stopTimelineCollector() {
    rmContext.getRMTimelineCollectorManager().remove(applicationId);
  }

  @Override
  public ApplicationId getApplicationId() {
    return this.applicationId;
  }
  
  @Override
  public ApplicationSubmissionContext getApplicationSubmissionContext() {
    return this.submissionContext;
  }

  @Override
  public FinalApplicationStatus getFinalApplicationStatus() {
    // finish state is obtained based on the state machine's current state
    // as a fall-back in case the application has not been unregistered
    // ( or if the app never unregistered itself )
    // when the report is requested
    if (currentAttempt != null
        && currentAttempt.getFinalApplicationStatus() != null) {
      return currentAttempt.getFinalApplicationStatus();
    }
    return createFinalApplicationStatus(this.stateMachine.getCurrentState());
  }

  @Override
  public RMAppState getState() {
    this.readLock.lock();
    try {
        return this.stateMachine.getCurrentState();
    } finally {
      this.readLock.unlock();
    }
  }

  @Override
  public String getUser() {
    return this.user;
  }

  @Override
  public float getProgress() {
    RMAppAttempt attempt = this.currentAttempt;
    if (attempt != null) {
      return attempt.getProgress();
    }
    return 0;
  }

  @Override
  public RMAppAttempt getRMAppAttempt(ApplicationAttemptId appAttemptId) {
    this.readLock.lock();

    try {
      return this.attempts.get(appAttemptId);
    } finally {
      this.readLock.unlock();
    }
  }

  @Override
  public String getQueue() {
    return this.queue;
  }
  
  @Override
  public void setQueue(String queue) {
    this.queue = queue;
  }

  @Override
  public AppCollectorData getCollectorData() {
    return this.collectorData;
  }

  public void setCollectorData(AppCollectorData incomingData) {
    this.collectorData = incomingData;
    this.collectorInfo = CollectorInfo.newInstance(
        incomingData.getCollectorAddr(), incomingData.getCollectorToken());
  }

  public CollectorInfo getCollectorInfo() {
    return this.collectorInfo;
  }

  public void removeCollectorData() {
    this.collectorData = null;
  }

  @Override
  public String getName() {
    return this.name;
  }

  @Override
  public RMAppAttempt getCurrentAppAttempt() {
    return this.currentAttempt;
  }

  @Override
  public Map<ApplicationAttemptId, RMAppAttempt> getAppAttempts() {
    this.readLock.lock();

    try {
      return Collections.unmodifiableMap(this.attempts);
    } finally {
      this.readLock.unlock();
    }
  }

  private FinalApplicationStatus createFinalApplicationStatus(RMAppState state) {
    switch(state) {
    case NEW:
    case NEW_SAVING:
    case SUBMITTED:
    case ACCEPTED:
    case RUNNING:
    case FINAL_SAVING:
    case KILLING:
      return FinalApplicationStatus.UNDEFINED;    
    // finished without a proper final state is the same as failed  
    case FINISHING:
    case FINISHED:
    case FAILED:
      return FinalApplicationStatus.FAILED;
    case KILLED:
      return FinalApplicationStatus.KILLED;
    }
    throw new YarnRuntimeException("Unknown state passed!");
  }

  @Override
  public int pullRMNodeUpdates(Map<RMNode, NodeUpdateType> upNodes) {
    this.writeLock.lock();
    try {
      int updatedNodeCount = this.updatedNodes.size();
      upNodes.putAll(this.updatedNodes);
      this.updatedNodes.clear();
      return updatedNodeCount;
    } finally {
      this.writeLock.unlock();
    }
  }
  
  @Override
  public ApplicationReport createAndGetApplicationReport(String clientUserName,
      boolean allowAccess) {
    this.readLock.lock();

    try {
      ApplicationAttemptId currentApplicationAttemptId = null;
      org.apache.hadoop.yarn.api.records.Token clientToAMToken = null;
      String trackingUrl = UNAVAILABLE;
      String host = UNAVAILABLE;
      String origTrackingUrl = UNAVAILABLE;
      LogAggregationStatus logAggregationStatus = null;
      int rpcPort = -1;
      ApplicationResourceUsageReport appUsageReport =
          RMServerUtils.DUMMY_APPLICATION_RESOURCE_USAGE_REPORT;
      FinalApplicationStatus finishState = getFinalApplicationStatus();
      String diags = UNAVAILABLE;
      float progress = 0.0f;
      org.apache.hadoop.yarn.api.records.Token amrmToken = null;
      if (allowAccess) {
        trackingUrl = rmContext.getAppProxyUrl(conf, applicationId);
        if (this.currentAttempt != null) {
          currentApplicationAttemptId = this.currentAttempt.getAppAttemptId();
          trackingUrl = this.currentAttempt.getTrackingUrl();
          origTrackingUrl = this.currentAttempt.getOriginalTrackingUrl();
          if (UserGroupInformation.isSecurityEnabled()) {
            // get a token so the client can communicate with the app attempt
            // NOTE: token may be unavailable if the attempt is not running
            Token<ClientToAMTokenIdentifier> attemptClientToAMToken =
                this.currentAttempt.createClientToken(clientUserName);
            if (attemptClientToAMToken != null) {
              clientToAMToken = BuilderUtils.newClientToAMToken(
                  attemptClientToAMToken.getIdentifier(),
                  attemptClientToAMToken.getKind().toString(),
                  attemptClientToAMToken.getPassword(),
                  attemptClientToAMToken.getService().toString());
            }
          }
          host = this.currentAttempt.getHost();
          rpcPort = this.currentAttempt.getRpcPort();
          appUsageReport = currentAttempt.getApplicationResourceUsageReport();
          progress = currentAttempt.getProgress();
          logAggregationStatus = this.getLogAggregationStatusForAppReport();
        }
        //if the diagnostics is not already set get it from attempt
        diags = getDiagnostics().toString();

        if (currentAttempt != null && 
            currentAttempt.getAppAttemptState() == RMAppAttemptState.LAUNCHED) {
          if (getApplicationSubmissionContext().getUnmanagedAM() &&
              clientUserName != null && getUser().equals(clientUserName)) {
            Token<AMRMTokenIdentifier> token = currentAttempt.getAMRMToken();
            if (token != null) {
              amrmToken = BuilderUtils.newAMRMToken(token.getIdentifier(),
                  token.getKind().toString(), token.getPassword(),
                  token.getService().toString());
            }
          }
        }

        RMAppMetrics rmAppMetrics = getRMAppMetrics();
        appUsageReport
            .setResourceSecondsMap(rmAppMetrics.getResourceSecondsMap());
        appUsageReport.setPreemptedResourceSecondsMap(
            rmAppMetrics.getPreemptedResourceSecondsMap());
      }

      if (currentApplicationAttemptId == null) {
        currentApplicationAttemptId = 
            BuilderUtils.newApplicationAttemptId(this.applicationId, 
                DUMMY_APPLICATION_ATTEMPT_NUMBER);
      }

      ApplicationReport report = BuilderUtils.newApplicationReport(
          this.applicationId, currentApplicationAttemptId, this.user,
          this.queue, this.name, host, rpcPort, clientToAMToken,
          createApplicationState(), diags, trackingUrl, this.startTime,
          this.launchTime, this.finishTime, finishState, appUsageReport,
          origTrackingUrl, progress, this.applicationType, amrmToken,
          applicationTags, this.getApplicationPriority());
      report.setLogAggregationStatus(logAggregationStatus);
      report.setUnmanagedApp(submissionContext.getUnmanagedAM());
      report.setAppNodeLabelExpression(getAppNodeLabelExpression());
      report.setAmNodeLabelExpression(getAmNodeLabelExpression());
      if (HAUtil.isFederationEnabled(conf)) {
        report.setRMClusterId(YarnConfiguration.getClusterId(conf));
      }

      ApplicationTimeout timeout = ApplicationTimeout
          .newInstance(ApplicationTimeoutType.LIFETIME, UNLIMITED, UNKNOWN);
      // Currently timeout type supported is LIFETIME. When more timeout types
      // are supported in YARN-5692, the below logic need to be changed.
      if (!this.applicationTimeouts.isEmpty()) {
        long timeoutInMillis = applicationTimeouts
            .get(ApplicationTimeoutType.LIFETIME).longValue();
        timeout.setExpiryTime(Times.formatISO8601(timeoutInMillis));
        if (isAppInCompletedStates()) {
          // if application configured with timeout and finished before timeout
          // happens then remaining time should not be calculated.
          timeout.setRemainingTime(0);
        } else {
          timeout.setRemainingTime(
              Math.max((timeoutInMillis - systemClock.getTime()) / 1000, 0));
        }
      }
      report.setApplicationTimeouts(
          Collections.singletonMap(timeout.getTimeoutType(), timeout));
      return report;
    } finally {
      this.readLock.unlock();
    }
  }

  @Override
  public long getFinishTime() {
    this.readLock.lock();

    try {
      return this.finishTime;
    } finally {
      this.readLock.unlock();
    }
  }

  @Override
  public long getStartTime() {
    this.readLock.lock();

    try {
      return this.startTime;
    } finally {
      this.readLock.unlock();
    }
  }

  @Override
  public long getLaunchTime() {
    this.readLock.lock();

    try {
      return this.launchTime;
    } finally {
      this.readLock.unlock();
    }
  }

  @Override
  public long getSubmitTime() {
    return this.submitTime;
  }

  @Override
  public String getTrackingUrl() {
    RMAppAttempt attempt = this.currentAttempt;
    if (attempt != null) {
      return attempt.getTrackingUrl();
    }
    return null;
  }

  @Override
  public String getOriginalTrackingUrl() {
    RMAppAttempt attempt = this.currentAttempt;
    if (attempt != null) {
      return attempt.getOriginalTrackingUrl();
    }
    return null;
  }

  @Override
  public StringBuilder getDiagnostics() {
    this.readLock.lock();
    try {
      if (diagnostics.length() == 0 && getCurrentAppAttempt() != null) {
        String appAttemptDiagnostics = getCurrentAppAttempt().getDiagnostics();
        if (appAttemptDiagnostics != null) {
          return new StringBuilder(appAttemptDiagnostics);
        }
      }
      return this.diagnostics;
    } finally {
      this.readLock.unlock();
    }
  }

  @Override
  public int getMaxAppAttempts() {
    return this.maxAppAttempts;
  }

  @Override
  public void handle(RMAppEvent event) {
    this.writeLock.lock();

    try {
      ApplicationId appID = event.getApplicationId();
      LOG.debug("Processing event for {} of type {}",
          appID, event.getType());

      final RMAppState oldState = getState();
      try {
        /* keep the master in sync with the state machine */
        this.stateMachine.doTransition(event.getType(), event);
      } catch (InvalidStateTransitionException e) {
        LOG.error("App: " + appID
            + " can't handle this event at current state", e);
        onInvalidStateTransition(event.getType(), oldState);
      }

      // Log at INFO if we're not recovering or not in a terminal state.
      // Log at DEBUG otherwise.
      if ((oldState != getState()) &&
          (((recoveredFinalState == null)) ||
            (event.getType() != RMAppEventType.RECOVER))) {
        LOG.info(String.format(STATE_CHANGE_MESSAGE, appID, oldState,
            getState(), event.getType()));
      } else if ((oldState != getState()) && LOG.isDebugEnabled()) {
        LOG.debug(String.format(STATE_CHANGE_MESSAGE, appID, oldState,
            getState(), event.getType()));
      }
    } finally {
      this.writeLock.unlock();
    }
  }

  @Override
  public void recover(RMState state) {
    ApplicationStateData appState =
        state.getApplicationState().get(getApplicationId());
    this.recoveredFinalState = appState.getState();

    if (recoveredFinalState == null) {
      LOG.info(String.format(RECOVERY_MESSAGE, getApplicationId(),
          appState.getAttemptCount(), "NONE"));
    } else if (LOG.isDebugEnabled()) {
      LOG.debug(String.format(RECOVERY_MESSAGE, getApplicationId(),
          appState.getAttemptCount(), recoveredFinalState));
    }

    this.diagnostics.append(null == appState.getDiagnostics() ? "" : appState
        .getDiagnostics());
    this.storedFinishTime = appState.getFinishTime();
    this.startTime = appState.getStartTime();
    this.launchTime = appState.getLaunchTime();
    this.callerContext = appState.getCallerContext();
    this.applicationTimeouts = appState.getApplicationTimeouts();
    // If interval > 0, some attempts might have been deleted.
    if (this.attemptFailuresValidityInterval > 0) {
      this.firstAttemptIdInStateStore = appState.getFirstAttemptId();
      this.nextAttemptId = firstAttemptIdInStateStore;
    }
    //TODO recover collector address.
    //this.collectorAddr = appState.getCollectorAddr();

    // send the ATS create Event during RM recovery.
    // NOTE: it could be duplicated with events sent before RM get restarted.
    sendATSCreateEvent();
    RMAppAttemptImpl preAttempt = null;
    for (ApplicationAttemptId attemptId :
        new TreeSet<>(appState.attempts.keySet())) {
      // create attempt
      createNewAttempt(attemptId);
      ((RMAppAttemptImpl)this.currentAttempt).recover(state);
      // If previous attempt is not in final state, it means we failed to store
      // its final state. We set it to FAILED now because we could not make sure
      // about its final state.
      if (preAttempt != null && preAttempt.getRecoveredFinalState() == null) {
        preAttempt.setRecoveredFinalState(RMAppAttemptState.FAILED);
      }
      preAttempt = (RMAppAttemptImpl)currentAttempt;
    }
    if (currentAttempt != null) {
      nextAttemptId = currentAttempt.getAppAttemptId().getAttemptId() + 1;
    }
  }

  private void createNewAttempt() {
    ApplicationAttemptId appAttemptId =
        ApplicationAttemptId.newInstance(applicationId, nextAttemptId++);
    createNewAttempt(appAttemptId);
  }

  private void createNewAttempt(ApplicationAttemptId appAttemptId) {
    BlacklistManager currentAMBlacklistManager;
    if (currentAttempt != null) {
      // Transfer over the blacklist from the previous app-attempt.
      currentAMBlacklistManager = currentAttempt.getAMBlacklistManager();
    } else {
      if (amBlacklistingEnabled && !submissionContext.getUnmanagedAM()) {
        currentAMBlacklistManager = new SimpleBlacklistManager(
            RMServerUtils.getApplicableNodeCountForAM(rmContext, conf,
                getAMResourceRequests()),
            blacklistDisableThreshold);
      } else {
        currentAMBlacklistManager = new DisabledBlacklistManager();
      }
    }
    RMAppAttempt attempt =
        new RMAppAttemptImpl(appAttemptId, rmContext, scheduler, masterService,
          submissionContext, conf, amReqs, this, currentAMBlacklistManager);
    attempts.put(appAttemptId, attempt);
    currentAttempt = attempt;
  }

  private void
      createAndStartNewAttempt(boolean transferStateFromPreviousAttempt) {
    createNewAttempt();
    handler.handle(new RMAppStartAttemptEvent(currentAttempt.getAppAttemptId(),
      transferStateFromPreviousAttempt));
  }

  private void processNodeUpdate(RMAppNodeUpdateType type, RMNode node) {
    NodeState nodeState = node.getState();
    updatedNodes.put(node, RMAppNodeUpdateType.convertToNodeUpdateType(type));
    LOG.debug("Received node update event:{} for node:{} with state:{}",
        type, node, nodeState);
  }

  private static class RMAppTransition implements
      SingleArcTransition<RMAppImpl, RMAppEvent> {
    public void transition(RMAppImpl app, RMAppEvent event) {
    };
  }

  private static final class RMAppNodeUpdateTransition extends RMAppTransition {
    public void transition(RMAppImpl app, RMAppEvent event) {
      RMAppNodeUpdateEvent nodeUpdateEvent = (RMAppNodeUpdateEvent) event;
      app.processNodeUpdate(nodeUpdateEvent.getUpdateType(),
          nodeUpdateEvent.getNode());
    };
  }

  private static final class RMAppStateUpdateTransition
      extends RMAppTransition {
    private YarnApplicationState stateToATS;

    public RMAppStateUpdateTransition(YarnApplicationState state) {
      stateToATS = state;
    }

    public void transition(RMAppImpl app, RMAppEvent event) {
      app.rmContext.getSystemMetricsPublisher().appStateUpdated(
          app, stateToATS, app.systemClock.getTime());
    };
  }

  private static final class AttemptLaunchedTransition
      extends  RMAppTransition {
    @Override
    public void transition(RMAppImpl app, RMAppEvent event) {

      if(app.launchTime == 0) {
        LOG.info("update the launch time for applicationId: "+
                app.getApplicationId()+", attemptId: "+
                app.getCurrentAppAttempt().getAppAttemptId()+
                "launchTime: "+event.getTimestamp());
        ApplicationStateData appState = ApplicationStateData.newInstance(
            app.submitTime, app.startTime, app.submissionContext, app.user,
            app.callerContext);
        appState.setApplicationTimeouts(app.getApplicationTimeouts());
        appState.setLaunchTime(event.getTimestamp());
        app.rmContext.getStateStore().updateApplicationState(appState, false);
        app.launchTime = event.getTimestamp();
        app.rmContext.getSystemMetricsPublisher().appLaunched(
            app, app.launchTime);
      }
    }
  }

  private static final class AppRunningOnNodeTransition extends RMAppTransition {
    public void transition(RMAppImpl app, RMAppEvent event) {
      RMAppRunningOnNodeEvent nodeAddedEvent = (RMAppRunningOnNodeEvent) event;
      
      // if final state already stored, notify RMNode
      if (isAppInFinalState(app)) {
        app.handler.handle(
            new RMNodeCleanAppEvent(nodeAddedEvent.getNodeId(), nodeAddedEvent
                .getApplicationId()));
        return;
      }
      
      // otherwise, add it to ranNodes for further process
      app.ranNodes.add(nodeAddedEvent.getNodeId());

      if (!nodeAddedEvent.isCreatedFromAcquiredState()) {
        app.logAggregation.addReportIfNecessary(
            nodeAddedEvent.getNodeId(), app.getApplicationId());
      } else {
        LOG.debug("Not considering node for log aggregation yet. nodeId: {}, appId: {}",
            nodeAddedEvent.getNodeId(), app.getApplicationId());
      }
    }
  }

  // synchronously recover attempt to ensure any incoming external events
  // to be processed after the attempt processes the recover event.
  private void recoverAppAttempts() {
    for (RMAppAttempt attempt : getAppAttempts().values()) {
      attempt.handle(new RMAppAttemptEvent(attempt.getAppAttemptId(),
        RMAppAttemptEventType.RECOVER));
    }
  }

  private static final class RMAppRecoveredTransition implements
      MultipleArcTransition<RMAppImpl, RMAppEvent, RMAppState> {

    @Override
    public RMAppState transition(RMAppImpl app, RMAppEvent event) {

      RMAppRecoverEvent recoverEvent = (RMAppRecoverEvent) event;
      app.recover(recoverEvent.getRMState());
      // The app has completed.
      if (app.recoveredFinalState != null) {
        app.recoverAppAttempts();
        new FinalTransition(app.recoveredFinalState).transition(app, event);
        return app.recoveredFinalState;
      }

      if (UserGroupInformation.isSecurityEnabled()) {
        // asynchronously renew delegation token on recovery.
        try {
          app.rmContext.getDelegationTokenRenewer()
              .addApplicationAsyncDuringRecovery(app.getApplicationId(),
                  BuilderUtils.parseCredentials(app.submissionContext),
                  app.submissionContext.getCancelTokensWhenComplete(),
                  app.getUser(),
                  BuilderUtils.parseTokensConf(app.submissionContext));
        } catch (Exception e) {
          String msg = "Failed to fetch user credentials from application:" + e
              .getMessage();
          app.diagnostics.append(msg);
          LOG.error(msg, e);
        }
      }

      for (Map.Entry<ApplicationTimeoutType, Long> timeout : app.applicationTimeouts
          .entrySet()) {
        app.rmContext.getRMAppLifetimeMonitor().registerApp(app.applicationId,
            timeout.getKey(), timeout.getValue());
        if (LOG.isDebugEnabled()) {
          long remainingTime = timeout.getValue() - app.systemClock.getTime();
          LOG.debug("Application " + app.applicationId
              + " is registered for timeout monitor, type=" + timeout.getKey()
              + " remaining timeout=" + (remainingTime > 0 ?
              remainingTime / 1000 :
              0) + " seconds");
        }
      }

      // No existent attempts means the attempt associated with this app was not
      // started or started but not yet saved.
      if (app.attempts.isEmpty()) {
        app.scheduler.handle(
            new AppAddedSchedulerEvent(app.user, app.submissionContext, false,
                app.applicationPriority, app.placementContext));
        return RMAppState.SUBMITTED;
      }

      // Add application to scheduler synchronously to guarantee scheduler
      // knows applications before AM or NM re-registers.
      app.scheduler.handle(
          new AppAddedSchedulerEvent(app.user, app.submissionContext, true,
              app.applicationPriority, app.placementContext));

      // recover attempts
      app.recoverAppAttempts();

      // YARN-1507 is saving the application state after the application is
      // accepted. So after YARN-1507, an app is saved meaning it is accepted.
      // Thus we return ACCECPTED state on recovery.
      return RMAppState.ACCEPTED;
    }
  }

  private static final class AddApplicationToSchedulerTransition extends
      RMAppTransition {
    @Override
    public void transition(RMAppImpl app, RMAppEvent event) {
      app.handler.handle(
          new AppAddedSchedulerEvent(app.user, app.submissionContext, false,
              app.applicationPriority, app.placementContext));
      // send the ATS create Event
      app.sendATSCreateEvent();
    }
  }

  private static final class StartAppAttemptTransition extends RMAppTransition {
    @Override
    public void transition(RMAppImpl app, RMAppEvent event) {
      app.createAndStartNewAttempt(false);
    };
  }

  private static final class FinalStateSavedTransition implements
      MultipleArcTransition<RMAppImpl, RMAppEvent, RMAppState> {

    @Override
    public RMAppState transition(RMAppImpl app, RMAppEvent event) {
      Map<ApplicationTimeoutType, Long> timeouts =
          app.submissionContext.getApplicationTimeouts();
      if (timeouts != null && timeouts.size() > 0) {
        app.rmContext.getRMAppLifetimeMonitor()
            .unregisterApp(app.getApplicationId(), timeouts.keySet());
      }

      if (app.transitionTodo instanceof SingleArcTransition) {
        ((SingleArcTransition) app.transitionTodo).transition(app,
          app.eventCausingFinalSaving);
      } else if (app.transitionTodo instanceof MultipleArcTransition) {
        ((MultipleArcTransition) app.transitionTodo).transition(app,
          app.eventCausingFinalSaving);
      }
      return app.targetedFinalState;
    }
  }

  private static class AttemptFailedFinalStateSavedTransition extends
      RMAppTransition {
    @Override
    public void transition(RMAppImpl app, RMAppEvent event) {
      String msg = null;
      if (event instanceof RMAppFailedAttemptEvent) {
        msg = app.getAppAttemptFailedDiagnostics(event);
      }
      LOG.info(msg);
      app.diagnostics.append(msg);
      // Inform the node for app-finish
      new FinalTransition(RMAppState.FAILED).transition(app, event);
    }
  }

  private String getAppAttemptFailedDiagnostics(RMAppEvent event) {
    String msg = null;
    RMAppFailedAttemptEvent failedEvent = (RMAppFailedAttemptEvent) event;
    if (this.submissionContext.getUnmanagedAM()) {
      // RM does not manage the AM. Do not retry
      msg = "Unmanaged application " + this.getApplicationId()
              + " failed due to " + failedEvent.getDiagnosticMsg()
              + ". Failing the application.";
    } else if (this.isNumAttemptsBeyondThreshold) {
      int globalLimit = conf.getInt(YarnConfiguration.GLOBAL_RM_AM_MAX_ATTEMPTS,
          conf.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
              YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS));
      msg = String.format(
        "Application %s failed %d times%s%s due to %s. Failing the application.",
          getApplicationId(),
          maxAppAttempts,
          (attemptFailuresValidityInterval <= 0 ? ""
               : (" in previous " + attemptFailuresValidityInterval
                  + " milliseconds")),
          (globalLimit == maxAppAttempts) ? ""
              : (" (global limit =" + globalLimit
                 + "; local limit is =" + maxAppAttempts + ")"),
          failedEvent.getDiagnosticMsg());
    }
    return msg;
  }

  private static final class RMAppNewlySavingTransition extends RMAppTransition {
    @Override
    public void transition(RMAppImpl app, RMAppEvent event) {

      long applicationLifetime =
          app.getApplicationLifetime(ApplicationTimeoutType.LIFETIME);
      applicationLifetime = app.scheduler
          .checkAndGetApplicationLifetime(app.queue, applicationLifetime, app);
      if (applicationLifetime > 0) {
        // calculate next timeout value
        Long newTimeout =
            Long.valueOf(app.submitTime + (applicationLifetime * 1000));
        app.rmContext.getRMAppLifetimeMonitor().registerApp(app.applicationId,
            ApplicationTimeoutType.LIFETIME, newTimeout);

        // update applicationTimeouts with new absolute value.
        app.applicationTimeouts.put(ApplicationTimeoutType.LIFETIME,
            newTimeout);

        LOG.info("Application " + app.applicationId
            + " is registered for timeout monitor, type="
            + ApplicationTimeoutType.LIFETIME + " value=" + applicationLifetime
            + " seconds");
      }

      // If recovery is enabled then store the application information in a
      // non-blocking call so make sure that RM has stored the information
      // needed to restart the AM after RM restart without further client
      // communication
      LOG.info("Storing application with id " + app.applicationId);
      app.rmContext.getStateStore().storeNewApplication(app);
    }
  }

  private void rememberTargetTransitions(RMAppEvent event,
      Object transitionToDo, RMAppState targetFinalState) {
    transitionTodo = transitionToDo;
    targetedFinalState = targetFinalState;
    eventCausingFinalSaving = event;
  }

  private void rememberTargetTransitionsAndStoreState(RMAppEvent event,
      Object transitionToDo, RMAppState targetFinalState,
      RMAppState stateToBeStored) {
    rememberTargetTransitions(event, transitionToDo, targetFinalState);
    this.stateBeforeFinalSaving = getState();
    this.storedFinishTime = this.systemClock.getTime();

    LOG.info("Updating application " + this.applicationId
        + " with final state: " + this.targetedFinalState);
    // we lost attempt_finished diagnostics in app, because attempt_finished
    // diagnostics is sent after app final state is saved. Later on, we will
    // create GetApplicationAttemptReport specifically for getting per attempt
    // info.
    String diags = null;
    switch (event.getType()) {
    case APP_REJECTED:
    case ATTEMPT_FINISHED:
    case ATTEMPT_KILLED:
      diags = event.getDiagnosticMsg();
      break;
    case ATTEMPT_FAILED:
      RMAppFailedAttemptEvent failedEvent = (RMAppFailedAttemptEvent) event;
      diags = getAppAttemptFailedDiagnostics(failedEvent);
      break;
    default:
      break;
    }

    ApplicationStateData appState =
        ApplicationStateData.newInstance(this.submitTime, this.startTime,
            this.getUser(), this.getRealUser(), this.submissionContext,
            stateToBeStored, diags, this.launchTime, this.storedFinishTime,
            this.callerContext);
    appState.setApplicationTimeouts(this.applicationTimeouts);
    this.rmContext.getStateStore().updateApplicationState(appState);
  }

  private static final class FinalSavingTransition extends RMAppTransition {
    Object transitionToDo;
    RMAppState targetedFinalState;
    RMAppState stateToBeStored;

    public FinalSavingTransition(Object transitionToDo,
        RMAppState targetedFinalState) {
      this(transitionToDo, targetedFinalState, targetedFinalState);
    }

    public FinalSavingTransition(Object transitionToDo,
        RMAppState targetedFinalState, RMAppState stateToBeStored) {
      this.transitionToDo = transitionToDo;
      this.targetedFinalState = targetedFinalState;
      this.stateToBeStored = stateToBeStored;
    }

    @Override
    public void transition(RMAppImpl app, RMAppEvent event) {
      app.rememberTargetTransitionsAndStoreState(event, transitionToDo,
          targetedFinalState, stateToBeStored);
    }
  }

  private static class AttemptUnregisteredTransition extends RMAppTransition {
    @Override
    public void transition(RMAppImpl app, RMAppEvent event) {
      app.finishTime = app.storedFinishTime;
    }
  }

  private static class AppFinishedTransition extends FinalTransition {
    public AppFinishedTransition() {
      super(RMAppState.FINISHED);
    }

    public void transition(RMAppImpl app, RMAppEvent event) {
      app.diagnostics.append(event.getDiagnosticMsg());
      super.transition(app, event);
    };
  }

  private static class AttemptFinishedAtFinalSavingTransition extends
      RMAppTransition {
    @Override
    public void transition(RMAppImpl app, RMAppEvent event) {
      if (app.targetedFinalState.equals(RMAppState.FAILED)
          || app.targetedFinalState.equals(RMAppState.KILLED)) {
        // Ignore Attempt_Finished event if we were supposed to reach FAILED
        // FINISHED state
        return;
      }

      // pass in the earlier attempt_unregistered event, as it is needed in
      // AppFinishedFinalStateSavedTransition later on
      app.rememberTargetTransitions(event,
        new AppFinishedFinalStateSavedTransition(app.eventCausingFinalSaving),
        RMAppState.FINISHED);
    };
  }

  private static class AppFinishedFinalStateSavedTransition extends
      RMAppTransition {
    RMAppEvent attemptUnregistered;

    public AppFinishedFinalStateSavedTransition(RMAppEvent attemptUnregistered) {
      this.attemptUnregistered = attemptUnregistered;
    }
    @Override
    public void transition(RMAppImpl app, RMAppEvent event) {
      new AttemptUnregisteredTransition().transition(app, attemptUnregistered);
      FINISHED_TRANSITION.transition(app, event);
    };
  }

  /**
   * Log the audit event for kill by client.
   *
   * @param event
   *          The {@link RMAppEvent} to be logged
   */
  static void auditLogKillEvent(RMAppEvent event) {
    if (event instanceof RMAppKillByClientEvent) {
      RMAppKillByClientEvent killEvent = (RMAppKillByClientEvent) event;
      UserGroupInformation callerUGI = killEvent.getCallerUGI();
      String userName = null;
      if (callerUGI != null) {
        userName = callerUGI.getShortUserName();
      }
      InetAddress remoteIP = killEvent.getIp();
      RMAuditLogger.logSuccess(userName, AuditConstants.KILL_APP_REQUEST,
          "RMAppImpl", event.getApplicationId(), remoteIP);
    }
  }

  private static class AppKilledTransition extends FinalTransition {
    public AppKilledTransition() {
      super(RMAppState.KILLED);
    }

    @Override
    public void transition(RMAppImpl app, RMAppEvent event) {
      app.diagnostics.append(event.getDiagnosticMsg());
      super.transition(app, event);
      RMAppImpl.auditLogKillEvent(event);
    };
  }

  private static class KillAttemptTransition extends RMAppTransition {
    @Override
    public void transition(RMAppImpl app, RMAppEvent event) {
      app.stateBeforeKilling = app.getState();
      // Forward app kill diagnostics in the event to kill app attempt.
      // These diagnostics will be returned back in ATTEMPT_KILLED event sent by
      // RMAppAttemptImpl.
      app.handler.handle(
          new RMAppAttemptEvent(app.currentAttempt.getAppAttemptId(),
              RMAppAttemptEventType.KILL, event.getDiagnosticMsg()));
      RMAppImpl.auditLogKillEvent(event);
    }
  }

  private static final class AppRejectedTransition extends FinalTransition {
    public AppRejectedTransition() {
      super(RMAppState.FAILED);
    }

    public void transition(RMAppImpl app, RMAppEvent event) {
      app.diagnostics.append(event.getDiagnosticMsg());
      super.transition(app, event);
    };
  }

  /**
   * Attempt to perform a type-specific cleanup after application has completed.
   *
   * @param app application to clean up
   */
  static void appAdminClientCleanUp(RMAppImpl app) {
    try {
      AppAdminClient client = AppAdminClient.createAppAdminClient(app
          .applicationType, app.conf);
      int result = client.actionCleanUp(app.name, app.user);
      if (result == 0) {
        LOG.info("Type-specific cleanup of application " + app.applicationId
            + " of type " + app.applicationType + " succeeded");
      } else {
        LOG.warn("Type-specific cleanup of application " + app.applicationId
            + " of type " + app.applicationType + " did not succeed with exit"
            + " code " + result);
      }
    } catch (IllegalArgumentException e) {
      // no AppAdminClient class has been specified for the application type,
      // so this does not need to be logged
    } catch (Exception e) {
      LOG.warn("Could not run type-specific cleanup on application " +
          app.applicationId + " of type " + app.applicationType, e);
    }
  }

  private static class FinalTransition extends RMAppTransition {

    private final RMAppState finalState;

    FinalTransition(RMAppState finalState) {
      this.finalState = finalState;
    }

    @Override
    public void transition(RMAppImpl app, RMAppEvent event) {
      completeAndCleanupApp(app);
      handleAppFinished(app);
      app.clearUnusedFields();
      appAdminClientCleanUp(app);
    }

    private void completeAndCleanupApp(RMAppImpl app) {
      //cleanup app in RM Nodes
      for (NodeId nodeId : app.getRanNodes()) {
        app.handler.handle(
                new RMNodeCleanAppEvent(nodeId, app.applicationId));
      }
      app.ranNodes.clear();
      // Recovered apps that are completed were not added to scheduler, so no
      // need to remove them from scheduler.
      if (app.recoveredFinalState == null) {
        app.handler.handle(new AppRemovedSchedulerEvent(app.applicationId,
            finalState));
      }

      app.handler.handle(new RMAppManagerEvent(app.applicationId,
              RMAppManagerEventType.APP_COMPLETED));
    }

    private void handleAppFinished(RMAppImpl app) {
      app.logAggregation
          .recordLogAggregationStartTime(app.systemClock.getTime());
      // record finish time
      app.finishTime = app.storedFinishTime;
      if (app.finishTime == 0) {
        app.finishTime = app.systemClock.getTime();
      }

      //record finish in history and metrics
      app.rmContext.getRMApplicationHistoryWriter()
          .applicationFinished(app, finalState);
      app.rmContext.getSystemMetricsPublisher()
          .appFinished(app, finalState, app.finishTime);
    }
  }

  public int getNumFailedAppAttempts() {
    int completedAttempts = 0;
    // Do not count AM preemption, hardware failures or NM resync
    // as attempt failure.
    for (RMAppAttempt attempt : attempts.values()) {
      if (attempt.shouldCountTowardsMaxAttemptRetry()) {
        completedAttempts++;
      }
    }
    return completedAttempts;
  }

  private static final class AttemptFailedTransition implements
        MultipleArcTransition<RMAppImpl, RMAppEvent, RMAppState> {

    private final RMAppState initialState;

    public AttemptFailedTransition(RMAppState initialState) {
      this.initialState = initialState;
    }

    @Override
    public RMAppState transition(RMAppImpl app, RMAppEvent event) {
      int numberOfFailure = app.getNumFailedAppAttempts();
      if (app.maxAppAttempts == 1) {
        // If the user explicitly set the attempts to 1 then there are likely
        // correctness issues if the AM restarts for any reason.
        LOG.info("Max app attempts is 1 for " + app.applicationId
            + ", preventing further attempts.");
        numberOfFailure = app.maxAppAttempts;
      } else {
        LOG.info("The number of failed attempts"
            + (app.attemptFailuresValidityInterval > 0 ? " in previous "
            + app.attemptFailuresValidityInterval + " milliseconds " : " ")
            + "is " + numberOfFailure + ". The max attempts is "
            + app.maxAppAttempts);

        if (app.attemptFailuresValidityInterval > 0) {
          removeExcessAttempts(app);
        }
      }

      if (!app.submissionContext.getUnmanagedAM()
          && numberOfFailure < app.maxAppAttempts) {
        if (initialState.equals(RMAppState.KILLING)) {
          // If this is not last attempt, app should be killed instead of
          // launching a new attempt
          app.rememberTargetTransitionsAndStoreState(event,
            new AppKilledTransition(), RMAppState.KILLED, RMAppState.KILLED);
          return RMAppState.FINAL_SAVING;
        }

        boolean transferStateFromPreviousAttempt;
        RMAppFailedAttemptEvent failedEvent = (RMAppFailedAttemptEvent) event;
        transferStateFromPreviousAttempt =
            failedEvent.getTransferStateFromPreviousAttempt();

        RMAppAttempt oldAttempt = app.currentAttempt;
        app.createAndStartNewAttempt(transferStateFromPreviousAttempt);
        // Transfer the state from the previous attempt to the current attempt.
        // Note that the previous failed attempt may still be collecting the
        // container events from the scheduler and update its data structures
        // before the new attempt is created. We always transferState for
        // finished containers so that they can be acked to NM,
        // but when pulling finished container we will check this flag again.
        ((RMAppAttemptImpl) app.currentAttempt)
          .transferStateFromAttempt(oldAttempt);
        return initialState;
      } else {
        if (numberOfFailure >= app.maxAppAttempts) {
          app.isNumAttemptsBeyondThreshold = true;
        }
        app.rememberTargetTransitionsAndStoreState(event,
          new AttemptFailedFinalStateSavedTransition(), RMAppState.FAILED,
          RMAppState.FAILED);
        return RMAppState.FINAL_SAVING;
      }
    }

    private void removeExcessAttempts(RMAppImpl app) {
      while (app.nextAttemptId
          - app.firstAttemptIdInStateStore > app.maxAppAttempts) {
        // attempts' first element is oldest attempt because it is a
        // LinkedHashMap
        ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(
            app.getApplicationId(), app.firstAttemptIdInStateStore);
        RMAppAttempt rmAppAttempt = app.getRMAppAttempt(attemptId);
        long endTime = app.systemClock.getTime();
        if (rmAppAttempt.getFinishTime() < (endTime
            - app.attemptFailuresValidityInterval)) {
          app.firstAttemptIdInStateStore++;
          LOG.info("Remove attempt from state store : " + attemptId);
          app.rmContext.getStateStore().removeApplicationAttempt(attemptId);
        } else {
          break;
        }
      }
    }
  }

  @Override
  public String getApplicationType() {
    return this.applicationType;
  }

  @Override
  public Set<String> getApplicationTags() {
    return this.applicationTags;
  }

  @Override
  public boolean isAppFinalStateStored() {
    RMAppState state = getState();
    return state.equals(RMAppState.FINISHING)
        || state.equals(RMAppState.FINISHED) || state.equals(RMAppState.FAILED)
        || state.equals(RMAppState.KILLED);
  }

  @Override
  public YarnApplicationState createApplicationState() {
    RMAppState rmAppState = getState();
    // If App is in FINAL_SAVING state, return its previous state.
    if (rmAppState.equals(RMAppState.FINAL_SAVING)) {
      rmAppState = stateBeforeFinalSaving;
    }
    if (rmAppState.equals(RMAppState.KILLING)) {
      rmAppState = stateBeforeKilling;
    }
    return RMServerUtils.createApplicationState(rmAppState);
  }
  
  public static boolean isAppInFinalState(RMApp rmApp) {
    RMAppState appState = ((RMAppImpl) rmApp).getRecoveredFinalState();
    if (appState == null) {
      appState = rmApp.getState();
    }
    return appState == RMAppState.FAILED || appState == RMAppState.FINISHED
        || appState == RMAppState.KILLED;
  }

  @Override
  public boolean isAppInCompletedStates() {
    RMAppState appState = getState();
    return appState == RMAppState.FINISHED || appState == RMAppState.FINISHING
        || appState == RMAppState.FAILED || appState == RMAppState.KILLED
        || appState == RMAppState.FINAL_SAVING
        || appState == RMAppState.KILLING;
  }

  @Override
  public ApplicationPlacementContext getApplicationPlacementContext() {
    return placementContext;
  }

  public RMAppState getRecoveredFinalState() {
    return this.recoveredFinalState;
  }

  @Override
  public Set<NodeId> getRanNodes() {
    return ranNodes;
  }
  
  @Override
  public RMAppMetrics getRMAppMetrics() {
    Resource resourcePreempted = Resource.newInstance(0, 0);
    int numAMContainerPreempted = 0;
    int numNonAMContainerPreempted = 0;
    Map<String, Long> resourceSecondsMap = new HashMap<>();
    Map<String, Long> preemptedSecondsMap = new HashMap<>();
    int totalAllocatedContainers = 0;
    this.readLock.lock();
    try {
      for (RMAppAttempt attempt : attempts.values()) {
        if (null != attempt) {
          RMAppAttemptMetrics attemptMetrics =
              attempt.getRMAppAttemptMetrics();
          Resources.addTo(resourcePreempted,
              attemptMetrics.getResourcePreempted());
          numAMContainerPreempted += attemptMetrics.getIsPreempted() ? 1 : 0;
          numNonAMContainerPreempted +=
              attemptMetrics.getNumNonAMContainersPreempted();
          // getAggregateAppResourceUsage() will calculate resource usage stats
          // for both running and finished containers.
          AggregateAppResourceUsage resUsage =
              attempt.getRMAppAttemptMetrics().getAggregateAppResourceUsage();
          for (Map.Entry<String, Long> entry : resUsage
              .getResourceUsageSecondsMap().entrySet()) {
            long value = RMServerUtils
                .getOrDefault(resourceSecondsMap, entry.getKey(), 0L);
            value += entry.getValue();
            resourceSecondsMap.put(entry.getKey(), value);
          }
          for (Map.Entry<String, Long> entry : attemptMetrics
              .getPreemptedResourceSecondsMap().entrySet()) {
            long value = RMServerUtils
                .getOrDefault(preemptedSecondsMap, entry.getKey(), 0L);
            value += entry.getValue();
            preemptedSecondsMap.put(entry.getKey(), value);
          }
          totalAllocatedContainers +=
              attemptMetrics.getTotalAllocatedContainers();
        }
      }
    } finally {
      this.readLock.unlock();
    }

    return new RMAppMetrics(resourcePreempted, numNonAMContainerPreempted,
        numAMContainerPreempted, resourceSecondsMap, preemptedSecondsMap,
        totalAllocatedContainers);
  }

  @Private
  @VisibleForTesting
  public void setSystemClock(Clock clock) {
    this.systemClock = clock;
  }

  @Override
  public ReservationId getReservationId() {
    return submissionContext.getReservationID();
  }
  
  @Override
  public List<ResourceRequest> getAMResourceRequests() {
    return this.amReqs;
  }

  @Override
  public Map<NodeId, LogAggregationReport> getLogAggregationReportsForApp() {
    return logAggregation.getLogAggregationReportsForApp(this);
  }

  public void aggregateLogReport(NodeId nodeId, LogAggregationReport report) {
    logAggregation.aggregateLogReport(nodeId, report, this);
  }

  public String getLogAggregationFailureMessagesForNM(NodeId nodeId) {
    return logAggregation.getLogAggregationFailureMessagesForNM(nodeId);
  }

  @Override
  public LogAggregationStatus getLogAggregationStatusForAppReport() {
    return logAggregation
        .getLogAggregationStatusForAppReport(this);
  }

  @Override
  public String getAppNodeLabelExpression() {
    String appNodeLabelExpression =
        getApplicationSubmissionContext().getNodeLabelExpression();
    appNodeLabelExpression = (appNodeLabelExpression == null)
        ? NodeLabel.NODE_LABEL_EXPRESSION_NOT_SET : appNodeLabelExpression;
    appNodeLabelExpression = (appNodeLabelExpression.trim().isEmpty())
        ? NodeLabel.DEFAULT_NODE_LABEL_PARTITION : appNodeLabelExpression;
    return appNodeLabelExpression;
  }

  @Override
  public String getAmNodeLabelExpression() {
    String amNodeLabelExpression = null;
    if (!getApplicationSubmissionContext().getUnmanagedAM()) {
      amNodeLabelExpression =
          getAMResourceRequests() != null && !getAMResourceRequests().isEmpty()
              ? getAMResourceRequests().get(0).getNodeLabelExpression() : null;
      amNodeLabelExpression = (amNodeLabelExpression == null)
          ? NodeLabel.NODE_LABEL_EXPRESSION_NOT_SET : amNodeLabelExpression;
      amNodeLabelExpression = (amNodeLabelExpression.trim().isEmpty())
          ? NodeLabel.DEFAULT_NODE_LABEL_PARTITION : amNodeLabelExpression;
    }
    return amNodeLabelExpression;
  }

  @Override
  public CallerContext getCallerContext() {
    return callerContext;
  }

  private void sendATSCreateEvent() {
    rmContext.getRMApplicationHistoryWriter().applicationStarted(this);
    rmContext.getSystemMetricsPublisher().appCreated(this, this.startTime);
    String appViewACLs = submissionContext.getAMContainerSpec()
        .getApplicationACLs().get(ApplicationAccessType.VIEW_APP);
    rmContext.getSystemMetricsPublisher().appACLsUpdated(
        this, appViewACLs, systemClock.getTime());
  }

  @Private
  @VisibleForTesting
  public int getNextAttemptId() {
    return nextAttemptId;
  }

  private long getApplicationLifetime(ApplicationTimeoutType type) {
    Map<ApplicationTimeoutType, Long> timeouts =
        this.submissionContext.getApplicationTimeouts();
    long applicationLifetime = -1;
    if (timeouts != null && timeouts.containsKey(type)) {
      applicationLifetime = timeouts.get(type);
    }
    return applicationLifetime;
  }

  @Override
  public Map<ApplicationTimeoutType, Long> getApplicationTimeouts() {
    this.readLock.lock();
    try {
      return new HashMap(this.applicationTimeouts);
    } finally {
      this.readLock.unlock();
    }
  }

  public void updateApplicationTimeout(
      Map<ApplicationTimeoutType, Long> updateTimeout) {
    this.writeLock.lock();
    try {
      if (COMPLETED_APP_STATES.contains(getState())) {
        return;
      }
      // update monitoring service
      this.rmContext.getRMAppLifetimeMonitor()
          .updateApplicationTimeouts(getApplicationId(), updateTimeout);
      this.applicationTimeouts.putAll(updateTimeout);

    } finally {
      this.writeLock.unlock();
    }
  }

  @Override
  public Priority getApplicationPriority() {
    return applicationPriority;
  }

  public void setApplicationPriority(Priority applicationPriority) {
    this.applicationPriority = applicationPriority;
  }

  /**
     * Clear Unused fields to free memory.
     */
  private void clearUnusedFields() {
    this.submissionContext.setAMContainerSpec(null);
    this.submissionContext.setLogAggregationContext(null);
  }

  @Override
  public Map<String, String> getApplicationSchedulingEnvs() {
    return this.applicationSchedulingEnvs;
  }

  /**
   * catch the InvalidStateTransition.
   *
   * @param state RMAppState.
   * @param rmAppEventType RMAppEventType.
   */
  protected void onInvalidStateTransition(RMAppEventType rmAppEventType,
              RMAppState state){
      /* TODO fail the application on the failed transition */
  }

  @VisibleForTesting
  long getLogAggregationStartTime() {
    return logAggregation.getLogAggregationStartTime();
  }

  Clock getSystemClock() {
    return systemClock;
  }

  @Override
  public String getRealUser() {
    UserGroupInformation realUserUgi = this.userUgi.getRealUser();
    return (realUserUgi != null) ? realUserUgi.getShortUserName() : null;
  }
}