DefaultContainerExecutor.java

/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.yarn.server.nodemanager;

import static org.apache.hadoop.fs.CreateFlag.CREATE;
import static org.apache.hadoop.fs.CreateFlag.OVERWRITE;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.numaAwarenessEnabled;

import org.apache.hadoop.classification.VisibleForTesting;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.PrintStream;
import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import org.apache.commons.lang3.RandomUtils;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.service.ServiceStateException;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.Shell.CommandExecutor;
import org.apache.hadoop.util.Shell.ExitCodeException;
import org.apache.hadoop.util.Shell.ShellCommandExecutor;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.ConfigurationException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerException;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.numa.NumaResourceAllocation;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.numa.NumaResourceAllocator;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.runtime.ContainerExecutionException;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerExecContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerLivenessContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerReacquisitionContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerReapContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerSignalContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerStartContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.DeletionAsUserContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.LocalizerStartContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * The {@code DefaultContainerExecuter} class offers generic container
 * execution services. Process execution is handled in a platform-independent
 * way via {@link ProcessBuilder}.
 */
public class DefaultContainerExecutor extends ContainerExecutor {

  private static final Logger LOG =
       LoggerFactory.getLogger(DefaultContainerExecutor.class);

  private static final int WIN_MAX_PATH = 260;

  /**
   * A {@link FileContext} for the local file system.
   */
  protected final FileContext lfs;

  private String logDirPermissions = null;

  private NumaResourceAllocator numaResourceAllocator;


  private String numactl;
  /**
   * Default constructor for use in testing.
   */
  @VisibleForTesting
  public DefaultContainerExecutor() {
    try {
      this.lfs = FileContext.getLocalFSFileContext();
    } catch (UnsupportedFileSystemException e) {
      throw new RuntimeException(e);
    }
  }

  /**
   * Create an instance with a given {@link FileContext}.
   *
   * @param lfs the given {@link FileContext}
   */
  DefaultContainerExecutor(FileContext lfs) {
    this.lfs = lfs;
  }

  /**
   * Copy a file using the {@link #lfs} {@link FileContext}.
   *
   * @param src the file to copy
   * @param dst where to copy the file
   * @param owner the owner of the new copy. Used only in secure Windows
   * clusters
   * @throws IOException when the copy fails
   * @see WindowsSecureContainerExecutor
   */
  protected void copyFile(Path src, Path dst, String owner) throws IOException {
    lfs.util().copy(src, dst, false, true);
  }
  
  /**
   * Make a file executable using the {@link #lfs} {@link FileContext}.
   *
   * @param script the path to make executable
   * @param owner the new owner for the file. Used only in secure Windows
   * clusters
   * @throws IOException when the change mode operation fails
   * @see WindowsSecureContainerExecutor
   */
  protected void setScriptExecutable(Path script, String owner)
      throws IOException {
    lfs.setPermission(script, ContainerExecutor.TASK_LAUNCH_SCRIPT_PERMISSION);
  }

  @Override
  public void init(Context nmContext) throws IOException {
    if(numaAwarenessEnabled(getConf())) {
      numaResourceAllocator = new NumaResourceAllocator(nmContext);
      numactl = this.getConf().get(YarnConfiguration.NM_NUMA_AWARENESS_NUMACTL_CMD,
              YarnConfiguration.DEFAULT_NM_NUMA_AWARENESS_NUMACTL_CMD);
      try {
        numaResourceAllocator.init(this.getConf());
        LOG.info("NUMA resources allocation is enabled in DefaultContainer Executor," +
                " Successfully initialized NUMA resources allocator.");
      } catch (YarnException e) {
        LOG.warn("Improper NUMA configuration provided.", e);
        throw new IOException("Failed to initialize configured numa subsystem!");
      }
    }
  }

  @Override
  public void startLocalizer(LocalizerStartContext ctx)
      throws IOException, InterruptedException {
    Path nmPrivateContainerTokensPath = ctx.getNmPrivateContainerTokens();
    InetSocketAddress nmAddr = ctx.getNmAddr();
    String user = ctx.getUser();
    String appId = ctx.getAppId();
    String locId = ctx.getLocId();
    LocalDirsHandlerService dirsHandler = ctx.getDirsHandler();

    List<String> localDirs = dirsHandler.getLocalDirs();
    List<String> logDirs = dirsHandler.getLogDirs();
    
    createUserLocalDirs(localDirs, user);
    createUserCacheDirs(localDirs, user);
    createAppDirs(localDirs, user, appId);
    createAppLogDirs(appId, logDirs, user);

    // randomly choose the local directory
    Path appStorageDir = getWorkingDir(localDirs, user, appId);

    String tokenFn = String.format(TOKEN_FILE_NAME_FMT, locId);
    Path tokenDst = new Path(appStorageDir, tokenFn);
    copyFile(nmPrivateContainerTokensPath, tokenDst, user);
    LOG.info("Copying from {} to {}", nmPrivateContainerTokensPath, tokenDst);


    FileContext localizerFc =
        FileContext.getFileContext(lfs.getDefaultFileSystem(), getConf());
    localizerFc.setUMask(lfs.getUMask());
    localizerFc.setWorkingDirectory(appStorageDir);
    LOG.info("Localizer CWD set to {} = {}", appStorageDir,
        localizerFc.getWorkingDirectory());

    ContainerLocalizer localizer =
        createContainerLocalizer(user, appId, locId, tokenFn, localDirs,
            localizerFc);
    // TODO: DO it over RPC for maintaining similarity?
    localizer.runLocalization(nmAddr);
  }

  /**
   * Create a new {@link ContainerLocalizer} instance.
   *
   * @param user the user who owns the job for which the localization is being
   * run
   * @param appId the ID of the application for which the localization is being
   * run
   * @param locId the ID of the container for which the localization is being
   * run
   * @param localDirs a list of directories to use as destinations for the
   * localization
   * @param localizerFc the {@link FileContext} to use when localizing files
   * @return the new {@link ContainerLocalizer} instance
   * @throws IOException if {@code user} or {@code locId} is {@code null} or if
   * the container localizer has an initialization failure
   */
  @Private
  @VisibleForTesting
  protected ContainerLocalizer createContainerLocalizer(String user,
      String appId, String locId, String tokenFileName, List<String> localDirs,
      FileContext localizerFc) throws IOException {
    ContainerLocalizer localizer =
        new ContainerLocalizer(localizerFc, user, appId, locId, tokenFileName,
            getPaths(localDirs),
            RecordFactoryProvider.getRecordFactory(getConf()));
    return localizer;
  }

  @Override
  public int launchContainer(ContainerStartContext ctx)
      throws IOException, ConfigurationException {
    Container container = ctx.getContainer();
    Path nmPrivateContainerScriptPath = ctx.getNmPrivateContainerScriptPath();
    Path nmPrivateTokensPath = ctx.getNmPrivateTokensPath();
    Path nmPrivateKeystorePath = ctx.getNmPrivateKeystorePath();
    Path nmPrivateTruststorePath = ctx.getNmPrivateTruststorePath();
    String user = ctx.getUser();
    Path containerWorkDir = ctx.getContainerWorkDir();
    List<String> localDirs = ctx.getLocalDirs();
    List<String> logDirs = ctx.getLogDirs();

    FsPermission dirPerm = new FsPermission(APPDIR_PERM);
    ContainerId containerId = container.getContainerId();

    // create container dirs on all disks
    String containerIdStr = containerId.toString();
    String appIdStr =
            containerId.getApplicationAttemptId().
                getApplicationId().toString();
    for (String sLocalDir : localDirs) {
      Path usersdir = new Path(sLocalDir, ContainerLocalizer.USERCACHE);
      Path userdir = new Path(usersdir, user);
      Path appCacheDir = new Path(userdir, ContainerLocalizer.APPCACHE);
      Path appDir = new Path(appCacheDir, appIdStr);
      Path containerDir = new Path(appDir, containerIdStr);
      createDir(containerDir, dirPerm, true, user);
    }

    // Create the container log-dirs on all disks
    createContainerLogDirs(appIdStr, containerIdStr, logDirs, user);

    Path tmpDir = new Path(containerWorkDir,
        YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR);
    createDir(tmpDir, dirPerm, false, user);


    // copy container tokens to work dir
    Path tokenDst =
      new Path(containerWorkDir, ContainerLaunch.FINAL_CONTAINER_TOKENS_FILE);
    copyFile(nmPrivateTokensPath, tokenDst, user);

    if (nmPrivateKeystorePath != null) {
      Path keystoreDst =
          new Path(containerWorkDir, ContainerLaunch.KEYSTORE_FILE);
      copyFile(nmPrivateKeystorePath, keystoreDst, user);
    }

    if (nmPrivateTruststorePath != null) {
      Path truststoreDst =
          new Path(containerWorkDir, ContainerLaunch.TRUSTSTORE_FILE);
      copyFile(nmPrivateTruststorePath, truststoreDst, user);
    }

    // copy launch script to work dir
    Path launchDst =
        new Path(containerWorkDir, ContainerLaunch.CONTAINER_SCRIPT);
    copyFile(nmPrivateContainerScriptPath, launchDst, user);

    // Create new local launch wrapper script
    LocalWrapperScriptBuilder sb = getLocalWrapperScriptBuilder(
        containerIdStr, containerWorkDir); 

    // Fail fast if attempting to launch the wrapper script would fail due to
    // Windows path length limitation.
    if (Shell.WINDOWS &&
        sb.getWrapperScriptPath().toString().length() > WIN_MAX_PATH) {
      throw new IOException(String.format(
        "Cannot launch container using script at path %s, because it exceeds " +
        "the maximum supported path length of %d characters.  Consider " +
        "configuring shorter directories in %s.", sb.getWrapperScriptPath(),
        WIN_MAX_PATH, YarnConfiguration.NM_LOCAL_DIRS));
    }

    Path pidFile = getPidFilePath(containerId);
    if (pidFile != null) {
      sb.writeLocalWrapperScript(launchDst, pidFile);
    } else {
      LOG.info("Container {} pid file not set. Returning terminated error",
          containerIdStr);
      return ExitCode.TERMINATED.getExitCode();
    }
    
    // create log dir under app
    // fork script
    Shell.CommandExecutor shExec = null;
    try {
      setScriptExecutable(launchDst, user);
      setScriptExecutable(sb.getWrapperScriptPath(), user);

      // adding numa commands based on configuration
      String[] numaCommands = new String[]{};

      if (numaResourceAllocator != null) {
        try {
          NumaResourceAllocation numaResourceAllocation =
                  numaResourceAllocator.allocateNumaNodes(container);
          if (numaResourceAllocation != null) {
            numaCommands = getNumaCommands(numaResourceAllocation);
          }
        } catch (ResourceHandlerException e) {
          LOG.error("NumaResource Allocation failed!", e);
          throw new IOException("NumaResource Allocation Error!", e);
        }
      }

      shExec = buildCommandExecutor(sb.getWrapperScriptPath().toString(),
              containerIdStr, user, pidFile, container.getResource(),
              new File(containerWorkDir.toUri().getPath()),
              container.getLaunchContext().getEnvironment(),
              numaCommands);

      if (isContainerActive(containerId)) {
        shExec.execute();
      } else {
        LOG.info("Container {} was marked as inactive. "
            + "Returning terminated error", containerIdStr);
        return ExitCode.TERMINATED.getExitCode();
      }
    } catch (IOException e) {
      if (null == shExec) {
        return -1;
      }
      int exitCode = shExec.getExitCode();
      LOG.warn("Exit code from container {} is : {}", containerId, exitCode);
      // 143 (SIGTERM) and 137 (SIGKILL) exit codes means the container was
      // terminated/killed forcefully. In all other cases, log the
      // container-executor's output
      if (exitCode != ExitCode.FORCE_KILLED.getExitCode()
          && exitCode != ExitCode.TERMINATED.getExitCode()) {
        LOG.warn("Exception from container-launch with container ID: {}"
            + " and exit code: {}", containerId, exitCode, e);

        StringBuilder builder = new StringBuilder();
        builder.append("Exception from container-launch.\n")
            .append("Container id: ").append(containerId).append("\n")
            .append("Exit code: ").append(exitCode).append("\n");
        if (!Optional.ofNullable(e.getMessage()).orElse("").isEmpty()) {
          builder.append("Exception message: ")
              .append(e.getMessage()).append("\n");
        }

        if (!shExec.getOutput().isEmpty()) {
          builder.append("Shell output: ")
              .append(shExec.getOutput()).append("\n");
        }
        String diagnostics = builder.toString();
        logOutput(diagnostics);
        container.handle(new ContainerDiagnosticsUpdateEvent(containerId,
            diagnostics));
      } else {
        container.handle(new ContainerDiagnosticsUpdateEvent(containerId,
            "Container killed on request. Exit code is " + exitCode));
      }
      return exitCode;
    } finally {
      if (shExec != null) shExec.close();
      postComplete(containerId);
    }
    return 0;
  }

  @Override
  public int relaunchContainer(ContainerStartContext ctx)
      throws IOException, ConfigurationException {
    return launchContainer(ctx);
  }

  /**
   * Create a new {@link ShellCommandExecutor} using the parameters.
   *
   * @param wrapperScriptPath the path to the script to execute
   * @param containerIdStr the container ID
   * @param user the application owner's username
   * @param pidFile the path to the container's PID file
   * @param resource this parameter controls memory and CPU limits.
   * @param workDir If not-null, specifies the directory which should be set
   * as the current working directory for the command. If null,
   * the current working directory is not modified.
   * @param environment the container environment
   * @param numaCommands list of prefix numa commands
   * @return the new {@link ShellCommandExecutor}
   * @see ShellCommandExecutor
   */
  protected CommandExecutor buildCommandExecutor(String wrapperScriptPath,
                            String containerIdStr, String user, Path pidFile, Resource resource,
                            File workDir, Map<String, String> environment, String[] numaCommands) {

    String[] command = getRunCommand(wrapperScriptPath,
        containerIdStr, user, pidFile, this.getConf(), resource);

    // check if numa commands are passed and append it as prefix commands
    if(numaCommands != null && numaCommands.length!=0) {
      command = concatStringCommands(numaCommands, command);
    }

    LOG.info("launchContainer: {}", Arrays.toString(command));
    return new ShellCommandExecutor(
        command,
        workDir,
        environment,
        0L,
        false);
  }

  /**
   * Create a {@link LocalWrapperScriptBuilder} for the given container ID
   * and path that is appropriate to the current platform.
   *
   * @param containerIdStr the container ID
   * @param containerWorkDir the container's working directory
   * @return a new {@link LocalWrapperScriptBuilder}
   */
  protected LocalWrapperScriptBuilder getLocalWrapperScriptBuilder(
      String containerIdStr, Path containerWorkDir) {
   return  Shell.WINDOWS ?
       new WindowsLocalWrapperScriptBuilder(containerIdStr, containerWorkDir) :
       new UnixLocalWrapperScriptBuilder(containerWorkDir);
  }

  /**
   * This class is a utility to create a wrapper script that is platform
   * appropriate.
   */
  protected abstract class LocalWrapperScriptBuilder {

    private final Path wrapperScriptPath;

    /**
     * Return the path for the wrapper script.
     *
     * @return the path for the wrapper script
     */
    public Path getWrapperScriptPath() {
      return wrapperScriptPath;
    }

    /**
     * Write out the wrapper script for the container launch script. This method
     * will create the script at the configured wrapper script path.
     *
     * @param launchDst the script to launch
     * @param pidFile the file that will hold the PID
     * @throws IOException if the wrapper script cannot be created
     * @see #getWrapperScriptPath
     */
    public void writeLocalWrapperScript(Path launchDst, Path pidFile)
        throws IOException {
      try (DataOutputStream out =
               lfs.create(wrapperScriptPath, EnumSet.of(CREATE, OVERWRITE));
           PrintStream pout =
               new PrintStream(out, false, "UTF-8")) {
        writeLocalWrapperScript(launchDst, pidFile, pout);
      }
    }

    /**
     * Write out the wrapper script for the container launch script.
     *
     * @param launchDst the script to launch
     * @param pidFile the file that will hold the PID
     * @param pout the stream to use to write out the wrapper script
     */
    protected abstract void writeLocalWrapperScript(Path launchDst,
        Path pidFile, PrintStream pout);

    /**
     * Create an instance for the given container working directory.
     *
     * @param containerWorkDir the working directory for the container
     */
    protected LocalWrapperScriptBuilder(Path containerWorkDir) {
      this.wrapperScriptPath = new Path(containerWorkDir,
        Shell.appendScriptExtension("default_container_executor"));
    }
  }

  /**
   * This class is an instance of {@link LocalWrapperScriptBuilder} for
   * non-Windows hosts.
   */
  private final class UnixLocalWrapperScriptBuilder
      extends LocalWrapperScriptBuilder {
    private final Path sessionScriptPath;

    /**
     * Create an instance for the given container path.
     *
     * @param containerWorkDir the container's working directory
     */
    public UnixLocalWrapperScriptBuilder(Path containerWorkDir) {
      super(containerWorkDir);
      this.sessionScriptPath = new Path(containerWorkDir,
          Shell.appendScriptExtension("default_container_executor_session"));
    }

    @Override
    public void writeLocalWrapperScript(Path launchDst, Path pidFile)
        throws IOException {
      writeSessionScript(launchDst, pidFile);
      super.writeLocalWrapperScript(launchDst, pidFile);
    }

    @Override
    public void writeLocalWrapperScript(Path launchDst, Path pidFile,
        PrintStream pout) {
      String exitCodeFile = ContainerLaunch.getExitCodeFile(
          pidFile.toString());
      String tmpFile = exitCodeFile + ".tmp";
      pout.println("#!/bin/bash");
      pout.println("/bin/bash \"" + sessionScriptPath.toString() + "\"");
      pout.println("rc=$?");
      pout.println("echo $rc > \"" + tmpFile + "\"");
      pout.println("/bin/mv -f \"" + tmpFile + "\" \"" + exitCodeFile + "\"");
      pout.println("exit $rc");
    }

    private void writeSessionScript(Path launchDst, Path pidFile)
        throws IOException {
      try (DataOutputStream out =
               lfs.create(sessionScriptPath, EnumSet.of(CREATE, OVERWRITE));
           PrintStream pout =
               new PrintStream(out, false, "UTF-8")) {
        // We need to do a move as writing to a file is not atomic
        // Process reading a file being written to may get garbled data
        // hence write pid to tmp file first followed by a mv
        pout.println("#!/bin/bash");
        pout.println();
        pout.println("echo $$ > " + pidFile.toString() + ".tmp");
        pout.println("/bin/mv -f " + pidFile.toString() + ".tmp " + pidFile);
        String exec = Shell.isSetsidAvailable? "exec setsid" : "exec";
        pout.printf("%s /bin/bash \"%s\"", exec, launchDst.toUri().getPath());
      }
      lfs.setPermission(sessionScriptPath,
          ContainerExecutor.TASK_LAUNCH_SCRIPT_PERMISSION);
    }
  }

  /**
   * This class is an instance of {@link LocalWrapperScriptBuilder} for
   * Windows hosts.
   */
  private final class WindowsLocalWrapperScriptBuilder
      extends LocalWrapperScriptBuilder {

    private final String containerIdStr;

    /**
     * Create an instance for the given container and working directory.
     *
     * @param containerIdStr the container ID
     * @param containerWorkDir the container's working directory
     */
    public WindowsLocalWrapperScriptBuilder(String containerIdStr,
        Path containerWorkDir) {

      super(containerWorkDir);
      this.containerIdStr = containerIdStr;
    }

    @Override
    public void writeLocalWrapperScript(Path launchDst, Path pidFile,
        PrintStream pout) {
      // TODO: exit code script for Windows

      // On Windows, the pid is the container ID, so that it can also serve as
      // the name of the job object created by winutils for task management.
      // Write to temp file followed by atomic move.
      String normalizedPidFile = new File(pidFile.toString()).getPath();
      pout.println("@echo " + containerIdStr + " > " + normalizedPidFile +
        ".tmp");
      pout.println("@move /Y " + normalizedPidFile + ".tmp " +
        normalizedPidFile);
      pout.println("@call " + launchDst.toString());
    }
  }

  @Override
  public boolean signalContainer(ContainerSignalContext ctx)
      throws IOException {
    String user = ctx.getUser();
    String pid = ctx.getPid();
    Signal signal = ctx.getSignal();
    LOG.debug("Sending signal {} to pid {} as user {}",
        signal.getValue(), pid, user);
    if (!containerIsAlive(pid)) {
      return false;
    }
    try {
      killContainer(pid, signal);
    } catch (IOException e) {
      if (!containerIsAlive(pid)) {
        return false;
      }
      throw e;
    }
    return true;
  }

  /**
   * No-op for reaping containers within the DefaultContainerExecutor.
   *
   * @param ctx Encapsulates information necessary for reaping containers.
   * @return true given no operations are needed.
   */
  @Override
  public boolean reapContainer(ContainerReapContext ctx) {
    return true;
  }

  @Override
  public boolean isContainerAlive(ContainerLivenessContext ctx)
      throws IOException {
    String pid = ctx.getPid();

    return containerIsAlive(pid);
  }

  /**
   * Returns true if the process with the specified pid is alive.
   * 
   * @param pid String pid
   * @return boolean true if the process is alive
   * @throws IOException if the command to test process liveliness fails
   */
  @VisibleForTesting
  public static boolean containerIsAlive(String pid) throws IOException {
    try {
      new ShellCommandExecutor(Shell.getCheckProcessIsAliveCommand(pid))
        .execute();
      // successful execution means process is alive
      return true;
    }
    catch (ExitCodeException e) {
      // failure (non-zero exit code) means process is not alive
      return false;
    }
  }

  /**
   * Send a specified signal to the specified pid
   *
   * @param pid the pid of the process [group] to signal.
   * @param signal signal to send
   * @throws IOException if the command to kill the process fails
   */
  protected void killContainer(String pid, Signal signal) throws IOException {
    new ShellCommandExecutor(Shell.getSignalKillCommand(signal.getValue(), pid))
      .execute();
  }

  @Override
  public void deleteAsUser(DeletionAsUserContext ctx)
      throws IOException, InterruptedException {
    Path subDir = ctx.getSubDir();
    List<Path> baseDirs = ctx.getBasedirs();

    if (baseDirs == null || baseDirs.size() == 0) {
      LOG.info("Deleting absolute path : {}", subDir);
      if (!lfs.delete(subDir, true)) {
        //Maybe retry
        LOG.warn("delete returned false for path: [{}]", subDir);
      }
      return;
    }
    for (Path baseDir : baseDirs) {
      Path del = subDir == null ? baseDir : new Path(baseDir, subDir);
      LOG.info("Deleting path : {}", del);
      try {
        if (!lfs.delete(del, true)) {
          LOG.warn("delete returned false for path: [{}]", del);
        }
      } catch (FileNotFoundException e) {
        continue;
      }
    }
  }

  @Override
  public void symLink(String target, String symlink) throws IOException {
    FileUtil.symLink(target, symlink);
  }

  /**
   * Permissions for user dir.
   * $local.dir/usercache/$user
   */
  static final short USER_PERM = (short)0750;
  /**
   * Permissions for user appcache dir.
   * $local.dir/usercache/$user/appcache
   */
  static final short APPCACHE_PERM = (short)0710;
  /**
   * Permissions for user filecache dir.
   * $local.dir/usercache/$user/filecache
   */
  static final short FILECACHE_PERM = (short)0710;
  /**
   * Permissions for user app dir.
   * $local.dir/usercache/$user/appcache/$appId
   */
  static final short APPDIR_PERM = (short)0710;

  private long getDiskFreeSpace(Path base) throws IOException {
    return lfs.getFsStatus(base).getRemaining();
  }

  private Path getApplicationDir(Path base, String user, String appId) {
    return new Path(getAppcacheDir(base, user), appId);
  }

  private Path getUserCacheDir(Path base, String user) {
    return new Path(new Path(base, ContainerLocalizer.USERCACHE), user);
  }

  private Path getAppcacheDir(Path base, String user) {
    return new Path(getUserCacheDir(base, user),
        ContainerLocalizer.APPCACHE);
  }

  private Path getFileCacheDir(Path base, String user) {
    return new Path(getUserCacheDir(base, user),
        ContainerLocalizer.FILECACHE);
  }

  /**
   * Return a randomly chosen application directory from a list of local storage
   * directories. The probability of selecting a directory is proportional to
   * its size.
   *
   * @param localDirs the target directories from which to select
   * @param user the user who owns the application
   * @param appId the application ID
   * @return the selected directory
   * @throws IOException if no application directories for the user can be
   * found
   */
  protected Path getWorkingDir(List<String> localDirs, String user,
      String appId) throws IOException {
    long totalAvailable = 0L;
    long[] availableOnDisk = new long[localDirs.size()];
    int i = 0;
    // randomly choose the app directory
    // the chance of picking a directory is proportional to
    // the available space on the directory.
    // firstly calculate the sum of all available space on these directories
    for (String localDir : localDirs) {
      Path curBase = getApplicationDir(new Path(localDir), user, appId);
      long space = 0L;
      try {
        space = getDiskFreeSpace(curBase);
      } catch (IOException e) {
        LOG.warn("Unable to get Free Space for {}", curBase, e);
      }
      availableOnDisk[i++] = space;
      totalAvailable += space;
    }

    // throw an IOException if totalAvailable is 0.
    if (totalAvailable <= 0L) {
      throw new IOException("Not able to find a working directory for " + user);
    }

    // make probability to pick a directory proportional to
    // the available space on the directory.
    long randomPosition = RandomUtils.nextLong() % totalAvailable;
    int dir = pickDirectory(randomPosition, availableOnDisk);

    return getApplicationDir(new Path(localDirs.get(dir)), user, appId);
  }

  /**
   * Picks a directory based on the input random number and
   * available size at each dir.
   */
  @Private
  @VisibleForTesting
  int pickDirectory(long randomPosition, final long[] availableOnDisk) {
    int dir = 0;
    // skip zero available space directory,
    // because totalAvailable is greater than 0 and randomPosition
    // is less than totalAvailable, we can find a valid directory
    // with nonzero available space.
    while (availableOnDisk[dir] == 0L) {
      dir++;
    }
    while (randomPosition >= availableOnDisk[dir]) {
      randomPosition -= availableOnDisk[dir++];
    }
    return dir;
  }

  /**
   * Use the {@link #lfs} {@link FileContext} to create the target directory.
   *
   * @param dirPath the target directory
   * @param perms the target permissions for the target directory
   * @param createParent whether the parent directories should also be created
   * @param user the user as whom the target directory should be created.
   * Used only on secure Windows hosts.
   * @throws IOException if there's a failure performing a file operation
   * @see WindowsSecureContainerExecutor
   */
  protected void createDir(Path dirPath, FsPermission perms,
      boolean createParent, String user) throws IOException {
    lfs.mkdir(dirPath, perms, createParent);
    if (!perms.equals(perms.applyUMask(lfs.getUMask()))) {
      lfs.setPermission(dirPath, perms);
    }
  }

  /**
   * Initialize the local directories for a particular user.
   * <ul>.mkdir
   * <li>$local.dir/usercache/$user</li>
   * </ul>
   *
   * @param localDirs the target directories to create
   * @param user the user whose local cache directories should be initialized
   * @throws IOException if there's an issue initializing the user local
   * directories
   */
  void createUserLocalDirs(List<String> localDirs, String user)
      throws IOException {
    boolean userDirStatus = false;
    FsPermission userperms = new FsPermission(USER_PERM);
    for (String localDir : localDirs) {
      // create $local.dir/usercache/$user and its immediate parent
      try {
        createDir(getUserCacheDir(new Path(localDir), user), userperms, true,
            user);
      } catch (IOException e) {
        LOG.warn("Unable to create the user directory : {}", localDir, e);
        continue;
      }
      userDirStatus = true;
    }
    if (!userDirStatus) {
      throw new IOException("Not able to initialize user directories "
          + "in any of the configured local directories for user " + user);
    }
  }


  /**
   * Initialize the local cache directories for a particular user.
   * <ul>
   * <li>$local.dir/usercache/$user</li>
   * <li>$local.dir/usercache/$user/appcache</li>
   * <li>$local.dir/usercache/$user/filecache</li>
   * </ul>
   *
   * @param localDirs the target directories to create
   * @param user the user whose local cache directories should be initialized
   * @throws IOException if there's an issue initializing the cache
   * directories
   */
  void createUserCacheDirs(List<String> localDirs, String user)
      throws IOException {
    LOG.info("Initializing user {}", user);

    boolean appcacheDirStatus = false;
    boolean distributedCacheDirStatus = false;
    FsPermission appCachePerms = new FsPermission(APPCACHE_PERM);
    FsPermission fileperms = new FsPermission(FILECACHE_PERM);

    for (String localDir : localDirs) {
      // create $local.dir/usercache/$user/appcache
      Path localDirPath = new Path(localDir);
      final Path appDir = getAppcacheDir(localDirPath, user);
      try {
        createDir(appDir, appCachePerms, true, user);
        appcacheDirStatus = true;
      } catch (IOException e) {
        LOG.warn("Unable to create app cache directory : {}", appDir, e);
      }
      // create $local.dir/usercache/$user/filecache
      final Path distDir = getFileCacheDir(localDirPath, user);
      try {
        createDir(distDir, fileperms, true, user);
        distributedCacheDirStatus = true;
      } catch (IOException e) {
        LOG.warn("Unable to create file cache directory : {}", distDir, e);
      }
    }
    if (!appcacheDirStatus) {
      throw new IOException("Not able to initialize app-cache directories "
          + "in any of the configured local directories for user " + user);
    }
    if (!distributedCacheDirStatus) {
      throw new IOException(
          "Not able to initialize distributed-cache directories "
              + "in any of the configured local directories for user "
              + user);
    }
  }

  /**
   * Initialize the local directories for a particular user.
   * <ul>
   * <li>$local.dir/usercache/$user/appcache/$appid</li>
   * </ul>
   *
   * @param localDirs the target directories to create
   * @param user the user whose local cache directories should be initialized
   * @param appId the application ID
   * @throws IOException if there's an issue initializing the application
   * directories
   */
  void createAppDirs(List<String> localDirs, String user, String appId)
      throws IOException {
    boolean initAppDirStatus = false;
    FsPermission appperms = new FsPermission(APPDIR_PERM);
    for (String localDir : localDirs) {
      Path fullAppDir = getApplicationDir(new Path(localDir), user, appId);
      // create $local.dir/usercache/$user/appcache/$appId
      try {
        createDir(fullAppDir, appperms, true, user);
        initAppDirStatus = true;
      } catch (IOException e) {
        LOG.warn("Unable to create app directory {}",
            fullAppDir, e);
      }
    }
    if (!initAppDirStatus) {
      throw new IOException("Not able to initialize app directories "
          + "in any of the configured local directories for app "
          + appId.toString());
    }
  }

  /**
   * Create application log directories on all disks.
   *
   * @param appId the application ID
   * @param logDirs the target directories to create
   * @param user the user whose local cache directories should be initialized
   * @throws IOException if there's an issue initializing the application log
   * directories
   */
  void createAppLogDirs(String appId, List<String> logDirs, String user)
      throws IOException {

    boolean appLogDirStatus = false;
    FsPermission appLogDirPerms = new
        FsPermission(getLogDirPermissions());
    for (String rootLogDir : logDirs) {
      // create $log.dir/$appid
      Path appLogDir = new Path(rootLogDir, appId);
      try {
        createDir(appLogDir, appLogDirPerms, true, user);
      } catch (IOException e) {
        LOG.warn("Unable to create the app-log directory : {}", appLogDir, e);
        continue;
      }
      appLogDirStatus = true;
    }
    if (!appLogDirStatus) {
      throw new IOException("Not able to initialize app-log directories "
          + "in any of the configured local directories for app " + appId);
    }
  }

  /**
   * Create application log directories on all disks.
   *
   * @param appId the application ID
   * @param containerId the container ID
   * @param logDirs the target directories to create
   * @param user the user as whom the directories should be created.
   * Used only on secure Windows hosts.
   * @throws IOException if there's an issue initializing the container log
   * directories
   */
  void createContainerLogDirs(String appId, String containerId,
      List<String> logDirs, String user) throws IOException {
    boolean containerLogDirStatus = false;
    FsPermission containerLogDirPerms = new
        FsPermission(getLogDirPermissions());
    for (String rootLogDir : logDirs) {
      // create $log.dir/$appid/$containerid
      Path appLogDir = new Path(rootLogDir, appId);
      Path containerLogDir = new Path(appLogDir, containerId);
      try {
        createDir(containerLogDir, containerLogDirPerms, true, user);
      } catch (IOException e) {
        LOG.warn("Unable to create the container-log directory : {}",
            appLogDir, e);
        continue;
      }
      containerLogDirStatus = true;
    }
    if (!containerLogDirStatus) {
      throw new IOException(
          "Not able to initialize container-log directories "
              + "in any of the configured local directories for container "
              + containerId);
    }
  }

  /**
   * Return the default container log directory permissions.
   *
   * @return the default container log directory permissions
   */
  @VisibleForTesting
  public String getLogDirPermissions() {
    if (this.logDirPermissions==null) {
      this.logDirPermissions = getConf().get(
          YarnConfiguration.NM_DEFAULT_CONTAINER_EXECUTOR_LOG_DIRS_PERMISSIONS,
          YarnConfiguration.NM_DEFAULT_CONTAINER_EXECUTOR_LOG_DIRS_PERMISSIONS_DEFAULT);
    }
    return this.logDirPermissions;
  }

  /**
   * Clear the internal variable for repeatable testing.
   */
  @VisibleForTesting
  public void clearLogDirPermissions() {
    this.logDirPermissions = null;
  }

  /**
   *
   * @param ctx Encapsulates information necessary for exec containers.
   * @return the input/output stream of interactive docker shell.
   * @throws ContainerExecutionException
   */
  @Override
  public IOStreamPair execContainer(ContainerExecContext ctx)
      throws ContainerExecutionException {
    return null;
  }

  /**
   * Return the list of paths of given local directories.
   *
   * @return the list of paths of given local directories
   */
  private static List<Path> getPaths(List<String> dirs) {
    List<Path> paths = new ArrayList<>(dirs.size());
    for (int i = 0; i < dirs.size(); i++) {
      paths.add(new Path(dirs.get(i)));
    }
    return paths;
  }

  @Override
  public void updateYarnSysFS(Context ctx, String user,
      String appId, String spec) throws IOException {
    throw new ServiceStateException("Implementation unavailable");
  }

  @Override
  public int reacquireContainer(ContainerReacquisitionContext ctx)
          throws IOException, InterruptedException {
    try {
      if (numaResourceAllocator != null) {
        numaResourceAllocator.recoverNumaResource(ctx.getContainerId());
      }
      return super.reacquireContainer(ctx);
    } finally {
      postComplete(ctx.getContainerId());
    }
  }

  /**
   * clean up and release of resources.
   *
   * @param containerId containerId of running container
   */
  public void postComplete(final ContainerId containerId) {
    if (numaResourceAllocator != null) {
      try {
        numaResourceAllocator.releaseNumaResource(containerId);
      } catch (ResourceHandlerException e) {
        LOG.warn("NumaResource release failed for " +
                "containerId: {}. Exception: ", containerId, e);
      }
    }
  }

  /**
   * @param resourceAllocation NonNull NumaResourceAllocation object reference
   * @return Array of numa specific commands
   */
  String[] getNumaCommands(NumaResourceAllocation resourceAllocation) {
    String[] numaCommand = new String[3];
    numaCommand[0] = numactl;
    numaCommand[1] = "--interleave=" + String.join(",", resourceAllocation.getMemNodes());
    numaCommand[2] = "--cpunodebind=" + String.join(",", resourceAllocation.getCpuNodes());
    return numaCommand;

  }

  /**
   * @param firstStringArray  Array of String
   * @param secondStringArray Array of String
   * @return combined array of string where first elements are from firstStringArray
   * and later are the elements from secondStringArray
   */
  String[] concatStringCommands(String[] firstStringArray, String[] secondStringArray) {

    if(firstStringArray == null && secondStringArray == null) {
      return secondStringArray;
    }

    else if(firstStringArray == null || firstStringArray.length == 0) {
      return secondStringArray;
    }

    else if(secondStringArray == null || secondStringArray.length == 0){
      return firstStringArray;
    }

    int len = firstStringArray.length + secondStringArray.length;

    String[] ret = new String[len];
    int idx = 0;
    for (String s : firstStringArray) {
      ret[idx] = s;
      idx++;
    }
    for (String s : secondStringArray) {
      ret[idx] = s;
      idx++;
    }
    return ret;
  }

  @VisibleForTesting
  public void setNumaResourceAllocator(NumaResourceAllocator numaResourceAllocator) {
    this.numaResourceAllocator = numaResourceAllocator;
  }

  @VisibleForTesting
  public void setNumactl(String numactl) {
    this.numactl = numactl;
  }

}