ContainerLaunch.java

/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher;

import static org.apache.hadoop.fs.CreateFlag.CREATE;
import static org.apache.hadoop.fs.CreateFlag.OVERWRITE;
import static org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.TOKEN_FILE_NAME_FMT;

import org.apache.hadoop.yarn.server.nodemanager.executor.DeletionAsUserContext;

import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.PrintStream;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.SignalContainerCommand;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.exceptions.ConfigurationException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.Signal;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.WindowsSecureContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerExitEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerKillEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerPrepareContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerReapContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerSignalContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerStartContext;
import org.apache.hadoop.yarn.server.nodemanager.util.ProcessIdFileReader;
import org.apache.hadoop.yarn.server.security.AMSecretKeys;
import org.apache.hadoop.yarn.util.Apps;
import org.apache.hadoop.yarn.util.AuxiliaryServiceHelper;

import org.apache.hadoop.classification.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ContainerLaunch implements Callable<Integer> {

  private static final Logger LOG =
       LoggerFactory.getLogger(ContainerLaunch.class);

  private static final String CONTAINER_PRE_LAUNCH_PREFIX = "prelaunch";
  public static final String CONTAINER_PRE_LAUNCH_STDOUT = CONTAINER_PRE_LAUNCH_PREFIX + ".out";
  public static final String CONTAINER_PRE_LAUNCH_STDERR = CONTAINER_PRE_LAUNCH_PREFIX + ".err";

  public static final String CONTAINER_SCRIPT =
    Shell.appendScriptExtension("launch_container");

  public static final String FINAL_CONTAINER_TOKENS_FILE = "container_tokens";
  public static final String SYSFS_DIR = "sysfs";

  public static final String KEYSTORE_FILE = "yarn_provided.keystore";
  public static final String TRUSTSTORE_FILE = "yarn_provided.truststore";

  private static final String PID_FILE_NAME_FMT = "%s.pid";
  static final String EXIT_CODE_FILE_SUFFIX = ".exitcode";

  private static final String ADDITIONAL_JDK17_PLUS_OPTIONS =
      "--add-opens=java.base/java.lang=ALL-UNNAMED " +
      "--add-exports=java.base/sun.net.dns=ALL-UNNAMED " +
      "--add-exports=java.base/sun.net.util=ALL-UNNAMED";

  protected final Dispatcher dispatcher;
  protected final ContainerExecutor exec;
  protected final Application app;
  protected final Container container;
  private final Configuration conf;
  private final Context context;
  private final ContainerManagerImpl containerManager;

  protected AtomicBoolean containerAlreadyLaunched = new AtomicBoolean(false);
  protected AtomicBoolean shouldPauseContainer = new AtomicBoolean(false);

  protected AtomicBoolean completed = new AtomicBoolean(false);

  private volatile boolean killedBeforeStart = false;
  private long maxKillWaitTime = 2000;

  protected Path pidFilePath = null;

  protected final LocalDirsHandlerService dirsHandler;

  private final Lock launchLock = new ReentrantLock();

  public ContainerLaunch(Context context, Configuration configuration,
      Dispatcher dispatcher, ContainerExecutor exec, Application app,
      Container container, LocalDirsHandlerService dirsHandler,
      ContainerManagerImpl containerManager) {
    this.context = context;
    this.conf = configuration;
    this.app = app;
    this.exec = exec;
    this.container = container;
    this.dispatcher = dispatcher;
    this.dirsHandler = dirsHandler;
    this.containerManager = containerManager;
    this.maxKillWaitTime =
        conf.getLong(YarnConfiguration.NM_PROCESS_KILL_WAIT_MS,
            YarnConfiguration.DEFAULT_NM_PROCESS_KILL_WAIT_MS);
  }

  @VisibleForTesting
  public static String expandEnvironment(String var,
      Path containerLogDir) {
    var = var.replace(ApplicationConstants.LOG_DIR_EXPANSION_VAR,
      containerLogDir.toString());
    var = var.replace(ApplicationConstants.CLASS_PATH_SEPARATOR,
      File.pathSeparator);

    if (Shell.isJavaVersionAtLeast(17)) {
      var = var.replace(ApplicationConstants.JVM_ADD_OPENS_VAR, ADDITIONAL_JDK17_PLUS_OPTIONS);
    } else {
      var = var.replace(ApplicationConstants.JVM_ADD_OPENS_VAR, "");
    }

    // replace parameter expansion marker. e.g. {{VAR}} on Windows is replaced
    // as %VAR% and on Linux replaced as "$VAR"
    if (Shell.WINDOWS) {
      var = var.replaceAll("(\\{\\{)|(\\}\\})", "%");
    } else {
      var = var.replace(ApplicationConstants.PARAMETER_EXPANSION_LEFT, "$");
      var = var.replace(ApplicationConstants.PARAMETER_EXPANSION_RIGHT, "");
    }
    return var;
  }

  private void expandAllEnvironmentVars(
      Map<String, String> environment, Path containerLogDir) {
    for (Entry<String, String> entry : environment.entrySet()) {
      String value = entry.getValue();
      value = expandEnvironment(value, containerLogDir);
      entry.setValue(value);
    }
  }

  private void addKeystoreVars(Map<String, String> environment,
      Path containerWorkDir) {
    environment.put(ApplicationConstants.KEYSTORE_FILE_LOCATION_ENV_NAME,
        new Path(containerWorkDir,
            ContainerLaunch.KEYSTORE_FILE).toUri().getPath());
    environment.put(ApplicationConstants.KEYSTORE_PASSWORD_ENV_NAME,
        new String(container.getCredentials().getSecretKey(
            AMSecretKeys.YARN_APPLICATION_AM_KEYSTORE_PASSWORD),
            StandardCharsets.UTF_8));
  }

  private void addTruststoreVars(Map<String, String> environment,
                               Path containerWorkDir) {
    environment.put(
        ApplicationConstants.TRUSTSTORE_FILE_LOCATION_ENV_NAME,
        new Path(containerWorkDir,
            ContainerLaunch.TRUSTSTORE_FILE).toUri().getPath());
    environment.put(ApplicationConstants.TRUSTSTORE_PASSWORD_ENV_NAME,
        new String(container.getCredentials().getSecretKey(
            AMSecretKeys.YARN_APPLICATION_AM_TRUSTSTORE_PASSWORD),
            StandardCharsets.UTF_8));
  }

  @Override
  public Integer call() {
    if (!validateContainerState()) {
      return 0;
    }

    final ContainerLaunchContext launchContext = container.getLaunchContext();
    ContainerId containerID = container.getContainerId();
    String containerIdStr = containerID.toString();
    final List<String> command = launchContext.getCommands();
    int ret = -1;

    Path containerLogDir;
    try {
      Map<Path, List<String>> localResources = getLocalizedResources();

      final String user = container.getUser();
      // /////////////////////////// Variable expansion
      // Before the container script gets written out.
      List<String> newCmds = new ArrayList<String>(command.size());
      String appIdStr = app.getAppId().toString();
      String relativeContainerLogDir = ContainerLaunch
          .getRelativeContainerLogDir(appIdStr, containerIdStr);
      containerLogDir =
          dirsHandler.getLogPathForWrite(relativeContainerLogDir, false);
      recordContainerLogDir(containerID, containerLogDir.toString());
      for (String str : command) {
        // TODO: Should we instead work via symlinks without this grammar?
        newCmds.add(expandEnvironment(str, containerLogDir));
      }
      launchContext.setCommands(newCmds);

      // The actual expansion of environment variables happens after calling
      // addConfigsToEnv.  This allows variables specified in NM_ADMIN_USER_ENV
      // to reference user or container-defined variables.
      Map<String, String> environment = launchContext.getEnvironment();
      // /////////////////////////// End of variable expansion

      // Use this to track variables that are added to the environment by nm.
      LinkedHashSet<String> nmEnvVars = new LinkedHashSet<String>();

      FileContext lfs = FileContext.getLocalFSFileContext();

      Path nmPrivateContainerScriptPath = dirsHandler.getLocalPathForWrite(
          getContainerPrivateDir(appIdStr, containerIdStr) + Path.SEPARATOR
              + CONTAINER_SCRIPT);
      Path nmPrivateTokensPath = dirsHandler.getLocalPathForWrite(
          getContainerPrivateDir(appIdStr, containerIdStr) + Path.SEPARATOR
              + String.format(TOKEN_FILE_NAME_FMT, containerIdStr));
      Path nmPrivateKeystorePath = dirsHandler.getLocalPathForWrite(
          getContainerPrivateDir(appIdStr, containerIdStr) + Path.SEPARATOR
              + KEYSTORE_FILE);
      Path nmPrivateTruststorePath = dirsHandler.getLocalPathForWrite(
          getContainerPrivateDir(appIdStr, containerIdStr) + Path.SEPARATOR
              + TRUSTSTORE_FILE);
      Path nmPrivateClasspathJarDir = dirsHandler.getLocalPathForWrite(
          getContainerPrivateDir(appIdStr, containerIdStr));

      // Select the working directory for the container
      Path containerWorkDir = deriveContainerWorkDir();
      recordContainerWorkDir(containerID, containerWorkDir.toString());

      // Select a root dir for all csi volumes for the container
      Path csiVolumesRoot = deriveCsiVolumesRootDir();
      recordContainerCsiVolumesRootDir(containerID, csiVolumesRoot.toString());

      String pidFileSubpath = getPidFileSubpath(appIdStr, containerIdStr);
      // pid file should be in nm private dir so that it is not
      // accessible by users
      pidFilePath = dirsHandler.getLocalPathForWrite(pidFileSubpath);
      List<String> localDirs = dirsHandler.getLocalDirs();
      List<String> localDirsForRead = dirsHandler.getLocalDirsForRead();
      List<String> logDirs = dirsHandler.getLogDirs();
      List<String> filecacheDirs = getNMFilecacheDirs(localDirsForRead);
      List<String> userLocalDirs = getUserLocalDirs(localDirs);
      List<String> containerLocalDirs = getContainerLocalDirs(localDirs);
      List<String> containerLogDirs = getContainerLogDirs(logDirs);
      List<String> userFilecacheDirs = getUserFilecacheDirs(localDirsForRead);
      List<String> applicationLocalDirs = getApplicationLocalDirs(localDirs,
          appIdStr);

      if (!dirsHandler.areDisksHealthy()) {
        ret = ContainerExitStatus.DISKS_FAILED;
        throw new IOException("Most of the disks failed. "
            + dirsHandler.getDisksHealthReport(false));
      }
      List<Path> appDirs = new ArrayList<Path>(localDirs.size());
      for (String localDir : localDirs) {
        Path usersdir = new Path(localDir, ContainerLocalizer.USERCACHE);
        Path userdir = new Path(usersdir, user);
        Path appsdir = new Path(userdir, ContainerLocalizer.APPCACHE);
        appDirs.add(new Path(appsdir, appIdStr));
      }

      byte[] keystore = container.getCredentials().getSecretKey(
          AMSecretKeys.YARN_APPLICATION_AM_KEYSTORE);
      if (keystore != null) {
        try (DataOutputStream keystoreOutStream =
                 lfs.create(nmPrivateKeystorePath,
                     EnumSet.of(CREATE, OVERWRITE))) {
          keystoreOutStream.write(keystore);
        }
      } else {
        nmPrivateKeystorePath = null;
      }
      byte[] truststore = container.getCredentials().getSecretKey(
          AMSecretKeys.YARN_APPLICATION_AM_TRUSTSTORE);
      if (truststore != null) {
        try (DataOutputStream truststoreOutStream =
                 lfs.create(nmPrivateTruststorePath,
                     EnumSet.of(CREATE, OVERWRITE))) {
          truststoreOutStream.write(truststore);
        }
      } else {
        nmPrivateTruststorePath = null;
      }

      // Set the token location too.
      addToEnvMap(environment, nmEnvVars,
          ApplicationConstants.CONTAINER_TOKEN_FILE_ENV_NAME,
          new Path(containerWorkDir,
              FINAL_CONTAINER_TOKENS_FILE).toUri().getPath());

      // /////////// Write out the container-script in the nmPrivate space.
      try (DataOutputStream containerScriptOutStream =
               lfs.create(nmPrivateContainerScriptPath,
                   EnumSet.of(CREATE, OVERWRITE))) {
        addConfigsToEnv(environment);

        expandAllEnvironmentVars(environment, containerLogDir);

        // Sanitize the container's environment
        sanitizeEnv(environment, containerWorkDir, appDirs, userLocalDirs,
            containerLogDirs, localResources, nmPrivateClasspathJarDir,
            nmEnvVars);

        // Add these if needed after expanding so we don't expand key values.
        if (keystore != null) {
          addKeystoreVars(environment, containerWorkDir);
        }
        if (truststore != null) {
          addTruststoreVars(environment, containerWorkDir);
        }

        prepareContainer(localResources, containerLocalDirs);

        // Write out the environment
        exec.writeLaunchEnv(containerScriptOutStream, environment,
            localResources, launchContext.getCommands(),
            containerLogDir, user, nmEnvVars);
      }
      // /////////// End of writing out container-script

      // /////////// Write out the container-tokens in the nmPrivate space.
      try (DataOutputStream tokensOutStream =
               lfs.create(nmPrivateTokensPath, EnumSet.of(CREATE, OVERWRITE))) {
        Credentials creds = container.getCredentials();
        creds.writeTokenStorageToStream(tokensOutStream);
      }
      // /////////// End of writing out container-tokens

      ret = launchContainer(new ContainerStartContext.Builder()
          .setContainer(container)
          .setLocalizedResources(localResources)
          .setNmPrivateContainerScriptPath(nmPrivateContainerScriptPath)
          .setNmPrivateTokensPath(nmPrivateTokensPath)
          .setNmPrivateKeystorePath(nmPrivateKeystorePath)
          .setNmPrivateTruststorePath(nmPrivateTruststorePath)
          .setUser(user)
          .setAppId(appIdStr)
          .setContainerWorkDir(containerWorkDir)
          .setContainerCsiVolumesRootDir(csiVolumesRoot)
          .setLocalDirs(localDirs)
          .setLogDirs(logDirs)
          .setFilecacheDirs(filecacheDirs)
          .setUserLocalDirs(userLocalDirs)
          .setContainerLocalDirs(containerLocalDirs)
          .setContainerLogDirs(containerLogDirs)
          .setUserFilecacheDirs(userFilecacheDirs)
          .setApplicationLocalDirs(applicationLocalDirs).build());
    } catch (ConfigurationException e) {
      LOG.error("Failed to launch container due to configuration error.", e);
      dispatcher.getEventHandler().handle(new ContainerExitEvent(
          containerID, ContainerEventType.CONTAINER_EXITED_WITH_FAILURE, ret,
          e.getMessage()));
      // Mark the node as unhealthy
      context.getNodeStatusUpdater().reportException(e);
      return ret;
    } catch (Throwable e) {
      LOG.warn("Failed to launch container.", e);
      dispatcher.getEventHandler().handle(new ContainerExitEvent(
          containerID, ContainerEventType.CONTAINER_EXITED_WITH_FAILURE, ret,
          e.getMessage()));
      return ret;
    } finally {
      setContainerCompletedStatus(ret);
    }

    handleContainerExitCode(ret, containerLogDir);
    return ret;
  }

  /**
   * Volumes mount point root:
   *   ${YARN_LOCAL_DIR}/usercache/${user}/filecache/csiVolumes/app/container
   * CSI volumes may creates the mount point with different permission bits.
   * If we create the volume mount under container work dir, it may
   * mess up the existing permission structure, which is restricted by
   * linux container executor. So we put all volume mounts under a same
   * root dir so it is easier cleanup.
   **/
  private Path deriveCsiVolumesRootDir() throws IOException {
    final String containerVolumePath =
        ContainerLocalizer.USERCACHE + Path.SEPARATOR
            + container.getUser() + Path.SEPARATOR
            + ContainerLocalizer.FILECACHE + Path.SEPARATOR
            + ContainerLocalizer.CSI_VOLIUME_MOUNTS_ROOT + Path.SEPARATOR
            + app.getAppId().toString() + Path.SEPARATOR
            + container.getContainerId().toString();
    return dirsHandler.getLocalPathForWrite(containerVolumePath,
        LocalDirAllocator.SIZE_UNKNOWN, false);
  }

  private Path deriveContainerWorkDir() throws IOException {

    final String containerWorkDirPath =
        ContainerLocalizer.USERCACHE +
        Path.SEPARATOR +
        container.getUser() +
        Path.SEPARATOR +
        ContainerLocalizer.APPCACHE +
        Path.SEPARATOR +
        app.getAppId().toString() +
        Path.SEPARATOR +
        container.getContainerId().toString();

    final Path containerWorkDir =
        dirsHandler.getLocalPathForWrite(
          containerWorkDirPath,
          LocalDirAllocator.SIZE_UNKNOWN, false);

    return containerWorkDir;
  }

  private void prepareContainer(Map<Path, List<String>> localResources,
      List<String> containerLocalDirs) throws IOException {

    exec.prepareContainer(new ContainerPrepareContext.Builder()
        .setContainer(container)
        .setLocalizedResources(localResources)
        .setUser(container.getUser())
        .setContainerLocalDirs(containerLocalDirs)
        .setCommands(container.getLaunchContext().getCommands())
        .build());
  }

  protected boolean validateContainerState() {
    // CONTAINER_KILLED_ON_REQUEST should not be missed if the container
    // is already at KILLING
    if (container.getContainerState() == ContainerState.KILLING) {
      dispatcher.getEventHandler().handle(
          new ContainerExitEvent(container.getContainerId(),
              ContainerEventType.CONTAINER_KILLED_ON_REQUEST,
              Shell.WINDOWS ? ExitCode.FORCE_KILLED.getExitCode() :
                  ExitCode.TERMINATED.getExitCode(),
              "Container terminated before launch."));
      return false;
    }

    return true;
  }

  protected List<String> getContainerLogDirs(List<String> logDirs) {
    List<String> containerLogDirs = new ArrayList<>(logDirs.size());
    String appIdStr = app.getAppId().toString();
    String containerIdStr = container.getContainerId().toString();
    String relativeContainerLogDir = ContainerLaunch
        .getRelativeContainerLogDir(appIdStr, containerIdStr);

    for (String logDir : logDirs) {
      containerLogDirs.add(logDir + Path.SEPARATOR + relativeContainerLogDir);
    }

    return containerLogDirs;
  }

  protected List<String> getContainerLocalDirs(List<String> localDirs) {
    List<String> containerLocalDirs = new ArrayList<>(localDirs.size());
    String user = container.getUser();
    String appIdStr = app.getAppId().toString();
    String relativeContainerLocalDir = ContainerLocalizer.USERCACHE
        + Path.SEPARATOR + user + Path.SEPARATOR + ContainerLocalizer.APPCACHE
        + Path.SEPARATOR + appIdStr + Path.SEPARATOR;

    for (String localDir : localDirs) {
      containerLocalDirs.add(localDir + Path.SEPARATOR
          + relativeContainerLocalDir);
    }

    return containerLocalDirs;
  }

  protected List<String> getUserLocalDirs(List<String> localDirs) {
    List<String> userLocalDirs = new ArrayList<>(localDirs.size());
    String user = container.getUser();

    for (String localDir : localDirs) {
      String userLocalDir = localDir + Path.SEPARATOR +
          ContainerLocalizer.USERCACHE + Path.SEPARATOR + user
          + Path.SEPARATOR;

      userLocalDirs.add(userLocalDir);
    }

    return userLocalDirs;
  }

  protected List<String> getNMFilecacheDirs(List<String> localDirs) {
    List<String> filecacheDirs = new ArrayList<>(localDirs.size());

    for (String localDir : localDirs) {
      String filecacheDir = localDir + Path.SEPARATOR +
          ContainerLocalizer.FILECACHE;

      filecacheDirs.add(filecacheDir);
    }

    return filecacheDirs;
  }

  protected List<String> getUserFilecacheDirs(List<String> localDirs) {
    List<String> userFilecacheDirs = new ArrayList<>(localDirs.size());
    String user = container.getUser();
    for (String localDir : localDirs) {
      String userFilecacheDir = localDir + Path.SEPARATOR +
          ContainerLocalizer.USERCACHE + Path.SEPARATOR + user
          + Path.SEPARATOR + ContainerLocalizer.FILECACHE;
      userFilecacheDirs.add(userFilecacheDir);
    }
    return userFilecacheDirs;
  }

  protected List<String> getApplicationLocalDirs(List<String> localDirs,
      String appIdStr) {
    List<String> applicationLocalDirs = new ArrayList<>(localDirs.size());
    String user = container.getUser();
    for (String localDir : localDirs) {
      String appLocalDir = localDir + Path.SEPARATOR +
          ContainerLocalizer.USERCACHE + Path.SEPARATOR + user
          + Path.SEPARATOR + ContainerLocalizer.APPCACHE
          + Path.SEPARATOR + appIdStr;
      applicationLocalDirs.add(appLocalDir);
    }
    return applicationLocalDirs;
  }

  protected Map<Path, List<String>> getLocalizedResources()
      throws YarnException {
    Map<Path, List<String>> localResources = container.getLocalizedResources();
    if (localResources == null) {
      throw RPCUtil.getRemoteException(
          "Unable to get local resources when Container " + container
              + " is at " + container.getContainerState());
    }
    return localResources;
  }

  protected int launchContainer(ContainerStartContext ctx)
      throws IOException, ConfigurationException {
    int launchPrep = prepareForLaunch(ctx);
    if (launchPrep == 0) {
      launchLock.lock();
      try {
        return exec.launchContainer(ctx);
      } finally {
        launchLock.unlock();
      }
    }
    return launchPrep;
  }

  protected int relaunchContainer(ContainerStartContext ctx)
      throws IOException, ConfigurationException {
    int launchPrep = prepareForLaunch(ctx);
    if (launchPrep == 0) {
      launchLock.lock();
      try {
        return exec.relaunchContainer(ctx);
      } finally {
        launchLock.unlock();
      }
    }
    return launchPrep;
  }

  void reapContainer() throws IOException {
    launchLock.lock();
    try {
      // Reap the container
      boolean result = exec.reapContainer(
          new ContainerReapContext.Builder()
              .setContainer(container)
              .setUser(container.getUser())
              .build());
      if (!result) {
        throw new IOException("Reap container failed for container " +
            container.getContainerId());
      }
      cleanupContainerFiles(getContainerWorkDir());
    } finally {
      launchLock.unlock();
    }
  }

  protected int prepareForLaunch(ContainerStartContext ctx) throws IOException {
    ContainerId containerId = container.getContainerId();
    if (container.isMarkedForKilling()) {
      LOG.info("Container " + containerId + " not launched as it has already "
          + "been marked for Killing");
      this.killedBeforeStart = true;
      return ExitCode.TERMINATED.getExitCode();
    }
    // LaunchContainer is a blocking call. We are here almost means the
    // container is launched, so send out the event.
    dispatcher.getEventHandler().handle(new ContainerEvent(
        containerId,
        ContainerEventType.CONTAINER_LAUNCHED));
    context.getNMStateStore().storeContainerLaunched(containerId);

    // Check if the container is signalled to be killed.
    if (!containerAlreadyLaunched.compareAndSet(false, true)) {
      LOG.info("Container " + containerId + " not launched as "
          + "cleanup already called");
      return ExitCode.TERMINATED.getExitCode();
    } else {
      exec.activateContainer(containerId, pidFilePath);
    }
    return ExitCode.SUCCESS.getExitCode();
  }

  protected void setContainerCompletedStatus(int exitCode) {
    ContainerId containerId = container.getContainerId();
    completed.set(true);
    exec.deactivateContainer(containerId);
    try {
      if (!container.shouldRetry(exitCode)) {
        context.getNMStateStore().storeContainerCompleted(containerId,
            exitCode);
      }
    } catch (IOException e) {
      LOG.error("Unable to set exit code for container " + containerId);
    }
  }

  protected void handleContainerExitCode(int exitCode, Path containerLogDir) {
    ContainerId containerId = container.getContainerId();
    LOG.debug("Container {} completed with exit code {}", containerId,
        exitCode);

    StringBuilder diagnosticInfo =
        new StringBuilder("Container exited with a non-zero exit code ");
    diagnosticInfo.append(exitCode);
    diagnosticInfo.append(". ");
    if (exitCode == ExitCode.FORCE_KILLED.getExitCode()
        || exitCode == ExitCode.TERMINATED.getExitCode()) {
      // If the process was killed, Send container_cleanedup_after_kill and
      // just break out of this method.
      dispatcher.getEventHandler().handle(
          new ContainerExitEvent(containerId,
              ContainerEventType.CONTAINER_KILLED_ON_REQUEST, exitCode,
              diagnosticInfo.toString()));
    } else if (exitCode != 0) {
      handleContainerExitWithFailure(containerId, exitCode, containerLogDir,
          diagnosticInfo);
    } else {
      LOG.info("Container " + containerId + " succeeded ");
      dispatcher.getEventHandler().handle(
          new ContainerEvent(containerId,
              ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS));
    }
  }

  /**
   * Tries to tail and fetch TAIL_SIZE_IN_BYTES of data from the error log.
   * ErrorLog filename is not fixed and depends upon app, hence file name
   * pattern is used.
   *
   * @param containerID
   * @param ret
   * @param containerLogDir
   * @param diagnosticInfo
   */
  protected void handleContainerExitWithFailure(ContainerId containerID,
      int ret, Path containerLogDir, StringBuilder diagnosticInfo) {
    LOG.warn("Container launch failed : " + diagnosticInfo.toString());

    FileSystem fileSystem = null;
    long tailSizeInBytes =
        conf.getLong(YarnConfiguration.NM_CONTAINER_STDERR_BYTES,
            YarnConfiguration.DEFAULT_NM_CONTAINER_STDERR_BYTES);

    // Append container prelaunch stderr to diagnostics
    try {
      fileSystem = FileSystem.getLocal(conf).getRaw();
      FileStatus preLaunchErrorFileStatus = fileSystem
          .getFileStatus(new Path(containerLogDir, ContainerLaunch.CONTAINER_PRE_LAUNCH_STDERR));

      Path errorFile = preLaunchErrorFileStatus.getPath();
      long fileSize = preLaunchErrorFileStatus.getLen();

      diagnosticInfo.append("Error file: ")
          .append(ContainerLaunch.CONTAINER_PRE_LAUNCH_STDERR).append(".\n");
      ;

      byte[] tailBuffer = tailFile(errorFile, fileSize, tailSizeInBytes);
      diagnosticInfo.append("Last ").append(tailSizeInBytes)
          .append(" bytes of ").append(errorFile.getName()).append(" :\n")
          .append(new String(tailBuffer, StandardCharsets.UTF_8));
    } catch (IOException e) {
      LOG.error("Failed to get tail of the container's prelaunch error log file", e);
    }

    // Append container stderr to diagnostics
    String errorFileNamePattern =
        conf.get(YarnConfiguration.NM_CONTAINER_STDERR_PATTERN,
            YarnConfiguration.DEFAULT_NM_CONTAINER_STDERR_PATTERN);

    try {
      if (fileSystem == null) {
        fileSystem = FileSystem.getLocal(conf).getRaw();
      }
      FileStatus[] errorFileStatuses = fileSystem
          .globStatus(new Path(containerLogDir, errorFileNamePattern));
      if (errorFileStatuses != null && errorFileStatuses.length != 0) {
        Path errorFile = errorFileStatuses[0].getPath();
        long fileSize = errorFileStatuses[0].getLen();

        // if more than one file matches the stderr pattern, take the latest
        // modified file, and also append the file names in the diagnosticInfo
        if (errorFileStatuses.length > 1) {
          String[] errorFileNames = new String[errorFileStatuses.length];
          long latestModifiedTime = errorFileStatuses[0].getModificationTime();
          errorFileNames[0] = errorFileStatuses[0].getPath().getName();
          for (int i = 1; i < errorFileStatuses.length; i++) {
            errorFileNames[i] = errorFileStatuses[i].getPath().getName();
            if (errorFileStatuses[i]
                .getModificationTime() > latestModifiedTime) {
              latestModifiedTime = errorFileStatuses[i].getModificationTime();
              errorFile = errorFileStatuses[i].getPath();
              fileSize = errorFileStatuses[i].getLen();
            }
          }
          diagnosticInfo.append("Error files: ")
              .append(StringUtils.join(", ", errorFileNames)).append(".\n");
        }

        byte[] tailBuffer = tailFile(errorFile, fileSize, tailSizeInBytes);
        String tailBufferMsg = new String(tailBuffer, StandardCharsets.UTF_8);
        diagnosticInfo.append("Last ").append(tailSizeInBytes)
            .append(" bytes of ").append(errorFile.getName()).append(" :\n")
            .append(tailBufferMsg).append("\n")
            .append(analysesErrorMsgOfContainerExitWithFailure(tailBufferMsg));

      }
    } catch (IOException e) {
      LOG.error("Failed to get tail of the container's error log file", e);
    }
    this.dispatcher.getEventHandler()
        .handle(new ContainerExitEvent(containerID,
            ContainerEventType.CONTAINER_EXITED_WITH_FAILURE, ret,
            diagnosticInfo.toString()));
  }

  private byte[] tailFile(Path filePath, long fileSize, long tailSizeInBytes) throws IOException {
    FSDataInputStream errorFileIS = null;
    FileSystem fileSystem = FileSystem.getLocal(conf).getRaw();
    try {
      long startPosition =
          (fileSize < tailSizeInBytes) ? 0 : fileSize - tailSizeInBytes;
      int bufferSize =
          (int) ((fileSize < tailSizeInBytes) ? fileSize : tailSizeInBytes);
      byte[] tailBuffer = new byte[bufferSize];
      errorFileIS = fileSystem.open(filePath);
      errorFileIS.readFully(startPosition, tailBuffer);
      return tailBuffer;
    } finally {
      IOUtils.cleanupWithLogger(LOG, errorFileIS);
    }
  }

  private String analysesErrorMsgOfContainerExitWithFailure(String errorMsg) {
    StringBuilder analysis = new StringBuilder();
    if (errorMsg.indexOf("Error: Could not find or load main class"
        + " org.apache.hadoop.mapreduce") != -1) {
      analysis.append(
          "Please check whether your <HADOOP_HOME>/etc/hadoop/mapred-site.xml "
          + "contains the below configuration:\n");
      analysis.append("<property>\n")
          .append("  <name>yarn.app.mapreduce.am.env</name>\n")
          .append("  <value>HADOOP_MAPRED_HOME=${full path of your hadoop "
              + "distribution directory}</value>\n")
          .append("</property>\n<property>\n")
          .append("  <name>mapreduce.map.env</name>\n")
          .append("  <value>HADOOP_MAPRED_HOME=${full path of your hadoop "
              + "distribution directory}</value>\n")
          .append("</property>\n<property>\n")
          .append("  <name>mapreduce.reduce.env</name>\n")
          .append("  <value>HADOOP_MAPRED_HOME=${full path of your hadoop "
              + "distribution directory}</value>\n")
          .append("</property>\n");
    }
    return analysis.toString();
  }

  protected String getPidFileSubpath(String appIdStr, String containerIdStr) {
    return getContainerPrivateDir(appIdStr, containerIdStr) + Path.SEPARATOR
        + String.format(ContainerLaunch.PID_FILE_NAME_FMT, containerIdStr);
  }

  /**
   * Send a signal to the container.
   *
   *
   * @throws IOException
   */
  public void signalContainer(SignalContainerCommand command)
      throws IOException {
    ContainerId containerId =
        container.getContainerTokenIdentifier().getContainerID();
    String containerIdStr = containerId.toString();
    String user = container.getUser();
    Signal signal = translateCommandToSignal(command);
    if (signal.equals(Signal.NULL)) {
      LOG.info("ignore signal command " + command);
      return;
    }

    LOG.info("Sending signal " + command + " to container " + containerIdStr);

    boolean alreadyLaunched =
        !containerAlreadyLaunched.compareAndSet(false, true);
    if (!alreadyLaunched) {
      LOG.info("Container " + containerIdStr + " not launched."
          + " Not sending the signal");
      return;
    }

    LOG.debug("Getting pid for container {} to send signal to from pid"
        + " file {}", containerIdStr,
        (pidFilePath != null ? pidFilePath.toString() : "null"));

    try {
      // get process id from pid file if available
      // else if shell is still active, get it from the shell
      String processId = getContainerPid();
      if (processId != null) {
        LOG.debug("Sending signal to pid {} as user {} for container {}",
            processId, user, containerIdStr);

        boolean result = exec.signalContainer(
            new ContainerSignalContext.Builder()
                .setContainer(container)
                .setUser(user)
                .setPid(processId)
                .setSignal(signal)
                .build());

        String diagnostics = "Sent signal " + command
            + " (" + signal + ") to pid " + processId
            + " as user " + user
            + " for container " + containerIdStr
            + ", result=" + (result ? "success" : "failed");
        LOG.info(diagnostics);

        dispatcher.getEventHandler().handle(
            new ContainerDiagnosticsUpdateEvent(containerId, diagnostics));
      }
    } catch (Exception e) {
      String message =
          "Exception when sending signal to container " + containerIdStr
              + ": " + StringUtils.stringifyException(e);
      LOG.warn(message);
    }
  }

  @VisibleForTesting
  public static Signal translateCommandToSignal(
      SignalContainerCommand command) {
    Signal signal = Signal.NULL;
    switch (command) {
      case OUTPUT_THREAD_DUMP:
        // TODO for windows support.
        signal = Shell.WINDOWS ? Signal.NULL: Signal.QUIT;
        break;
      case GRACEFUL_SHUTDOWN:
        signal = Signal.TERM;
        break;
      case FORCEFUL_SHUTDOWN:
        signal = Signal.KILL;
        break;
    }
    return signal;
  }

  /**
   * Pause the container.
   * Cancels the launch if the container isn't launched yet. Otherwise asks the
   * executor to pause the container.
   * @throws IOException in case of errors.
   */
  public void pauseContainer() throws IOException {
    ContainerId containerId = container.getContainerId();
    String containerIdStr = containerId.toString();
    LOG.info("Pausing the container " + containerIdStr);

    // The pause event is only handled if the container is in the running state
    // (the container state machine), so we don't check for
    // shouldLaunchContainer over here

    if (!shouldPauseContainer.compareAndSet(false, true)) {
      LOG.info("Container " + containerId + " not paused as "
          + "resume already called");
      return;
    }

    try {
      // Pause the container
      exec.pauseContainer(container);

      // PauseContainer is a blocking call. We are here almost means the
      // container is paused, so send out the event.
      dispatcher.getEventHandler().handle(new ContainerEvent(
          containerId,
          ContainerEventType.CONTAINER_PAUSED));

      try {
        this.context.getNMStateStore().storeContainerPaused(
            container.getContainerId());
      } catch (IOException e) {
        LOG.warn("Could not store container [" + container.getContainerId()
            + "] state. The Container has been paused.", e);
      }
    } catch (Exception e) {
      String message =
          "Exception when trying to pause container " + containerIdStr
              + ": " + StringUtils.stringifyException(e);
      LOG.info(message);
      container.handle(new ContainerKillEvent(container.getContainerId(),
          ContainerExitStatus.PREEMPTED, "Container preempted as there was "
          + " an exception in pausing it."));
    }
  }

  /**
   * Resume the container.
   * Cancels the launch if the container isn't launched yet. Otherwise asks the
   * executor to pause the container.
   * @throws IOException in case of error.
   */
  public void resumeContainer() throws IOException {
    ContainerId containerId = container.getContainerId();
    String containerIdStr = containerId.toString();
    LOG.info("Resuming the container " + containerIdStr);

    // The resume event is only handled if the container is in a paused state
    // so we don't check for the launched flag here.

    // paused flag will be set to true if process already paused
    boolean alreadyPaused = !shouldPauseContainer.compareAndSet(false, true);
    if (!alreadyPaused) {
      LOG.info("Container " + containerIdStr + " not paused."
          + " No resume necessary");
      return;
    }

    // If the container has already started
    try {
      exec.resumeContainer(container);
      // ResumeContainer is a blocking call. We are here almost means the
      // container is resumed, so send out the event.
      dispatcher.getEventHandler().handle(new ContainerEvent(
          containerId,
          ContainerEventType.CONTAINER_RESUMED));

      try {
        this.context.getNMStateStore().removeContainerPaused(
            container.getContainerId());
      } catch (IOException e) {
        LOG.warn("Could not store container [" + container.getContainerId()
            + "] state. The Container has been resumed.", e);
      }
    } catch (Exception e) {
      String message =
          "Exception when trying to resume container " + containerIdStr
              + ": " + StringUtils.stringifyException(e);
      LOG.info(message);
      container.handle(new ContainerKillEvent(container.getContainerId(),
          ContainerExitStatus.PREEMPTED, "Container preempted as there was "
          + " an exception in pausing it."));
    }
  }

  /**
   * Loop through for a time-bounded interval waiting to
   * read the process id from a file generated by a running process.
   * @return Process ID; null when pidFilePath is null
   * @throws Exception
   */
  String getContainerPid() throws Exception {
    if (pidFilePath == null) {
      return null;
    }
    String containerIdStr = 
        container.getContainerId().toString();
    String processId;
    LOG.debug("Accessing pid for container {} from pid file {}",
        containerIdStr, pidFilePath);
    int sleepCounter = 0;
    final int sleepInterval = 100;

    // loop waiting for pid file to show up 
    // until our timer expires in which case we admit defeat
    while (true) {
      processId = ProcessIdFileReader.getProcessId(pidFilePath);
      if (processId != null) {
        LOG.debug("Got pid {} for container {}", processId, containerIdStr);
        break;
      }
      else if ((sleepCounter*sleepInterval) > maxKillWaitTime) {
        LOG.info("Could not get pid for " + containerIdStr
        		+ ". Waited for " + maxKillWaitTime + " ms.");
        break;
      }
      else {
        ++sleepCounter;
        Thread.sleep(sleepInterval);
      }
    }
    return processId;
  }

  public static String getRelativeContainerLogDir(String appIdStr,
      String containerIdStr) {
    return appIdStr + Path.SEPARATOR + containerIdStr;
  }

  protected String getContainerPrivateDir(String appIdStr,
      String containerIdStr) {
    return getAppPrivateDir(appIdStr) + Path.SEPARATOR + containerIdStr
        + Path.SEPARATOR;
  }

  private String getAppPrivateDir(String appIdStr) {
    return ResourceLocalizationService.NM_PRIVATE_DIR + Path.SEPARATOR
        + appIdStr;
  }

  Context getContext() {
    return context;
  }

  public static abstract class ShellScriptBuilder {
    public static ShellScriptBuilder create() {
      return create(Shell.osType);
    }

    @VisibleForTesting
    public static ShellScriptBuilder create(Shell.OSType osType) {
      return (osType == Shell.OSType.OS_TYPE_WIN) ?
          new WindowsShellScriptBuilder() :
          new UnixShellScriptBuilder();
    }

    private static final String LINE_SEPARATOR =
        System.getProperty("line.separator");
    private final StringBuilder sb = new StringBuilder();

    public abstract void command(List<String> command) throws IOException;

    protected static final String ENV_PRELAUNCH_STDOUT = "PRELAUNCH_OUT";
    protected static final String ENV_PRELAUNCH_STDERR = "PRELAUNCH_ERR";

    private boolean redirectStdOut = false;
    private boolean redirectStdErr = false;

    /**
     * Set stdout for the shell script
     * @param stdoutDir stdout must be an absolute path
     * @param stdOutFile stdout file name
     * @throws IOException thrown when stdout path is not absolute
     */
    public final void stdout(Path stdoutDir, String stdOutFile) throws IOException {
      if (!stdoutDir.isAbsolute()) {
        throw new IOException("Stdout path must be absolute");
      }
      redirectStdOut = true;
      setStdOut(new Path(stdoutDir, stdOutFile));
    }

    /**
     * Set stderr for the shell script
     * @param stderrDir stderr must be an absolute path
     * @param stdErrFile stderr file name
     * @throws IOException thrown when stderr path is not absolute
     */
    public final void stderr(Path stderrDir, String stdErrFile) throws IOException {
      if (!stderrDir.isAbsolute()) {
        throw new IOException("Stdout path must be absolute");
      }
      redirectStdErr = true;
      setStdErr(new Path(stderrDir, stdErrFile));
    }

    protected abstract void setStdOut(Path stdout) throws IOException;

    protected abstract void setStdErr(Path stdout) throws IOException;

    public abstract void env(String key, String value) throws IOException;

    public abstract void whitelistedEnv(String key, String value)
        throws IOException;

    public abstract void echo(String echoStr) throws IOException;

    public final void symlink(Path src, Path dst) throws IOException {
      if (!src.isAbsolute()) {
        throw new IOException("Source must be absolute");
      }
      if (dst.isAbsolute()) {
        throw new IOException("Destination must be relative");
      }
      if (dst.toUri().getPath().indexOf('/') != -1) {
        mkdir(dst.getParent());
      }
      link(src, dst);
    }

    /**
     * Method to copy files that are useful for debugging container failures.
     * This method will be called by ContainerExecutor when setting up the
     * container launch script. The method should take care to make sure files
     * are read-able by the yarn user if the files are to undergo
     * log-aggregation.
     * @param src path to the source file
     * @param dst path to the destination file - should be absolute
     * @throws IOException
     */
    public abstract void copyDebugInformation(Path src, Path dst)
        throws IOException;

    /**
     * Method to dump debug information to a target file. This method will
     * be called by ContainerExecutor when setting up the container launch
     * script.
     * @param output the file to which debug information is to be written
     * @throws IOException
     */
    public abstract void listDebugInformation(Path output) throws IOException;

    @Override
    public String toString() {
      return sb.toString();
    }

    public final void write(PrintStream out) throws IOException {
      out.append(sb);
    }

    protected final void buildCommand(String... command) {
      for (String s : command) {
        sb.append(s);
      }
    }

    protected final void linebreak(String... command) {
      sb.append(LINE_SEPARATOR);
    }

    protected final void line(String... command) {
      buildCommand(command);
      linebreak();
    }

    public void setExitOnFailure() {
      // Dummy implementation
    }

    protected abstract void link(Path src, Path dst) throws IOException;

    protected abstract void mkdir(Path path) throws IOException;

    boolean doRedirectStdOut() {
      return redirectStdOut;
    }

    boolean doRedirectStdErr() {
      return redirectStdErr;
    }

    /**
     * Parse an environment value and returns all environment keys it uses.
     * @param envVal an environment variable's value
     * @return all environment variable names used in <code>envVal</code>.
     */
    public Set<String> getEnvDependencies(final String envVal) {
      return Collections.emptySet();
    }

    /**
     * Returns a dependency ordered version of <code>envs</code>. Does not alter
     * input <code>envs</code> map.
     * @param envs environment map
     * @return a dependency ordered version of <code>envs</code>
     */
    public final Map<String, String> orderEnvByDependencies(
        Map<String, String> envs) {
      if (envs == null || envs.size() < 2) {
        return envs;
      }
      final Map<String, String> ordered = new LinkedHashMap<String, String>();
      class Env {
        private boolean resolved = false;
        private final Collection<Env> deps = new ArrayList<>();
        private final String name;
        private final String value;
        Env(String name, String value) {
          this.name = name;
          this.value = value;
        }
        void resolve() {
          resolved = true;
          for (Env dep : deps) {
            if (!dep.resolved) {
              dep.resolve();
            }
          }
          ordered.put(name, value);
        }
      }
      final Map<String, Env> singletons = new HashMap<>();
      for (Map.Entry<String, String> e : envs.entrySet()) {
        Env env = singletons.get(e.getKey());
        if (env == null) {
          env = new Env(e.getKey(), e.getValue());
          singletons.put(env.name, env);
        }
        for (String depStr : getEnvDependencies(env.value)) {
          if (!envs.containsKey(depStr)) {
            continue;
          }
          Env depEnv = singletons.get(depStr);
          if (depEnv == null) {
            depEnv = new Env(depStr, envs.get(depStr));
            singletons.put(depStr, depEnv);
          }
          env.deps.add(depEnv);
        }
      }
      for (Env env : singletons.values()) {
        if (!env.resolved) {
          env.resolve();
        }
      }
      return ordered;
    }
  }

  private static final class UnixShellScriptBuilder extends ShellScriptBuilder {
    @SuppressWarnings("unused")
    private void errorCheck() {
      line("hadoop_shell_errorcode=$?");
      line("if [[ \"$hadoop_shell_errorcode\" -ne 0 ]]");
      line("then");
      line("  exit $hadoop_shell_errorcode");
      line("fi");
    }

    public UnixShellScriptBuilder() {
      line("#!/bin/bash");
      line();
    }

    @Override
    public void command(List<String> command) {
      line("exec /bin/bash -c \"", StringUtils.join(" ", command), "\"");
    }

    @Override
    public void setStdOut(final Path stdout) throws IOException {
      line("export ", ENV_PRELAUNCH_STDOUT, "=\"", stdout.toString(), "\"");
      // tee is needed for DefaultContainerExecutor error propagation to stdout
      // Close stdout of subprocess to prevent it from writing to the stdout file
      line("exec >\"${" + ENV_PRELAUNCH_STDOUT + "}\"");
    }

    @Override
    public void setStdErr(final Path stderr) throws IOException {
      line("export ", ENV_PRELAUNCH_STDERR, "=\"", stderr.toString(), "\"");
      // tee is needed for DefaultContainerExecutor error propagation to stderr
      // Close stdout of subprocess to prevent it from writing to the stdout file
      line("exec 2>\"${" + ENV_PRELAUNCH_STDERR + "}\"");
    }

    @Override
    public void env(String key, String value) throws IOException {
      line("export ", key, "=\"", value, "\"");
    }

    @Override
    public void whitelistedEnv(String key, String value) throws IOException {
      line("export ", key, "=${", key, ":-", "\"", value, "\"}");
    }

    @Override
    public void echo(final String echoStr) throws IOException {
      line("echo \"" + echoStr + "\"");
    }

    @Override
    protected void link(Path src, Path dst) throws IOException {
      line("ln -sf -- \"", src.toUri().getPath(), "\" \"", dst.toString(), "\"");
    }

    @Override
    protected void mkdir(Path path) throws IOException {
      line("mkdir -p ", path.toString());
    }

    @Override
    public void copyDebugInformation(Path src, Path dest) throws IOException {
      line("# Creating copy of launch script");
      line("cp \"", src.toUri().getPath(), "\" \"", dest.toUri().getPath(),
          "\"");
      // set permissions to 640 because we need to be able to run
      // log aggregation in secure mode as well
      if(dest.isAbsolute()) {
        line("chmod 640 \"", dest.toUri().getPath(), "\"");
      }
    }

    @Override
    public void listDebugInformation(Path output) throws  IOException {
      line("# Determining directory contents");
      line("echo \"ls -l:\" 1>\"", output.toString(), "\"");
      line("ls -l 1>>\"", output.toString(), "\"");

      // don't run error check because if there are loops
      // find will exit with an error causing container launch to fail
      // find will follow symlinks outside the work dir if such sylimks exist
      // (like public/app local resources)
      line("echo \"find -L . -maxdepth 5 -ls:\" 1>>\"", output.toString(),
          "\"");
      line("find -L . -maxdepth 5 -ls 1>>\"", output.toString(), "\"");
      line("echo \"broken symlinks(find -L . -maxdepth 5 -type l -ls):\" 1>>\"",
          output.toString(), "\"");
      line("find -L . -maxdepth 5 -type l -ls 1>>\"", output.toString(), "\"");
    }

    @Override
    public void setExitOnFailure() {
      line("set -o pipefail -e");
    }

    /**
     * Parse <code>envVal</code> using bash-like syntax to extract env variables
     * it depends on.
     */
    @Override
    public Set<String> getEnvDependencies(final String envVal) {
      if (envVal == null || envVal.isEmpty()) {
        return Collections.emptySet();
      }
      final Set<String> deps = new HashSet<>();
      // env/whitelistedEnv dump values inside double quotes
      boolean inDoubleQuotes = true;
      char c;
      int i = 0;
      final int len = envVal.length();
      while (i < len) {
        c = envVal.charAt(i);
        if (c == '"') {
          inDoubleQuotes = !inDoubleQuotes;
        } else if (c == '\'' && !inDoubleQuotes) {
          i++;
          // eat until closing simple quote
          while (i < len) {
            c = envVal.charAt(i);
            if (c == '\\') {
              i++;
            }
            if (c == '\'') {
              break;
            }
            i++;
          }
        } else if (c == '\\') {
          i++;
        } else if (c == '$') {
          i++;
          if (i >= len) {
            break;
          }
          c = envVal.charAt(i);
          if (c == '{') { // for ${... bash like syntax
            i++;
            if (i >= len) {
              break;
            }
            c = envVal.charAt(i);
            if (c == '#') { // for ${#... bash array syntax
              i++;
              if (i >= len) {
                break;
              }
            }
          }
          final int start = i;
          while (i < len) {
            c = envVal.charAt(i);
            if (c != '$' && (
                (i == start && Character.isJavaIdentifierStart(c)) ||
                    (i > start && Character.isJavaIdentifierPart(c)))) {
              i++;
            } else {
              break;
            }
          }
          if (i > start) {
            deps.add(envVal.substring(start, i));
          }
        }
        i++;
      }
      return deps;
    }
  }

  private static final class WindowsShellScriptBuilder
      extends ShellScriptBuilder {

    private static final Pattern VARIABLE_PATTERN = Pattern.compile("%(.*?)%");
    private static final Pattern SPLIT_PATTERN = Pattern.compile(":");

    private void errorCheck() {
      line("@if %errorlevel% neq 0 exit /b %errorlevel%");
    }

    private void lineWithLenCheck(String... commands) throws IOException {
      Shell.checkWindowsCommandLineLength(commands);
      line(commands);
    }

    public WindowsShellScriptBuilder() {
      line("@setlocal");
      line();
    }

    @Override
    public void command(List<String> command) throws IOException {
      lineWithLenCheck("@call ", StringUtils.join(" ", command));
      errorCheck();
    }

    //Dummy implementation
    @Override
    protected void setStdOut(final Path stdout) throws IOException {
    }

    //Dummy implementation
    @Override
    protected void setStdErr(final Path stderr) throws IOException {
    }

    @Override
    public void env(String key, String value) throws IOException {
      lineWithLenCheck("@set ", key, "=", value);
      errorCheck();
    }

    @Override
    public void whitelistedEnv(String key, String value) throws IOException {
      env(key, value);
    }

    @Override
    public void echo(final String echoStr) throws IOException {
      lineWithLenCheck("@echo \"", echoStr, "\"");
    }

    @Override
    protected void link(Path src, Path dst) throws IOException {
      File srcFile = new File(src.toUri().getPath());
      String srcFileStr = srcFile.getPath();
      String dstFileStr = new File(dst.toString()).getPath();
      lineWithLenCheck(String.format("@%s symlink \"%s\" \"%s\"",
          Shell.getWinUtilsPath(), dstFileStr, srcFileStr));
      errorCheck();
    }

    @Override
    protected void mkdir(Path path) throws IOException {
      lineWithLenCheck(String.format("@if not exist \"%s\" mkdir \"%s\"",
          path.toString(), path.toString()));
      errorCheck();
    }

    @Override
    public void copyDebugInformation(Path src, Path dest)
        throws IOException {
      // no need to worry about permissions - in secure mode
      // WindowsSecureContainerExecutor will set permissions
      // to allow NM to read the file
      line("rem Creating copy of launch script");
      lineWithLenCheck(String.format("copy \"%s\" \"%s\"", src.toString(),
          dest.toString()));
    }

    @Override
    public void listDebugInformation(Path output) throws IOException {
      line("rem Determining directory contents");
      lineWithLenCheck(
          String.format("@echo \"dir:\" > \"%s\"", output.toString()));
      lineWithLenCheck(String.format("dir >> \"%s\"", output.toString()));
    }

    /**
     * Parse <code>envVal</code> using cmd/bat-like syntax to extract env
     * variables it depends on.
     */
    public Set<String> getEnvDependencies(final String envVal) {
      if (envVal == null || envVal.isEmpty()) {
        return Collections.emptySet();
      }

      // Example inputs: %var%, %%, %a:b%
      Matcher matcher = VARIABLE_PATTERN.matcher(envVal);
      final Set<String> deps = new HashSet<>();
      while (matcher.find()) {
        String match = matcher.group(1);
        if (!match.isEmpty()) {
          if (match.equals(":")) {
            // Special case, variable name can be a single : character
            deps.add(match);
          } else {
            // Either store the variable name before the : string manipulation
            // character or the whole match. (%var% -> var, %a:b% -> a)
            String[] split = SPLIT_PATTERN.split(match, 2);
            if (!split[0].isEmpty()) {
              deps.add(split[0]);
            }
          }
        }
      }
      return deps;
    }
  }

  private static void addToEnvMap(
      Map<String, String> envMap, Set<String> envSet,
      String envName, String envValue) {
    envMap.put(envName, envValue);
    envSet.add(envName);
  }

  public void sanitizeEnv(Map<String, String> environment, Path pwd,
      List<Path> appDirs, List<String> userLocalDirs, List<String>
      containerLogDirs, Map<Path, List<String>> resources,
      Path nmPrivateClasspathJarDir,
      Set<String> nmVars) throws IOException {
    // Based on discussion in YARN-7654, for ENTRY_POINT enabled
    // docker container, we forward user defined environment variables
    // without node manager environment variables.  This is the reason
    // that we skip sanitizeEnv method.
    boolean overrideDisable = Boolean.parseBoolean(
        environment.get(
            Environment.
                YARN_CONTAINER_RUNTIME_DOCKER_RUN_OVERRIDE_DISABLE.
                    name()));
    if (overrideDisable) {
      environment.remove("WORK_DIR");
      return;
    }

    /**
     * Non-modifiable environment variables
     */

    addToEnvMap(environment, nmVars, Environment.CONTAINER_ID.name(),
        container.getContainerId().toString());

    addToEnvMap(environment, nmVars, Environment.NM_PORT.name(),
      String.valueOf(this.context.getNodeId().getPort()));

    addToEnvMap(environment, nmVars, Environment.NM_HOST.name(),
        this.context.getNodeId().getHost());

    addToEnvMap(environment, nmVars, Environment.NM_HTTP_PORT.name(),
      String.valueOf(this.context.getHttpPort()));

    addToEnvMap(environment, nmVars, Environment.LOCAL_DIRS.name(),
        StringUtils.join(",", appDirs));

    addToEnvMap(environment, nmVars, Environment.LOCAL_USER_DIRS.name(),
        StringUtils.join(",", userLocalDirs));

    addToEnvMap(environment, nmVars, Environment.LOG_DIRS.name(),
      StringUtils.join(",", containerLogDirs));

    addToEnvMap(environment, nmVars, Environment.USER.name(),
        container.getUser());

    addToEnvMap(environment, nmVars, Environment.LOGNAME.name(),
        container.getUser());

    addToEnvMap(environment, nmVars, Environment.HOME.name(),
        conf.get(
            YarnConfiguration.NM_USER_HOME_DIR, 
            YarnConfiguration.DEFAULT_NM_USER_HOME_DIR
            )
        );

    addToEnvMap(environment, nmVars, Environment.PWD.name(), pwd.toString());

    addToEnvMap(environment, nmVars, Environment.LOCALIZATION_COUNTERS.name(),
        container.localizationCountersAsString());

    if (!Shell.WINDOWS) {
      addToEnvMap(environment, nmVars, "JVM_PID", "$$");
    }

    // TODO: Remove Windows check and use this approach on all platforms after
    // additional testing.  See YARN-358.
    if (Shell.WINDOWS) {
      sanitizeWindowsEnv(environment, pwd,
          resources, nmPrivateClasspathJarDir);
    }

    // put AuxiliaryService data to environment
    for (Map.Entry<String, ByteBuffer> meta : containerManager
        .getAuxServiceMetaData().entrySet()) {
      AuxiliaryServiceHelper.setServiceDataIntoEnv(
          meta.getKey(), meta.getValue(), environment);
      nmVars.add(AuxiliaryServiceHelper.getPrefixServiceName(meta.getKey()));
    }
  }

  /**
   * There are some configurations (such as {@value YarnConfiguration#NM_ADMIN_USER_ENV}) whose
   * values need to be added to the environment variables.
   *
   * @param environment The environment variables map to add the configuration values to.
   */
  public void addConfigsToEnv(Map<String, String> environment) {
    // variables here will be forced in, even if the container has
    // specified them.  Note: we do not track these in nmVars, to
    // allow them to be ordered properly if they reference variables
    // defined by the user.
    String defEnvStr = conf.get(YarnConfiguration.DEFAULT_NM_ADMIN_USER_ENV);
    Apps.setEnvFromInputProperty(environment, YarnConfiguration.NM_ADMIN_USER_ENV, defEnvStr, conf,
        File.pathSeparator);

    if (!Shell.WINDOWS) {
      // maybe force path components
      String forcePath = conf.get(YarnConfiguration.NM_ADMIN_FORCE_PATH,
          YarnConfiguration.DEFAULT_NM_ADMIN_FORCE_PATH);
      if (!forcePath.isEmpty()) {
        String userPath = environment.get(Environment.PATH.name());
        environment.remove(Environment.PATH.name());
        if (userPath == null || userPath.isEmpty()) {
          Apps.addToEnvironment(environment, Environment.PATH.name(), forcePath,
              File.pathSeparator);
          Apps.addToEnvironment(environment, Environment.PATH.name(), "$PATH", File.pathSeparator);
        } else {
          Apps.addToEnvironment(environment, Environment.PATH.name(), forcePath,
              File.pathSeparator);
          Apps.addToEnvironment(environment, Environment.PATH.name(), userPath, File.pathSeparator);
        }
      }
    }
  }

  private void sanitizeWindowsEnv(Map<String, String> environment, Path pwd,
      Map<Path, List<String>> resources, Path nmPrivateClasspathJarDir)
      throws IOException {
    String inputClassPath = environment.get(Environment.CLASSPATH.name());

    if (inputClassPath != null && !inputClassPath.isEmpty()) {

      //On non-windows, localized resources
      //from distcache are available via the classpath as they were placed
      //there but on windows they are not available when the classpath
      //jar is created and so they "are lost" and have to be explicitly
      //added to the classpath instead.  This also means that their position
      //is lost relative to other non-distcache classpath entries which will
      //break things like mapreduce.job.user.classpath.first.  An environment
      //variable can be set to indicate that distcache entries should come
      //first

      boolean preferLocalizedJars = Boolean.parseBoolean(
              environment.get(Environment.CLASSPATH_PREPEND_DISTCACHE.name())
      );

      boolean needsSeparator = false;
      StringBuilder newClassPath = new StringBuilder();
      if (!preferLocalizedJars) {
        newClassPath.append(inputClassPath);
        needsSeparator = true;
      }

      // Localized resources do not exist at the desired paths yet, because the
      // container launch script has not run to create symlinks yet.  This
      // means that FileUtil.createJarWithClassPath can't automatically expand
      // wildcards to separate classpath entries for each file in the manifest.
      // To resolve this, append classpath entries explicitly for each
      // resource.
      for (Map.Entry<Path, List<String>> entry : resources.entrySet()) {
        boolean targetIsDirectory = new File(entry.getKey().toUri().getPath())
                .isDirectory();

        for (String linkName : entry.getValue()) {
          // Append resource.
          if (needsSeparator) {
            newClassPath.append(File.pathSeparator);
          } else {
            needsSeparator = true;
          }
          newClassPath.append(pwd.toString())
                  .append(Path.SEPARATOR).append(linkName);

          // FileUtil.createJarWithClassPath must use File.toURI to convert
          // each file to a URI to write into the manifest's classpath.  For
          // directories, the classpath must have a trailing '/', but
          // File.toURI only appends the trailing '/' if it is a directory that
          // already exists.  To resolve this, add the classpath entries with
          // explicit trailing '/' here for any localized resource that targets
          // a directory.  Then, FileUtil.createJarWithClassPath will guarantee
          // that the resulting entry in the manifest's classpath will have a
          // trailing '/', and thus refer to a directory instead of a file.
          if (targetIsDirectory) {
            newClassPath.append(Path.SEPARATOR);
          }
        }
      }
      if (preferLocalizedJars) {
        if (needsSeparator) {
          newClassPath.append(File.pathSeparator);
        }
        newClassPath.append(inputClassPath);
      }

      // When the container launches, it takes the parent process's environment
      // and then adds/overwrites with the entries from the container launch
      // context.  Do the same thing here for correct substitution of
      // environment variables in the classpath jar manifest.
      Map<String, String> mergedEnv = new HashMap<String, String>(
              System.getenv());
      mergedEnv.putAll(environment);

      // this is hacky and temporary - it's to preserve the windows secure
      // behavior but enable non-secure windows to properly build the class
      // path for access to job.jar/lib/xyz and friends (see YARN-2803)
      Path jarDir;
      if (exec instanceof WindowsSecureContainerExecutor) {
        jarDir = nmPrivateClasspathJarDir;
      } else {
        jarDir = pwd;
      }
      String[] jarCp = FileUtil.createJarWithClassPath(
              newClassPath.toString(), jarDir, pwd, mergedEnv);
      // In a secure cluster the classpath jar must be localized to grant access
      Path localizedClassPathJar = exec.localizeClasspathJar(
              new Path(jarCp[0]), pwd, container.getUser());
      String replacementClassPath = localizedClassPathJar.toString() + jarCp[1];
      environment.put(Environment.CLASSPATH.name(), replacementClassPath);
    }
  }

  public static String getExitCodeFile(String pidFile) {
    return pidFile + EXIT_CODE_FILE_SUFFIX;
  }

  private void recordContainerLogDir(ContainerId containerId,
      String logDir) throws IOException{
    container.setLogDir(logDir);
    if (container.isRetryContextSet()) {
      context.getNMStateStore().storeContainerLogDir(containerId, logDir);
    }
  }

  private void recordContainerWorkDir(ContainerId containerId,
      String workDir) throws IOException{
    container.setWorkDir(workDir);
    if (container.isRetryContextSet()) {
      context.getNMStateStore().storeContainerWorkDir(containerId, workDir);
    }
  }

  private void recordContainerCsiVolumesRootDir(ContainerId containerId,
      String volumesRoot) throws IOException {
    container.setCsiVolumesRootDir(volumesRoot);
    // TODO persistent to the NM store...
  }

  protected Path getContainerWorkDir() throws IOException {
    String containerWorkDir = container.getWorkDir();
    if (containerWorkDir == null
        || !dirsHandler.isGoodLocalDir(containerWorkDir)) {
      throw new IOException(
          "Could not find a good work dir " + containerWorkDir
              + " for container " + container);
    }

    return new Path(containerWorkDir);
  }

  /**
   * Clean up container's files for container relaunch or cleanup.
   */
  protected void cleanupContainerFiles(Path containerWorkDir) {
    LOG.debug("cleanup container {} files", containerWorkDir);
    // delete ContainerScriptPath
    deleteAsUser(new Path(containerWorkDir, CONTAINER_SCRIPT));
    // delete TokensPath
    deleteAsUser(new Path(containerWorkDir, FINAL_CONTAINER_TOKENS_FILE));
    // delete sysfs dir
    deleteAsUser(new Path(containerWorkDir, SYSFS_DIR));

    // delete symlinks because launch script will create symlinks again
    try {
      exec.cleanupBeforeRelaunch(container);
    } catch (IOException | InterruptedException e) {
      LOG.warn("{} exec failed to cleanup", container.getContainerId(), e);
    }
  }

  private void deleteAsUser(Path path) {
    try {
      exec.deleteAsUser(new DeletionAsUserContext.Builder()
          .setUser(container.getUser())
          .setSubDir(path)
          .build());
    } catch (Exception e) {
      LOG.warn("Failed to delete " + path, e);
    }
  }

  /**
   * Returns the PID File Path.
   */
  Path getPidFilePath() {
    return pidFilePath;
  }

  /**
   * Marks the container to be launched only if it was not launched.
   *
   * @return true if successful; false otherwise.
   */
  boolean markLaunched() {
    return containerAlreadyLaunched.compareAndSet(false, true);
  }

  /**
   * Returns if the launch is completed or not.
   */
  boolean isLaunchCompleted() {
    return completed.get();
  }

}