TestContainerManager.java

/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.yarn.server.nodemanager.containermanager;

import org.apache.hadoop.util.Lists;
import org.apache.hadoop.yarn.api.protocolrecords.GetLocalizationStatusesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetLocalizationStatusesResponse;
import org.apache.hadoop.yarn.api.records.LocalizationState;
import org.apache.hadoop.yarn.api.records.LocalizationStatus;
import org.apache.hadoop.yarn.server.api.AuxiliaryLocalPathHandler;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;

import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
import static org.apache.hadoop.test.MetricsAsserts.assertGauge;
import static org.apache.hadoop.test.MetricsAsserts.assertGaugeGt;
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.io.PrintWriter;
import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;

import java.util.function.Supplier;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.yarn.api.protocolrecords.ContainerUpdateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ContainerUpdateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ResourceLocalizationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetContainerStatusesRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StartContainersRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StopContainersRequestPBImpl;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerRetryContext;
import org.apache.hadoop.yarn.api.records.ContainerRetryPolicy;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceUtilization;
import org.apache.hadoop.yarn.api.records.SerializedException;
import org.apache.hadoop.yarn.api.records.SignalContainerCommand;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.ConfigurationException;
import org.apache.hadoop.yarn.exceptions.InvalidContainerException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.security.NMTokenIdentifier;
import org.apache.hadoop.yarn.server.api.ResourceManagerConstants;
import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent;
import org.apache.hadoop.yarn.server.nodemanager.CMgrSignalContainersEvent;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.Signal;
import org.apache.hadoop.yarn.server.nodemanager.ContainerStateTransitionListener;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.TestAuxServices.ServiceA;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerSignalContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerStartContext;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import static org.mockito.Mockito.when;
import org.slf4j.LoggerFactory;

public class TestContainerManager extends BaseContainerManagerTest {

  public TestContainerManager() throws UnsupportedFileSystemException {
    super();
  }

  static {
    LOG = LoggerFactory.getLogger(TestContainerManager.class);
  }

  private static class Listener implements ContainerStateTransitionListener {

    private final Map<ContainerId,
        List<org.apache.hadoop.yarn.server.nodemanager.containermanager.
            container.ContainerState>> states = new HashMap<>();
    private final Map<ContainerId, List<ContainerEventType>> events =
        new HashMap<>();

    @Override
    public void init(Context context) {}

    @Override
    public void preTransition(ContainerImpl op,
        org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
            ContainerState beforeState,
        ContainerEvent eventToBeProcessed) {
      if (!states.containsKey(op.getContainerId())) {
        states.put(op.getContainerId(), new ArrayList<>());
        states.get(op.getContainerId()).add(beforeState);
        events.put(op.getContainerId(), new ArrayList<>());
      }
    }

    @Override
    public void postTransition(ContainerImpl op,
        org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
            ContainerState beforeState,
        org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
            ContainerState afterState,
        ContainerEvent processedEvent) {
      states.get(op.getContainerId()).add(afterState);
      events.get(op.getContainerId()).add(processedEvent.getType());
    }
  }

  private boolean delayContainers = false;

  @Override
  protected ContainerExecutor createContainerExecutor() {
    DefaultContainerExecutor exec = new DefaultContainerExecutor() {
      @Override
      public int launchContainer(ContainerStartContext ctx)
          throws IOException, ConfigurationException {
        if (delayContainers) {
          try {
            Thread.sleep(10000);
          } catch (InterruptedException e) {
            // Nothing..
          }
        }
        return super.launchContainer(ctx);
      }
    };
    exec.setConf(conf);
    return spy(exec);
  }
  
  @Override
  protected ContainerManagerImpl
      createContainerManager(DeletionService delSrvc) {
    return  new ContainerManagerImpl(context, exec, delSrvc,
        getNodeStatusUpdater(), metrics, dirsHandler) {

      @Override
      protected UserGroupInformation getRemoteUgi() throws YarnException {
        ApplicationId appId = ApplicationId.newInstance(0, 0);
        ApplicationAttemptId appAttemptId =
            ApplicationAttemptId.newInstance(appId, 1);
        UserGroupInformation ugi =
            UserGroupInformation.createRemoteUser(appAttemptId.toString());
        ugi.addTokenIdentifier(new NMTokenIdentifier(appAttemptId, context
          .getNodeId(), user, context.getNMTokenSecretManager().getCurrentKey()
          .getKeyId()));
        return ugi;
      }
    };
  }

  @Test
  public void testContainerManagerInitialization() throws IOException {

    containerManager.start();

    InetAddress localAddr = InetAddress.getLocalHost();
    String fqdn = localAddr.getCanonicalHostName();
    if (!localAddr.getHostAddress().equals(fqdn)) {
      // only check if fqdn is not same as ip
      // api returns ip in case of resolution failure
      assertEquals(fqdn, context.getNodeId().getHost());
    }
  
    // Just do a query for a non-existing container.
    boolean throwsException = false;
    try {
      List<ContainerId> containerIds = new ArrayList<>();
      ContainerId id =createContainerId(0);
      containerIds.add(id);
      GetContainerStatusesRequest request =
          GetContainerStatusesRequest.newInstance(containerIds);
      GetContainerStatusesResponse response =
          containerManager.getContainerStatuses(request);
      if(response.getFailedRequests().containsKey(id)){
        throw response.getFailedRequests().get(id).deSerialize();
      }
    } catch (Throwable e) {
      throwsException = true;
    }
    assertTrue(throwsException);
  }

  @Test
  public void testContainerSetup() throws Exception {

    containerManager.start();

    // ////// Create the resources for the container
    File dir = new File(tmpDir, "dir");
    dir.mkdirs();
    File file = new File(dir, "file");
    PrintWriter fileWriter = new PrintWriter(file);
    fileWriter.write("Hello World!");
    fileWriter.close();

    // ////// Construct the Container-id
    ContainerId cId = createContainerId(0);

    // ////// Construct the container-spec.
    ContainerLaunchContext containerLaunchContext = 
        recordFactory.newRecordInstance(ContainerLaunchContext.class);
    URL resource_alpha =
        URL.fromPath(localFS
            .makeQualified(new Path(file.getAbsolutePath())));
    LocalResource rsrc_alpha = recordFactory.newRecordInstance(LocalResource.class);    
    rsrc_alpha.setResource(resource_alpha);
    rsrc_alpha.setSize(-1);
    rsrc_alpha.setVisibility(LocalResourceVisibility.APPLICATION);
    rsrc_alpha.setType(LocalResourceType.FILE);
    rsrc_alpha.setTimestamp(file.lastModified());
    String destinationFile = "dest_file";
    Map<String, LocalResource> localResources = 
        new HashMap<String, LocalResource>();
    localResources.put(destinationFile, rsrc_alpha);
    containerLaunchContext.setLocalResources(localResources);

    StartContainerRequest scRequest =
        StartContainerRequest.newInstance(
          containerLaunchContext,
          createContainerToken(cId, DUMMY_RM_IDENTIFIER, context.getNodeId(),
            user, context.getContainerTokenSecretManager()));
    List<StartContainerRequest> list = new ArrayList<>();
    list.add(scRequest);
    StartContainersRequest allRequests =
        StartContainersRequest.newInstance(list);
    containerManager.startContainers(allRequests);

    BaseContainerManagerTest.waitForContainerState(containerManager, cId,
        ContainerState.COMPLETE, 40);

    // Now ascertain that the resources are localised correctly.
    ApplicationId appId = cId.getApplicationAttemptId().getApplicationId();
    String appIDStr = appId.toString();
    String containerIDStr = cId.toString();
    File userCacheDir = new File(localDir, ContainerLocalizer.USERCACHE);
    File userDir = new File(userCacheDir, user);
    File appCache = new File(userDir, ContainerLocalizer.APPCACHE);
    File appDir = new File(appCache, appIDStr);
    File containerDir = new File(appDir, containerIDStr);
    File targetFile = new File(containerDir, destinationFile);
    File sysDir =
        new File(localDir,
            ResourceLocalizationService.NM_PRIVATE_DIR);
    File appSysDir = new File(sysDir, appIDStr);
    File containerSysDir = new File(appSysDir, containerIDStr);

    for (File f : new File[] { localDir, sysDir, userCacheDir, appDir,
        appSysDir,
        containerDir, containerSysDir }) {
      assertTrue(f.exists(), f.getAbsolutePath() + " doesn't exist!!");
      assertTrue(f.isDirectory(),
          f.getAbsolutePath() + " is not a directory!!");
    }
    assertTrue(targetFile.exists(),
        targetFile.getAbsolutePath() + " doesn't exist!!");

    // Now verify the contents of the file
    BufferedReader reader = new BufferedReader(new FileReader(targetFile));
    assertEquals("Hello World!", reader.readLine());
    assertEquals(null, reader.readLine());

    //
    // check the localization counter
    //
    long targetFileSize =
        FileUtil.getDU(targetFile.getCanonicalFile().getParentFile());
    MetricsRecordBuilder rb = getMetrics("NodeManagerMetrics");
    assertCounter("LocalizedCacheMissBytes", targetFileSize, rb);
    assertCounter("LocalizedCacheHitBytes", 0L, rb);
    assertCounter("LocalizedCacheMissFiles", 1L, rb);
    assertCounter("LocalizedCacheHitFiles", 0L, rb);
    assertGaugeGt("LocalizationDurationMillisAvgTime", 0, rb);
    assertGauge("LocalizedCacheHitBytesRatio", 0, rb);
    assertGauge("LocalizedCacheHitFilesRatio", 0, rb);

    // test cache being used
    final ContainerId cid1 = createContainerId(1);
    containerManager.startContainers(StartContainersRequest.newInstance(
        Collections.singletonList(
            StartContainerRequest.newInstance(
                containerLaunchContext,
                createContainerToken(cid1, DUMMY_RM_IDENTIFIER,
                    context.getNodeId(),
                    user,
                    context.getContainerTokenSecretManager())))));
    waitForContainerState(containerManager, cid1, ContainerState.COMPLETE);
    rb = getMetrics("NodeManagerMetrics");
    assertCounter("LocalizedCacheMissBytes", targetFileSize, rb);
    assertCounter("LocalizedCacheHitBytes", targetFileSize, rb);
    assertCounter("LocalizedCacheMissFiles", 1L, rb);
    assertCounter("LocalizedCacheHitFiles", 1L, rb);
    assertGauge("LocalizedCacheHitBytesRatio", 50, rb);
    assertGauge("LocalizedCacheHitFilesRatio", 50, rb);
  }

  @Test
  @Timeout(10)
  public void testAuxPathHandler() throws Exception {
    File testDir = GenericTestUtils
        .getTestDir(TestContainerManager.class.getSimpleName() + "LocDir");
    testDir.mkdirs();
    File testFile = new File(testDir, "test");
    testFile.createNewFile();
    YarnConfiguration configuration = new YarnConfiguration();
    configuration.set(YarnConfiguration.NM_LOCAL_DIRS,
        testDir.getAbsolutePath());
    LocalDirsHandlerService spyDirHandlerService =
        Mockito.spy(new LocalDirsHandlerService());
    spyDirHandlerService.init(configuration);
    when(spyDirHandlerService.getConfig()).thenReturn(configuration);
    AuxiliaryLocalPathHandler auxiliaryLocalPathHandler =
        new ContainerManagerImpl.AuxiliaryLocalPathHandlerImpl(
            spyDirHandlerService);
    Path p = auxiliaryLocalPathHandler.getLocalPathForRead("test");
    assertTrue(p != null &&
        !spyDirHandlerService.getLocalDirsForRead().isEmpty());

    when(spyDirHandlerService.getLocalDirsForRead()).thenReturn(
        new ArrayList<String>());
    try {
      auxiliaryLocalPathHandler.getLocalPathForRead("test");
      fail("Should not have passed!");
    } catch (IOException e) {
      assertTrue(e.getMessage().contains("Could not find"));
    } finally {
      testFile.delete();
      testDir.delete();
    }
  }

  //@Test
  public void testContainerLaunchAndStop() throws IOException,
      InterruptedException, YarnException {
    containerManager.start();

    File scriptFile = Shell.appendScriptExtension(tmpDir, "scriptFile");
    PrintWriter fileWriter = new PrintWriter(scriptFile);
    File processStartFile =
        new File(tmpDir, "start_file.txt").getAbsoluteFile();

    // ////// Construct the Container-id
    ContainerId cId = createContainerId(0);

    if (Shell.WINDOWS) {
      fileWriter.println("@echo Hello World!> " + processStartFile);
      fileWriter.println("@echo " + cId + ">> " + processStartFile);
      fileWriter.println("@ping -n 100 127.0.0.1 >nul");
    } else {
      fileWriter.write("\numask 0"); // So that start file is readable by the test
      fileWriter.write("\necho Hello World! > " + processStartFile);
      fileWriter.write("\necho $$ >> " + processStartFile);
      fileWriter.write("\nexec sleep 100");
    }
    fileWriter.close();

    ContainerLaunchContext containerLaunchContext = 
        recordFactory.newRecordInstance(ContainerLaunchContext.class);

    URL resource_alpha =
        URL.fromPath(localFS
            .makeQualified(new Path(scriptFile.getAbsolutePath())));
    LocalResource rsrc_alpha =
        recordFactory.newRecordInstance(LocalResource.class);
    rsrc_alpha.setResource(resource_alpha);
    rsrc_alpha.setSize(-1);
    rsrc_alpha.setVisibility(LocalResourceVisibility.APPLICATION);
    rsrc_alpha.setType(LocalResourceType.FILE);
    rsrc_alpha.setTimestamp(scriptFile.lastModified());
    String destinationFile = "dest_file";
    Map<String, LocalResource> localResources = 
        new HashMap<String, LocalResource>();
    localResources.put(destinationFile, rsrc_alpha);
    containerLaunchContext.setLocalResources(localResources);
    List<String> commands = Arrays.asList(Shell.getRunScriptCommand(scriptFile));
    containerLaunchContext.setCommands(commands);

    StartContainerRequest scRequest =
        StartContainerRequest.newInstance(containerLaunchContext,
          createContainerToken(cId,
            DUMMY_RM_IDENTIFIER, context.getNodeId(), user,
            context.getContainerTokenSecretManager()));
    List<StartContainerRequest> list = new ArrayList<>();
    list.add(scRequest);
    StartContainersRequest allRequests =
        StartContainersRequest.newInstance(list);
    containerManager.startContainers(allRequests);

    int timeoutSecs = 0;
    while (!processStartFile.exists() && timeoutSecs++ < 20) {
      Thread.sleep(1000);
      LOG.info("Waiting for process start-file to be created");
    }
    assertTrue(processStartFile.exists(),
        "ProcessStartFile doesn't exist!");
    
    // Now verify the contents of the file
    BufferedReader reader =
        new BufferedReader(new FileReader(processStartFile));
    assertEquals("Hello World!", reader.readLine());
    // Get the pid of the process
    String pid = reader.readLine().trim();
    // No more lines
    assertEquals(null, reader.readLine());

    // Now test the stop functionality.

    // Assert that the process is alive
    assertTrue(DefaultContainerExecutor.containerIsAlive(pid),
        "Process is not alive!");
    // Once more
    assertTrue(DefaultContainerExecutor.containerIsAlive(pid),
        "Process is not alive!");

    List<ContainerId> containerIds = new ArrayList<>();
    containerIds.add(cId);
    StopContainersRequest stopRequest =
        StopContainersRequest.newInstance(containerIds);
    containerManager.stopContainers(stopRequest);
    BaseContainerManagerTest.waitForContainerState(containerManager, cId,
        ContainerState.COMPLETE);
    
    GetContainerStatusesRequest gcsRequest =
        GetContainerStatusesRequest.newInstance(containerIds);
    ContainerStatus containerStatus = 
        containerManager.getContainerStatuses(gcsRequest).getContainerStatuses().get(0);
    int expectedExitCode = ContainerExitStatus.KILLED_BY_APPMASTER;
    assertEquals(expectedExitCode, containerStatus.getExitStatus());

    // Assert that the process is not alive anymore
    assertFalse(DefaultContainerExecutor.containerIsAlive(pid),
        "Process is still alive!");
  }

  @Test
  public void testContainerRestart() throws IOException, InterruptedException,
      YarnException {
    containerManager.start();
    // ////// Construct the Container-id
    ContainerId cId = createContainerId(0);
    File oldStartFile = new File(tmpDir, "start_file_o.txt").getAbsoluteFile();

    String pid = prepareInitialContainer(cId, oldStartFile);

    // Test that the container can restart
    // Also, Since there was no rollback context present before the
    // restart, rollback should NOT be possible after the restart
    doRestartTests(cId, oldStartFile, "Hello World!", pid, false);
  }

  private String doRestartTests(ContainerId cId, File oldStartFile,
      String testString, String pid, boolean canRollback)
      throws YarnException, IOException, InterruptedException {
    int beforeRestart = metrics.getRunningContainers();
    Container container =
        containerManager.getContext().getContainers().get(cId);
    assertFalse(container.isReInitializing());
    containerManager.restartContainer(cId);
    assertTrue(container.isReInitializing());

    // Wait for original process to die and the new process to restart
    int timeoutSecs = 0;
    while (DefaultContainerExecutor.containerIsAlive(pid)
        && (metrics.getRunningContainers() == beforeRestart)
        && container.isReInitializing()
        && timeoutSecs++ < 20) {
      Thread.sleep(1000);
      LOG.info("Waiting for Original process to die.." +
          "and new process to start!!");
    }

    assertFalse(DefaultContainerExecutor.containerIsAlive(pid),
        "Old Process Still alive!!");

    String newPid = null;
    timeoutSecs = 0;
    while (timeoutSecs++ < 20) {
      LOG.info("Waiting for New process file to be created!!");
      // Now verify the contents of the file
      BufferedReader reader =
          new BufferedReader(new FileReader(oldStartFile));
      assertEquals(testString, reader.readLine());
      // Get the pid of the process
      newPid = reader.readLine().trim();
      // No more lines
      assertEquals(null, reader.readLine());
      reader.close();
      if (!newPid.equals(pid)) {
        break;
      }
      Thread.sleep(1000);
    }

    // Assert both pids are different
    assertNotEquals(pid, newPid);

    // Container cannot rollback from a restart
    assertEquals(canRollback, container.canRollback());

    return newPid;
  }

  private String[] testContainerReInitSuccess(boolean autoCommit)
      throws IOException, InterruptedException, YarnException {
    containerManager.start();
    // ////// Construct the Container-id
    ContainerId cId = createContainerId(0);
    File oldStartFile = new File(tmpDir, "start_file_o.txt").getAbsoluteFile();

    String pid = prepareInitialContainer(cId, oldStartFile);

    File newStartFile = new File(tmpDir, "start_file_n.txt").getAbsoluteFile();

    ResourceUtilization beforeUpgrade =
        ResourceUtilization.newInstance(
            containerManager.getContainerScheduler().getCurrentUtilization());
    prepareContainerUpgrade(autoCommit, false, false, cId, newStartFile);
    ResourceUtilization afterUpgrade =
        ResourceUtilization.newInstance(
            containerManager.getContainerScheduler().getCurrentUtilization());
    assertEquals(beforeUpgrade, afterUpgrade,
        "Possible resource leak detected !!");

    // Assert that the First process is not alive anymore
    assertFalse(DefaultContainerExecutor.containerIsAlive(pid),
        "Process is still alive!");

    BufferedReader reader =
        new BufferedReader(new FileReader(newStartFile));
    assertEquals("Upgrade World!", reader.readLine());

    // Get the pid of the process
    String newPid = reader.readLine().trim();
    assertNotEquals(pid, newPid, "Old and New Pids must be different !");
    // No more lines
    assertEquals(null, reader.readLine());

    reader.close();

    // Verify old file still exists and is accessible by
    // the new process...
    reader = new BufferedReader(new FileReader(oldStartFile));
    assertEquals("Hello World!", reader.readLine());

    // Assert that the New process is alive
    assertTrue(DefaultContainerExecutor.containerIsAlive(newPid),
        "New Process is not alive!");
    return new String[]{pid, newPid};
  }

  @Test
  public void testContainerUpgradeSuccessAutoCommit() throws IOException,
      InterruptedException, YarnException {
    Listener listener = new Listener();
    ((NodeManager.DefaultContainerStateListener)containerManager.context.
        getContainerStateTransitionListener()).addListener(listener);
    testContainerReInitSuccess(true);
    // Should not be able to Commit (since already auto committed)
    try {
      containerManager.commitLastReInitialization(createContainerId(0));
      fail();
    } catch (Exception e) {
      assertTrue(e.getMessage().contains("Nothing to Commit"));
    }

    List<org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
        ContainerState> containerStates =
        listener.states.get(createContainerId(0));
    assertEquals(Arrays.asList(
        org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
            ContainerState.NEW,
        org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
            ContainerState.LOCALIZING,
        org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
            ContainerState.SCHEDULED,
        org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
            ContainerState.RUNNING,
        org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
            ContainerState.REINITIALIZING,
        org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
            ContainerState.REINITIALIZING_AWAITING_KILL,
        org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
            ContainerState.REINITIALIZING_AWAITING_KILL,
        org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
            ContainerState.SCHEDULED,
        org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
            ContainerState.RUNNING), containerStates);

    List<ContainerEventType> containerEventTypes =
        listener.events.get(createContainerId(0));
    assertEquals(Arrays.asList(
        ContainerEventType.INIT_CONTAINER,
        ContainerEventType.RESOURCE_LOCALIZED,
        ContainerEventType.CONTAINER_LAUNCHED,
        ContainerEventType.REINITIALIZE_CONTAINER,
        ContainerEventType.RESOURCE_LOCALIZED,
        ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
        ContainerEventType.CONTAINER_KILLED_ON_REQUEST,
        ContainerEventType.CONTAINER_LAUNCHED), containerEventTypes);
  }

  @Test
  public void testContainerUpgradeSuccessExplicitCommit() throws IOException,
      InterruptedException, YarnException {
    testContainerReInitSuccess(false);
    ContainerId cId = createContainerId(0);
    containerManager.commitLastReInitialization(cId);
    // Should not be able to Rollback once committed
    try {
      containerManager.rollbackLastReInitialization(cId);
      fail();
    } catch (Exception e) {
      assertTrue(e.getMessage().contains("Nothing to rollback to"));
    }
  }

  @Test
  public void testContainerUpgradeSuccessExplicitRollback() throws IOException,
      InterruptedException, YarnException {
    Listener listener = new Listener();
    ((NodeManager.DefaultContainerStateListener)containerManager.context.
        getContainerStateTransitionListener()).addListener(listener);
    String[] pids = testContainerReInitSuccess(false);

    // Test that the container can be Restarted after the successful upgrrade.
    // Also, since there is a rollback context present before the restart, it
    // should be possible to rollback the container AFTER the restart.
    pids[1] = doRestartTests(createContainerId(0),
        new File(tmpDir, "start_file_n.txt").getAbsoluteFile(),
        "Upgrade World!", pids[1], true);

    // Delete the old start File..
    File oldStartFile = new File(tmpDir, "start_file_o.txt").getAbsoluteFile();

    oldStartFile.delete();

    ContainerId cId = createContainerId(0);
    // Explicit Rollback
    containerManager.rollbackLastReInitialization(cId);

    Container container =
        containerManager.getContext().getContainers().get(cId);
    assertTrue(container.isReInitializing());
    // Original should be dead anyway
    assertFalse(DefaultContainerExecutor.containerIsAlive(pids[0]),
        "Original Process is still alive!");

    // Wait for new container to startup
    int timeoutSecs = 0;
    while (container.isReInitializing() && timeoutSecs++ < 20) {
      Thread.sleep(1000);
      LOG.info("Waiting for ReInitialization to complete..");
    }
    assertFalse(container.isReInitializing());

    timeoutSecs = 0;
    // Wait for new processStartfile to be created
    while (!oldStartFile.exists() && timeoutSecs++ < 20) {
      Thread.sleep(1000);
      LOG.info("Waiting for New process start-file to be created");
    }

    // Now verify the contents of the file
    BufferedReader reader =
        new BufferedReader(new FileReader(oldStartFile));
    assertEquals("Hello World!", reader.readLine());
    // Get the pid of the process
    String rolledBackPid = reader.readLine().trim();
    // No more lines
    assertEquals(null, reader.readLine());

    assertNotEquals(pids[0], rolledBackPid,
        "The Rolled-back process should be a different pid");

    List<org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
        ContainerState> containerStates =
        listener.states.get(createContainerId(0));
    assertEquals(Arrays.asList(
        org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
            ContainerState.NEW,
        org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
            ContainerState.LOCALIZING,
        org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
            ContainerState.SCHEDULED,
        org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
            ContainerState.RUNNING,
        org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
            ContainerState.REINITIALIZING,
        org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
            ContainerState.REINITIALIZING_AWAITING_KILL,
        org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
            ContainerState.REINITIALIZING_AWAITING_KILL,
        org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
            ContainerState.SCHEDULED,
        org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
            ContainerState.RUNNING,
        // This is the successful restart
        org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
            ContainerState.REINITIALIZING_AWAITING_KILL,
        org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
            ContainerState.REINITIALIZING_AWAITING_KILL,
        org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
            ContainerState.SCHEDULED,
        org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
            ContainerState.RUNNING,
        // This is the rollback
        org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
            ContainerState.REINITIALIZING_AWAITING_KILL,
        org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
            ContainerState.REINITIALIZING_AWAITING_KILL,
        org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
            ContainerState.SCHEDULED,
        org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
            ContainerState.RUNNING), containerStates);

    List<ContainerEventType> containerEventTypes =
        listener.events.get(createContainerId(0));
    assertEquals(Arrays.asList(
        ContainerEventType.INIT_CONTAINER,
        ContainerEventType.RESOURCE_LOCALIZED,
        ContainerEventType.CONTAINER_LAUNCHED,
        ContainerEventType.REINITIALIZE_CONTAINER,
        ContainerEventType.RESOURCE_LOCALIZED,
        ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
        ContainerEventType.CONTAINER_KILLED_ON_REQUEST,
        ContainerEventType.CONTAINER_LAUNCHED,
        ContainerEventType.REINITIALIZE_CONTAINER,
        ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
        ContainerEventType.CONTAINER_KILLED_ON_REQUEST,
        ContainerEventType.CONTAINER_LAUNCHED,
        ContainerEventType.ROLLBACK_REINIT,
        ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
        ContainerEventType.CONTAINER_KILLED_ON_REQUEST,
        ContainerEventType.CONTAINER_LAUNCHED), containerEventTypes);
  }

  @Test
  public void testContainerUpgradeLocalizationFailure() throws IOException,
      InterruptedException, YarnException {
    if (Shell.WINDOWS) {
      return;
    }
    containerManager.start();
    Listener listener = new Listener();
    ((NodeManager.DefaultContainerStateListener)containerManager.context.
        getContainerStateTransitionListener()).addListener(listener);
    // ////// Construct the Container-id
    ContainerId cId = createContainerId(0);
    File oldStartFile = new File(tmpDir, "start_file_o.txt").getAbsoluteFile();

    String pid = prepareInitialContainer(cId, oldStartFile);

    File newStartFile = new File(tmpDir, "start_file_n.txt").getAbsoluteFile();

    prepareContainerUpgrade(false, true, true, cId, newStartFile);

    // Assert that the First process is STILL alive
    // since upgrade was terminated..
    assertTrue(DefaultContainerExecutor.containerIsAlive(pid),
        "Process is NOT alive!");

    List<org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
        ContainerState> containerStates =
        listener.states.get(createContainerId(0));
    assertEquals(Arrays.asList(
        org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
            ContainerState.NEW,
        org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
            ContainerState.LOCALIZING,
        org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
            ContainerState.SCHEDULED,
        org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
            ContainerState.RUNNING,
        org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
            ContainerState.REINITIALIZING,
        org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
            ContainerState.RUNNING), containerStates);

    List<ContainerEventType> containerEventTypes =
        listener.events.get(createContainerId(0));
    assertEquals(Arrays.asList(
        ContainerEventType.INIT_CONTAINER,
        ContainerEventType.RESOURCE_LOCALIZED,
        ContainerEventType.CONTAINER_LAUNCHED,
        ContainerEventType.REINITIALIZE_CONTAINER,
        ContainerEventType.RESOURCE_FAILED), containerEventTypes);
  }

  @Test
  public void testContainerUpgradeProcessFailure() throws IOException,
      InterruptedException, YarnException {
    if (Shell.WINDOWS) {
      return;
    }
    containerManager.start();
    // ////// Construct the Container-id
    ContainerId cId = createContainerId(0);
    File oldStartFile = new File(tmpDir, "start_file_o.txt").getAbsoluteFile();

    String pid = prepareInitialContainer(cId, oldStartFile);

    File newStartFile = new File(tmpDir, "start_file_n.txt").getAbsoluteFile();

    // Since Autocommit is true, there is also no rollback context...
    // which implies that if the new process fails, since there is no
    // rollback, it is terminated.
    prepareContainerUpgrade(true, true, false, cId, newStartFile);

    // Assert that the First process is not alive anymore
    assertFalse(DefaultContainerExecutor.containerIsAlive(pid),
        "Process is still alive!");
  }

  @Test
  public void testContainerUpgradeRollbackDueToFailure() throws IOException,
      InterruptedException, YarnException {
    if (Shell.WINDOWS) {
      return;
    }
    containerManager.start();
    Listener listener = new Listener();
    ((NodeManager.DefaultContainerStateListener)containerManager.context.
        getContainerStateTransitionListener()).addListener(listener);
    // ////// Construct the Container-id
    ContainerId cId = createContainerId(0);
    File oldStartFile = new File(tmpDir, "start_file_o.txt").getAbsoluteFile();

    String pid = prepareInitialContainer(cId, oldStartFile);

    File newStartFile = new File(tmpDir, "start_file_n.txt").getAbsoluteFile();

    prepareContainerUpgrade(false, true, false, cId, newStartFile);

    // Assert that the First process is not alive anymore
    assertFalse(DefaultContainerExecutor.containerIsAlive(pid),
        "Original Process is still alive!");

    int timeoutSecs = 0;
    // Wait for oldStartFile to be created
    while (!oldStartFile.exists() && timeoutSecs++ < 20) {
      System.out.println("\nFiles: " +
          Arrays.toString(oldStartFile.getParentFile().list()));
      Thread.sleep(1000);
      LOG.info("Waiting for New process start-file to be created");
    }

    // Now verify the contents of the file
    BufferedReader reader =
        new BufferedReader(new FileReader(oldStartFile));
    assertEquals("Hello World!", reader.readLine());
    // Get the pid of the process
    String rolledBackPid = reader.readLine().trim();
    // No more lines
    assertEquals(null, reader.readLine());

    assertNotEquals(pid, rolledBackPid,
        "The Rolled-back process should be a different pid");

    List<org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
        ContainerState> containerStates =
        listener.states.get(createContainerId(0));
    assertEquals(Arrays.asList(
        org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
            ContainerState.NEW,
        org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
            ContainerState.LOCALIZING,
        org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
            ContainerState.SCHEDULED,
        org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
            ContainerState.RUNNING,
        org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
            ContainerState.REINITIALIZING,
        org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
            ContainerState.REINITIALIZING_AWAITING_KILL,
        org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
            ContainerState.REINITIALIZING_AWAITING_KILL,
        org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
            ContainerState.SCHEDULED,
        org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
            ContainerState.RUNNING,
        org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
            ContainerState.RUNNING,
        org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
            ContainerState.SCHEDULED,
        org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
            ContainerState.RUNNING), containerStates);

    List<ContainerEventType> containerEventTypes =
        listener.events.get(createContainerId(0));
    assertEquals(Arrays.asList(
        ContainerEventType.INIT_CONTAINER,
        ContainerEventType.RESOURCE_LOCALIZED,
        ContainerEventType.CONTAINER_LAUNCHED,
        ContainerEventType.REINITIALIZE_CONTAINER,
        ContainerEventType.RESOURCE_LOCALIZED,
        ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
        ContainerEventType.CONTAINER_KILLED_ON_REQUEST,
        ContainerEventType.CONTAINER_LAUNCHED,
        ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
        ContainerEventType.CONTAINER_EXITED_WITH_FAILURE,
        ContainerEventType.CONTAINER_LAUNCHED), containerEventTypes);
  }

  /**
   * Prepare a launch Context for container upgrade and request the
   * Container Manager to re-initialize a running container using the
   * new launch context.
   * @param autoCommit Enable autoCommit.
   * @param failCmd injects a start script that intentionally fails.
   * @param failLoc injects a bad file Location that will fail localization.
   */
  private void prepareContainerUpgrade(boolean autoCommit, boolean failCmd,
      boolean failLoc, ContainerId cId, File startFile)
      throws FileNotFoundException, YarnException, InterruptedException {
    // Re-write scriptfile and processStartFile
    File scriptFile = Shell.appendScriptExtension(tmpDir, "scriptFile_new");
    PrintWriter fileWriter = new PrintWriter(scriptFile);

    writeScriptFile(fileWriter, "Upgrade World!", startFile, cId, failCmd);

    ContainerLaunchContext containerLaunchContext =
        prepareContainerLaunchContext(scriptFile, "dest_file_new", failLoc, 0);

    containerManager.reInitializeContainer(cId, containerLaunchContext,
        autoCommit);
    try {
      containerManager.reInitializeContainer(cId, containerLaunchContext,
          autoCommit);
    } catch (Exception e) {
      assertTrue(e.getMessage().contains("Cannot perform RE_INIT"));
    }
    int timeoutSecs = 0;
    int maxTimeToWait = failLoc ? 10 : 20;
    // Wait for new processStartfile to be created
    while (!startFile.exists() && timeoutSecs++ < maxTimeToWait) {
      Thread.sleep(1000);
      LOG.info("Waiting for New process start-file to be created");
    }
  }

  /**
   * Prepare and start an initial container. This container will be subsequently
   * re-initialized for upgrade. It also waits for the container to start and
   * returns the Pid of the running container.
   */
  private String prepareInitialContainer(ContainerId cId, File startFile)
      throws IOException, YarnException, InterruptedException {
    File scriptFileOld = Shell.appendScriptExtension(tmpDir, "scriptFile");
    PrintWriter fileWriterOld = new PrintWriter(scriptFileOld);

    writeScriptFile(fileWriterOld, "Hello World!", startFile, cId, false);

    ContainerLaunchContext containerLaunchContext =
        prepareContainerLaunchContext(scriptFileOld, "dest_file", false, 4);

    StartContainerRequest scRequest =
        StartContainerRequest.newInstance(containerLaunchContext,
            createContainerToken(cId,
                DUMMY_RM_IDENTIFIER, context.getNodeId(), user,
                context.getContainerTokenSecretManager()));
    List<StartContainerRequest> list = new ArrayList<>();
    list.add(scRequest);
    StartContainersRequest allRequests =
        StartContainersRequest.newInstance(list);
    containerManager.startContainers(allRequests);

    int timeoutSecs = 0;
    while (!startFile.exists() && timeoutSecs++ < 20) {
      Thread.sleep(1000);
      LOG.info("Waiting for process start-file to be created");
    }
    assertTrue(startFile.exists(), "ProcessStartFile doesn't exist!");

    // Now verify the contents of the file
    BufferedReader reader =
        new BufferedReader(new FileReader(startFile));
    assertEquals("Hello World!", reader.readLine());
    // Get the pid of the process
    String pid = reader.readLine().trim();
    // No more lines
    assertEquals(null, reader.readLine());

    // Assert that the process is alive
    assertTrue(DefaultContainerExecutor.containerIsAlive(pid),
        "Process is not alive!");
    // Once more
    assertTrue(DefaultContainerExecutor.containerIsAlive(pid),
        "Process is not alive!");
    return pid;
  }

  private void writeScriptFile(PrintWriter fileWriter, String startLine,
      File processStartFile, ContainerId cId, boolean isFailure) {
    if (Shell.WINDOWS) {
      fileWriter.println("@echo " + startLine + "> " + processStartFile);
      fileWriter.println("@echo " + cId + ">> " + processStartFile);
      fileWriter.println("@ping -n 100 127.0.0.1 >nul");
    } else {
      fileWriter.write("\numask 0"); // So that start file is readable by test
      if (isFailure) {
        // Echo PID and throw some error code
        fileWriter.write("\necho $$ >> " + processStartFile);
        fileWriter.write("\nexit 111");
      } else {
        fileWriter.write("\necho " + startLine + " > " + processStartFile);
        fileWriter.write("\necho $$ >> " + processStartFile);
        fileWriter.write("\nexec sleep 100");
      }
    }
    fileWriter.close();
  }

  private ContainerLaunchContext prepareContainerLaunchContext(File scriptFile,
      String destFName, boolean putBadFile, int numRetries) {
    ContainerLaunchContext containerLaunchContext =
        recordFactory.newRecordInstance(ContainerLaunchContext.class);
    URL resourceAlpha = null;
    if (putBadFile) {
      File fileToDelete = new File(tmpDir, "fileToDelete")
          .getAbsoluteFile();
      resourceAlpha =
          URL.fromPath(localFS
              .makeQualified(new Path(fileToDelete.getAbsolutePath())));
      fileToDelete.delete();
    } else {
      resourceAlpha =
          URL.fromPath(localFS
              .makeQualified(new Path(scriptFile.getAbsolutePath())));
    }
    LocalResource rsrcAlpha =
        recordFactory.newRecordInstance(LocalResource.class);
    rsrcAlpha.setResource(resourceAlpha);
    rsrcAlpha.setSize(-1);
    rsrcAlpha.setVisibility(LocalResourceVisibility.APPLICATION);
    rsrcAlpha.setType(LocalResourceType.FILE);
    rsrcAlpha.setTimestamp(scriptFile.lastModified());
    Map<String, LocalResource> localResources = new HashMap<>();
    localResources.put(destFName, rsrcAlpha);
    containerLaunchContext.setLocalResources(localResources);

    ContainerRetryContext containerRetryContext = ContainerRetryContext
        .newInstance(
            ContainerRetryPolicy.RETRY_ON_SPECIFIC_ERROR_CODES,
            new HashSet<>(Arrays.asList(Integer.valueOf(111))), numRetries, 0);
    containerLaunchContext.setContainerRetryContext(containerRetryContext);
    List<String> commands = Arrays.asList(
        Shell.getRunScriptCommand(scriptFile));
    containerLaunchContext.setCommands(commands);
    return containerLaunchContext;
  }

  protected void testContainerLaunchAndExit(int exitCode) throws IOException,
      InterruptedException, YarnException {

	  File scriptFile = Shell.appendScriptExtension(tmpDir, "scriptFile");
	  PrintWriter fileWriter = new PrintWriter(scriptFile);
	  File processStartFile =
			  new File(tmpDir, "start_file.txt").getAbsoluteFile();

	  // ////// Construct the Container-id
	  ContainerId cId = createContainerId(0);

	  if (Shell.WINDOWS) {
	    fileWriter.println("@echo Hello World!> " + processStartFile);
	    fileWriter.println("@echo " + cId + ">> " + processStartFile);
	    if (exitCode != 0) {
	      fileWriter.println("@exit " + exitCode);
	    }
	  } else {
	    fileWriter.write("\numask 0"); // So that start file is readable by the test
	    fileWriter.write("\necho Hello World! > " + processStartFile);
	    fileWriter.write("\necho $$ >> " + processStartFile); 
	    // Have script throw an exit code at the end
	    if (exitCode != 0) {
	      fileWriter.write("\nexit "+exitCode);
	    }
	  }
	  
	  fileWriter.close();

	  ContainerLaunchContext containerLaunchContext = 
			  recordFactory.newRecordInstance(ContainerLaunchContext.class);

	  URL resource_alpha =
			  URL.fromPath(localFS
					  .makeQualified(new Path(scriptFile.getAbsolutePath())));
	  LocalResource rsrc_alpha =
			  recordFactory.newRecordInstance(LocalResource.class);
	  rsrc_alpha.setResource(resource_alpha);
	  rsrc_alpha.setSize(-1);
	  rsrc_alpha.setVisibility(LocalResourceVisibility.APPLICATION);
	  rsrc_alpha.setType(LocalResourceType.FILE);
	  rsrc_alpha.setTimestamp(scriptFile.lastModified());
	  String destinationFile = "dest_file";
	  Map<String, LocalResource> localResources = 
			  new HashMap<String, LocalResource>();
	  localResources.put(destinationFile, rsrc_alpha);
	  containerLaunchContext.setLocalResources(localResources);
	  List<String> commands = Arrays.asList(Shell.getRunScriptCommand(scriptFile));
	  containerLaunchContext.setCommands(commands);

    StartContainerRequest scRequest =
        StartContainerRequest.newInstance(
          containerLaunchContext,
          createContainerToken(cId, DUMMY_RM_IDENTIFIER, context.getNodeId(),
            user, context.getContainerTokenSecretManager()));
    List<StartContainerRequest> list = new ArrayList<>();
    list.add(scRequest);
    StartContainersRequest allRequests =
        StartContainersRequest.newInstance(list);
    containerManager.startContainers(allRequests);

	  BaseContainerManagerTest.waitForContainerState(containerManager, cId,
			  ContainerState.COMPLETE);

    List<ContainerId> containerIds = new ArrayList<>();
    containerIds.add(cId);
    GetContainerStatusesRequest gcsRequest =
        GetContainerStatusesRequest.newInstance(containerIds);
    ContainerStatus containerStatus = containerManager.
        getContainerStatuses(gcsRequest).getContainerStatuses().get(0);

    // Verify exit status matches exit state of script
    assertEquals(exitCode, containerStatus.getExitStatus());
  }
  
  @Test
  public void testContainerLaunchAndExitSuccess() throws IOException,
      InterruptedException, YarnException {
    containerManager.start();
    int exitCode = 0;

    // launch context for a command that will return exit code 0
    // and verify exit code returned
    testContainerLaunchAndExit(exitCode);
  }

  @Test
  public void testContainerLaunchAndExitFailure() throws IOException,
      InterruptedException, YarnException {
	  containerManager.start();
	  int exitCode = 50; 

	  // launch context for a command that will return exit code 0 
	  // and verify exit code returned 
	  testContainerLaunchAndExit(exitCode);	  
  }

  private Map<String, LocalResource> setupLocalResources(String fileName,
      String symLink) throws Exception {
    // ////// Create the resources for the container
    File dir = new File(tmpDir, "dir");
    dir.mkdirs();
    File file = new File(dir, fileName);
    PrintWriter fileWriter = new PrintWriter(file);
    fileWriter.write("Hello World!");
    fileWriter.close();

    URL resourceURL = URL.fromPath(FileContext.getLocalFSFileContext()
        .makeQualified(new Path(file.getAbsolutePath())));
    LocalResource resource =
        recordFactory.newRecordInstance(LocalResource.class);
    resource.setResource(resourceURL);
    resource.setSize(-1);
    resource.setVisibility(LocalResourceVisibility.APPLICATION);
    resource.setType(LocalResourceType.FILE);
    resource.setTimestamp(file.lastModified());
    Map<String, LocalResource> localResources =
        new HashMap<String, LocalResource>();
    localResources.put(symLink, resource);
    return localResources;
  }

  // Start the container
  // While the container is running, localize new resources.
  // Verify the symlink is created properly
  @Test
  public void testLocalizingResourceWhileContainerRunning() throws Exception {
    // Real del service
    delSrvc = new DeletionService(exec);
    delSrvc.init(conf);

    ((NodeManager.NMContext)context).setContainerExecutor(exec);
    containerManager = createContainerManager(delSrvc);
    containerManager.init(conf);
    containerManager.start();
    // set up local resources
    Map<String, LocalResource> localResource =
        setupLocalResources("file", "symLink1");
    ContainerLaunchContext context =
        recordFactory.newRecordInstance(ContainerLaunchContext.class);
    context.setLocalResources(localResource);

    // a long running container - sleep
    context.setCommands(Arrays.asList("sleep 6"));
    ContainerId cId = createContainerId(0);

    // start the container
    StartContainerRequest scRequest = StartContainerRequest.newInstance(context,
        createContainerToken(cId, DUMMY_RM_IDENTIFIER, this.context.getNodeId(),
            user, this.context.getContainerTokenSecretManager()));
    StartContainersRequest allRequests =
        StartContainersRequest.newInstance(Arrays.asList(scRequest));
    containerManager.startContainers(allRequests);
    BaseContainerManagerTest
        .waitForContainerState(containerManager, cId, ContainerState.RUNNING);

    BaseContainerManagerTest.waitForApplicationState(containerManager,
        cId.getApplicationAttemptId().getApplicationId(),
        ApplicationState.RUNNING);
    checkResourceLocalized(cId, "symLink1");

    // Localize new local resources while container is running
    Map<String, LocalResource> localResource2 =
        setupLocalResources("file2", "symLink2");

    ResourceLocalizationRequest request =
        ResourceLocalizationRequest.newInstance(cId, localResource2);
    containerManager.localize(request);

    // Verify resource is localized and symlink is created.
    GenericTestUtils.waitFor(new Supplier<Boolean>() {
      public Boolean get() {
        try {
          checkResourceLocalized(cId, "symLink2");
          return true;
        } catch (Throwable e) {
          return false;
        }
      }
    }, 500, 20000);

    BaseContainerManagerTest
        .waitForContainerState(containerManager, cId, ContainerState.COMPLETE);
    // Verify container cannot localize resources while at non-running state.
    try{
      containerManager.localize(request);
      fail();
    } catch (YarnException e) {
      assertTrue(
          e.getMessage().contains("Cannot perform LOCALIZE"));
    }
  }

  private void checkResourceLocalized(ContainerId containerId, String symLink) {
    String appId =
        containerId.getApplicationAttemptId().getApplicationId().toString();
    File userCacheDir = new File(localDir, ContainerLocalizer.USERCACHE);
    File userDir = new File(userCacheDir, user);
    File appCache = new File(userDir, ContainerLocalizer.APPCACHE);
    // localDir/usercache/nobody/appcache/application_0_0000
    File appDir = new File(appCache, appId);
    // localDir/usercache/nobody/appcache/application_0_0000/container_0_0000_01_000000
    File containerDir = new File(appDir, containerId.toString());
    // localDir/usercache/nobody/appcache/application_0_0000/container_0_0000_01_000000/symLink1
    File targetFile = new File(containerDir, symLink);

    File sysDir =
        new File(localDir, ResourceLocalizationService.NM_PRIVATE_DIR);
    // localDir/nmPrivate/application_0_0000
    File appSysDir = new File(sysDir, appId);
    // localDir/nmPrivate/application_0_0000/container_0_0000_01_000000
    File containerSysDir = new File(appSysDir, containerId.toString());

    assertTrue(appDir.exists(), "AppDir " + appDir.getAbsolutePath() + " doesn't exist!!");
    assertTrue(appSysDir.exists(), "AppSysDir "
        + appSysDir.getAbsolutePath() + " doesn't exist!!");
    assertTrue(containerDir.exists(), "containerDir "
        + containerDir.getAbsolutePath() + " doesn't exist !");
    assertTrue(containerDir.exists(), "containerSysDir "
        + containerSysDir.getAbsolutePath()
        + " doesn't exist !");
    assertTrue(targetFile.exists(),
        "targetFile " + targetFile.getAbsolutePath() + " doesn't exist !!");
  }

  @Test
  public void testLocalFilesCleanup() throws InterruptedException,
      IOException, YarnException {
    // Real del service
    delSrvc = new DeletionService(exec);
    delSrvc.init(conf);

    containerManager = createContainerManager(delSrvc);
    containerManager.init(conf);
    containerManager.start();

    // ////// Create the resources for the container
    File dir = new File(tmpDir, "dir");
    dir.mkdirs();
    File file = new File(dir, "file");
    PrintWriter fileWriter = new PrintWriter(file);
    fileWriter.write("Hello World!");
    fileWriter.close();

    // ////// Construct the Container-id
    ContainerId cId = createContainerId(0);
    ApplicationId appId = cId.getApplicationAttemptId().getApplicationId();

    // ////// Construct the container-spec.
    ContainerLaunchContext containerLaunchContext = recordFactory.newRecordInstance(ContainerLaunchContext.class);
//    containerLaunchContext.resources =
//        new HashMap<CharSequence, LocalResource>();
    URL resource_alpha =
        URL.fromPath(FileContext.getLocalFSFileContext()
            .makeQualified(new Path(file.getAbsolutePath())));
    LocalResource rsrc_alpha = recordFactory.newRecordInstance(LocalResource.class);
    rsrc_alpha.setResource(resource_alpha);
    rsrc_alpha.setSize(-1);
    rsrc_alpha.setVisibility(LocalResourceVisibility.APPLICATION);
    rsrc_alpha.setType(LocalResourceType.FILE);
    rsrc_alpha.setTimestamp(file.lastModified());
    String destinationFile = "dest_file";
    Map<String, LocalResource> localResources = 
        new HashMap<String, LocalResource>();
    localResources.put(destinationFile, rsrc_alpha);
    containerLaunchContext.setLocalResources(localResources);

    StartContainerRequest scRequest =
        StartContainerRequest.newInstance(
          containerLaunchContext,
          createContainerToken(cId, DUMMY_RM_IDENTIFIER, context.getNodeId(),
            user, context.getContainerTokenSecretManager()));
    List<StartContainerRequest> list = new ArrayList<>();
    list.add(scRequest);
    StartContainersRequest allRequests =
        StartContainersRequest.newInstance(list);
    containerManager.startContainers(allRequests);

    BaseContainerManagerTest.waitForContainerState(containerManager, cId,
        ContainerState.COMPLETE);

    BaseContainerManagerTest.waitForApplicationState(containerManager, 
        cId.getApplicationAttemptId().getApplicationId(),
        ApplicationState.RUNNING);

    // Now ascertain that the resources are localised correctly.
    String appIDStr = appId.toString();
    String containerIDStr = cId.toString();
    File userCacheDir = new File(localDir, ContainerLocalizer.USERCACHE);
    File userDir = new File(userCacheDir, user);
    File appCache = new File(userDir, ContainerLocalizer.APPCACHE);
    File appDir = new File(appCache, appIDStr);
    File containerDir = new File(appDir, containerIDStr);
    File targetFile = new File(containerDir, destinationFile);
    File sysDir =
        new File(localDir,
            ResourceLocalizationService.NM_PRIVATE_DIR);
    File appSysDir = new File(sysDir, appIDStr);
    File containerSysDir = new File(appSysDir, containerIDStr);
    // AppDir should still exist
    assertTrue(appDir.exists(), "AppDir " + appDir.getAbsolutePath()
        + " doesn't exist!!");
    assertTrue(appSysDir.exists(), "AppSysDir " + appSysDir.getAbsolutePath()
        + " doesn't exist!!");
    for (File f : new File[] { containerDir, containerSysDir }) {
      assertFalse(f.exists(), f.getAbsolutePath() + " exists!!");
    }
    assertFalse(targetFile.exists(),
        targetFile.getAbsolutePath() + " exists!!");

    // Simulate RM sending an AppFinish event.
    containerManager.handle(new CMgrCompletedAppsEvent(Arrays
        .asList(new ApplicationId[] { appId }), CMgrCompletedAppsEvent.Reason.ON_SHUTDOWN));

    BaseContainerManagerTest.waitForApplicationState(containerManager, 
        cId.getApplicationAttemptId().getApplicationId(),
        ApplicationState.FINISHED);

    // Now ascertain that the resources are localised correctly.
    for (File f : new File[] { appDir, containerDir, appSysDir,
        containerSysDir }) {
      // Wait for deletion. Deletion can happen long after AppFinish because of
      // the async DeletionService
      int timeout = 0;
      while (f.exists() && timeout++ < 15) {
        Thread.sleep(1000);
      }
      assertFalse(f.exists(), f.getAbsolutePath() + " exists!!");
    }
    // Wait for deletion
    int timeout = 0;
    while (targetFile.exists() && timeout++ < 15) {
      Thread.sleep(1000);
    }
    assertFalse(targetFile.exists(),
        targetFile.getAbsolutePath() + " exists!!");
  }

  @Test
  public void testContainerLaunchFromPreviousRM() throws IOException,
      InterruptedException, YarnException {
    containerManager.start();

    ContainerLaunchContext containerLaunchContext =
        recordFactory.newRecordInstance(ContainerLaunchContext.class);

    ContainerId cId1 = createContainerId(0);
    ContainerId cId2 = createContainerId(0);
    containerLaunchContext
      .setLocalResources(new HashMap<String, LocalResource>());

    // Construct the Container with Invalid RMIdentifier
    StartContainerRequest startRequest1 =
        StartContainerRequest.newInstance(containerLaunchContext,
          createContainerToken(cId1,
            ResourceManagerConstants.RM_INVALID_IDENTIFIER, context.getNodeId(),
            user, context.getContainerTokenSecretManager()));
    List<StartContainerRequest> list = new ArrayList<>();
    list.add(startRequest1);
    StartContainersRequest allRequests =
        StartContainersRequest.newInstance(list);
    containerManager.startContainers(allRequests);
    
    boolean catchException = false;
    try {
      StartContainersResponse response = containerManager.startContainers(allRequests);
      if(response.getFailedRequests().containsKey(cId1)) {
        throw response.getFailedRequests().get(cId1).deSerialize();
      }
    } catch (Throwable e) {
      e.printStackTrace();
      catchException = true;
      assertTrue(e.getMessage().contains(
        "Container " + cId1 + " rejected as it is allocated by a previous RM"));
      assertTrue(e.getClass().getName()
        .equalsIgnoreCase(InvalidContainerException.class.getName()));
    }

    // Verify that startContainer fail because of invalid container request
    assertTrue(catchException);

    // Construct the Container with a RMIdentifier within current RM
    StartContainerRequest startRequest2 =
        StartContainerRequest.newInstance(containerLaunchContext,
          createContainerToken(cId2,
            DUMMY_RM_IDENTIFIER, context.getNodeId(), user,
            context.getContainerTokenSecretManager()));
    List<StartContainerRequest> list2 = new ArrayList<>();
    list.add(startRequest2);
    StartContainersRequest allRequests2 =
        StartContainersRequest.newInstance(list2);
    containerManager.startContainers(allRequests2);
    
    boolean noException = true;
    try {
      containerManager.startContainers(allRequests2);
    } catch (YarnException e) {
      noException = false;
    }
    // Verify that startContainer get no YarnException
    assertTrue(noException);
  }

  @Test
  public void testMultipleContainersLaunch() throws Exception {
    containerManager.start();

    List<StartContainerRequest> list = new ArrayList<>();
    for (int i = 0; i < 10; i++) {
      ContainerId cId = createContainerId(i);
      long identifier = 0;
      if ((i & 1) == 0)
        // container with even id fail
        identifier = ResourceManagerConstants.RM_INVALID_IDENTIFIER;
      else
        identifier = DUMMY_RM_IDENTIFIER;
      Token containerToken =
          createContainerToken(cId, identifier, context.getNodeId(), user,
            context.getContainerTokenSecretManager());
      StartContainerRequest request =
          StartContainerRequest.newInstance(
              recordFactory.newRecordInstance(ContainerLaunchContext.class),
              containerToken);
      list.add(request);
    }
    StartContainersRequest requestList =
        StartContainersRequest.newInstance(list);

    StartContainersResponse response =
        containerManager.startContainers(requestList);
    Thread.sleep(5000);

    assertEquals(5, response.getSuccessfullyStartedContainers().size());
    for (ContainerId id : response.getSuccessfullyStartedContainers()) {
      // Containers with odd id should succeed.
      assertEquals(1, id.getContainerId() & 1);
    }
    assertEquals(5, response.getFailedRequests().size());
    for (Map.Entry<ContainerId, SerializedException> entry : response
      .getFailedRequests().entrySet()) {
      // Containers with even id should fail.
      assertEquals(0, entry.getKey().getContainerId() & 1);
      assertTrue(entry.getValue().getMessage()
        .contains(
          "Container " + entry.getKey() + " rejected as it is allocated by a previous RM"));
    }
  }

  @Test
  public void testMultipleContainersStopAndGetStatus() throws Exception {
    containerManager.start();
    List<StartContainerRequest> startRequest = new ArrayList<>();
    List<ContainerId> containerIds = new ArrayList<>();
    for (int i = 0; i < 10; i++) {
      ContainerId cId;
      if ((i & 1) == 0) {
        // Containers with even id belong to an unauthorized app
        cId = createContainerId(i, 1);
      } else {
        cId = createContainerId(i, 0);
      }
      Token containerToken =
          createContainerToken(cId, DUMMY_RM_IDENTIFIER, context.getNodeId(),
            user, context.getContainerTokenSecretManager());
      StartContainerRequest request =
          StartContainerRequest.newInstance(
              recordFactory.newRecordInstance(ContainerLaunchContext.class),
              containerToken);
      startRequest.add(request);
      containerIds.add(cId);
    }
    // start containers
    StartContainersRequest requestList =
        StartContainersRequest.newInstance(startRequest);
    containerManager.startContainers(requestList);
    Thread.sleep(5000);

    // Get container statuses
    GetContainerStatusesRequest statusRequest =
        GetContainerStatusesRequest.newInstance(containerIds);
    GetContainerStatusesResponse statusResponse =
        containerManager.getContainerStatuses(statusRequest);
    assertEquals(5, statusResponse.getContainerStatuses().size());
    for (ContainerStatus status : statusResponse.getContainerStatuses()) {
      // Containers with odd id should succeed
      assertEquals(1, status.getContainerId().getContainerId() & 1);
    }
    assertEquals(5, statusResponse.getFailedRequests().size());
    for (Map.Entry<ContainerId, SerializedException> entry : statusResponse
      .getFailedRequests().entrySet()) {
      // Containers with even id should fail.
      assertEquals(0, entry.getKey().getContainerId() & 1);
      assertTrue(entry.getValue().getMessage()
          .contains("attempted to get status for non-application container"));
    }

    // stop containers
    StopContainersRequest stopRequest =
        StopContainersRequest.newInstance(containerIds);
    StopContainersResponse stopResponse =
        containerManager.stopContainers(stopRequest);
    assertEquals(5, stopResponse.getSuccessfullyStoppedContainers()
      .size());
    for (ContainerId id : stopResponse.getSuccessfullyStoppedContainers()) {
      // Containers with odd id should succeed.
      assertEquals(1, id.getContainerId() & 1);
    }
    assertEquals(5, stopResponse.getFailedRequests().size());
    for (Map.Entry<ContainerId, SerializedException> entry : stopResponse
      .getFailedRequests().entrySet()) {
      // Containers with even id should fail.
      assertEquals(0, entry.getKey().getContainerId() & 1);
      assertTrue(entry.getValue().getMessage()
          .contains("attempted to stop non-application container"));
    }
  }

  @Test
  public void testUnauthorizedRequests() throws IOException, YarnException {
    containerManager.start();

    // Create a containerId that belongs to an unauthorized appId
    ContainerId cId = createContainerId(0, 1);

    // startContainers()
    ContainerLaunchContext containerLaunchContext =
        recordFactory.newRecordInstance(ContainerLaunchContext.class);
    StartContainerRequest scRequest =
        StartContainerRequest.newInstance(containerLaunchContext,
            createContainerToken(cId, DUMMY_RM_IDENTIFIER, context.getNodeId(),
                user, context.getContainerTokenSecretManager()));
    List<StartContainerRequest> list = new ArrayList<>();
    list.add(scRequest);
    StartContainersRequest allRequests =
        StartContainersRequest.newInstance(list);
    StartContainersResponse startResponse =
        containerManager.startContainers(allRequests);

    assertFalse(startResponse.getSuccessfullyStartedContainers().contains(cId),
        "Should not be authorized to start container");
    assertTrue(startResponse.getFailedRequests().containsKey(cId),
        "Start container request should fail");

    // Insert the containerId into context, make it as if it is running
    ContainerTokenIdentifier containerTokenIdentifier =
        BuilderUtils.newContainerTokenIdentifier(scRequest.getContainerToken());
    Container container = new ContainerImpl(conf, null, containerLaunchContext,
        null, metrics, containerTokenIdentifier, context);
    context.getContainers().put(cId, container);

    // stopContainers()
    List<ContainerId> containerIds = new ArrayList<>();
    containerIds.add(cId);
    StopContainersRequest stopRequest =
        StopContainersRequest.newInstance(containerIds);
    StopContainersResponse stopResponse =
        containerManager.stopContainers(stopRequest);

    assertFalse(stopResponse.getSuccessfullyStoppedContainers().contains(cId),
        "Should not be authorized to stop container");
    assertTrue(stopResponse.getFailedRequests().containsKey(cId),
        "Stop container request should fail");

    // getContainerStatuses()
    containerIds = new ArrayList<>();
    containerIds.add(cId);
    GetContainerStatusesRequest request =
        GetContainerStatusesRequest.newInstance(containerIds);
    GetContainerStatusesResponse response =
        containerManager.getContainerStatuses(request);

    assertEquals(response.getContainerStatuses().size(), 0,
        "Should not be authorized to get container status");
    assertTrue(response.getFailedRequests().containsKey(cId),
        "Get status request should fail");
  }

  @Test
  public void testStartContainerFailureWithUnknownAuxService() throws Exception {
    conf.setStrings(YarnConfiguration.NM_AUX_SERVICES,
        new String[] { "existService" });
    conf.setClass(
        String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, "existService"),
        ServiceA.class, Service.class);
    containerManager.start();

    List<StartContainerRequest> startRequest = new ArrayList<>();

    ContainerLaunchContext containerLaunchContext =
        recordFactory.newRecordInstance(ContainerLaunchContext.class);
    Map<String, ByteBuffer> serviceData = new HashMap<String, ByteBuffer>();
    String serviceName = "non_exist_auxService";
    serviceData.put(serviceName, ByteBuffer.wrap(serviceName.getBytes()));
    containerLaunchContext.setServiceData(serviceData);

    ContainerId cId = createContainerId(0);
    String user = "start_container_fail";
    Token containerToken =
        createContainerToken(cId, DUMMY_RM_IDENTIFIER, context.getNodeId(),
            user, context.getContainerTokenSecretManager());
    StartContainerRequest request =
        StartContainerRequest.newInstance(containerLaunchContext,
            containerToken);

    // start containers
    startRequest.add(request);
    StartContainersRequest requestList =
        StartContainersRequest.newInstance(startRequest);

    StartContainersResponse response =
        containerManager.startContainers(requestList);
    assertEquals(1, response.getFailedRequests().size());
    assertEquals(0, response.getSuccessfullyStartedContainers().size());
    assertTrue(response.getFailedRequests().containsKey(cId));
    assertTrue(response.getFailedRequests().get(cId).getMessage()
        .contains("The auxService:" + serviceName + " does not exist"));
  }

  /* Test added to verify fix in YARN-644 */
  @Test
  public void testNullTokens() throws Exception {
    ContainerManagerImpl cMgrImpl =
        new ContainerManagerImpl(context, exec, delSrvc, getNodeStatusUpdater(),
        metrics, dirsHandler);
    String strExceptionMsg = "";
    try {
      cMgrImpl.authorizeStartAndResourceIncreaseRequest(
          null, new ContainerTokenIdentifier(), true);
    } catch(YarnException ye) {
      strExceptionMsg = ye.getMessage();
    }
    assertEquals(strExceptionMsg,
        ContainerManagerImpl.INVALID_NMTOKEN_MSG);

    strExceptionMsg = "";
    try {
      cMgrImpl.authorizeStartAndResourceIncreaseRequest(
          new NMTokenIdentifier(), null, true);
    } catch(YarnException ye) {
      strExceptionMsg = ye.getMessage();
    }
    assertEquals(strExceptionMsg,
        ContainerManagerImpl.INVALID_CONTAINERTOKEN_MSG);

    strExceptionMsg = "";
    try {
      cMgrImpl.authorizeGetAndStopContainerRequest(null, null, true, null,
          null);
    } catch(YarnException ye) {
      strExceptionMsg = ye.getMessage();
    }
    assertEquals(strExceptionMsg,
        ContainerManagerImpl.INVALID_NMTOKEN_MSG);

    strExceptionMsg = "";
    try {
      cMgrImpl.authorizeUser(null, null);
    } catch(YarnException ye) {
      strExceptionMsg = ye.getMessage();
    }
    assertEquals(strExceptionMsg,
        ContainerManagerImpl.INVALID_NMTOKEN_MSG);

    ContainerManagerImpl spyContainerMgr = spy(cMgrImpl);
    UserGroupInformation ugInfo = UserGroupInformation.createRemoteUser("a");
    Mockito.when(spyContainerMgr.getRemoteUgi()).thenReturn(ugInfo);
    Mockito.when(spyContainerMgr.
        selectNMTokenIdentifier(ugInfo)).thenReturn(null);

    strExceptionMsg = "";
    try {
      spyContainerMgr.stopContainers(new StopContainersRequestPBImpl());
    } catch(YarnException ye) {
      strExceptionMsg = ye.getMessage();
    }
    assertEquals(strExceptionMsg,
        ContainerManagerImpl.INVALID_NMTOKEN_MSG);

    strExceptionMsg = "";
    try {
      spyContainerMgr.getContainerStatuses(
          new GetContainerStatusesRequestPBImpl());
    } catch(YarnException ye) {
      strExceptionMsg = ye.getMessage();
    }
    assertEquals(strExceptionMsg,
        ContainerManagerImpl.INVALID_NMTOKEN_MSG);

    Mockito.doNothing().when(spyContainerMgr).authorizeUser(ugInfo, null);
    List<StartContainerRequest> reqList = new ArrayList<>();
    reqList.add(StartContainerRequest.newInstance(null, null));
    StartContainersRequest reqs = new StartContainersRequestPBImpl();
    reqs.setStartContainerRequests(reqList);
    strExceptionMsg = "";
    try {
      spyContainerMgr.startContainers(reqs);
    } catch(YarnException ye) {
      strExceptionMsg = ye.getCause().getMessage();
    }
    assertEquals(strExceptionMsg,
        ContainerManagerImpl.INVALID_CONTAINERTOKEN_MSG);
  }

  @Test
  public void testIncreaseContainerResourceWithInvalidRequests() throws Exception {
    containerManager.start();
    // Start 4 containers 0..4 with default resource (1024, 1)
    List<StartContainerRequest> list = new ArrayList<>();
    for (int i = 0; i < 4; i++) {
      ContainerId cId = createContainerId(i);
      long identifier = DUMMY_RM_IDENTIFIER;
      Token containerToken = createContainerToken(cId, identifier,
          context.getNodeId(), user, context.getContainerTokenSecretManager());
      StartContainerRequest request = StartContainerRequest.newInstance(
          recordFactory.newRecordInstance(ContainerLaunchContext.class),
          containerToken);
      list.add(request);
    }
    StartContainersRequest requestList = StartContainersRequest
        .newInstance(list);
    StartContainersResponse response = containerManager
        .startContainers(requestList);

    assertEquals(4, response.getSuccessfullyStartedContainers().size());
    int i = 0;
    for (ContainerId id : response.getSuccessfullyStartedContainers()) {
      assertEquals(i, id.getContainerId());
      i++;
    }

    Thread.sleep(2000);
    // Construct container resource increase request,
    List<Token> increaseTokens = new ArrayList<>();
    // Add increase request for container-0, the request will fail as the
    // container will have exited, and won't be in RUNNING state
    ContainerId cId0 = createContainerId(0);
    Token containerToken =
        createContainerToken(cId0, 1, DUMMY_RM_IDENTIFIER,
            context.getNodeId(), user,
                Resource.newInstance(1234, 3),
                    context.getContainerTokenSecretManager(), null);
    increaseTokens.add(containerToken);
    // Add increase request for container-7, the request will fail as the
    // container does not exist
    ContainerId cId7 = createContainerId(7);
    containerToken =
        createContainerToken(cId7, DUMMY_RM_IDENTIFIER,
            context.getNodeId(), user,
            Resource.newInstance(1234, 3),
            context.getContainerTokenSecretManager(), null);
    increaseTokens.add(containerToken);

    ContainerUpdateRequest updateRequest =
        ContainerUpdateRequest.newInstance(increaseTokens);
    ContainerUpdateResponse updateResponse =
        containerManager.updateContainer(updateRequest);
    // Check response
    assertEquals(
        1, updateResponse.getSuccessfullyUpdatedContainers().size());
    assertEquals(1, updateResponse.getFailedRequests().size());
    for (Map.Entry<ContainerId, SerializedException> entry : updateResponse
        .getFailedRequests().entrySet()) {
      assertNotNull(entry.getValue().getMessage(), "Failed message");
      if (cId7.equals(entry.getKey())) {
        assertTrue(entry.getValue().getMessage()
            .contains("Container " + cId7.toString()
                + " is not handled by this NodeManager"));
      } else {
        throw new YarnException("Received failed request from wrong"
            + " container: " + entry.getKey().toString());
      }
    }
  }

  @Test
  public void testChangeContainerResource() throws Exception {
    containerManager.start();
    File scriptFile = Shell.appendScriptExtension(tmpDir, "scriptFile");
    PrintWriter fileWriter = new PrintWriter(scriptFile);
    // Construct the Container-id
    ContainerId cId = createContainerId(0);
    if (Shell.WINDOWS) {
      fileWriter.println("@ping -n 100 127.0.0.1 >nul");
    } else {
      fileWriter.write("\numask 0");
      fileWriter.write("\nexec sleep 100");
    }
    fileWriter.close();
    ContainerLaunchContext containerLaunchContext =
        recordFactory.newRecordInstance(ContainerLaunchContext.class);
    URL resource_alpha =
        URL.fromPath(localFS
            .makeQualified(new Path(scriptFile.getAbsolutePath())));
    LocalResource rsrc_alpha =
        recordFactory.newRecordInstance(LocalResource.class);
    rsrc_alpha.setResource(resource_alpha);
    rsrc_alpha.setSize(-1);
    rsrc_alpha.setVisibility(LocalResourceVisibility.APPLICATION);
    rsrc_alpha.setType(LocalResourceType.FILE);
    rsrc_alpha.setTimestamp(scriptFile.lastModified());
    String destinationFile = "dest_file";
    Map<String, LocalResource> localResources =
        new HashMap<String, LocalResource>();
    localResources.put(destinationFile, rsrc_alpha);
    containerLaunchContext.setLocalResources(localResources);
    List<String> commands =
        Arrays.asList(Shell.getRunScriptCommand(scriptFile));
    containerLaunchContext.setCommands(commands);
    StartContainerRequest scRequest =
        StartContainerRequest.newInstance(
            containerLaunchContext,
                createContainerToken(cId, DUMMY_RM_IDENTIFIER,
                    context.getNodeId(), user,
                        context.getContainerTokenSecretManager()));
    List<StartContainerRequest> list = new ArrayList<>();
    list.add(scRequest);
    StartContainersRequest allRequests =
        StartContainersRequest.newInstance(list);
    containerManager.startContainers(allRequests);
    // Make sure the container reaches RUNNING state
    BaseContainerManagerTest.waitForNMContainerState(containerManager, cId,
        org.apache.hadoop.yarn.server.nodemanager.
            containermanager.container.ContainerState.RUNNING);
    // Construct container resource increase request,
    List<Token> increaseTokens = new ArrayList<>();
    // Add increase request.
    Resource targetResource = Resource.newInstance(4096, 2);
    Token containerToken = createContainerToken(cId, 1, DUMMY_RM_IDENTIFIER,
        context.getNodeId(), user, targetResource,
            context.getContainerTokenSecretManager(), null);
    increaseTokens.add(containerToken);
    ContainerUpdateRequest updateRequest =
        ContainerUpdateRequest.newInstance(increaseTokens);
    ContainerUpdateResponse updateResponse =
        containerManager.updateContainer(updateRequest);
    assertEquals(1, updateResponse.getSuccessfullyUpdatedContainers().size());
    assertTrue(updateResponse.getFailedRequests().isEmpty());
    // Check status
    List<ContainerId> containerIds = new ArrayList<>();
    containerIds.add(cId);
    GetContainerStatusesRequest gcsRequest =
        GetContainerStatusesRequest.newInstance(containerIds);
    ContainerStatus containerStatus = containerManager
        .getContainerStatuses(gcsRequest).getContainerStatuses().get(0);
    // Check status immediately as resource increase is blocking
    assertEquals(targetResource, containerStatus.getCapability());
    // Simulate a decrease request
    List<Token> decreaseTokens = new ArrayList<>();
    targetResource = Resource.newInstance(2048, 2);
    Token token = createContainerToken(cId, 2, DUMMY_RM_IDENTIFIER,
        context.getNodeId(), user, targetResource,
        context.getContainerTokenSecretManager(), null);
    decreaseTokens.add(token);
    updateRequest = ContainerUpdateRequest.newInstance(decreaseTokens);
    updateResponse = containerManager.updateContainer(updateRequest);

    assertEquals(1, updateResponse.getSuccessfullyUpdatedContainers().size());
    assertTrue(updateResponse.getFailedRequests().isEmpty());

    // Check status with retry
    containerStatus = containerManager
        .getContainerStatuses(gcsRequest).getContainerStatuses().get(0);
    int retry = 0;
    while (!targetResource.equals(containerStatus.getCapability()) &&
        (retry++ < 5)) {
      Thread.sleep(200);
      containerStatus = containerManager.getContainerStatuses(gcsRequest)
          .getContainerStatuses().get(0);
    }
    assertEquals(targetResource, containerStatus.getCapability());
  }

  @Test
  public void testOutputThreadDumpSignal() throws IOException,
      InterruptedException, YarnException {
    testContainerLaunchAndSignal(SignalContainerCommand.OUTPUT_THREAD_DUMP);
  }

  @Test
  public void testGracefulShutdownSignal() throws IOException,
      InterruptedException, YarnException {
    testContainerLaunchAndSignal(SignalContainerCommand.GRACEFUL_SHUTDOWN);
  }

  @Test
  public void testForcefulShutdownSignal() throws IOException,
      InterruptedException, YarnException {
    testContainerLaunchAndSignal(SignalContainerCommand.FORCEFUL_SHUTDOWN);
  }

  // Verify signal container request can be delivered from
  // NodeStatusUpdaterImpl to ContainerExecutor.
  private void testContainerLaunchAndSignal(SignalContainerCommand command)
      throws IOException, InterruptedException, YarnException {

    Signal signal = ContainerLaunch.translateCommandToSignal(command);
    containerManager.start();

    File scriptFile = Shell.appendScriptExtension(tmpDir, "scriptFile");
    PrintWriter fileWriter = new PrintWriter(scriptFile);
    File processStartFile =
        new File(tmpDir, "start_file.txt").getAbsoluteFile();
    writeScriptFile(fileWriter, "Hello world!", processStartFile, null, false);

    ContainerLaunchContext containerLaunchContext =
        recordFactory.newRecordInstance(ContainerLaunchContext.class);

    // ////// Construct the Container-id
    ContainerId cId = createContainerId(0);

    URL resource_alpha =
        URL.fromPath(localFS
            .makeQualified(new Path(scriptFile.getAbsolutePath())));
    LocalResource rsrc_alpha =
        recordFactory.newRecordInstance(LocalResource.class);
    rsrc_alpha.setResource(resource_alpha);
    rsrc_alpha.setSize(-1);
    rsrc_alpha.setVisibility(LocalResourceVisibility.APPLICATION);
    rsrc_alpha.setType(LocalResourceType.FILE);
    rsrc_alpha.setTimestamp(scriptFile.lastModified());
    String destinationFile = "dest_file";
    Map<String, LocalResource> localResources =
        new HashMap<String, LocalResource>();
    localResources.put(destinationFile, rsrc_alpha);
    containerLaunchContext.setLocalResources(localResources);
    List<String> commands =
        Arrays.asList(Shell.getRunScriptCommand(scriptFile));
    containerLaunchContext.setCommands(commands);
    StartContainerRequest scRequest =
        StartContainerRequest.newInstance(
            containerLaunchContext,
            createContainerToken(cId, DUMMY_RM_IDENTIFIER, context.getNodeId(),
            user, context.getContainerTokenSecretManager()));
    List<StartContainerRequest> list = new ArrayList<>();
    list.add(scRequest);
    StartContainersRequest allRequests =
        StartContainersRequest.newInstance(list);
    containerManager.startContainers(allRequests);

    int timeoutSecs = 0;
    while (!processStartFile.exists() && timeoutSecs++ < 20) {
      Thread.sleep(1000);
      LOG.info("Waiting for process start-file to be created");
    }
    assertTrue(processStartFile.exists(),
        "ProcessStartFile doesn't exist!");

    // Simulate NodeStatusUpdaterImpl sending CMgrSignalContainersEvent
    SignalContainerRequest signalReq =
        SignalContainerRequest.newInstance(cId, command);
    List<SignalContainerRequest> reqs = new ArrayList<>();
    reqs.add(signalReq);
    containerManager.handle(new CMgrSignalContainersEvent(reqs));

    final ArgumentCaptor<ContainerSignalContext> signalContextCaptor =
        ArgumentCaptor.forClass(ContainerSignalContext.class);
    if (signal.equals(Signal.NULL)) {
      verify(exec, never()).signalContainer(signalContextCaptor.capture());
    } else {
      verify(exec, timeout(10000).atLeastOnce()).signalContainer(signalContextCaptor.capture());
      ContainerSignalContext signalContext = signalContextCaptor.getAllValues().get(0);
      assertEquals(cId, signalContext.getContainer().getContainerId());
      assertEquals(signal, signalContext.getSignal());
    }
  }

  @Test
  public void testStartContainerFailureWithInvalidLocalResource()
      throws Exception {
    containerManager.start();
    LocalResource rsrc_alpha =
        recordFactory.newRecordInstance(LocalResource.class);
    rsrc_alpha.setResource(null);
    rsrc_alpha.setSize(-1);
    rsrc_alpha.setVisibility(LocalResourceVisibility.APPLICATION);
    rsrc_alpha.setType(LocalResourceType.FILE);
    rsrc_alpha.setTimestamp(System.currentTimeMillis());
    Map<String, LocalResource> localResources =
        new HashMap<String, LocalResource>();
    localResources.put("invalid_resource", rsrc_alpha);
    ContainerLaunchContext containerLaunchContext =
        recordFactory.newRecordInstance(ContainerLaunchContext.class);
    ContainerLaunchContext spyContainerLaunchContext =
        spy(containerLaunchContext);
    Mockito.when(spyContainerLaunchContext.getLocalResources())
        .thenReturn(localResources);

    ContainerId cId = createContainerId(0);
    String user = "start_container_fail";
    Token containerToken =
        createContainerToken(cId, DUMMY_RM_IDENTIFIER, context.getNodeId(),
            user, context.getContainerTokenSecretManager());
    StartContainerRequest request = StartContainerRequest
        .newInstance(spyContainerLaunchContext, containerToken);

    // start containers
    List<StartContainerRequest> startRequest =
        new ArrayList<StartContainerRequest>();
    startRequest.add(request);
    StartContainersRequest requestList =
        StartContainersRequest.newInstance(startRequest);

    StartContainersResponse response =
        containerManager.startContainers(requestList);
    assertTrue(response.getFailedRequests().size() == 1);
    assertTrue(response.getSuccessfullyStartedContainers().size() == 0);
    assertTrue(response.getFailedRequests().containsKey(cId));
    assertTrue(response.getFailedRequests().get(cId).getMessage()
        .contains("Null resource URL for local resource"));
  }

  @Test
  public void testStartContainerFailureWithNullTypeLocalResource()
      throws Exception {
    containerManager.start();
    LocalResource rsrc_alpha =
        recordFactory.newRecordInstance(LocalResource.class);
    rsrc_alpha.setResource(URL.fromPath(new Path("./")));
    rsrc_alpha.setSize(-1);
    rsrc_alpha.setVisibility(LocalResourceVisibility.APPLICATION);
    rsrc_alpha.setType(null);
    rsrc_alpha.setTimestamp(System.currentTimeMillis());
    Map<String, LocalResource> localResources =
        new HashMap<String, LocalResource>();
    localResources.put("null_type_resource", rsrc_alpha);
    ContainerLaunchContext containerLaunchContext =
        recordFactory.newRecordInstance(ContainerLaunchContext.class);
    ContainerLaunchContext spyContainerLaunchContext =
        spy(containerLaunchContext);
    Mockito.when(spyContainerLaunchContext.getLocalResources())
        .thenReturn(localResources);

    ContainerId cId = createContainerId(0);
    String user = "start_container_fail";
    Token containerToken =
        createContainerToken(cId, DUMMY_RM_IDENTIFIER, context.getNodeId(),
            user, context.getContainerTokenSecretManager());
    StartContainerRequest request = StartContainerRequest
        .newInstance(spyContainerLaunchContext, containerToken);

    // start containers
    List<StartContainerRequest> startRequest =
        new ArrayList<StartContainerRequest>();
    startRequest.add(request);
    StartContainersRequest requestList =
        StartContainersRequest.newInstance(startRequest);

    StartContainersResponse response =
        containerManager.startContainers(requestList);
    assertTrue(response.getFailedRequests().size() == 1);
    assertTrue(response.getSuccessfullyStartedContainers().size() == 0);
    assertTrue(response.getFailedRequests().containsKey(cId));
    assertTrue(response.getFailedRequests().get(cId).getMessage()
        .contains("Null resource type for local resource"));
  }

  @Test
  public void testStartContainerFailureWithNullVisibilityLocalResource()
      throws Exception {
    containerManager.start();
    LocalResource rsrc_alpha =
        recordFactory.newRecordInstance(LocalResource.class);
    rsrc_alpha.setResource(URL.fromPath(new Path("./")));
    rsrc_alpha.setSize(-1);
    rsrc_alpha.setVisibility(null);
    rsrc_alpha.setType(LocalResourceType.FILE);
    rsrc_alpha.setTimestamp(System.currentTimeMillis());
    Map<String, LocalResource> localResources =
        new HashMap<String, LocalResource>();
    localResources.put("null_visibility_resource", rsrc_alpha);
    ContainerLaunchContext containerLaunchContext =
        recordFactory.newRecordInstance(ContainerLaunchContext.class);
    ContainerLaunchContext spyContainerLaunchContext =
        spy(containerLaunchContext);
    Mockito.when(spyContainerLaunchContext.getLocalResources())
        .thenReturn(localResources);

    ContainerId cId = createContainerId(0);
    String user = "start_container_fail";
    Token containerToken =
        createContainerToken(cId, DUMMY_RM_IDENTIFIER, context.getNodeId(),
            user, context.getContainerTokenSecretManager());
    StartContainerRequest request = StartContainerRequest
        .newInstance(spyContainerLaunchContext, containerToken);

    // start containers
    List<StartContainerRequest> startRequest =
        new ArrayList<StartContainerRequest>();
    startRequest.add(request);
    StartContainersRequest requestList =
        StartContainersRequest.newInstance(startRequest);

    StartContainersResponse response =
        containerManager.startContainers(requestList);
    assertTrue(response.getFailedRequests().size() == 1);
    assertTrue(response.getSuccessfullyStartedContainers().size() == 0);
    assertTrue(response.getFailedRequests().containsKey(cId));
    assertTrue(response.getFailedRequests().get(cId).getMessage()
        .contains("Null resource visibility for local resource"));
  }

  @Test
  public void testGetLocalizationStatuses() throws Exception {
    containerManager.start();
    ContainerId containerId = createContainerId(0, 0);
    Token containerToken =
        createContainerToken(containerId, DUMMY_RM_IDENTIFIER,
            context.getNodeId(),
            user, context.getContainerTokenSecretManager());

    // localization resource
    File scriptFile = Shell.appendScriptExtension(tmpDir, "scriptFile_new");
    PrintWriter fileWriter = new PrintWriter(scriptFile);
    File file1 = new File(tmpDir, "file1.txt").getAbsoluteFile();

    writeScriptFile(fileWriter, "Upgrade World!", file1, containerId, false);

    ContainerLaunchContext containerLaunchContext =
        prepareContainerLaunchContext(scriptFile, "dest_file1", false, 0);

    StartContainerRequest request = StartContainerRequest.newInstance(
        containerLaunchContext, containerToken);
    List<StartContainerRequest> startRequest = new ArrayList<>();
    startRequest.add(request);

    // start container
    StartContainersRequest requestList = StartContainersRequest.newInstance(
        startRequest);
    containerManager.startContainers(requestList);
    Thread.sleep(5000);

    // Get localization statuses
    GetLocalizationStatusesRequest statusRequest =
        GetLocalizationStatusesRequest.newInstance(
            Lists.newArrayList(containerId));

    GetLocalizationStatusesResponse statusResponse =
        containerManager.getLocalizationStatuses(statusRequest);

    assertEquals(1, statusResponse.getLocalizationStatuses()
        .get(containerId).size());
    LocalizationStatus status = statusResponse.getLocalizationStatuses()
        .get(containerId).iterator().next();
    assertEquals("dest_file1",
        status.getResourceKey(), "resource key");
    assertEquals(LocalizationState.COMPLETED,
        status.getLocalizationState(), "resource status");

    assertEquals(0, statusResponse.getFailedRequests().size());

    // stop containers
    StopContainersRequest stopRequest =
        StopContainersRequest.newInstance(Lists.newArrayList(containerId));
    containerManager.stopContainers(stopRequest);
  }

  @Test
  public void testGetLocalizationStatusesMultiContainers() throws Exception {
    containerManager.start();
    ContainerId container1 = createContainerId(0, 0);
    ContainerId container2 = createContainerId(1, 0);

    Token containerToken1 = createContainerToken(container1,
        DUMMY_RM_IDENTIFIER, context.getNodeId(), user,
        context.getContainerTokenSecretManager());
    Token containerToken2 = createContainerToken(container2,
        DUMMY_RM_IDENTIFIER, context.getNodeId(), user,
        context.getContainerTokenSecretManager());

    // localization resource
    File scriptFile = Shell.appendScriptExtension(tmpDir, "scriptFile_new");
    PrintWriter fileWriter = new PrintWriter(scriptFile);
    File file1 = new File(tmpDir, "file1.txt").getAbsoluteFile();

    writeScriptFile(fileWriter, "Upgrade World!", file1, container1, false);

    ContainerLaunchContext containerLaunchContext =
        prepareContainerLaunchContext(scriptFile, "dest_file1", false, 0);

    StartContainerRequest request1 = StartContainerRequest.newInstance(
        containerLaunchContext, containerToken1);
    StartContainerRequest request2 = StartContainerRequest.newInstance(
        containerLaunchContext, containerToken2);

    List<StartContainerRequest> startRequest = new ArrayList<>();
    startRequest.add(request1);
    startRequest.add(request2);

    // start container
    StartContainersRequest requestList = StartContainersRequest.newInstance(
        startRequest);
    containerManager.startContainers(requestList);
    Thread.sleep(5000);

    // Get localization statuses
    GetLocalizationStatusesRequest statusRequest =
        GetLocalizationStatusesRequest.newInstance(
            Lists.newArrayList(container1, container2));

    GetLocalizationStatusesResponse statusResponse =
        containerManager.getLocalizationStatuses(statusRequest);
    assertEquals(2, statusResponse.getLocalizationStatuses().size());

    ContainerId[] containerIds = {container1, container2};
    Arrays.stream(containerIds).forEach(cntnId -> {
      List<LocalizationStatus> statuses = statusResponse
          .getLocalizationStatuses().get(container1);
      assertEquals(1, statuses.size());
      LocalizationStatus status = statuses.get(0);
      assertEquals("dest_file1",
          status.getResourceKey(), "resource key");
      assertEquals(LocalizationState.COMPLETED,
          status.getLocalizationState(), "resource status");
    });

    assertEquals(0, statusResponse.getFailedRequests().size());

    // stop containers
    StopContainersRequest stopRequest =
        StopContainersRequest.newInstance(Lists.newArrayList(container1,
            container2));
    containerManager.stopContainers(stopRequest);
  }
}