ContainerImpl.java

/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.yarn.server.nodemanager.containermanager.container;

import java.io.File;
import java.io.IOException;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Date;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.yarn.api.records.ContainerSubState;
import org.apache.hadoop.yarn.api.records.LocalizationStatus;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.runtime.ContainerExecutionException;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.UpdateContainerSchedulerEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerRetryContext;
import org.apache.hadoop.yarn.api.records.ContainerRetryPolicy;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger;
import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationContainerFinishedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourceRequest;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceSet;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache.SharedCacheUploadEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache.SharedCacheUploadEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerFinishedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainerMetrics;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainerStartMonitoringEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainerStopMonitoringEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerSchedulerEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerSchedulerEventType;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerState;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerStatus;
import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.state.InvalidStateTransitionException;
import org.apache.hadoop.yarn.state.MultipleArcTransition;
import org.apache.hadoop.yarn.state.SingleArcTransition;
import org.apache.hadoop.yarn.state.StateMachine;
import org.apache.hadoop.yarn.state.StateMachineFactory;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.hadoop.yarn.util.resource.Resources;

public class ContainerImpl implements Container {
  private enum LocalizationCounter {
    // 1-to-1 correspondence with MR TaskCounter.LOCALIZED_*
    BYTES_MISSED,
    BYTES_CACHED,
    FILES_MISSED,
    FILES_CACHED,
    MILLIS;
  }

  private static final class ReInitializationContext {
    private final ContainerLaunchContext newLaunchContext;
    private final ResourceSet newResourceSet;

    // Rollback state
    private final ContainerLaunchContext oldLaunchContext;
    private final ResourceSet oldResourceSet;

    private boolean isRollback = false;

    private ReInitializationContext(ContainerLaunchContext newLaunchContext,
        ResourceSet newResourceSet,
        ContainerLaunchContext oldLaunchContext,
        ResourceSet oldResourceSet) {
      this.newLaunchContext = newLaunchContext;
      this.newResourceSet = newResourceSet;
      this.oldLaunchContext = oldLaunchContext;
      this.oldResourceSet = oldResourceSet;
    }

    private boolean canRollback() {
      return (oldLaunchContext != null);
    }

    private ResourceSet mergedResourceSet(ResourceSet current) {
      if (isRollback) {
        // No merging should be done for rollback
        return newResourceSet;
      }
      if (current == newResourceSet) {
        // This happens during a restart
        return current;
      }
      return ResourceSet.merge(current, newResourceSet);
    }

    private ReInitializationContext createContextForRollback() {
      ReInitializationContext cntxt = new ReInitializationContext(
          oldLaunchContext, oldResourceSet, null, null);
      cntxt.isRollback = true;
      return cntxt;
    }
  }

  private final SimpleDateFormat dateFormat =
      new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
  private final Lock readLock;
  private final Lock writeLock;
  private final Dispatcher dispatcher;
  private final NMStateStoreService stateStore;
  private final Credentials credentials;
  private final NodeManagerMetrics metrics;
  private final long[] localizationCounts =
      new long[LocalizationCounter.values().length];

  private volatile ContainerLaunchContext launchContext;
  private volatile ContainerTokenIdentifier containerTokenIdentifier;
  private final ContainerId containerId;
  private final String user;
  private int version;
  private int exitCode = ContainerExitStatus.INVALID;
  private final StringBuilder diagnostics;
  private final int diagnosticsMaxSize;
  private boolean wasLaunched;
  private boolean wasPaused;
  private long containerLocalizationStartTime;
  private long containerLaunchStartTime;
  private ContainerMetrics containerMetrics;
  private static Clock clock = SystemClock.getInstance();

  private ContainerRetryContext containerRetryContext;
  private SlidingWindowRetryPolicy.RetryContext windowRetryContext;
  private SlidingWindowRetryPolicy retryPolicy;

  private String csiVolumesRootDir;
  private String workDir;
  private String logDir;
  private String host;
  private String ips;
  private String exposedPorts;
  private volatile ReInitializationContext reInitContext;
  private volatile boolean isReInitializing = false;
  private volatile boolean isMarkeForKilling = false;
  private Object containerRuntimeData;

  /** The NM-wide configuration - not specific to this container */
  private final Configuration daemonConf;
  private final long startTime;

  private static final Logger LOG =
       LoggerFactory.getLogger(ContainerImpl.class);

  // whether container has been recovered after a restart
  private RecoveredContainerStatus recoveredStatus =
      RecoveredContainerStatus.REQUESTED;
  // whether container was marked as killed after recovery
  private boolean recoveredAsKilled = false;
  private Context context;
  private ResourceSet resourceSet;
  private ResourceMappings resourceMappings;

  public ContainerImpl(Configuration conf, Dispatcher dispatcher,
      ContainerLaunchContext launchContext, Credentials creds,
      NodeManagerMetrics metrics,
      ContainerTokenIdentifier containerTokenIdentifier, Context context) {
    this(conf, dispatcher, launchContext, creds, metrics,
        containerTokenIdentifier, context, SystemClock.getInstance().getTime());
  }

  public ContainerImpl(Configuration conf, Dispatcher dispatcher,
      ContainerLaunchContext launchContext, Credentials creds,
      NodeManagerMetrics metrics,
      ContainerTokenIdentifier containerTokenIdentifier, Context context,
      long startTs) {
    this.startTime = startTs;
    this.daemonConf = conf;
    this.dispatcher = dispatcher;
    this.stateStore = context.getNMStateStore();
    this.version = containerTokenIdentifier.getVersion();
    this.launchContext = launchContext;

    this.diagnosticsMaxSize = conf.getInt(
        YarnConfiguration.NM_CONTAINER_DIAGNOSTICS_MAXIMUM_SIZE,
        YarnConfiguration.DEFAULT_NM_CONTAINER_DIAGNOSTICS_MAXIMUM_SIZE);
    this.containerTokenIdentifier = containerTokenIdentifier;
    this.containerId = containerTokenIdentifier.getContainerID();
    this.diagnostics = new StringBuilder();
    this.credentials = creds;
    this.metrics = metrics;
    user = containerTokenIdentifier.getApplicationSubmitter();
    ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
    this.readLock = readWriteLock.readLock();
    this.writeLock = readWriteLock.writeLock();
    this.context = context;
    boolean containerMetricsEnabled =
        conf.getBoolean(YarnConfiguration.NM_CONTAINER_METRICS_ENABLE,
            YarnConfiguration.DEFAULT_NM_CONTAINER_METRICS_ENABLE);

    if (containerMetricsEnabled) {
      long flushPeriod =
          conf.getLong(YarnConfiguration.NM_CONTAINER_METRICS_PERIOD_MS,
              YarnConfiguration.DEFAULT_NM_CONTAINER_METRICS_PERIOD_MS);
      long unregisterDelay = conf.getLong(
          YarnConfiguration.NM_CONTAINER_METRICS_UNREGISTER_DELAY_MS,
          YarnConfiguration.DEFAULT_NM_CONTAINER_METRICS_UNREGISTER_DELAY_MS);
      containerMetrics = ContainerMetrics
          .forContainer(containerId, flushPeriod, unregisterDelay);
      containerMetrics.recordStartTime(clock.getTime());
    }

    // Configure the Retry Context
    this.containerRetryContext = configureRetryContext(
        conf, launchContext, this.containerId);
    this.windowRetryContext = new SlidingWindowRetryPolicy
        .RetryContext(containerRetryContext);
    this.retryPolicy = new SlidingWindowRetryPolicy(clock);

    stateMachine = stateMachineFactory.make(this, ContainerState.NEW,
        context.getContainerStateTransitionListener());
    this.context = context;
    this.resourceSet = new ResourceSet();
    this.resourceMappings = new ResourceMappings();
  }

  private static ContainerRetryContext configureRetryContext(
      Configuration conf, ContainerLaunchContext launchContext,
      ContainerId containerId) {
    ContainerRetryContext context;
    if (launchContext != null
        && launchContext.getContainerRetryContext() != null) {
      context = launchContext.getContainerRetryContext();
    } else {
      context = ContainerRetryContext.NEVER_RETRY_CONTEXT;
    }
    int minimumRestartInterval = conf.getInt(
        YarnConfiguration.NM_CONTAINER_RETRY_MINIMUM_INTERVAL_MS,
        YarnConfiguration.DEFAULT_NM_CONTAINER_RETRY_MINIMUM_INTERVAL_MS);
    if (context.getRetryPolicy() != ContainerRetryPolicy.NEVER_RETRY
        && context.getRetryInterval() < minimumRestartInterval) {
      LOG.info("Set restart interval to minimum value " + minimumRestartInterval
          + "ms for container " + containerId);
      context.setRetryInterval(minimumRestartInterval);
    }
    return context;
  }

  // constructor for a recovered container
  public ContainerImpl(Configuration conf, Dispatcher dispatcher,
      ContainerLaunchContext launchContext, Credentials creds,
      NodeManagerMetrics metrics,
      ContainerTokenIdentifier containerTokenIdentifier, Context context,
      RecoveredContainerState rcs) {
    this(conf, dispatcher, launchContext, creds, metrics,
        containerTokenIdentifier, context, rcs.getStartTime());
    this.recoveredStatus = rcs.getStatus();
    this.exitCode = rcs.getExitCode();
    this.recoveredAsKilled = rcs.getKilled();
    this.diagnostics.append(rcs.getDiagnostics());
    this.version = rcs.getVersion();
    this.windowRetryContext.setRemainingRetries(
        rcs.getRemainingRetryAttempts());
    this.windowRetryContext.setRestartTimes(rcs.getRestartTimes());
    this.workDir = rcs.getWorkDir();
    this.logDir = rcs.getLogDir();
    this.resourceMappings = rcs.getResourceMappings();
  }

  private static final ContainerDiagnosticsUpdateTransition UPDATE_DIAGNOSTICS_TRANSITION =
      new ContainerDiagnosticsUpdateTransition();

  // State Machine for each container.
  private static StateMachineFactory
           <ContainerImpl, ContainerState, ContainerEventType, ContainerEvent>
        stateMachineFactory =
      new StateMachineFactory<ContainerImpl, ContainerState, ContainerEventType, ContainerEvent>(ContainerState.NEW)
    // From NEW State
    .addTransition(ContainerState.NEW,
        EnumSet.of(ContainerState.LOCALIZING,
            ContainerState.SCHEDULED,
            ContainerState.LOCALIZATION_FAILED,
            ContainerState.DONE),
        ContainerEventType.INIT_CONTAINER, new RequestResourcesTransition())
    .addTransition(ContainerState.NEW, ContainerState.NEW,
        ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
        UPDATE_DIAGNOSTICS_TRANSITION)
    .addTransition(ContainerState.NEW, ContainerState.DONE,
        ContainerEventType.KILL_CONTAINER, new KillOnNewTransition())
    .addTransition(ContainerState.NEW, ContainerState.NEW,
        ContainerEventType.UPDATE_CONTAINER_TOKEN, new UpdateTransition())

    // From LOCALIZING State
    .addTransition(ContainerState.LOCALIZING,
        EnumSet.of(ContainerState.LOCALIZING, ContainerState.SCHEDULED),
        ContainerEventType.RESOURCE_LOCALIZED, new LocalizedTransition())
    .addTransition(ContainerState.LOCALIZING,
        ContainerState.LOCALIZATION_FAILED,
        ContainerEventType.RESOURCE_FAILED,
        new ResourceFailedTransition())
    .addTransition(ContainerState.LOCALIZING, ContainerState.LOCALIZING,
        ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
        UPDATE_DIAGNOSTICS_TRANSITION)
    .addTransition(ContainerState.LOCALIZING, ContainerState.KILLING,
        ContainerEventType.KILL_CONTAINER,
        new KillBeforeRunningTransition())
    .addTransition(ContainerState.LOCALIZING, ContainerState.LOCALIZING,
        ContainerEventType.UPDATE_CONTAINER_TOKEN, new UpdateTransition())


    // From LOCALIZATION_FAILED State
    .addTransition(ContainerState.LOCALIZATION_FAILED,
        ContainerState.DONE,
        ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP,
        new LocalizationFailedToDoneTransition())
    .addTransition(ContainerState.LOCALIZATION_FAILED,
        ContainerState.LOCALIZATION_FAILED,
        ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
        UPDATE_DIAGNOSTICS_TRANSITION)
    // container not launched so kill is a no-op
    .addTransition(ContainerState.LOCALIZATION_FAILED,
        ContainerState.LOCALIZATION_FAILED,
        EnumSet.of(ContainerEventType.KILL_CONTAINER,
            ContainerEventType.PAUSE_CONTAINER))
    // container cleanup triggers a release of all resources
    // regardless of whether they were localized or not
    // LocalizedResource handles release event in all states
    .addTransition(ContainerState.LOCALIZATION_FAILED,
        ContainerState.LOCALIZATION_FAILED,
        ContainerEventType.RESOURCE_LOCALIZED)
    .addTransition(ContainerState.LOCALIZATION_FAILED,
        ContainerState.LOCALIZATION_FAILED,
        ContainerEventType.RESOURCE_FAILED)
    .addTransition(ContainerState.LOCALIZATION_FAILED,
        ContainerState.LOCALIZATION_FAILED,
        ContainerEventType.UPDATE_CONTAINER_TOKEN, new UpdateTransition())

    // From SCHEDULED State
    .addTransition(ContainerState.SCHEDULED, ContainerState.RUNNING,
        ContainerEventType.CONTAINER_LAUNCHED, new LaunchTransition())
    .addTransition(ContainerState.SCHEDULED, ContainerState.PAUSED,
        ContainerEventType.RECOVER_PAUSED_CONTAINER,
        new RecoveredContainerTransition())
    .addTransition(ContainerState.SCHEDULED, ContainerState.EXITED_WITH_FAILURE,
        ContainerEventType.CONTAINER_EXITED_WITH_FAILURE,
        new ExitedWithFailureTransition(true))
    .addTransition(ContainerState.SCHEDULED, ContainerState.SCHEDULED,
       ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
       UPDATE_DIAGNOSTICS_TRANSITION)
    .addTransition(ContainerState.SCHEDULED, ContainerState.KILLING,
        ContainerEventType.KILL_CONTAINER,
        new KillTransition())
    .addTransition(ContainerState.SCHEDULED, ContainerState.SCHEDULED,
        ContainerEventType.UPDATE_CONTAINER_TOKEN,
        new NotifyContainerSchedulerOfUpdateTransition())

    // From RUNNING State
    .addTransition(ContainerState.RUNNING,
        ContainerState.EXITED_WITH_SUCCESS,
        ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS,
        new ExitedWithSuccessTransition(true))
    .addTransition(ContainerState.RUNNING,
        EnumSet.of(ContainerState.RELAUNCHING,
            ContainerState.SCHEDULED,
            ContainerState.EXITED_WITH_FAILURE),
        ContainerEventType.CONTAINER_EXITED_WITH_FAILURE,
        new RetryFailureTransition())
    .addTransition(ContainerState.RUNNING,
        EnumSet.of(ContainerState.RUNNING,
            ContainerState.REINITIALIZING,
            ContainerState.REINITIALIZING_AWAITING_KILL),
        ContainerEventType.REINITIALIZE_CONTAINER,
        new ReInitializeContainerTransition())
    .addTransition(ContainerState.RUNNING,
        EnumSet.of(ContainerState.RUNNING,
            ContainerState.REINITIALIZING,
            ContainerState.REINITIALIZING_AWAITING_KILL),
        ContainerEventType.ROLLBACK_REINIT,
        new RollbackContainerTransition())
    .addTransition(ContainerState.RUNNING, ContainerState.RUNNING,
        ContainerEventType.RESOURCE_LOCALIZED,
        new ResourceLocalizedWhileRunningTransition())
    .addTransition(ContainerState.RUNNING, ContainerState.RUNNING,
        ContainerEventType.RESOURCE_FAILED,
        new ResourceLocalizationFailedWhileRunningTransition())
    .addTransition(ContainerState.RUNNING, ContainerState.RUNNING,
       ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
       UPDATE_DIAGNOSTICS_TRANSITION)
    .addTransition(ContainerState.RUNNING, ContainerState.KILLING,
        ContainerEventType.KILL_CONTAINER, new KillTransition())
    .addTransition(ContainerState.RUNNING,
        ContainerState.EXITED_WITH_FAILURE,
        ContainerEventType.CONTAINER_KILLED_ON_REQUEST,
        new KilledExternallyTransition())
    .addTransition(ContainerState.RUNNING, ContainerState.PAUSING,
        ContainerEventType.PAUSE_CONTAINER, new PauseContainerTransition())
    .addTransition(ContainerState.RUNNING, ContainerState.RUNNING,
        ContainerEventType.UPDATE_CONTAINER_TOKEN,
        new NotifyContainerSchedulerOfUpdateTransition())


    // From PAUSING State
    .addTransition(ContainerState.PAUSING, ContainerState.PAUSING,
        ContainerEventType.RESOURCE_LOCALIZED,
        new ResourceLocalizedWhileRunningTransition())
    .addTransition(ContainerState.PAUSING, ContainerState.KILLING,
        ContainerEventType.KILL_CONTAINER, new KillTransition())
    .addTransition(ContainerState.PAUSING, ContainerState.PAUSING,
        ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
        UPDATE_DIAGNOSTICS_TRANSITION)
    .addTransition(ContainerState.PAUSING, ContainerState.PAUSED,
        ContainerEventType.CONTAINER_PAUSED, new PausedContainerTransition())
    // In case something goes wrong then container will exit from the
    // PAUSING state
    .addTransition(ContainerState.PAUSING,
        ContainerState.EXITED_WITH_SUCCESS,
        ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS)
    .addTransition(ContainerState.PAUSING,
        ContainerState.EXITED_WITH_FAILURE,
        ContainerEventType.CONTAINER_EXITED_WITH_FAILURE,
        new ExitedWithFailureTransition(true))
    .addTransition(ContainerState.PAUSING, ContainerState.EXITED_WITH_FAILURE,
        ContainerEventType.CONTAINER_KILLED_ON_REQUEST,
        new KilledExternallyTransition())
    .addTransition(ContainerState.PAUSING, ContainerState.PAUSING,
        ContainerEventType.RESOURCE_LOCALIZED,
        new ResourceLocalizedWhileRunningTransition())
    .addTransition(ContainerState.PAUSING, ContainerState.PAUSING,
        ContainerEventType.UPDATE_CONTAINER_TOKEN,
        new NotifyContainerSchedulerOfUpdateTransition())

    // From PAUSED State
    .addTransition(ContainerState.PAUSED, ContainerState.KILLING,
        ContainerEventType.KILL_CONTAINER, new KillTransition())
    .addTransition(ContainerState.PAUSED, ContainerState.PAUSED,
        ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
        UPDATE_DIAGNOSTICS_TRANSITION)
    .addTransition(ContainerState.PAUSED, ContainerState.PAUSED,
        ContainerEventType.PAUSE_CONTAINER)
    // This can happen during re-initialization.
    .addTransition(ContainerState.PAUSED, ContainerState.PAUSED,
        ContainerEventType.RESOURCE_LOCALIZED,
        new ResourceLocalizedWhileRunningTransition())
    .addTransition(ContainerState.PAUSED, ContainerState.RESUMING,
        ContainerEventType.RESUME_CONTAINER, new ResumeContainerTransition())
    // In case something goes wrong then container will exit from the
    // PAUSED state
    .addTransition(ContainerState.PAUSED,
        ContainerState.EXITED_WITH_FAILURE,
        ContainerEventType.CONTAINER_EXITED_WITH_FAILURE,
        new ExitedWithFailureTransition(true))
    .addTransition(ContainerState.PAUSED, ContainerState.EXITED_WITH_FAILURE,
        ContainerEventType.CONTAINER_KILLED_ON_REQUEST,
        new KilledExternallyTransition())
    .addTransition(ContainerState.PAUSED,
        ContainerState.EXITED_WITH_SUCCESS,
        ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS,
        new ExitedWithSuccessTransition(true))
    .addTransition(ContainerState.PAUSED, ContainerState.PAUSED,
        ContainerEventType.UPDATE_CONTAINER_TOKEN,
        new NotifyContainerSchedulerOfUpdateTransition())

    // From RESUMING State
    .addTransition(ContainerState.RESUMING, ContainerState.KILLING,
        ContainerEventType.KILL_CONTAINER, new KillTransition())
    .addTransition(ContainerState.RESUMING, ContainerState.RUNNING,
        ContainerEventType.CONTAINER_RESUMED)
    .addTransition(ContainerState.RESUMING, ContainerState.RESUMING,
        ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
        UPDATE_DIAGNOSTICS_TRANSITION)
    // This can happen during re-initialization
    .addTransition(ContainerState.RESUMING, ContainerState.RESUMING,
        ContainerEventType.RESOURCE_LOCALIZED,
        new ResourceLocalizedWhileRunningTransition())
    // In case something goes wrong then container will exit from the
    // RESUMING state
    .addTransition(ContainerState.RESUMING,
        ContainerState.EXITED_WITH_FAILURE,
        ContainerEventType.CONTAINER_EXITED_WITH_FAILURE,
        new ExitedWithFailureTransition(true))
    .addTransition(ContainerState.RESUMING,
        ContainerState.EXITED_WITH_FAILURE,
        ContainerEventType.CONTAINER_KILLED_ON_REQUEST,
        new KilledExternallyTransition())
    .addTransition(ContainerState.RESUMING,
        ContainerState.EXITED_WITH_SUCCESS,
        ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS,
        new ExitedWithSuccessTransition(true))
    .addTransition(ContainerState.RESUMING, ContainerState.RESUMING,
        ContainerEventType.UPDATE_CONTAINER_TOKEN,
        new NotifyContainerSchedulerOfUpdateTransition())
    // NOTE - We cannot get a PAUSE_CONTAINER while in RESUMING state.

    // From REINITIALIZING State
    .addTransition(ContainerState.REINITIALIZING,
        ContainerState.EXITED_WITH_SUCCESS,
        ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS,
        new ExitedWithSuccessTransition(true))
    .addTransition(ContainerState.REINITIALIZING,
        ContainerState.EXITED_WITH_FAILURE,
        ContainerEventType.CONTAINER_EXITED_WITH_FAILURE,
        new ExitedWithFailureTransition(true))
    .addTransition(ContainerState.REINITIALIZING,
        EnumSet.of(ContainerState.REINITIALIZING,
            ContainerState.REINITIALIZING_AWAITING_KILL),
        ContainerEventType.RESOURCE_LOCALIZED,
        new ResourceLocalizedWhileReInitTransition())
    .addTransition(ContainerState.REINITIALIZING, ContainerState.RUNNING,
        ContainerEventType.RESOURCE_FAILED,
        new ResourceLocalizationFailedWhileReInitTransition())
    .addTransition(ContainerState.REINITIALIZING,
        ContainerState.REINITIALIZING,
        ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
        UPDATE_DIAGNOSTICS_TRANSITION)
    .addTransition(ContainerState.REINITIALIZING, ContainerState.KILLING,
        ContainerEventType.KILL_CONTAINER, new KillTransition())
    .addTransition(ContainerState.REINITIALIZING, ContainerState.PAUSING,
        ContainerEventType.PAUSE_CONTAINER, new PauseContainerTransition())
    .addTransition(ContainerState.REINITIALIZING,
        ContainerState.REINITIALIZING,
        ContainerEventType.UPDATE_CONTAINER_TOKEN,
        new NotifyContainerSchedulerOfUpdateTransition())

    // from REINITIALIZING_AWAITING_KILL
    .addTransition(ContainerState.REINITIALIZING_AWAITING_KILL,
        ContainerState.EXITED_WITH_SUCCESS,
        ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS,
        new ExitedWithSuccessTransition(true))
    .addTransition(ContainerState.REINITIALIZING_AWAITING_KILL,
        ContainerState.EXITED_WITH_FAILURE,
        ContainerEventType.CONTAINER_EXITED_WITH_FAILURE,
        new ExitedWithFailureTransition(true))
    .addTransition(ContainerState.REINITIALIZING_AWAITING_KILL,
        ContainerState.REINITIALIZING_AWAITING_KILL,
        ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
        UPDATE_DIAGNOSTICS_TRANSITION)
    .addTransition(ContainerState.REINITIALIZING_AWAITING_KILL,
        ContainerState.KILLING,
        ContainerEventType.KILL_CONTAINER, new KillTransition())
    .addTransition(ContainerState.REINITIALIZING_AWAITING_KILL,
        ContainerState.SCHEDULED, ContainerEventType.PAUSE_CONTAINER)
    .addTransition(ContainerState.REINITIALIZING_AWAITING_KILL,
        ContainerState.SCHEDULED,
        ContainerEventType.CONTAINER_KILLED_ON_REQUEST,
        new KilledForReInitializationTransition())
    .addTransition(ContainerState.REINITIALIZING_AWAITING_KILL,
        ContainerState.REINITIALIZING_AWAITING_KILL,
        ContainerEventType.UPDATE_CONTAINER_TOKEN,
        new NotifyContainerSchedulerOfUpdateTransition())

    // From RELAUNCHING State
    .addTransition(ContainerState.RELAUNCHING, ContainerState.RUNNING,
        ContainerEventType.CONTAINER_LAUNCHED, new LaunchTransition())
    .addTransition(ContainerState.RELAUNCHING,
        ContainerState.EXITED_WITH_FAILURE,
        ContainerEventType.CONTAINER_EXITED_WITH_FAILURE,
        new ExitedWithFailureTransition(true))
    .addTransition(ContainerState.RELAUNCHING, ContainerState.RELAUNCHING,
        ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
        UPDATE_DIAGNOSTICS_TRANSITION)
    .addTransition(ContainerState.RELAUNCHING, ContainerState.KILLING,
        ContainerEventType.KILL_CONTAINER, new KillTransition())
    .addTransition(ContainerState.RELAUNCHING, ContainerState.KILLING,
        ContainerEventType.PAUSE_CONTAINER, new KillOnPauseTransition())
    .addTransition(ContainerState.RELAUNCHING, ContainerState.RELAUNCHING,
        ContainerEventType.UPDATE_CONTAINER_TOKEN,
        new NotifyContainerSchedulerOfUpdateTransition())


    // From CONTAINER_EXITED_WITH_SUCCESS State
    .addTransition(ContainerState.EXITED_WITH_SUCCESS, ContainerState.DONE,
        ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP,
        new ExitedWithSuccessToDoneTransition())
    .addTransition(ContainerState.EXITED_WITH_SUCCESS,
        ContainerState.EXITED_WITH_SUCCESS,
        ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
        UPDATE_DIAGNOSTICS_TRANSITION)
    .addTransition(ContainerState.EXITED_WITH_SUCCESS,
        ContainerState.EXITED_WITH_SUCCESS,
        EnumSet.of(ContainerEventType.KILL_CONTAINER,
            ContainerEventType.PAUSE_CONTAINER))
    // No transition - assuming container is on its way to completion
    .addTransition(ContainerState.EXITED_WITH_SUCCESS,
        ContainerState.EXITED_WITH_SUCCESS,
        ContainerEventType.UPDATE_CONTAINER_TOKEN)
    .addTransition(ContainerState.EXITED_WITH_SUCCESS,
        ContainerState.EXITED_WITH_SUCCESS,
        ContainerEventType.CONTAINER_KILLED_ON_REQUEST)

    // From EXITED_WITH_FAILURE State
    .addTransition(ContainerState.EXITED_WITH_FAILURE, ContainerState.DONE,
            ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP,
            new ExitedWithFailureToDoneTransition())
    .addTransition(ContainerState.EXITED_WITH_FAILURE,
        ContainerState.EXITED_WITH_FAILURE,
        ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
        UPDATE_DIAGNOSTICS_TRANSITION)
    .addTransition(ContainerState.EXITED_WITH_FAILURE,
                   ContainerState.EXITED_WITH_FAILURE,
        EnumSet.of(ContainerEventType.KILL_CONTAINER,
            ContainerEventType.PAUSE_CONTAINER))
    // No transition - assuming container is on its way to completion
    .addTransition(ContainerState.EXITED_WITH_FAILURE,
        ContainerState.EXITED_WITH_FAILURE,
        ContainerEventType.UPDATE_CONTAINER_TOKEN)
    .addTransition(ContainerState.EXITED_WITH_FAILURE,
        ContainerState.EXITED_WITH_FAILURE,
        ContainerEventType.CONTAINER_KILLED_ON_REQUEST)

    // From KILLING State.
    .addTransition(ContainerState.KILLING,
        ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL,
        ContainerEventType.CONTAINER_KILLED_ON_REQUEST,
        new ContainerKilledTransition())
    .addTransition(ContainerState.KILLING,
        ContainerState.KILLING,
        ContainerEventType.RESOURCE_LOCALIZED,
        new LocalizedResourceDuringKillTransition())
    .addTransition(ContainerState.KILLING, 
        ContainerState.KILLING, 
        ContainerEventType.RESOURCE_FAILED)
    .addTransition(ContainerState.KILLING, ContainerState.KILLING,
       ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
       UPDATE_DIAGNOSTICS_TRANSITION)
    .addTransition(ContainerState.KILLING, ContainerState.KILLING,
        ContainerEventType.KILL_CONTAINER)
    .addTransition(ContainerState.KILLING, ContainerState.EXITED_WITH_SUCCESS,
        ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS,
        new ExitedWithSuccessTransition(false))
    .addTransition(ContainerState.KILLING, ContainerState.EXITED_WITH_FAILURE,
        ContainerEventType.CONTAINER_EXITED_WITH_FAILURE,
        new ExitedWithFailureTransition(false))
    .addTransition(ContainerState.KILLING,
            ContainerState.DONE,
            ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP,
            new KillingToDoneTransition())
    // Handle a launched container during killing stage is a no-op
    // as cleanup container is always handled after launch container event
    // in the container launcher
    .addTransition(ContainerState.KILLING,
        ContainerState.KILLING,
        EnumSet.of(ContainerEventType.CONTAINER_LAUNCHED,
            ContainerEventType.PAUSE_CONTAINER))
    // No transition - assuming container is on its way to completion
    .addTransition(ContainerState.KILLING, ContainerState.KILLING,
        ContainerEventType.UPDATE_CONTAINER_TOKEN)

    // From CONTAINER_CLEANEDUP_AFTER_KILL State.
    .addTransition(ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL,
            ContainerState.DONE,
            ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP,
            new ContainerCleanedupAfterKillToDoneTransition())
    .addTransition(ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL,
        ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL,
        ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
        UPDATE_DIAGNOSTICS_TRANSITION)
    .addTransition(ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL,
        ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL,
        EnumSet.of(ContainerEventType.KILL_CONTAINER,
            ContainerEventType.RESOURCE_FAILED,
            ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS,
            ContainerEventType.CONTAINER_EXITED_WITH_FAILURE,
            ContainerEventType.PAUSE_CONTAINER))
    // No transition - assuming container is on its way to completion
    .addTransition(ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL,
        ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL,
        ContainerEventType.UPDATE_CONTAINER_TOKEN)
    .addTransition(ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL,
        ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL,
        ContainerEventType.CONTAINER_KILLED_ON_REQUEST)

    // From DONE
    .addTransition(ContainerState.DONE, ContainerState.DONE,
        EnumSet.of(ContainerEventType.KILL_CONTAINER,
            ContainerEventType.PAUSE_CONTAINER))
    .addTransition(ContainerState.DONE, ContainerState.DONE,
        ContainerEventType.INIT_CONTAINER)
    .addTransition(ContainerState.DONE, ContainerState.DONE,
       ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
       UPDATE_DIAGNOSTICS_TRANSITION)
    // This transition may result when
    // we notify container of failed localization if localizer thread (for
    // that container) fails for some reason
    .addTransition(ContainerState.DONE, ContainerState.DONE,
        EnumSet.of(ContainerEventType.RESOURCE_FAILED,
            ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS,
            ContainerEventType.CONTAINER_EXITED_WITH_FAILURE))
    // No transition - assuming container is on its way to completion
    .addTransition(ContainerState.DONE, ContainerState.DONE,
        ContainerEventType.UPDATE_CONTAINER_TOKEN)
    .addTransition(ContainerState.DONE, ContainerState.DONE,
        ContainerEventType.CONTAINER_KILLED_ON_REQUEST)

    // create the topology tables
    .installTopology();

  private final StateMachine<ContainerState, ContainerEventType, ContainerEvent>
    stateMachine;

  public org.apache.hadoop.yarn.api.records.ContainerState getCurrentState() {
    switch (stateMachine.getCurrentState()) {
    case NEW:
    case LOCALIZING:
    case LOCALIZATION_FAILED:
    case SCHEDULED:
    case PAUSED:
    case RESUMING:
    case RUNNING:
    case RELAUNCHING:
    case REINITIALIZING:
    case REINITIALIZING_AWAITING_KILL:
    case EXITED_WITH_SUCCESS:
    case EXITED_WITH_FAILURE:
    case KILLING:
    case CONTAINER_CLEANEDUP_AFTER_KILL:
    case CONTAINER_RESOURCES_CLEANINGUP:
    case PAUSING:
      return org.apache.hadoop.yarn.api.records.ContainerState.RUNNING;
    case DONE:
    default:
      return org.apache.hadoop.yarn.api.records.ContainerState.COMPLETE;
    }
  }

  // NOTE: Please update the doc in the ContainerSubState class as
  //       well as the yarn_protos.proto file if this mapping is ever modified.
  private ContainerSubState getContainerSubState() {
    switch (stateMachine.getCurrentState()) {
    case NEW:
    case LOCALIZING:
    case SCHEDULED:
    case REINITIALIZING_AWAITING_KILL:
    case RELAUNCHING:
      return ContainerSubState.SCHEDULED;
    case REINITIALIZING:
    case PAUSING:
    case KILLING:
    case RUNNING:
      return ContainerSubState.RUNNING;
    case PAUSED:
    case RESUMING:
      return ContainerSubState.PAUSED;
    case LOCALIZATION_FAILED:
    case EXITED_WITH_SUCCESS:
    case EXITED_WITH_FAILURE:
    case CONTAINER_CLEANEDUP_AFTER_KILL:
    case CONTAINER_RESOURCES_CLEANINGUP:
      return ContainerSubState.COMPLETING;
    case DONE:
    default:
      return ContainerSubState.DONE;
    }
  }

  public NMTimelinePublisher getNMTimelinePublisher() {
    return context.getNMTimelinePublisher();
  }

  @Override
  public String getUser() {
    this.readLock.lock();
    try {
      return this.user;
    } finally {
      this.readLock.unlock();
    }
  }

  @Override
  public Map<Path, List<String>> getLocalizedResources() {
    this.readLock.lock();
    try {
      if (ContainerState.SCHEDULED == getContainerState()
          || ContainerState.RELAUNCHING == getContainerState()) {
        return resourceSet.getLocalizedResources();
      } else {
        return null;
      }
    } finally {
      this.readLock.unlock();
    }
  }

  @Override
  public Credentials getCredentials() {
    this.readLock.lock();
    try {
      return credentials;
    } finally {
      this.readLock.unlock();
    }
  }

  @Override
  public ContainerState getContainerState() {
    this.readLock.lock();
    try {
      return stateMachine.getCurrentState();
    } finally {
      this.readLock.unlock();
    }
  }

  @Override
  public ContainerLaunchContext getLaunchContext() {
    this.readLock.lock();
    try {
      return launchContext;
    } finally {
      this.readLock.unlock();
    }
  }

  @Override
  public ContainerStatus cloneAndGetContainerStatus() {
    this.readLock.lock();
    try {
      ContainerStatus status = BuilderUtils.newContainerStatus(this.containerId,
          getCurrentState(), diagnostics.toString(), exitCode, getResource(),
          this.containerTokenIdentifier.getExecutionType());
      status.setIPs(StringUtils.isEmpty(ips) ? null :
          Arrays.asList(ips.split(",")));
      status.setHost(host);
      status.setContainerSubState(getContainerSubState());
      status.setExposedPorts(exposedPorts);
      return status;
    } finally {
      this.readLock.unlock();
    }
  }

  @Override
  public NMContainerStatus getNMContainerStatus() {
    this.readLock.lock();
    try {
      NMContainerStatus status =
          NMContainerStatus.newInstance(this.containerId,
              this.version, getCurrentState(), getResource(),
              diagnostics.toString(), exitCode,
              containerTokenIdentifier.getPriority(),
              containerTokenIdentifier.getCreationTime(),
              containerTokenIdentifier.getNodeLabelExpression(),
              containerTokenIdentifier.getExecutionType(),
              containerTokenIdentifier.getAllocationRequestId());
      status.setAllocationTags(containerTokenIdentifier.getAllcationTags());
      return status;
    } finally {
      this.readLock.unlock();
    }
  }

  @Override
  public ContainerId getContainerId() {
    return this.containerId;
  }

  @Override
  public long getContainerStartTime() {
    return this.startTime;
  }

  @Override
  public long getContainerLaunchTime() {
    return this.containerLaunchStartTime;
  }

  @Override
  public Resource getResource() {
    return Resources.clone(
        this.containerTokenIdentifier.getResource());
  }

  @Override
  public ContainerTokenIdentifier getContainerTokenIdentifier() {
    this.readLock.lock();
    try {
      return this.containerTokenIdentifier;
    } finally {
      this.readLock.unlock();
    }
  }

  @Override
  public void setContainerTokenIdentifier(ContainerTokenIdentifier token) {
    this.writeLock.lock();
    try {
      this.containerTokenIdentifier = token;
    } finally {
      this.writeLock.unlock();
    }
  }

  @Override
  public String getWorkDir() {
    return workDir;
  }

  @Override
  public void setWorkDir(String workDir) {
    this.workDir = workDir;
  }

  @Override
  public String getCsiVolumesRootDir() {
    return csiVolumesRootDir;
  }

  @Override
  public void setCsiVolumesRootDir(String volumesRootDir) {
    this.csiVolumesRootDir = volumesRootDir;
  }

  private void clearIpAndHost() {
    LOG.info("{} clearing ip and host", containerId);
    this.ips = null;
    this.host = null;
  }

  @Override
  public void setIpAndHost(String[] ipAndHost) {
    this.writeLock.lock();
    try {
      this.ips = ipAndHost[0];
      this.host = ipAndHost[1];
    } finally {
      this.writeLock.unlock();
    }
  }

  @Override
  public String getLogDir() {
    return logDir;
  }

  @Override
  public void setLogDir(String logDir) {
    this.logDir = logDir;
  }

  @Override
  public ResourceSet getResourceSet() {
    return this.resourceSet;
  }

  @SuppressWarnings("unchecked")
  private void sendFinishedEvents() {
    // Inform the application
    @SuppressWarnings("rawtypes")
    EventHandler eventHandler = dispatcher.getEventHandler();

    ContainerStatus containerStatus = cloneAndGetContainerStatus();
    eventHandler.handle(
        new ApplicationContainerFinishedEvent(containerStatus, startTime));

    // Tell the scheduler the container is Done
    eventHandler.handle(new ContainerSchedulerEvent(this,
        ContainerSchedulerEventType.CONTAINER_COMPLETED));
    // Remove the container from the resource-monitor
    eventHandler.handle(new ContainerStopMonitoringEvent(containerId));
    // Tell the logService too
    eventHandler.handle(new LogHandlerContainerFinishedEvent(
        containerId, containerTokenIdentifier.getContainerType(), exitCode));
  }

  @SuppressWarnings("unchecked") // dispatcher not typed
  @Override
  public void sendLaunchEvent() {
    if (ContainerState.PAUSED == getContainerState()) {
      dispatcher.getEventHandler().handle(
          new ContainerResumeEvent(containerId,
              "Container Resumed as some resources freed up"));
    } else {
      ContainersLauncherEventType launcherEvent =
          ContainersLauncherEventType.LAUNCH_CONTAINER;
      if (recoveredStatus == RecoveredContainerStatus.LAUNCHED) {
        // try to recover a container that was previously launched
        launcherEvent = ContainersLauncherEventType.RECOVER_CONTAINER;
      } else if (recoveredStatus == RecoveredContainerStatus.PAUSED) {
        launcherEvent = ContainersLauncherEventType.RECOVER_PAUSED_CONTAINER;
      }

      containerLaunchStartTime = clock.getTime();
      dispatcher.getEventHandler().handle(
          new ContainersLauncherEvent(this, launcherEvent));
    }

  }

  @SuppressWarnings("unchecked") // dispatcher not typed
  private void sendScheduleEvent() {
    if (recoveredStatus == RecoveredContainerStatus.PAUSED) {
      ContainersLauncherEventType launcherEvent;
      launcherEvent = ContainersLauncherEventType.RECOVER_PAUSED_CONTAINER;
      dispatcher.getEventHandler()
          .handle(new ContainersLauncherEvent(this, launcherEvent));
    } else {
      dispatcher.getEventHandler().handle(new ContainerSchedulerEvent(this,
          ContainerSchedulerEventType.SCHEDULE_CONTAINER));
    }
  }

  @SuppressWarnings("unchecked") // dispatcher not typed
  @Override
  public void sendKillEvent(int exitStatus, String description) {
    this.isMarkeForKilling = true;
    dispatcher.getEventHandler().handle(
        new ContainerKillEvent(containerId, exitStatus, description));
  }

  @SuppressWarnings("unchecked") // dispatcher not typed
  @Override
  public void sendPauseEvent(String description) {
    dispatcher.getEventHandler().handle(
        new ContainerPauseEvent(containerId, description));
  }

  @SuppressWarnings("unchecked") // dispatcher not typed
  private void sendRelaunchEvent() {
    ContainersLauncherEventType launcherEvent =
        ContainersLauncherEventType.RELAUNCH_CONTAINER;
    dispatcher.getEventHandler().handle(
        new ContainersLauncherEvent(this, launcherEvent));
  }

  // Inform the ContainersMonitor to start monitoring the container's
  // resource usage.
  @SuppressWarnings("unchecked") // dispatcher not typed
  private void sendContainerMonitorStartEvent() {
    long launchDuration = clock.getTime() - containerLaunchStartTime;
    metrics.addContainerLaunchDuration(launchDuration);

    long pmemBytes = getResource().getMemorySize() * 1024 * 1024L;
    float pmemRatio = daemonConf.getFloat(
        YarnConfiguration.NM_VMEM_PMEM_RATIO,
        YarnConfiguration.DEFAULT_NM_VMEM_PMEM_RATIO);
    long vmemBytes = (long) (pmemRatio * pmemBytes);
    int cpuVcores = getResource().getVirtualCores();
    long localizationDuration = containerLaunchStartTime -
        containerLocalizationStartTime;
    dispatcher.getEventHandler().handle(
        new ContainerStartMonitoringEvent(containerId,
        vmemBytes, pmemBytes, cpuVcores, launchDuration,
        localizationDuration));
  }

  private void addDiagnostics(String... diags) {
    for (String s : diags) {
      this.diagnostics.append("[" + dateFormat.format(new Date()) + "]" + s);
    }
    if (diagnostics.length() > diagnosticsMaxSize) {
      diagnostics.delete(0, diagnostics.length() - diagnosticsMaxSize);
    }
    try {
      stateStore.storeContainerDiagnostics(containerId, diagnostics);
    } catch (IOException e) {
      LOG.warn("Unable to update diagnostics in state store for "
          + containerId, e);
    }
  }

  @SuppressWarnings("unchecked") // dispatcher not typed
  public void cleanup() {
    Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrc =
        resourceSet.getAllResourcesByVisibility();
    dispatcher.getEventHandler().handle(
        new ContainerLocalizationCleanupEvent(this, rsrc));
  }

  static class ContainerTransition implements
      SingleArcTransition<ContainerImpl, ContainerEvent> {

    @Override
    public void transition(ContainerImpl container, ContainerEvent event) {
      // Just drain the event and change the state.
    }

  }

  static class UpdateTransition extends ContainerTransition {
    @Override
    public void transition(
        ContainerImpl container, ContainerEvent event) {
      UpdateContainerTokenEvent updateEvent = (UpdateContainerTokenEvent)event;
      // Update the container token
      container.setContainerTokenIdentifier(updateEvent.getUpdatedToken());

      try {
        // Persist change in the state store.
        container.context.getNMStateStore()
            .storeContainerUpdateToken(container.containerId,
                container.getContainerTokenIdentifier());
      } catch (IOException e) {
        LOG.warn("Could not store container [" + container.containerId
            + "] update..", e);
      }
    }
  }

  static class NotifyContainerSchedulerOfUpdateTransition extends
      UpdateTransition {
    @Override
    public void transition(
        ContainerImpl container, ContainerEvent event) {

      UpdateContainerTokenEvent updateEvent = (UpdateContainerTokenEvent)event;
      // Save original token
      ContainerTokenIdentifier originalToken =
          container.containerTokenIdentifier;
      super.transition(container, updateEvent);
      container.dispatcher.getEventHandler().handle(
          new UpdateContainerSchedulerEvent(container,
              originalToken, updateEvent));
    }
  }

  /**
   * State transition when a NEW container receives the INIT_CONTAINER
   * message.
   * 
   * If there are resources to localize, sends a
   * ContainerLocalizationRequest (LOCALIZE_CONTAINER_RESOURCES)
   * to the ResourceLocalizationManager and enters LOCALIZING state.
   * 
   * If there are no resources to localize, sends LAUNCH_CONTAINER event
   * and enters SCHEDULED state directly.
   * 
   * If there are any invalid resources specified, enters LOCALIZATION_FAILED
   * directly.
   */
  @SuppressWarnings("unchecked") // dispatcher not typed
  static class RequestResourcesTransition implements
      MultipleArcTransition<ContainerImpl,ContainerEvent,ContainerState> {
    @Override
    public ContainerState transition(ContainerImpl container,
        ContainerEvent event) {
      if (container.recoveredStatus == RecoveredContainerStatus.COMPLETED) {
        container.sendFinishedEvents();
        return ContainerState.DONE;
      } else if (isContainerRecoveredAsKilled(container)) {
        // container was killed but never launched
        container.metrics.killedContainer();
        NMAuditLogger.logSuccess(container.user,
            AuditConstants.FINISH_KILLED_CONTAINER, "ContainerImpl",
            container.containerId.getApplicationAttemptId().getApplicationId(),
            container.containerId);
        container.metrics.releaseContainer(
            container.containerTokenIdentifier.getResource());
        container.sendFinishedEvents();
        return ContainerState.DONE;
      } else if (container.recoveredStatus == RecoveredContainerStatus.QUEUED) {
        return ContainerState.SCHEDULED;
      }

      final ContainerLaunchContext ctxt = container.launchContext;
      container.metrics.initingContainer();

      container.dispatcher.getEventHandler().handle(new AuxServicesEvent
          (AuxServicesEventType.CONTAINER_INIT, container));

      // Inform the AuxServices about the opaque serviceData
      Map<String,ByteBuffer> csd = ctxt.getServiceData();
      if (csd != null) {
        // This can happen more than once per Application as each container may
        // have distinct service data
        for (Map.Entry<String,ByteBuffer> service : csd.entrySet()) {
          container.dispatcher.getEventHandler().handle(
              new AuxServicesEvent(AuxServicesEventType.APPLICATION_INIT,
                  container.user, container.containerId
                      .getApplicationAttemptId().getApplicationId(),
                  service.getKey().toString(), service.getValue()));
        }
      }

      container.containerLocalizationStartTime = clock.getTime();
      // duration = end - start;
      // record in RequestResourcesTransition: -start
      // add in LocalizedTransition: +end
      //
      container.localizationCounts[LocalizationCounter.MILLIS.ordinal()]
          = -Time.monotonicNow();

      // Send requests for public, private resources
      Map<String, LocalResource> cntrRsrc;
      try {
        cntrRsrc = container.context
            .getContainerExecutor().getLocalResources(container);
        if (!cntrRsrc.isEmpty()) {
          Map<LocalResourceVisibility, Collection<LocalResourceRequest>> req =
              container.resourceSet.addResources(ctxt.getLocalResources());
          container.dispatcher.getEventHandler().handle(
              new ContainerLocalizationRequestEvent(container, req));
          // Get list of resources for logging
          List<String> resourcePaths = new ArrayList<>();
          for (Collection<LocalResourceRequest> rsrcReqList : req.values()) {
            for (LocalResourceRequest rsrc : rsrcReqList) {
              resourcePaths.add(rsrc.getPath().toString());
            }
          }
          LOG.info("Container " + container.getContainerId()
              + " is localizing: " + resourcePaths);
          return ContainerState.LOCALIZING;
        } else {
          container.sendScheduleEvent();
          container.metrics.endInitingContainer();
          return ContainerState.SCHEDULED;
        }
      } catch (URISyntaxException | IOException e) {
        // malformed resource; abort container launch
        LOG.warn("Failed to parse resource-request", e);
        container.cleanup();
        container.metrics.endInitingContainer();
        return ContainerState.LOCALIZATION_FAILED;
      }
    }

    static boolean isContainerRecoveredAsKilled(ContainerImpl container) {
      if (!container.recoveredAsKilled) {
        return false;
      }
      // container was killed but never launched
      RecoveredContainerStatus containerStatus = container.recoveredStatus;
      return containerStatus == RecoveredContainerStatus.REQUESTED
          || containerStatus == RecoveredContainerStatus.QUEUED;
    }
  }

  /**
   * Transition when one of the requested resources for this container
   * has been successfully localized.
   */
  static class LocalizedTransition implements
      MultipleArcTransition<ContainerImpl,ContainerEvent,ContainerState> {
    @SuppressWarnings("unchecked")
    @Override
    public ContainerState transition(ContainerImpl container,
        ContainerEvent event) {
      ContainerResourceLocalizedEvent rsrcEvent = (ContainerResourceLocalizedEvent) event;
      LocalResourceRequest resourceRequest = rsrcEvent.getResource();
      Path location = rsrcEvent.getLocation();
      Set<String> syms =
          container.resourceSet.resourceLocalized(resourceRequest, location);
      if (null == syms) {
        LOG.info("Localized resource " + resourceRequest +
            " for container " + container.containerId);
        return ContainerState.LOCALIZING;
      }

      final long localizedSize = rsrcEvent.getSize();
      if (localizedSize > 0) {
        container.localizationCounts
        [LocalizationCounter.BYTES_MISSED.ordinal()] += localizedSize;
        container.localizationCounts
        [LocalizationCounter.FILES_MISSED.ordinal()]++;
      } else if (localizedSize < 0) {
        // cached: recorded negative, restore the sign
        container.localizationCounts
        [LocalizationCounter.BYTES_CACHED.ordinal()] -= localizedSize;
        container.localizationCounts
        [LocalizationCounter.FILES_CACHED.ordinal()]++;
      }
      container.metrics.localizationCacheHitMiss(localizedSize);

      // check to see if this resource should be uploaded to the shared cache
      // as well
      if (shouldBeUploadedToSharedCache(container, resourceRequest)) {
        container.resourceSet.getResourcesToBeUploaded()
            .put(resourceRequest, location);
      }
      if (!container.resourceSet.getPendingResources().isEmpty()) {
        return ContainerState.LOCALIZING;
      }

      // duration = end - start;
      // record in RequestResourcesTransition: -start
      // add in LocalizedTransition: +end
      //
      container.localizationCounts[LocalizationCounter.MILLIS.ordinal()]
          += Time.monotonicNow();
      container.metrics.localizationComplete(
          container.localizationCounts[LocalizationCounter.MILLIS.ordinal()]);
      container.dispatcher.getEventHandler().handle(
          new ContainerLocalizationEvent(LocalizationEventType.
              CONTAINER_RESOURCES_LOCALIZED, container));

      container.sendScheduleEvent();
      container.metrics.endInitingContainer();

      // If this is a recovered container that has already launched, skip
      // uploading resources to the shared cache. We do this to avoid uploading
      // the same resources multiple times. The tradeoff is that in the case of
      // a recovered container, there is a chance that resources don't get
      // uploaded into the shared cache. This is OK because resources are not
      // acknowledged by the SCM until they have been uploaded by the node
      // manager.
      if (container.recoveredStatus != RecoveredContainerStatus.LAUNCHED
          && container.recoveredStatus != RecoveredContainerStatus.COMPLETED) {
        // kick off uploads to the shared cache
        container.dispatcher.getEventHandler().handle(
            new SharedCacheUploadEvent(
                container.resourceSet.getResourcesToBeUploaded(), container
                .getLaunchContext(), container.getUser(),
                SharedCacheUploadEventType.UPLOAD));
      }

      return ContainerState.SCHEDULED;
    }
  }

  /**
   * Transition to start the Re-Initialization process.
   */
  static class ReInitializeContainerTransition implements
      MultipleArcTransition<ContainerImpl, ContainerEvent, ContainerState> {

    @SuppressWarnings("unchecked")
    @Override
    public ContainerState transition(
        ContainerImpl container, ContainerEvent event) {
      container.reInitContext = createReInitContext(container, event);
      boolean resourcesPresent = false;
      try {
        // 'reInitContext.newResourceSet' can be
        // a) current container resourceSet (In case of Restart)
        // b) previous resourceSet (In case of RollBack)
        // c) An actual NEW resourceSet (In case of Upgrade/ReInit)
        //
        // In cases a) and b) Container can immediately be cleaned up since
        // we are sure the resources are already available (we check the
        // pendingResources to verify that nothing more is needed). So we can
        // kill the container immediately
        ResourceSet newResourceSet = container.reInitContext.newResourceSet;
        if (!newResourceSet.getPendingResources().isEmpty()) {
          container.dispatcher.getEventHandler().handle(
              new ContainerLocalizationRequestEvent(
                  container, newResourceSet.getAllResourcesByVisibility()));
        } else {
          // We are not waiting on any resources, so...
          // Kill the current container.
          container.dispatcher.getEventHandler().handle(
              new ContainersLauncherEvent(container,
                  ContainersLauncherEventType.CLEANUP_CONTAINER_FOR_REINIT));
          resourcesPresent = true;
        }
        container.metrics.reInitingContainer();
        NMAuditLogger.logSuccess(container.user,
            AuditConstants.START_CONTAINER_REINIT, "ContainerImpl",
            container.containerId.getApplicationAttemptId().getApplicationId(),
            container.containerId);
      } catch (Exception e) {
        LOG.error("Container [" + container.getContainerId() + "]" +
            " re-initialization failure..", e);
        container.addDiagnostics("Error re-initializing due to" +
            "[" + e.getMessage() + "]");
        return ContainerState.RUNNING;
      }
      return resourcesPresent ?
          ContainerState.REINITIALIZING_AWAITING_KILL :
          ContainerState.REINITIALIZING;
    }

    protected ReInitializationContext createReInitContext(
        ContainerImpl container, ContainerEvent event) {
      ContainerReInitEvent reInitEvent = (ContainerReInitEvent)event;
      if (reInitEvent.getReInitLaunchContext() == null) {
        // This is a Restart...
        // We also need to make sure that if Rollback is possible, the
        // rollback state should be retained in the
        // oldLaunchContext and oldResourceSet
        container.addDiagnostics("Container will be Restarted.\n");
        return new ReInitializationContext(
            container.launchContext, container.resourceSet,
            container.canRollback() ?
                container.reInitContext.oldLaunchContext : null,
            container.canRollback() ?
                container.reInitContext.oldResourceSet : null);
      } else {
        container.addDiagnostics("Container will be Re-initialized.\n");
        return new ReInitializationContext(
            reInitEvent.getReInitLaunchContext(),
            reInitEvent.getResourceSet(),
            // If AutoCommit is turned on, then no rollback can happen...
            // So don't need to store the previous context.
            (reInitEvent.isAutoCommit() ? null : container.launchContext),
            (reInitEvent.isAutoCommit() ? null : container.resourceSet));
      }
    }
  }

  /**
   * Transition to start the Rollback process.
   */
  static class RollbackContainerTransition extends
      ReInitializeContainerTransition {

    @Override
    protected ReInitializationContext createReInitContext(ContainerImpl
        container, ContainerEvent event) {
      container.addDiagnostics("Container upgrade will be Rolled-back.\n");
      LOG.warn("Container [" + container.getContainerId() + "]" +
          " about to be explicitly Rolledback !!");
      return container.reInitContext.createContextForRollback();
    }
  }

  /**
   * Resource requested for Container Re-initialization has been localized.
   * If all dependencies are met, then restart Container with new bits.
   */
  static class ResourceLocalizedWhileReInitTransition
      implements MultipleArcTransition
      <ContainerImpl, ContainerEvent, ContainerState> {


    @SuppressWarnings("unchecked")
    @Override
    public ContainerState transition(
        ContainerImpl container, ContainerEvent event) {
      ContainerResourceLocalizedEvent rsrcEvent =
          (ContainerResourceLocalizedEvent) event;
      container.reInitContext.newResourceSet.resourceLocalized(
          rsrcEvent.getResource(), rsrcEvent.getLocation());
      // Check if all ResourceLocalization has completed
      if (container.reInitContext.newResourceSet.getPendingResources()
          .isEmpty()) {
        // Kill the current container.
        container.dispatcher.getEventHandler().handle(
            new ContainersLauncherEvent(container,
                ContainersLauncherEventType.CLEANUP_CONTAINER_FOR_REINIT));
        return ContainerState.REINITIALIZING_AWAITING_KILL;
      }
      return ContainerState.REINITIALIZING;
    }
  }

  /**
   * Resource is localized while the container is running - create symlinks.
   */
  static class ResourceLocalizedWhileRunningTransition
      extends ContainerTransition {

    @SuppressWarnings("unchecked")
    @Override
    public void transition(ContainerImpl container, ContainerEvent event) {
      ContainerResourceLocalizedEvent rsrcEvent =
          (ContainerResourceLocalizedEvent) event;
      Set<String> links = container.resourceSet.resourceLocalized(
          rsrcEvent.getResource(), rsrcEvent.getLocation());
      if (links == null) {
        return;
      }
      // creating symlinks.
      for (String link : links) {
        try {
          String linkFile = new Path(container.workDir, link).toString();
          if (new File(linkFile).exists()) {
            LOG.info("Symlink file already exists: " + linkFile);
          } else {
            container.context.getContainerExecutor()
                .symLink(rsrcEvent.getLocation().toString(), linkFile);
            LOG.info("Created symlink: " + linkFile + " -> " + rsrcEvent
                .getLocation());
          }
        } catch (IOException e) {
          String message = String
              .format("Error when creating symlink %s -> %s", link,
                  rsrcEvent.getLocation());
          LOG.error(message, e);
        }
      }
    }
  }

  /**
   * Resource localization failed while the container is running.
   */
  static class ResourceLocalizationFailedWhileRunningTransition
      extends ContainerTransition {

    @Override
    public void transition(ContainerImpl container, ContainerEvent event) {
      ContainerResourceFailedEvent failedEvent =
          (ContainerResourceFailedEvent) event;
      container.resourceSet
          .resourceLocalizationFailed(failedEvent.getResource(),
              failedEvent.getDiagnosticMessage());
      container.addDiagnostics(failedEvent.getDiagnosticMessage());
    }
  }

  /**
   * Resource localization failed while the container is reinitializing.
   */
  static class ResourceLocalizationFailedWhileReInitTransition
      extends ContainerTransition {

    @Override
    public void transition(ContainerImpl container, ContainerEvent event) {
      ContainerResourceFailedEvent failedEvent =
          (ContainerResourceFailedEvent) event;
      container.resourceSet.resourceLocalizationFailed(
          failedEvent.getResource(), failedEvent.getDiagnosticMessage());
      container.addDiagnostics("Container aborting re-initialization.. "
          + failedEvent.getDiagnosticMessage());
      LOG.error("Container [" + container.getContainerId() + "] Re-init" +
          " failed !! Resource [" + failedEvent.getResource() + "] could" +
          " not be localized !!");
      container.reInitContext = null;
    }
  }

  /**
   * Transition from SCHEDULED state to RUNNING state upon receiving
   * a CONTAINER_LAUNCHED event.
   */
  static class LaunchTransition extends ContainerTransition {
    @SuppressWarnings("unchecked")
    @Override
    public void transition(ContainerImpl container, ContainerEvent event) {
      container.sendContainerMonitorStartEvent();
      container.metrics.runningContainer();
      container.wasLaunched  = true;

      if (container.isReInitializing()) {
        NMAuditLogger.logSuccess(container.user,
            AuditConstants.FINISH_CONTAINER_REINIT, "ContainerImpl",
            container.containerId.getApplicationAttemptId().getApplicationId(),
            container.containerId);
      }
      container.setIsReInitializing(false);
      // Check if this launch was due to a re-initialization.
      // If autocommit == true, then wipe the re-init context. This ensures
      // that any subsequent failures do not trigger a rollback.
      if (container.reInitContext != null
          && !container.reInitContext.canRollback()) {
        container.reInitContext = null;
      }

      if (container.recoveredAsKilled) {
        LOG.info("Killing " + container.containerId
            + " due to recovered as killed");
        container.addDiagnostics("Container recovered as killed.\n");
        container.dispatcher.getEventHandler().handle(
            new ContainersLauncherEvent(container,
                ContainersLauncherEventType.CLEANUP_CONTAINER));
      }
    }
  }

  /**
   * Transition from SCHEDULED state to PAUSED state on recovery
   */
  static class RecoveredContainerTransition extends ContainerTransition {
    @SuppressWarnings("unchecked")
    @Override
    public void transition(ContainerImpl container, ContainerEvent event) {
      container.sendContainerMonitorStartEvent();
      container.wasLaunched = true;
      container.setIsPaused(true);
    }
  }

  /**
   * Transition from RUNNING or KILLING state to
   * EXITED_WITH_SUCCESS state upon EXITED_WITH_SUCCESS message.
   */
  @SuppressWarnings("unchecked")  // dispatcher not typed
  static class ExitedWithSuccessTransition extends ContainerTransition {

    boolean clCleanupRequired;

    public ExitedWithSuccessTransition(boolean clCleanupRequired) {
      this.clCleanupRequired = clCleanupRequired;
    }

    @Override
    public void transition(ContainerImpl container, ContainerEvent event) {

      container.setIsReInitializing(false);
      container.setIsPaused(false);
      // Set exit code to 0 on success    	
      container.exitCode = 0;
    	
      // TODO: Add containerWorkDir to the deletion service.

      if (clCleanupRequired) {
        container.dispatcher.getEventHandler().handle(
            new ContainersLauncherEvent(container,
                ContainersLauncherEventType.CLEANUP_CONTAINER));
      }

      container.cleanup();
    }
  }

  /**
   * Transition to EXITED_WITH_FAILURE state upon
   * CONTAINER_EXITED_WITH_FAILURE state.
   **/
  @SuppressWarnings("unchecked")  // dispatcher not typed
  static class ExitedWithFailureTransition extends ContainerTransition {

    boolean clCleanupRequired;

    public ExitedWithFailureTransition(boolean clCleanupRequired) {
      this.clCleanupRequired = clCleanupRequired;
    }

    @Override
    public void transition(ContainerImpl container, ContainerEvent event) {
      container.setIsPaused(false);
      container.setIsReInitializing(false);
      ContainerExitEvent exitEvent = (ContainerExitEvent) event;
      container.exitCode = exitEvent.getExitCode();
      if (exitEvent.getDiagnosticInfo() != null) {
        container.addDiagnostics(exitEvent.getDiagnosticInfo() + "\n");
      }

      // TODO: Add containerWorkDir to the deletion service.
      // TODO: Add containerOuputDir to the deletion service.

      if (clCleanupRequired) {
        container.dispatcher.getEventHandler().handle(
            new ContainersLauncherEvent(container,
                ContainersLauncherEventType.CLEANUP_CONTAINER));
      }

      container.cleanup();
    }
  }

  /**
   * Transition to EXITED_WITH_FAILURE or RELAUNCHING state upon
   * CONTAINER_EXITED_WITH_FAILURE state.
   **/
  @SuppressWarnings("unchecked")  // dispatcher not typed
  static class RetryFailureTransition implements
      MultipleArcTransition<ContainerImpl, ContainerEvent, ContainerState> {

    @Override
    public ContainerState transition(final ContainerImpl container,
        ContainerEvent event) {
      ContainerExitEvent exitEvent = (ContainerExitEvent) event;
      container.exitCode = exitEvent.getExitCode();
      if (exitEvent.getDiagnosticInfo() != null) {
        if (container.containerRetryContext.getRetryPolicy()
            != ContainerRetryPolicy.NEVER_RETRY) {
          container.addDiagnostics("Diagnostic message from attempt : \n");
        }
        container.addDiagnostics(exitEvent.getDiagnosticInfo() + "\n");
      }
      if (container.shouldRetry(container.exitCode)) {
        // Updates to the retry context should  be protected from concurrent
        // writes. It should only be called from this transition.
        container.retryPolicy.updateRetryContext(container.windowRetryContext);
        container.storeRetryContext();
        doRelaunch(container,
            container.windowRetryContext.getRemainingRetries(),
            container.containerRetryContext.getRetryInterval());
        return ContainerState.RELAUNCHING;
      } else if (container.canRollback()) {
        // Rollback is possible only if the previous launch context is
        // available.
        container.addDiagnostics("Container Re-init Auto Rolled-Back.");
        LOG.info("Rolling back Container reInitialization for [" +
            container.getContainerId() + "] !!");
        container.reInitContext =
            container.reInitContext.createContextForRollback();
        container.metrics.rollbackContainerOnFailure();
        container.metrics.reInitingContainer();
        NMAuditLogger.logSuccess(container.user,
            AuditConstants.START_CONTAINER_REINIT, "ContainerImpl",
            container.containerId.getApplicationAttemptId().getApplicationId(),
            container.containerId);
        new KilledForReInitializationTransition().transition(container, event);
        return ContainerState.SCHEDULED;
      } else {
        new ExitedWithFailureTransition(true).transition(container, event);
        return ContainerState.EXITED_WITH_FAILURE;
      }
    }

    private void doRelaunch(final ContainerImpl container,
        int remainingRetryAttempts, final int retryInterval) {
      if (remainingRetryAttempts == ContainerRetryContext.RETRY_FOREVER) {
        LOG.info("Relaunching Container {}. " +
                "retry interval {} ms", container.getContainerId(),
            retryInterval);
      } else {
        LOG.info("Relaunching Container {}. " +
                "remaining retry attempts(after relaunch) {}, " +
                "retry interval {} ms", container.getContainerId(),
            remainingRetryAttempts, retryInterval);
      }

      container.wasLaunched  = false;
      container.metrics.endRunningContainer();
      if (retryInterval == 0) {
        container.sendRelaunchEvent();
      } else {
        // wait for some time, then send launch event
        new Thread() {
          @Override
          public void run() {
            try {
              Thread.sleep(retryInterval);
              container.sendRelaunchEvent();
            } catch (InterruptedException e) {
              return;
            }
          }
        }.start();
      }
    }
  }

  @Override
  public boolean isRetryContextSet() {
    return containerRetryContext.getRetryPolicy()
        != ContainerRetryPolicy.NEVER_RETRY;
  }

  @Override
  public boolean shouldRetry(int errorCode) {
    if (errorCode == ExitCode.SUCCESS.getExitCode()
        || errorCode == ExitCode.FORCE_KILLED.getExitCode()
        || errorCode == ExitCode.TERMINATED.getExitCode()) {
      return false;
    }
    return retryPolicy.shouldRetry(windowRetryContext, errorCode);
  }

  /**
   * Transition to EXITED_WITH_FAILURE
   */
  static class KilledExternallyTransition extends ExitedWithFailureTransition {
    KilledExternallyTransition() {
      super(true);
    }

    @Override
    public void transition(ContainerImpl container,
        ContainerEvent event) {
      super.transition(container, event);
      container.addDiagnostics("Killed by external signal\n");
    }
  }

  /**
   * Transition to SCHEDULED and wait for RE-LAUNCH
   */
  static class KilledForReInitializationTransition extends ContainerTransition {

    @Override
    public void transition(ContainerImpl container,
        ContainerEvent event) {
      LOG.info("Relaunching Container [" + container.getContainerId()
          + "] for re-initialization !!");
      container.wasLaunched  = false;
      container.metrics.endRunningContainer();
      container.clearIpAndHost();
      // Remove the container from the resource-monitor. When container
      // is launched again, it is added back to monitoring service.
      container.dispatcher.getEventHandler().handle(
          new ContainerStopMonitoringEvent(container.containerId, true));
      container.launchContext = container.reInitContext.newLaunchContext;

      // Re configure the Retry Context
      container.containerRetryContext =
          configureRetryContext(container.context.getConf(),
              container.launchContext, container.containerId);
      container.windowRetryContext = new SlidingWindowRetryPolicy
          .RetryContext(container.containerRetryContext);
      container.retryPolicy = new SlidingWindowRetryPolicy(clock);

      container.resourceSet =
          container.reInitContext.mergedResourceSet(container.resourceSet);
      container.isMarkeForKilling = false;
      // Ensure Resources are decremented.
      container.dispatcher.getEventHandler().handle(
          new ContainerSchedulerEvent(container,
          ContainerSchedulerEventType.CONTAINER_COMPLETED));
      container.sendScheduleEvent();
    }
  }

  /**
   * Transition from LOCALIZING to LOCALIZATION_FAILED upon receiving
   * RESOURCE_FAILED event.
   */
  static class ResourceFailedTransition implements
      SingleArcTransition<ContainerImpl, ContainerEvent> {
    @Override
    public void transition(ContainerImpl container, ContainerEvent event) {

      ContainerResourceFailedEvent rsrcFailedEvent =
          (ContainerResourceFailedEvent) event;
      container.addDiagnostics(rsrcFailedEvent.getDiagnosticMessage() + "\n");

      // Inform the localizer to decrement reference counts and cleanup
      // resources.
      container.cleanup();
      container.metrics.endInitingContainer();
    }
  }

  /**
   * Transition from LOCALIZING to KILLING upon receiving
   * KILL_CONTAINER event.
   */
  static class KillBeforeRunningTransition implements
      SingleArcTransition<ContainerImpl, ContainerEvent> {
    @Override
    public void transition(ContainerImpl container, ContainerEvent event) {
      // Inform the localizer to decrement reference counts and cleanup
      // resources.
      container.cleanup();
      container.metrics.endInitingContainer();
      ContainerKillEvent killEvent = (ContainerKillEvent) event;
      container.exitCode = killEvent.getContainerExitStatus();
      container.addDiagnostics(killEvent.getDiagnostic() + "\n");
      container.addDiagnostics("Container is killed before being launched.\n");
    }
  }

  /**
   * Remain in KILLING state when receiving a RESOURCE_LOCALIZED request
   * while in the process of killing.
   */
  static class LocalizedResourceDuringKillTransition implements
      SingleArcTransition<ContainerImpl, ContainerEvent> {
    @Override
    public void transition(ContainerImpl container, ContainerEvent event) {
      ContainerResourceLocalizedEvent rsrcEvent =
          (ContainerResourceLocalizedEvent) event;
      container.resourceSet
          .resourceLocalized(rsrcEvent.getResource(), rsrcEvent.getLocation());
    }
  }

  /**
   * Transitions upon receiving KILL_CONTAINER.
   * - SCHEDULED -> KILLING.
   * - RUNNING -> KILLING.
   * - REINITIALIZING -> KILLING.
   */
  @SuppressWarnings("unchecked") // dispatcher not typed
  static class KillTransition implements
      SingleArcTransition<ContainerImpl, ContainerEvent> {

    @SuppressWarnings("unchecked")
    @Override
    public void transition(ContainerImpl container, ContainerEvent event) {
      // Kill the process/process-grp
      container.setIsReInitializing(false);
      container.setIsPaused(false);
      container.dispatcher.getEventHandler().handle(
          new ContainersLauncherEvent(container,
              ContainersLauncherEventType.CLEANUP_CONTAINER));
      ContainerKillEvent killEvent = (ContainerKillEvent) event;
      container.addDiagnostics(killEvent.getDiagnostic() + "\n");
      container.exitCode = killEvent.getContainerExitStatus();
    }
  }

  /**
   * Transitions upon receiving PAUSE_CONTAINER.
   * - LOCALIZED -> KILLING.
   * - REINITIALIZING -> KILLING.
   */
  @SuppressWarnings("unchecked") // dispatcher not typed
  static class KillOnPauseTransition implements
      SingleArcTransition<ContainerImpl, ContainerEvent> {

    @SuppressWarnings("unchecked")
    @Override
    public void transition(ContainerImpl container, ContainerEvent event) {
      // Kill the process/process-grp
      container.setIsReInitializing(false);
      container.dispatcher.getEventHandler().handle(
          new ContainersLauncherEvent(container,
              ContainersLauncherEventType.CLEANUP_CONTAINER));
    }
  }

  /**
   * Transition from KILLING to CONTAINER_CLEANEDUP_AFTER_KILL
   * upon receiving CONTAINER_KILLED_ON_REQUEST.
   */
  static class ContainerKilledTransition implements
      SingleArcTransition<ContainerImpl, ContainerEvent> {
    @Override
    public void transition(ContainerImpl container, ContainerEvent event) {
      ContainerExitEvent exitEvent = (ContainerExitEvent) event;
      if (container.hasDefaultExitCode()) {
        container.exitCode = exitEvent.getExitCode();
      }

      if (exitEvent.getDiagnosticInfo() != null) {
        container.addDiagnostics(exitEvent.getDiagnosticInfo() + "\n");
      }

      // The process/process-grp is killed. Decrement reference counts and
      // cleanup resources
      container.cleanup();
    }
  }

  /**
   * Handle the following transitions:
   * - {LOCALIZATION_FAILED, EXITED_WITH_SUCCESS, EXITED_WITH_FAILURE,
   *    KILLING, CONTAINER_CLEANEDUP_AFTER_KILL}
   *   -> DONE upon CONTAINER_RESOURCES_CLEANEDUP
   */
  static class ContainerDoneTransition implements
      SingleArcTransition<ContainerImpl, ContainerEvent> {
    @Override
    @SuppressWarnings("unchecked")
    public void transition(ContainerImpl container, ContainerEvent event) {
      container.metrics.releaseContainer(
          container.containerTokenIdentifier.getResource());
      if (container.containerMetrics != null) {
        container.containerMetrics
            .recordFinishTimeAndExitCode(clock.getTime(), container.exitCode);
        container.containerMetrics.finished(false);
      }
      container.sendFinishedEvents();

      // if the current state is NEW it means the CONTAINER_INIT was never
      // sent for the event, thus no need to send the CONTAINER_STOP
      if (container.getCurrentState()
          != org.apache.hadoop.yarn.api.records.ContainerState.NEW) {
        container.dispatcher.getEventHandler().handle(new AuxServicesEvent
            (AuxServicesEventType.CONTAINER_STOP, container));
      }
      container.context.getNodeStatusUpdater().sendOutofBandHeartBeat();
    }
  }

  /**
   * Handle the following transition:
   * - NEW -> DONE upon KILL_CONTAINER
   */
  static class KillOnNewTransition extends ContainerDoneTransition {
    @Override
    public void transition(ContainerImpl container, ContainerEvent event) {
      if (container.recoveredStatus == RecoveredContainerStatus.COMPLETED) {
        container.sendFinishedEvents();
      } else {
        ContainerKillEvent killEvent = (ContainerKillEvent) event;
        container.exitCode = killEvent.getContainerExitStatus();
        container.addDiagnostics(killEvent.getDiagnostic() + "\n");
        container.addDiagnostics("Container is killed before being launched.\n");
        container.metrics.killedContainer();
        NMAuditLogger.logSuccess(container.user,
            AuditConstants.FINISH_KILLED_CONTAINER, "ContainerImpl",
            container.containerId.getApplicationAttemptId().getApplicationId(),
            container.containerId);
        super.transition(container, event);
      }
    }
  }

  /**
   * Handle the following transition:
   * - LOCALIZATION_FAILED -> DONE upon CONTAINER_RESOURCES_CLEANEDUP
   */
  static class LocalizationFailedToDoneTransition extends
      ContainerDoneTransition {
    @Override
    public void transition(ContainerImpl container, ContainerEvent event) {
      container.metrics.failedContainer();
      NMAuditLogger.logFailure(container.user,
          AuditConstants.FINISH_FAILED_CONTAINER, "ContainerImpl",
          "Container failed with state: " + container.getContainerState(),
          container.containerId.getApplicationAttemptId().getApplicationId(),
          container.containerId);
      super.transition(container, event);
    }
  }

  /**
   * Handle the following transition:
   * - EXITED_WITH_SUCCESS -> DONE upon CONTAINER_RESOURCES_CLEANEDUP
   */
  static class ExitedWithSuccessToDoneTransition extends
      ContainerDoneTransition {
    @Override
    public void transition(ContainerImpl container, ContainerEvent event) {
      if (container.wasLaunched) {
        container.metrics.endRunningContainer();
      } else {
        LOG.warn("Container exited with success despite being killed and not" +
            "actually running");
      }
      container.metrics.completedContainer();
      NMAuditLogger.logSuccess(container.user,
          AuditConstants.FINISH_SUCCESS_CONTAINER, "ContainerImpl",
          container.containerId.getApplicationAttemptId().getApplicationId(),
          container.containerId);
      super.transition(container, event);
    }
  }

  /**
   * Handle the following transition:
   * - EXITED_WITH_FAILURE -> DONE upon CONTAINER_RESOURCES_CLEANEDUP
   */
  static class ExitedWithFailureToDoneTransition extends
      ContainerDoneTransition {
    @Override
    public void transition(ContainerImpl container, ContainerEvent event) {
      if (container.wasLaunched) {
        container.metrics.endRunningContainer();
      }
      container.metrics.failedContainer();
      NMAuditLogger.logFailure(container.user,
          AuditConstants.FINISH_FAILED_CONTAINER, "ContainerImpl",
          "Container failed with state: " + container.getContainerState(),
          container.containerId.getApplicationAttemptId().getApplicationId(),
          container.containerId);
      super.transition(container, event);
    }
  }

  /**
   * Handle the following transition:
   * - KILLING -> DONE upon CONTAINER_RESOURCES_CLEANEDUP
   */
  static class KillingToDoneTransition extends
      ContainerDoneTransition {
    @Override
    public void transition(ContainerImpl container, ContainerEvent event) {
      container.metrics.killedContainer();
      NMAuditLogger.logSuccess(container.user,
          AuditConstants.FINISH_KILLED_CONTAINER, "ContainerImpl",
          container.containerId.getApplicationAttemptId().getApplicationId(),
          container.containerId);
      super.transition(container, event);
    }
  }

  /**
   * Handle the following transition:
   * CONTAINER_CLEANEDUP_AFTER_KILL -> DONE upon CONTAINER_RESOURCES_CLEANEDUP
   */
  static class ContainerCleanedupAfterKillToDoneTransition extends
      ContainerDoneTransition {
    @Override
    public void transition(ContainerImpl container, ContainerEvent event) {
      if (container.wasLaunched) {
        container.metrics.endRunningContainer();
      }
      container.metrics.killedContainer();
      NMAuditLogger.logSuccess(container.user,
          AuditConstants.FINISH_KILLED_CONTAINER, "ContainerImpl",
          container.containerId.getApplicationAttemptId().getApplicationId(),
          container.containerId);
      super.transition(container, event);
    }
  }

  /**
   * Update diagnostics, staying in the same state.
   */
  static class ContainerDiagnosticsUpdateTransition implements
      SingleArcTransition<ContainerImpl, ContainerEvent> {
    @Override
    public void transition(ContainerImpl container, ContainerEvent event) {
      ContainerDiagnosticsUpdateEvent updateEvent =
          (ContainerDiagnosticsUpdateEvent) event;
      container.addDiagnostics(updateEvent.getDiagnosticsUpdate() + "\n");
    }
  }

  /**
   * Transitions upon receiving PAUSE_CONTAINER.
   * - RUNNING -> PAUSING
   */
  @SuppressWarnings("unchecked") // dispatcher not typed
  static class PauseContainerTransition implements
      SingleArcTransition<ContainerImpl, ContainerEvent> {
    @Override
    public void transition(ContainerImpl container, ContainerEvent event) {
      // Pause the process/process-grp if it is supported by the container
      container.dispatcher.getEventHandler().handle(
          new ContainersLauncherEvent(container,
              ContainersLauncherEventType.PAUSE_CONTAINER));
      ContainerPauseEvent pauseEvent = (ContainerPauseEvent) event;
      container.addDiagnostics(pauseEvent.getDiagnostic() + "\n");
    }
  }

  /**
   * Transitions upon receiving PAUSED_CONTAINER.
   */
  @SuppressWarnings("unchecked") // dispatcher not typed
  static class PausedContainerTransition implements
      SingleArcTransition<ContainerImpl, ContainerEvent> {
    @Override
    public void transition(ContainerImpl container, ContainerEvent event) {
      container.setIsPaused(true);
      container.metrics.pausedContainer();
      // Container was PAUSED so tell the scheduler
      container.dispatcher.getEventHandler().handle(
          new ContainerSchedulerEvent(container,
              ContainerSchedulerEventType.CONTAINER_PAUSED));
    }
  }

  /**
   * Transitions upon receiving RESUME_CONTAINER.
   * - PAUSED -> RUNNING
   */
  @SuppressWarnings("unchecked") // dispatcher not typed
  static class ResumeContainerTransition implements
      SingleArcTransition<ContainerImpl, ContainerEvent> {
    @Override
    public void transition(ContainerImpl container, ContainerEvent event) {
      container.setIsPaused(false);
      // Pause the process/process-grp if it is supported by the container
      container.dispatcher.getEventHandler().handle(
          new ContainersLauncherEvent(container,
              ContainersLauncherEventType.RESUME_CONTAINER));
      ContainerResumeEvent resumeEvent = (ContainerResumeEvent) event;
      container.addDiagnostics(resumeEvent.getDiagnostic() + "\n");
    }
  }

  @Override
  public void handle(ContainerEvent event) {
    this.writeLock.lock();
    try {
      ContainerId containerID = event.getContainerID();
      LOG.debug("Processing {} of type {}", containerID, event.getType());
      ContainerState oldState = stateMachine.getCurrentState();
      ContainerState newState = null;
      try {
        newState =
            stateMachine.doTransition(event.getType(), event);
      } catch (InvalidStateTransitionException e) {
        LOG.error("Can't handle this event at current state: Current: ["
            + oldState + "], eventType: [" + event.getType() + "]," +
            " container: [" + containerID + "]", e);
      }
      if (newState != null && oldState != newState) {
        LOG.info("Container " + containerID + " transitioned from "
            + oldState
            + " to " + newState);
      }
    } finally {
      this.writeLock.unlock();
    }
  }

  @Override
  public String toString() {
    this.readLock.lock();
    try {
      return this.containerId.toString();
    } finally {
      this.readLock.unlock();
    }
  }

  private boolean hasDefaultExitCode() {
    return (this.exitCode == ContainerExitStatus.INVALID);
  }

  /**
   * Returns whether the specific resource should be uploaded to the shared
   * cache.
   */
  private static boolean shouldBeUploadedToSharedCache(ContainerImpl container,
      LocalResourceRequest resource) {
    return container.resourceSet.getResourcesUploadPolicies().get(resource);
  }

  private void setIsPaused(boolean paused) {
    if (this.wasPaused && !paused) {
      this.metrics.endPausedContainer();
    }
    this.wasPaused = paused;
  }

  @VisibleForTesting
  ContainerRetryContext getContainerRetryContext() {
    return containerRetryContext;
  }

  @Override
  public Priority getPriority() {
    return containerTokenIdentifier.getPriority();
  }

  @Override
  public boolean isRunning() {
    return getContainerState() == ContainerState.RUNNING;
  }

  @Override
  public void setIsReInitializing(boolean isReInitializing) {
    if (this.isReInitializing && !isReInitializing) {
      metrics.endReInitingContainer();
    }
    this.isReInitializing = isReInitializing;
  }

  @Override
  public boolean isReInitializing() {
    return this.isReInitializing;
  }

  @Override
  public boolean isMarkedForKilling() {
    return this.isMarkeForKilling;
  }

  @Override
  public boolean canRollback() {
    return (this.reInitContext != null)
        && (this.reInitContext.canRollback());
  }

  @Override
  public void commitUpgrade() {
    this.reInitContext = null;
  }

  @Override
  public boolean isRecovering() {
    boolean isRecovering = (
        recoveredStatus != RecoveredContainerStatus.REQUESTED &&
        getContainerState() == ContainerState.NEW);
    return isRecovering;
  }

  /**
   * Get assigned resource mappings to the container.
   *
   * @return Resource Mappings of the container
   */
  @Override
  public ResourceMappings getResourceMappings() {
    return resourceMappings;
  }

  private void storeRetryContext() {
    if (windowRetryContext.getRestartTimes() != null &&
        !windowRetryContext.getRestartTimes().isEmpty()) {
      try {
        stateStore.storeContainerRestartTimes(containerId,
            windowRetryContext.getRestartTimes());
      } catch (IOException e) {
        LOG.warn(
            "Unable to update finishTimeForRetryAttempts in state store for "
                + containerId, e);
      }
    }
    try {
      stateStore.storeContainerRemainingRetryAttempts(containerId,
          windowRetryContext.getRemainingRetries());
    } catch (IOException e) {
      LOG.warn(
          "Unable to update remainingRetryAttempts in state store for "
              + containerId, e);
    }
  }

  @VisibleForTesting
  SlidingWindowRetryPolicy getRetryPolicy() {
    return retryPolicy;
  }

  @Override
  public boolean isContainerInFinalStates() {
    ContainerState state = getContainerState();
    return state == ContainerState.KILLING || state == ContainerState.DONE
        || state == ContainerState.LOCALIZATION_FAILED
        || state == ContainerState.CONTAINER_RESOURCES_CLEANINGUP
        || state == ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL
        || state == ContainerState.EXITED_WITH_FAILURE
        || state == ContainerState.EXITED_WITH_SUCCESS;
  }

  @Override
  public void setExposedPorts(String ports) {
    this.exposedPorts = ports;
  }

  @Override
  public List<LocalizationStatus> getLocalizationStatuses() {
    this.readLock.lock();
    try {
      return resourceSet.getLocalizationStatuses();
    } finally {
      this.readLock.unlock();
    }
  }

  public void setContainerRuntimeData(Object containerRuntimeData) {
    this.containerRuntimeData = containerRuntimeData;
  }

  public <T> T getContainerRuntimeData(Class<T> runtimeClass)
      throws ContainerExecutionException {
    if (!runtimeClass.isInstance(containerRuntimeData)) {
      throw new ContainerExecutionException(
          "Runtime class " + containerRuntimeData.getClass().getCanonicalName()
          + " is invalid. Expected class " + runtimeClass.getCanonicalName());
    }
    return runtimeClass.cast(containerRuntimeData);
  }

  @Override
  public String localizationCountersAsString() {
    StringBuilder result =
        new StringBuilder(String.valueOf(localizationCounts[0]));
    for (int i = 1; i < localizationCounts.length; i++) {
      result.append(',').append(localizationCounts[i]);
    }
    return result.toString();
  }
}