ContainerExecutor.java

/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.yarn.server.nodemanager;

import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintStream;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.ConfigurationException;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.ShellScriptBuilder;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.runtime.ContainerExecutionException;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerPrepareContext;
import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerHardwareUtils;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerExecContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerLivenessContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerReacquisitionContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerReapContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerSignalContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerStartContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.DeletionAsUserContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.LocalizerStartContext;
import org.apache.hadoop.yarn.server.nodemanager.util.ProcessIdFileReader;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.StringUtils;

import static org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.CONTAINER_PRE_LAUNCH_STDERR;
import static org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.CONTAINER_PRE_LAUNCH_STDOUT;

/**
 * This class is abstraction of the mechanism used to launch a container on the
 * underlying OS.  All executor implementations must extend ContainerExecutor.
 */
public abstract class ContainerExecutor implements Configurable {
  private static final Logger LOG =
       LoggerFactory.getLogger(ContainerExecutor.class);
  protected static final String WILDCARD = "*";
  public static final String TOKEN_FILE_NAME_FMT = "%s.tokens";

  /**
   * The permissions to use when creating the launch script.
   */
  public static final FsPermission TASK_LAUNCH_SCRIPT_PERMISSION =
      FsPermission.createImmutable((short)0700);

  /**
   * The relative path to which debug information will be written.
   *
   * @see ShellScriptBuilder#listDebugInformation
   */
  public static final String DIRECTORY_CONTENTS = "directory.info";

  private Configuration conf;
  private final ConcurrentMap<ContainerId, Path> pidFiles =
      new ConcurrentHashMap<>();
  private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
  private String[] whitelistVars;
  private int exitCodeFileTimeout =
      YarnConfiguration.DEFAULT_NM_CONTAINER_EXECUTOR_EXIT_FILE_TIMEOUT;
  private int containerExitCode;

  @Override
  public void setConf(Configuration conf) {
    this.conf = conf;
    if (conf != null) {
      whitelistVars = conf.get(YarnConfiguration.NM_ENV_WHITELIST,
          YarnConfiguration.DEFAULT_NM_ENV_WHITELIST).split(",");
      exitCodeFileTimeout = conf.getInt(
          YarnConfiguration.NM_CONTAINER_EXECUTOR_EXIT_FILE_TIMEOUT,
          YarnConfiguration.DEFAULT_NM_CONTAINER_EXECUTOR_EXIT_FILE_TIMEOUT);
    }
  }

  @Override
  public Configuration getConf() {
    return conf;
  }

  /**
   * Run the executor initialization steps.
   * Verify that the necessary configs and permissions are in place.
   *
   * @param nmContext Context of NM
   * @throws IOException if initialization fails
   */
  public abstract void init(Context nmContext) throws IOException;

  public void start() {}

  public void stop() {}

  /**
   * This function localizes the JAR file on-demand.
   * On Windows the ContainerLaunch creates a temporary special JAR manifest of
   * other JARs to workaround the CLASSPATH length. In a secure cluster this
   * JAR must be localized so that the container has access to it.
   * The default implementation returns the classpath passed to it, which
   * is expected to have been created in the node manager's <i>fprivate</i>
   * folder, which will not work with secure Windows clusters.
   *
   * @param jarPath the path to the JAR to localize
   * @param target the directory where the JAR file should be localized
   * @param owner the name of the user who should own the localized file
   * @return the path to the localized JAR file
   * @throws IOException if localization fails
   */
  public Path localizeClasspathJar(Path jarPath, Path target, String owner)
      throws IOException {
    return jarPath;
  }


  /**
   * Prepare the environment for containers in this application to execute.
   * <pre>
   * For $x in local.dirs
   *   create $x/$user/$appId
   * Copy $nmLocal/appTokens {@literal ->} $N/$user/$appId
   * For $rsrc in private resources
   *   Copy $rsrc {@literal ->} $N/$user/filecache/[idef]
   * For $rsrc in job resources
   *   Copy $rsrc {@literal ->} $N/$user/$appId/filecache/idef
   * </pre>
   *
   * @param ctx LocalizerStartContext that encapsulates necessary information
   *            for starting a localizer.
   * @throws IOException for most application init failures
   * @throws InterruptedException if application init thread is halted by NM
   * @throws ConfigurationException if config error was found
   */
  public abstract void startLocalizer(LocalizerStartContext ctx)
      throws IOException, InterruptedException, ConfigurationException;

  /**
   * Prepare the container prior to the launch environment being written.
   * @param ctx Encapsulates information necessary for launching containers.
   * @throws IOException if errors occur during container preparation
   */
  public void prepareContainer(ContainerPrepareContext ctx) throws
      IOException{
  }

  /**
   * Launch the container on the node. This is a blocking call and returns only
   * when the container exits.
   * @param ctx Encapsulates information necessary for launching containers.
   * @return the return status of the launch
   * @throws IOException if the container launch fails
   * @throws ConfigurationException if config error was found
   */
  public abstract int launchContainer(ContainerStartContext ctx) throws
      IOException, ConfigurationException;

  /**
   * Relaunch the container on the node. This is a blocking call and returns
   * only when the container exits.
   * @param ctx Encapsulates information necessary for relaunching containers.
   * @return the return status of the relaunch
   * @throws IOException if the container relaunch fails
   * @throws ConfigurationException if config error was found
   */
  public abstract int relaunchContainer(ContainerStartContext ctx) throws
      IOException, ConfigurationException;

  /**
   * Signal container with the specified signal.
   *
   * @param ctx Encapsulates information necessary for signaling containers.
   * @return returns true if the operation succeeded
   * @throws IOException if signaling the container fails
   */
  public abstract boolean signalContainer(ContainerSignalContext ctx)
      throws IOException;

  /**
   * Perform the steps necessary to reap the container.
   *
   * @param ctx Encapsulates information necessary for reaping containers.
   * @return returns true if the operation succeeded.
   * @throws IOException if reaping the container fails.
   */
  public abstract boolean reapContainer(ContainerReapContext ctx)
      throws IOException;

  /**
   * Perform interactive docker command into running container.
   *
   * @param ctx Encapsulates information necessary for exec containers.
   * @return return input/output stream if the operation succeeded.
   * @throws ContainerExecutionException if container exec fails.
   */
  public abstract IOStreamPair execContainer(ContainerExecContext ctx)
      throws ContainerExecutionException;

  /**
   * Delete specified directories as a given user.
   *
   * @param ctx Encapsulates information necessary for deletion.
   * @throws IOException if delete fails
   * @throws InterruptedException if interrupted while waiting for the deletion
   * operation to complete
   */
  public abstract void deleteAsUser(DeletionAsUserContext ctx)
      throws IOException, InterruptedException;

  /**
   * Create a symlink file which points to the target.
   * @param target The target for symlink
   * @param symlink the symlink file
   * @throws IOException Error when creating symlinks
   */
  public abstract void symLink(String target, String symlink)
      throws IOException;

  /**
   * Check if a container is alive.
   * @param ctx Encapsulates information necessary for container liveness check.
   * @return true if container is still alive
   * @throws IOException if there is a failure while checking the container
   * status
   */
  public abstract boolean isContainerAlive(ContainerLivenessContext ctx)
      throws IOException;


  public Map<String, LocalResource> getLocalResources(Container container)
      throws IOException {
    return container.getLaunchContext().getLocalResources();
  }

  /**
   * Update cluster information inside container.
   *
   * @param ctx ContainerRuntimeContext
   * @param user Owner of application
   * @param appId YARN application ID
   * @param spec Service Specification
   * @throws IOException if there is a failure while writing spec to disk
   */
  public abstract void updateYarnSysFS(Context ctx, String user,
      String appId, String spec) throws IOException;

  /**
   * Recover an already existing container. This is a blocking call and returns
   * only when the container exits.  Note that the container must have been
   * activated prior to this call.
   *
   * @param ctx encapsulates information necessary to reacquire container
   * @return The exit code of the pre-existing container
   * @throws IOException if there is a failure while reacquiring the container
   * @throws InterruptedException if interrupted while waiting to reacquire
   * the container
   */
  public int reacquireContainer(ContainerReacquisitionContext ctx)
      throws IOException, InterruptedException {
    Container container = ctx.getContainer();
    String user = ctx.getUser();
    ContainerId containerId = ctx.getContainerId();
    Path pidPath = getPidFilePath(containerId);

    if (pidPath == null) {
      LOG.warn("{} is not active, returning terminated error", containerId);
      containerExitCode = ExitCode.TERMINATED.getExitCode();
      return ExitCode.TERMINATED.getExitCode();
    }

    String pid = ProcessIdFileReader.getProcessId(pidPath);

    if (pid == null) {
      throw new IOException("Unable to determine pid for " + containerId);
    }

    LOG.info("Reacquiring {} with pid {}", containerId, pid);

    ContainerLivenessContext livenessContext = new ContainerLivenessContext
        .Builder()
        .setContainer(container)
        .setUser(user)
        .setPid(pid)
        .build();

    while (isContainerAlive(livenessContext)) {
      Thread.sleep(1000);
    }

    // wait for exit code file to appear
    final int sleepMsec = 100;
    int msecLeft = this.exitCodeFileTimeout;
    String exitCodeFile = ContainerLaunch.getExitCodeFile(pidPath.toString());
    File file = new File(exitCodeFile);

    while (!file.exists() && msecLeft >= 0) {
      if (!isContainerActive(containerId)) {
        LOG.info("{} was deactivated", containerId);
        containerExitCode = ExitCode.TERMINATED.getExitCode();
        return ExitCode.TERMINATED.getExitCode();
      }

      Thread.sleep(sleepMsec);

      msecLeft -= sleepMsec;
    }

    if (msecLeft < 0) {
      throw new IOException("Timeout while waiting for exit code from "
          + containerId);
    }

    try {
      containerExitCode = Integer.parseInt(
          FileUtils.readFileToString(file, StandardCharsets.UTF_8).trim());
      return containerExitCode;
    } catch (NumberFormatException e) {
      throw new IOException("Error parsing exit code from pid " + pid, e);
    }
  }

  /**
   * This method writes out the launch environment of a container to the
   * default container launch script. For the default container script path see
   * {@link ContainerLaunch#CONTAINER_SCRIPT}.
   *
   * @param out the output stream to which the environment is written (usually
   * a script file which will be executed by the Launcher)
   * @param environment the environment variables and their values
   * @param resources the resources which have been localized for this
   * container. Symlinks will be created to these localized resources
   * @param command the command that will be run
   * @param logDir the log dir to which to copy debugging information
   * @param user the username of the job owner
   * @param nmVars the set of environment vars that are explicitly set by NM
   * @throws IOException if any errors happened writing to the OutputStream,
   * while creating symlinks
   */
  public void writeLaunchEnv(OutputStream out, Map<String, String> environment,
      Map<Path, List<String>> resources, List<String> command, Path logDir,
      String user, LinkedHashSet<String> nmVars) throws IOException {
    this.writeLaunchEnv(out, environment, resources, command, logDir, user,
        ContainerLaunch.CONTAINER_SCRIPT, nmVars);
  }

  /**
   * This method writes out the launch environment of a container to a specified
   * path.
   *
   * @param out the output stream to which the environment is written (usually
   * a script file which will be executed by the Launcher)
   * @param environment the environment variables and their values
   * @param resources the resources which have been localized for this
   * container. Symlinks will be created to these localized resources
   * @param command the command that will be run
   * @param logDir the log dir to which to copy debugging information
   * @param user the username of the job owner
   * @param outFilename the path to which to write the launch environment
   * @param nmVars the set of environment vars that are explicitly set by NM
   * @throws IOException if any errors happened writing to the OutputStream,
   * while creating symlinks
   */
  @VisibleForTesting
  public void writeLaunchEnv(OutputStream out, Map<String, String> environment,
      Map<Path, List<String>> resources, List<String> command, Path logDir,
      String user, String outFilename, LinkedHashSet<String> nmVars)
      throws IOException {

    ContainerLaunch.ShellScriptBuilder sb =
        ContainerLaunch.ShellScriptBuilder.create();

    // Add "set -o pipefail -e" to validate launch_container script.
    sb.setExitOnFailure();

    //Redirect stdout and stderr for launch_container script
    sb.stdout(logDir, CONTAINER_PRE_LAUNCH_STDOUT);
    sb.stderr(logDir, CONTAINER_PRE_LAUNCH_STDERR);


    if (environment != null) {
      sb.echo("Setting up env variables");
      // Whitelist environment variables are treated specially.
      // Only add them if they are not already defined in the environment.
      // Add them using special syntax to prevent them from eclipsing
      // variables that may be set explicitly in the container image (e.g,
      // in a docker image).  Put these before the others to ensure the
      // correct expansion is used.
      for(String var : whitelistVars) {
        if (!environment.containsKey(var)) {
          String val = getNMEnvVar(var);
          if (val != null) {
            sb.whitelistedEnv(var, val);
          }
        }
      }
      // Now write vars that were set explicitly by nodemanager, preserving
      // the order they were written in.
      for (String nmEnvVar : nmVars) {
        sb.env(nmEnvVar, environment.get(nmEnvVar));
      }
      // Now write the remaining environment variables.
      for (Map.Entry<String, String> env :
           sb.orderEnvByDependencies(environment).entrySet()) {
        if (!nmVars.contains(env.getKey())) {
          sb.env(env.getKey(), env.getValue());
        }
      }
    }

    if (resources != null) {
      sb.echo("Setting up job resources");
      Map<Path, Path> symLinks = resolveSymLinks(resources, user);
      for (Map.Entry<Path, Path> symLink : symLinks.entrySet()) {
        sb.symlink(symLink.getKey(), symLink.getValue());
      }
    }

    // dump debugging information if configured
    if (shouldWriteDebugInformation(getConf())) {
      sb.echo("Copying debugging information");
      sb.copyDebugInformation(new Path(outFilename),
          new Path(logDir, outFilename));
      sb.listDebugInformation(new Path(logDir, DIRECTORY_CONTENTS));
    }
    sb.echo("Launching container");
    sb.command(command);

    PrintStream pout = null;
    try {
      pout = new PrintStream(out, false, "UTF-8");
      sb.write(pout);
    } finally {
      if (out != null) {
        out.close();
      }
    }
  }

  /**
   * Return the files in the target directory. If retrieving the list of files
   * requires specific access rights, that access will happen as the
   * specified user. The list will not include entries for "." or "..".
   *
   * @param user the user as whom to access the target directory
   * @param dir the target directory
   * @return a list of files in the target directory
   */
  protected File[] readDirAsUser(String user, Path dir) {
    return new File(dir.toString()).listFiles();
  }

  private boolean shouldWriteDebugInformation(Configuration config) {
    return config != null && (
            config.getBoolean(
                YarnConfiguration.NM_LOG_CONTAINER_DEBUG_INFO,
                YarnConfiguration.DEFAULT_NM_LOG_CONTAINER_DEBUG_INFO
            ) || (
            config.getBoolean(
                YarnConfiguration.NM_LOG_CONTAINER_DEBUG_INFO_ON_ERROR,
                YarnConfiguration.DEFAULT_NM_LOG_CONTAINER_DEBUG_INFO_ON_ERROR
            ) && containerExitCode != 0));
  }

  /**
   * The container exit code.
   */
  public enum ExitCode {
    SUCCESS(0),
    FORCE_KILLED(137),
    TERMINATED(143),
    LOST(154);

    private final int code;

    private ExitCode(int exitCode) {
      this.code = exitCode;
    }

    /**
     * Get the exit code as an int.
     * @return the exit code as an int
     */
    public int getExitCode() {
      return code;
    }

    @Override
    public String toString() {
      return String.valueOf(code);
    }
  }

  /**
   * The constants for the signals.
   */
  public enum Signal {
    NULL(0, "NULL"),
    QUIT(3, "SIGQUIT"),
    KILL(9, "SIGKILL"),
    TERM(15, "SIGTERM");

    private final int value;
    private final String str;

    private Signal(int value, String str) {
      this.str = str;
      this.value = value;
    }

    /**
     * Get the signal number.
     * @return the signal number
     */
    public int getValue() {
      return value;
    }

    @Override
    public String toString() {
      return str;
    }
  }

  /**
   * Log each line of the output string as INFO level log messages.
   *
   * @param output the output string to log
   */
  protected void logOutput(String output) {
    String shExecOutput = output;

    if (shExecOutput != null) {
      for (String str : shExecOutput.split("\n")) {
        LOG.info(str);
      }
    }
  }

  /**
   * Get the pidFile of the container.
   *
   * @param containerId the container ID
   * @return the path of the pid-file for the given containerId.
   */
  protected Path getPidFilePath(ContainerId containerId) {
    return this.pidFiles.get(containerId);
  }

  /**
   * Return a command line to execute the given command in the OS shell.
   * On Windows, the {code}groupId{code} parameter can be used to launch
   * and associate the given GID with a process group. On
   * non-Windows hosts, the {code}groupId{code} parameter is ignored.
   *
   * @param command the command to execute
   * @param groupId the job owner's GID
   * @param userName the job owner's username
   * @param pidFile the path to the container's PID file
   * @param config the configuration
   * @return the command line to execute
   */
  protected String[] getRunCommand(String command, String groupId,
      String userName, Path pidFile, Configuration config) {
    return getRunCommand(command, groupId, userName, pidFile, config, null);
  }

  /**
   * Return a command line to execute the given command in the OS shell.
   * On Windows, the {code}groupId{code} parameter can be used to launch
   * and associate the given GID with a process group. On
   * non-Windows hosts, the {code}groupId{code} parameter is ignored.
   *
   * @param command the command to execute
   * @param groupId the job owner's GID for Windows. On other operating systems
   * it is ignored.
   * @param userName the job owner's username for Windows. On other operating
   * systems it is ignored.
   * @param pidFile the path to the container's PID file on Windows. On other
   * operating systems it is ignored.
   * @param config the configuration
   * @param resource on Windows this parameter controls memory and CPU limits.
   * If null, no limits are set. On other operating systems it is ignored.
   * @return the command line to execute
   */
  protected String[] getRunCommand(String command, String groupId,
      String userName, Path pidFile, Configuration config, Resource resource) {
    if (Shell.WINDOWS) {
      return getRunCommandForWindows(command, groupId, userName, pidFile,
          config, resource);
    } else {
      return getRunCommandForOther(command, config);
    }

  }

  /**
   * Return a command line to execute the given command in the OS shell.
   * The {code}groupId{code} parameter can be used to launch
   * and associate the given GID with a process group.
   *
   * @param command the command to execute
   * @param groupId the job owner's GID
   * @param userName the job owner's username
   * @param pidFile the path to the container's PID file
   * @param config the configuration
   * @param resource this parameter controls memory and CPU limits.
   * If null, no limits are set.
   * @return the command line to execute
   */
  protected String[] getRunCommandForWindows(String command, String groupId,
      String userName, Path pidFile, Configuration config, Resource resource) {
    int cpuRate = -1;
    int memory = -1;

    if (resource != null) {
      if (config.getBoolean(
          YarnConfiguration.NM_WINDOWS_CONTAINER_MEMORY_LIMIT_ENABLED,
          YarnConfiguration.
            DEFAULT_NM_WINDOWS_CONTAINER_MEMORY_LIMIT_ENABLED)) {
        memory = (int) resource.getMemorySize();
      }

      if (config.getBoolean(
          YarnConfiguration.NM_WINDOWS_CONTAINER_CPU_LIMIT_ENABLED,
          YarnConfiguration.DEFAULT_NM_WINDOWS_CONTAINER_CPU_LIMIT_ENABLED)) {
        int containerVCores = resource.getVirtualCores();
        int nodeVCores = NodeManagerHardwareUtils.getVCores(config);
        int nodeCpuPercentage =
            NodeManagerHardwareUtils.getNodeCpuPercentage(config);

        float containerCpuPercentage =
            (float)(nodeCpuPercentage * containerVCores) / nodeVCores;

        // CPU should be set to a percentage * 100, e.g. 20% cpu rate limit
        // should be set as 20 * 100.
        cpuRate = Math.min(10000, (int)(containerCpuPercentage * 100));
      }
    }

    return new String[] {
        Shell.getWinUtilsPath(),
        "task",
        "create",
        "-m",
        String.valueOf(memory),
        "-c",
        String.valueOf(cpuRate),
        groupId,
        "cmd /c " + command
    };
  }

  /**
   * Return a command line to execute the given command in the OS shell.
   *
   * @param command the command to execute
   * @param config the configuration
   * @return the command line to execute
   */
  protected String[] getRunCommandForOther(String command,
      Configuration config) {
    List<String> retCommand = new ArrayList<>();
    boolean containerSchedPriorityIsSet = false;
    int containerSchedPriorityAdjustment =
        YarnConfiguration.DEFAULT_NM_CONTAINER_EXECUTOR_SCHED_PRIORITY;

    if (config.get(YarnConfiguration.NM_CONTAINER_EXECUTOR_SCHED_PRIORITY) !=
        null) {
      containerSchedPriorityIsSet = true;
      containerSchedPriorityAdjustment = config
          .getInt(YarnConfiguration.NM_CONTAINER_EXECUTOR_SCHED_PRIORITY,
          YarnConfiguration.DEFAULT_NM_CONTAINER_EXECUTOR_SCHED_PRIORITY);
    }

    if (containerSchedPriorityIsSet) {
      retCommand.addAll(Arrays.asList("nice", "-n",
          Integer.toString(containerSchedPriorityAdjustment)));
    }

    retCommand.addAll(Arrays.asList("bash", command));

    return retCommand.toArray(new String[retCommand.size()]);
  }

  /**
   * Return whether the container is still active.
   *
   * @param containerId the target container's ID
   * @return true if the container is active
   */
  protected boolean isContainerActive(ContainerId containerId) {
    return this.pidFiles.containsKey(containerId);
  }

  @VisibleForTesting
  protected String getNMEnvVar(String varname) {
    return System.getenv(varname);
  }

  /**
   * Mark the container as active.
   *
   * @param containerId the container ID
   * @param pidFilePath the path where the executor should write the PID
   * of the launched process
   */
  public void activateContainer(ContainerId containerId, Path pidFilePath) {
    this.pidFiles.put(containerId, pidFilePath);
  }

  // LinuxContainerExecutor overrides this method and behaves differently.
  public String[] getIpAndHost(Container container)
      throws ContainerExecutionException {
    return getLocalIpAndHost(container);
  }

  // ipAndHost[0] contains ip.
  // ipAndHost[1] contains hostname.
  public static String[] getLocalIpAndHost(Container container) {
    String[] ipAndHost = new String[2];
    try {
      InetAddress address = InetAddress.getLocalHost();
      ipAndHost[0] = address.getHostAddress();
      ipAndHost[1] = address.getHostName();
    } catch (UnknownHostException e) {
      LOG.error("Unable to get Local hostname and ip for {}", container
          .getContainerId(), e);
    }
    return ipAndHost;
  }

  /**
   * Mark the container as inactive. For inactive containers this
   * method has no effect.
   *
   * @param containerId the container ID
   */
  public void deactivateContainer(ContainerId containerId) {
    this.pidFiles.remove(containerId);
  }

  /**
   * Pause the container. The default implementation is to raise a kill event.
   * Specific executor implementations can override this behavior.
   * @param container
   *          the Container
   */
  public void pauseContainer(Container container) {
    LOG.warn("{} doesn't support pausing.", container.getContainerId());
    throw new UnsupportedOperationException();
  }

  /**
   * Resume the container from pause state. The default implementation ignores
   * this event. Specific implementations can override this behavior.
   * @param container
   *          the Container
   */
  public void resumeContainer(Container container) {
    LOG.warn("{} doesn't support resume.", container.getContainerId());
    throw new UnsupportedOperationException();
  }

  /**
   * Perform any cleanup before the next launch of the container.
   * @param container         container
   */
  public void cleanupBeforeRelaunch(Container container)
      throws IOException, InterruptedException {
    if (container.getLocalizedResources() != null) {

      Map<Path, Path> symLinks = resolveSymLinks(
          container.getLocalizedResources(), container.getUser());

      for (Map.Entry<Path, Path> symLink : symLinks.entrySet()) {
        LOG.debug("{} deleting {}", container.getContainerId(),
            symLink.getValue());
        deleteAsUser(new DeletionAsUserContext.Builder()
            .setUser(container.getUser())
            .setSubDir(symLink.getValue())
            .build());
      }
    }
  }

  /**
   * Get the process-identifier for the container.
   *
   * @param containerID the container ID
   * @return the process ID of the container if it has already launched,
   * or null otherwise
   */
  public String getProcessId(ContainerId containerID) {
    String pid = null;
    Path pidFile = pidFiles.get(containerID);

    // If PID is null, this container hasn't launched yet.
    if (pidFile != null) {
      try {
        pid = ProcessIdFileReader.getProcessId(pidFile);
      } catch (IOException e) {
        LOG.error("Got exception reading pid from pid-file {}", pidFile, e);
      }
    }

    return pid;
  }

  /**
   * This class will signal a target container after a specified delay.
   * @see #signalContainer
   */
  public static class DelayedProcessKiller extends Thread {
    private final Container container;
    private final String user;
    private final String pid;
    private final long delay;
    private final Signal signal;
    private final ContainerExecutor containerExecutor;

    /**
     * Basic constructor.
     *
     * @param container the container to signal
     * @param user the user as whow to send the signal
     * @param pid the PID of the container process
     * @param delayMS the period of time to wait in millis before signaling
     * the container
     * @param signal the signal to send
     * @param containerExecutor the executor to use to send the signal
     */
    public DelayedProcessKiller(Container container, String user, String pid,
        long delayMS, Signal signal, ContainerExecutor containerExecutor) {
      this.container = container;
      this.user = user;
      this.pid = pid;
      this.delay = delayMS;
      this.signal = signal;
      this.containerExecutor = containerExecutor;
      setName("Task killer for " + pid);
      setDaemon(false);
    }

    @Override
    public void run() {
      try {
        Thread.sleep(delay);
        containerExecutor.signalContainer(new ContainerSignalContext.Builder()
            .setContainer(container)
            .setUser(user)
            .setPid(pid)
            .setSignal(signal)
            .build());
      } catch (InterruptedException e) {
        interrupt();
      } catch (IOException e) {
        String message = "Exception when user " + user + " killing task " + pid
            + " in DelayedProcessKiller: " + StringUtils.stringifyException(e);
        LOG.warn(message);
        container.handle(new ContainerDiagnosticsUpdateEvent(
            container.getContainerId(), message));
      }
    }
  }

  private Map<Path, Path> resolveSymLinks(Map<Path,
      List<String>> resources, String user) {
    Map<Path, Path> symLinks = new HashMap<>();
    for (Map.Entry<Path, List<String>> resourceEntry :
        resources.entrySet()) {
      for (String linkName : resourceEntry.getValue()) {
        if (new Path(linkName).getName().equals(WILDCARD)) {
          // If this is a wildcarded path, link to everything in the
          // directory from the working directory
          for (File wildLink : readDirAsUser(user, resourceEntry.getKey())) {
            symLinks.put(new Path(wildLink.toString()),
                new Path(wildLink.getName()));
          }
        } else {
          symLinks.put(resourceEntry.getKey(), new Path(linkName));
        }
      }
    }
    return symLinks;
  }

  public String getExposedPorts(Container container)
      throws ContainerExecutionException {
    return null;
  }
}