TestNodeManagerResync.java

/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.yarn.server.nodemanager;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.io.PrintWriter;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.net.ServerSocketUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.yarn.api.protocolrecords.ContainerUpdateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ContainerUpdateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.security.NMTokenIdentifier;
import org.apache.hadoop.yarn.server.api.ResourceTracker;
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
import org.apache.hadoop.yarn.server.api.records.NodeAction;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.TestContainerManager;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.health.NodeHealthCheckerService;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

public class TestNodeManagerResync {
  static final File basedir =
      new File("target", TestNodeManagerResync.class.getName());
  static final File tmpDir = new File(basedir, "tmpDir");
  static final File logsDir = new File(basedir, "logs");
  static final File remoteLogsDir = new File(basedir, "remotelogs");
  static final File nmLocalDir = new File(basedir, "nm0");
  static final File processStartFile = new File(tmpDir, "start_file.txt")
    .getAbsoluteFile();

  static final RecordFactory recordFactory = RecordFactoryProvider
      .getRecordFactory(null);
  static final String user = "nobody";
  private FileContext localFS;
  private CyclicBarrier syncBarrier;
  private CyclicBarrier updateBarrier;
  private AtomicInteger resyncThreadCount;
  private AtomicBoolean assertionFailedInThread = new AtomicBoolean(false);
  private AtomicBoolean isNMShutdownCalled = new AtomicBoolean(false);
  private final NodeManagerEvent resyncEvent =
      new NodeManagerEvent(NodeManagerEventType.RESYNC);
  private final long DUMMY_RM_IDENTIFIER = 1234;

  protected static final Logger LOG =
       LoggerFactory.getLogger(TestNodeManagerResync.class);

  @BeforeEach
  public void setup() throws UnsupportedFileSystemException {
    localFS = FileContext.getLocalFSFileContext();
    tmpDir.mkdirs();
    logsDir.mkdirs();
    remoteLogsDir.mkdirs();
    nmLocalDir.mkdirs();
    syncBarrier = new CyclicBarrier(2);
    updateBarrier = new CyclicBarrier(2);
    resyncThreadCount = new AtomicInteger(0);
  }

  @AfterEach
  public void tearDown() throws IOException, InterruptedException {
    localFS.delete(new Path(basedir.getPath()), true);
    assertionFailedInThread.set(false);
  }

  @Test
  public void testKillContainersOnResync() throws IOException,
      InterruptedException, YarnException {
    TestNodeManager1 nm = new TestNodeManager1(false);

    testContainerPreservationOnResyncImpl(nm, false);
  }

  @Test
  public void testPreserveContainersOnResyncKeepingContainers() throws
      IOException,
      InterruptedException, YarnException {
    TestNodeManager1 nm = new TestNodeManager1(true);

    testContainerPreservationOnResyncImpl(nm, true);
  }

  protected void testContainerPreservationOnResyncImpl(TestNodeManager1 nm,
      boolean isWorkPreservingRestartEnabled)
      throws IOException, YarnException, InterruptedException {
    int port = ServerSocketUtil.getPort(49153, 10);
    YarnConfiguration conf = createNMConfig(port);
    conf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED,
        isWorkPreservingRestartEnabled);

    try {
      nm.init(conf);
      nm.start();
      ContainerId cId = TestNodeManagerShutdown.createContainerId();
      TestNodeManagerShutdown.startContainer(nm, cId, localFS, tmpDir,
          processStartFile, port);

      nm.setExistingContainerId(cId);
      assertEquals(1, ((TestNodeManager1) nm).getNMRegistrationCount());
      nm.getNMDispatcher().getEventHandler().handle(resyncEvent);
      try {
        syncBarrier.await();
      } catch (BrokenBarrierException e) {
      }
      assertEquals(2, ((TestNodeManager1) nm).getNMRegistrationCount());
      // Only containers should be killed on resync, apps should lie around.
      // That way local resources for apps can be used beyond resync without
      // relocalization
      assertTrue(nm.getNMContext().getApplications()
          .containsKey(cId.getApplicationAttemptId().getApplicationId()));
      assertFalse(assertionFailedInThread.get());
    }
    finally {
      nm.stop();
    }
  }

  @SuppressWarnings("resource")
  @Test
  @Timeout(value = 30)
  public void testNMMultipleResyncEvent()
      throws IOException, InterruptedException {
    TestNodeManager1 nm = new TestNodeManager1(false);
    YarnConfiguration conf = createNMConfig();

    int resyncEventCount = 4;
    try {
      nm.init(conf);
      nm.start();
      assertEquals(1, nm.getNMRegistrationCount());
      for (int i = 0; i < resyncEventCount; i++) {
        nm.getNMDispatcher().getEventHandler().handle(resyncEvent);
      }

      DrainDispatcher dispatcher = (DrainDispatcher) nm.getNMDispatcher();
      dispatcher.await();
      LOG.info("NM dispatcher drained");

      // Wait for the resync thread to finish
      try {
        syncBarrier.await();
      } catch (BrokenBarrierException e) {
      }
      LOG.info("Barrier wait done for the resync thread");

      // Resync should only happen once
      assertEquals(2, nm.getNMRegistrationCount());
      assertFalse(isNMShutdownCalled.get(), "NM shutdown called.");
    } finally {
      nm.stop();
    }
  }

  @SuppressWarnings("resource")
  @Test
  @Timeout(value = 10)
  public void testNMshutdownWhenResyncThrowException() throws IOException,
      InterruptedException, YarnException {
    NodeManager nm = new TestNodeManager3();
    YarnConfiguration conf = createNMConfig();
    try {
      nm.init(conf);
      nm.start();
      assertEquals(1, ((TestNodeManager3) nm).getNMRegistrationCount());
      nm.getNMDispatcher().getEventHandler()
          .handle(new NodeManagerEvent(NodeManagerEventType.RESYNC));

      synchronized (isNMShutdownCalled) {
        while (!isNMShutdownCalled.get()) {
          try {
            isNMShutdownCalled.wait();
          } catch (InterruptedException e) {
          }
        }
      }

      assertTrue(isNMShutdownCalled.get(), "NM shutdown not called.");
    } finally {
      nm.stop();
    }
  }

  @SuppressWarnings("resource")
  @Test
  @Timeout(value = 60)
  public void testContainerResourceIncreaseIsSynchronizedWithRMResync()
      throws IOException, InterruptedException, YarnException {
    NodeManager nm = new TestNodeManager4();
    YarnConfiguration conf = createNMConfig();
    conf.setBoolean(
        YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, true);
    try {
      nm.init(conf);
      nm.start();
      // Start a container and make sure it is in RUNNING state
      ((TestNodeManager4) nm).startContainer();
      // Simulate a container resource increase in a separate thread
      ((TestNodeManager4) nm).updateContainerResource();
      // Simulate RM restart by sending a RESYNC event
      LOG.info("Sending out RESYNC event");
      nm.getNMDispatcher().getEventHandler()
          .handle(new NodeManagerEvent(NodeManagerEventType.RESYNC));
      try {
        syncBarrier.await();
      } catch (BrokenBarrierException e) {
        e.printStackTrace();
      }
      assertFalse(assertionFailedInThread.get());
    } finally {
      nm.stop();
    }
  }

  // This is to test when NM gets the resync response from last heart beat, it
  // should be able to send the already-sent-via-last-heart-beat container
  // statuses again when it re-register with RM.
  @SuppressWarnings("resource")
  @Test
  public void testNMSentContainerStatusOnResync() throws Exception {
    final ContainerStatus testCompleteContainer =
        TestNodeStatusUpdater.createContainerStatus(2, ContainerState.COMPLETE);
    final Container container =
        TestNodeStatusUpdater.getMockContainer(testCompleteContainer);
    NMContainerStatus report =
        createNMContainerStatus(2, ContainerState.COMPLETE);
    when(container.getNMContainerStatus()).thenReturn(report);
    NodeManager nm = new NodeManager() {
      int registerCount = 0;

      @Override
      protected NodeStatusUpdater createNodeStatusUpdater(Context context,
          Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
        return new TestNodeStatusUpdaterResync(context, dispatcher,
          healthChecker, metrics) {
          @Override
          protected ResourceTracker createResourceTracker() {
            return new MockResourceTracker() {
              @Override
              public RegisterNodeManagerResponse registerNodeManager(
                  RegisterNodeManagerRequest request) throws YarnException,
                  IOException {
                if (registerCount == 0) {
                  // first register, no containers info.
                  try {
                    assertEquals(0, request.getNMContainerStatuses()
                      .size());
                  } catch (AssertionError error) {
                    error.printStackTrace();
                    assertionFailedInThread.set(true);
                  }
                  // put the completed container into the context
                  getNMContext().getContainers().put(
                    testCompleteContainer.getContainerId(), container);
                  getNMContext().getApplications().put(
                      testCompleteContainer.getContainerId()
                          .getApplicationAttemptId().getApplicationId(),
                      mock(Application.class));
                } else {
                  // second register contains the completed container info.
                  List<NMContainerStatus> statuses =
                      request.getNMContainerStatuses();
                  try {
                    assertEquals(1, statuses.size());
                    assertEquals(testCompleteContainer.getContainerId(),
                      statuses.get(0).getContainerId());
                  } catch (AssertionError error) {
                    error.printStackTrace();
                    assertionFailedInThread.set(true);
                  }
                }
                registerCount++;
                return super.registerNodeManager(request);
              }

              @Override
              public NodeHeartbeatResponse nodeHeartbeat(
                  NodeHeartbeatRequest request) {
                // first heartBeat contains the completed container info
                List<ContainerStatus> statuses =
                    request.getNodeStatus().getContainersStatuses();
                try {
                  assertEquals(1, statuses.size());
                  assertEquals(testCompleteContainer.getContainerId(),
                    statuses.get(0).getContainerId());
                } catch (AssertionError error) {
                  error.printStackTrace();
                  assertionFailedInThread.set(true);
                }

                // notify RESYNC on first heartbeat.
                return YarnServerBuilderUtils.newNodeHeartbeatResponse(1,
                  NodeAction.RESYNC, null, null, null, null, 1000L);
              }
            };
          }
        };
      }
    };
    YarnConfiguration conf = createNMConfig();
    try {
      nm.init(conf);
      nm.start();

      try {
        syncBarrier.await();
      } catch (BrokenBarrierException e) {
      }
      assertFalse(assertionFailedInThread.get());
    } finally {
      nm.stop();
    }
  }

  // This can be used as a common base class for testing NM resync behavior.
  class TestNodeStatusUpdaterResync extends MockNodeStatusUpdater {
    public TestNodeStatusUpdaterResync(Context context, Dispatcher dispatcher,
        NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) {
      super(context, dispatcher, healthChecker, metrics);
    }
    @Override
    protected void rebootNodeStatusUpdaterAndRegisterWithRM() {
      try {
        // Wait here so as to sync with the main test thread.
        super.rebootNodeStatusUpdaterAndRegisterWithRM();
        syncBarrier.await();
      } catch (InterruptedException e) {
      } catch (BrokenBarrierException e) {
      } catch (AssertionError ae) {
        ae.printStackTrace();
        assertionFailedInThread.set(true);
      }
    }
  }

  private YarnConfiguration createNMConfig(int port) throws IOException {
    YarnConfiguration conf = new YarnConfiguration();
    conf.setInt(YarnConfiguration.NM_PMEM_MB, 5*1024); // 5GB
    conf.set(YarnConfiguration.NM_ADDRESS, "127.0.0.1:" + port);
    conf.set(YarnConfiguration.NM_LOCALIZER_ADDRESS, "127.0.0.1:"
        + ServerSocketUtil.getPort(49155, 10));
    conf.set(YarnConfiguration.NM_WEBAPP_ADDRESS,
        "127.0.0.1:" + ServerSocketUtil
            .getPort(YarnConfiguration.DEFAULT_NM_WEBAPP_PORT, 10));
    conf.set(YarnConfiguration.NM_LOG_DIRS, logsDir.getAbsolutePath());
    conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
      remoteLogsDir.getAbsolutePath());
    conf.set(YarnConfiguration.NM_LOCAL_DIRS, nmLocalDir.getAbsolutePath());
    conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 1);
    return conf;
  }

  private YarnConfiguration createNMConfig() throws IOException {
    return createNMConfig(ServerSocketUtil.getPort(49156, 10));
  }

  class TestNodeManager1 extends NodeManager {

    private int registrationCount = 0;
    private boolean containersShouldBePreserved;
    private ContainerId existingCid;

    public TestNodeManager1(boolean containersShouldBePreserved) {
      this.containersShouldBePreserved = containersShouldBePreserved;
    }

    public void setExistingContainerId(ContainerId cId) {
      existingCid = cId;
    }

    @Override
    protected AsyncDispatcher createNMDispatcher() {
      return new DrainDispatcher();
    }

    @Override
    protected NodeStatusUpdater createNodeStatusUpdater(Context context,
        Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
      return new TestNodeStatusUpdaterImpl1(context, dispatcher,
          healthChecker, metrics);
    }

    public int getNMRegistrationCount() {
      return registrationCount;
    }

    @Override
    protected void shutDown(int exitCode) {
      synchronized (isNMShutdownCalled) {
        isNMShutdownCalled.set(true);
        isNMShutdownCalled.notify();
      }
    }

    class TestNodeStatusUpdaterImpl1 extends MockNodeStatusUpdater {

      public TestNodeStatusUpdaterImpl1(Context context, Dispatcher dispatcher,
          NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) {
        super(context, dispatcher, healthChecker, metrics);
      }

      @Override
      protected void registerWithRM() throws YarnException, IOException {
        super.registerWithRM();
        registrationCount++;
      }

      @Override
      protected void rebootNodeStatusUpdaterAndRegisterWithRM() {
        ConcurrentMap<ContainerId, org.apache.hadoop.yarn.server.nodemanager
        .containermanager.container.Container> containers =
            getNMContext().getContainers();
        if (resyncThreadCount.incrementAndGet() > 1) {
          throw new YarnRuntimeException("Multiple resync thread created!");
        }
        try {
          try {
            if (containersShouldBePreserved) {
              assertFalse(containers.isEmpty());
              assertTrue(containers.containsKey(existingCid));
              ContainerState state = containers.get(existingCid)
                  .cloneAndGetContainerStatus().getState();
              // Wait till RUNNING state...
              int counter = 50;
              while (state != ContainerState.RUNNING && counter > 0) {
                Thread.sleep(100);
                counter--;
              }
              assertEquals(ContainerState.RUNNING,
                  containers.get(existingCid)
                  .cloneAndGetContainerStatus().getState());
            } else {
              // ensure that containers are empty or are completed before
              // restart nodeStatusUpdater
              if (!containers.isEmpty()) {
                assertEquals(ContainerState.COMPLETE,
                    containers.get(existingCid)
                        .cloneAndGetContainerStatus().getState());
              }
            }
            super.rebootNodeStatusUpdaterAndRegisterWithRM();
          }
          catch (AssertionError ae) {
            ae.printStackTrace();
            assertionFailedInThread.set(true);
          }
          finally {
            syncBarrier.await();
          }
        } catch (InterruptedException e) {
        } catch (BrokenBarrierException e) {
        } catch (AssertionError ae) {
          ae.printStackTrace();
          assertionFailedInThread.set(true);
        }
      }
    }
  }

  class TestNodeManager3 extends NodeManager {

    private int registrationCount = 0;

    @Override
    protected NodeStatusUpdater createNodeStatusUpdater(Context context,
        Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
      return new TestNodeStatusUpdaterImpl3(context, dispatcher, healthChecker,
          metrics);
    }

    public int getNMRegistrationCount() {
      return registrationCount;
    }

    @Override
    protected void shutDown(int exitCode) {
      synchronized (isNMShutdownCalled) {
        isNMShutdownCalled.set(true);
        isNMShutdownCalled.notify();
      }
    }

    class TestNodeStatusUpdaterImpl3 extends MockNodeStatusUpdater {

      public TestNodeStatusUpdaterImpl3(Context context, Dispatcher dispatcher,
          NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) {
        super(context, dispatcher, healthChecker, metrics);
      }

      @Override
      protected void registerWithRM() throws YarnException, IOException {
        super.registerWithRM();
        registrationCount++;
        if (registrationCount > 1) {
          throw new YarnRuntimeException("Registration with RM failed.");
        }
      }
    }}

  class TestNodeManager4 extends NodeManager {

    private Thread containerUpdateResourceThread = null;

    @Override
    protected NodeStatusUpdater createNodeStatusUpdater(Context context,
        Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
      return new TestNodeStatusUpdaterImpl4(context, dispatcher,
          healthChecker, metrics);
    }

    @Override
    protected ContainerManagerImpl createContainerManager(Context context,
        ContainerExecutor exec, DeletionService del,
        NodeStatusUpdater nodeStatusUpdater,
        ApplicationACLsManager aclsManager,
        LocalDirsHandlerService dirsHandler) {
      return new ContainerManagerImpl(context, exec, del, nodeStatusUpdater,
          metrics, dirsHandler){

        @Override
        protected void authorizeGetAndStopContainerRequest(
            ContainerId containerId, Container container,
            boolean stopRequest, NMTokenIdentifier identifier,
            String remoteUser)
            throws YarnException {
          // do nothing
        }
        @Override
        protected void authorizeUser(UserGroupInformation remoteUgi,
            NMTokenIdentifier nmTokenIdentifier) {
          // do nothing
        }
        @Override
        protected void authorizeStartAndResourceIncreaseRequest(
            NMTokenIdentifier nmTokenIdentifier,
            ContainerTokenIdentifier containerTokenIdentifier,
            boolean startRequest) throws YarnException {
          try {
            // Sleep 2 seconds to simulate a pro-longed increase action.
            // If during this time a RESYNC event is sent by RM, the
            // resync action should block until the increase action is
            // completed.
            // See testContainerResourceIncreaseIsSynchronizedWithRMResync()
            Thread.sleep(2000);
          } catch (InterruptedException e) {
            e.printStackTrace();
          }
        }
        @Override
        protected void updateNMTokenIdentifier(
            NMTokenIdentifier nmTokenIdentifier)
                throws SecretManager.InvalidToken {
          // Do nothing
        }
        @Override
        public Map<String, ByteBuffer> getAuxServiceMetaData() {
          return new HashMap<>();
        }
        @Override
        protected NMTokenIdentifier selectNMTokenIdentifier(
            UserGroupInformation remoteUgi) {
          return new NMTokenIdentifier();
        }
      };
    }

    // Start a container in NM
    public void startContainer()
        throws IOException, InterruptedException, YarnException {
      LOG.info("Start a container and wait until it is in RUNNING state");
      File scriptFile = Shell.appendScriptExtension(tmpDir, "scriptFile");
      PrintWriter fileWriter = new PrintWriter(scriptFile);
      if (Shell.WINDOWS) {
        fileWriter.println("@ping -n 100 127.0.0.1 >nul");
      } else {
        fileWriter.write("\numask 0");
        fileWriter.write("\nexec sleep 100");
      }
      fileWriter.close();
      ContainerLaunchContext containerLaunchContext =
          recordFactory.newRecordInstance(ContainerLaunchContext.class);
      URL resource_alpha =
          URL.fromPath(localFS
              .makeQualified(new Path(scriptFile.getAbsolutePath())));
      LocalResource rsrc_alpha =
          recordFactory.newRecordInstance(LocalResource.class);
      rsrc_alpha.setResource(resource_alpha);
      rsrc_alpha.setSize(-1);
      rsrc_alpha.setVisibility(LocalResourceVisibility.APPLICATION);
      rsrc_alpha.setType(LocalResourceType.FILE);
      rsrc_alpha.setTimestamp(scriptFile.lastModified());
      String destinationFile = "dest_file";
      Map<String, LocalResource> localResources =
          new HashMap<String, LocalResource>();
      localResources.put(destinationFile, rsrc_alpha);
      containerLaunchContext.setLocalResources(localResources);
      List<String> commands =
          Arrays.asList(Shell.getRunScriptCommand(scriptFile));
      containerLaunchContext.setCommands(commands);
      Resource resource = Resource.newInstance(1024, 1);
      StartContainerRequest scRequest =
          StartContainerRequest.newInstance(
              containerLaunchContext,
              getContainerToken(resource));
      List<StartContainerRequest> list = new ArrayList<StartContainerRequest>();
      list.add(scRequest);
      StartContainersRequest allRequests =
          StartContainersRequest.newInstance(list);
      getContainerManager().startContainers(allRequests);
      // Make sure the container reaches RUNNING state
      ContainerId cId = TestContainerManager.createContainerId(0);
      BaseContainerManagerTest.waitForNMContainerState(
          getContainerManager(), cId,
          org.apache.hadoop.yarn.server.nodemanager.
              containermanager.container.ContainerState.RUNNING);
    }

    // Increase container resource in a thread
    public void updateContainerResource()
        throws InterruptedException {
      LOG.info("Increase a container resource in a separate thread");
      containerUpdateResourceThread = new ContainerUpdateResourceThread();
      containerUpdateResourceThread.start();
    }

    class TestNodeStatusUpdaterImpl4 extends MockNodeStatusUpdater {

      public TestNodeStatusUpdaterImpl4(Context context, Dispatcher dispatcher,
          NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) {
        super(context, dispatcher, healthChecker, metrics);
      }

      @Override
      protected void rebootNodeStatusUpdaterAndRegisterWithRM() {
        try {
          try {
            // Check status before registerWithRM
            List<ContainerId> containerIds = new ArrayList<>();
            ContainerId cId = TestContainerManager.createContainerId(0);
            containerIds.add(cId);
            GetContainerStatusesRequest gcsRequest =
                GetContainerStatusesRequest.newInstance(containerIds);
            ContainerStatus containerStatus = getContainerManager()
                .getContainerStatuses(gcsRequest).getContainerStatuses().get(0);
            assertEquals(Resource.newInstance(1024, 1),
                containerStatus.getCapability());
            updateBarrier.await();
            // Call the actual rebootNodeStatusUpdaterAndRegisterWithRM().
            // This function should be synchronized with
            // updateContainer().
            updateBarrier.await();
            super.rebootNodeStatusUpdaterAndRegisterWithRM();
            // Check status after registerWithRM
            containerStatus = getContainerManager()
                .getContainerStatuses(gcsRequest).getContainerStatuses().get(0);
            assertEquals(Resource.newInstance(4096, 2),
                containerStatus.getCapability());
          } catch (AssertionError ae) {
            ae.printStackTrace();
            assertionFailedInThread.set(true);
          }   finally {
            syncBarrier.await();
          }
        } catch (Exception e) {
          e.printStackTrace();
        }
      }
    }

    class ContainerUpdateResourceThread extends Thread {
      @Override
      public void run() {
        // Construct container resource increase request
        List<Token> increaseTokens = new ArrayList<Token>();
        // Add increase request.
        Resource targetResource = Resource.newInstance(4096, 2);
        try{
          try {
            updateBarrier.await();
            increaseTokens.add(getContainerToken(targetResource, 1));
            ContainerUpdateRequest updateRequest =
                ContainerUpdateRequest.newInstance(increaseTokens);
            ContainerUpdateResponse updateResponse =
                getContainerManager()
                    .updateContainer(updateRequest);
            assertEquals(1,
                updateResponse.getSuccessfullyUpdatedContainers().size());
            assertTrue(updateResponse.getFailedRequests().isEmpty());
          } catch (Exception e) {
            e.printStackTrace();
          } finally {
            updateBarrier.await();
          }
        } catch (Exception e) {
          e.printStackTrace();
        }
      }
    }

    private Token getContainerToken(Resource resource) throws IOException {
      ContainerId cId = TestContainerManager.createContainerId(0);
      return TestContainerManager.createContainerToken(
          cId, DUMMY_RM_IDENTIFIER,
          getNMContext().getNodeId(), user, resource,
          getNMContext().getContainerTokenSecretManager(), null);
    }

    private Token getContainerToken(Resource resource, int version)
        throws IOException {
      ContainerId cId = TestContainerManager.createContainerId(0);
      return TestContainerManager.createContainerToken(
          cId, version, DUMMY_RM_IDENTIFIER,
          getNMContext().getNodeId(), user, resource,
          getNMContext().getContainerTokenSecretManager(), null);
    }
  }

  public static NMContainerStatus createNMContainerStatus(int id,
      ContainerState containerState) {
    ApplicationId applicationId = ApplicationId.newInstance(0, 1);
    ApplicationAttemptId applicationAttemptId =
        ApplicationAttemptId.newInstance(applicationId, 1);
    ContainerId containerId = ContainerId.newContainerId(applicationAttemptId, id);
    NMContainerStatus containerReport =
        NMContainerStatus.newInstance(containerId, 0, containerState,
          Resource.newInstance(1024, 1), "recover container", 0,
          Priority.newInstance(10), 0);
    return containerReport;
  }
}