TestResourceLocalizationService.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyShort;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isA;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.io.File;
import java.io.IOException;
import java.io.NotSerializableException;
import java.io.ObjectInputStream;
import java.lang.reflect.Constructor;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.hadoop.util.Sets;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.FileDeletionMatcher;
import org.apache.hadoop.yarn.server.nodemanager.executor.LocalizerStartContext;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.AbstractFileSystem;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FSError;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Options.ChecksumOpt;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.SerializedException;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
import org.apache.hadoop.yarn.server.nodemanager.api.ResourceLocalizationSpec;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalResourceStatus;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerAction;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerHeartbeatResponse;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerStatus;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.ResourceStatusType;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.impl.pb.LocalResourceStatusPBImpl;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.impl.pb.LocalizerStatusPBImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerResourceFailedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService.LocalizerRunner;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService.LocalizerTracker;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService.PublicLocalizer;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ApplicationLocalizationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerResourceRequestEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceFailedLocalizationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceLocalizedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceReleaseEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRequestEvent;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMMemoryStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatcher;
import org.mockito.Mockito;
import org.mockito.internal.matchers.VarargMatcher;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

public class TestResourceLocalizationService {

  static final Path basedir =
      new Path("target", TestResourceLocalizationService.class.getName());
  static Server mockServer;

  private Configuration conf;
  private AbstractFileSystem spylfs;
  private FileContext lfs;
  private NMContext nmContext;
  private NodeManagerMetrics metrics;
  @BeforeAll
  public static void setupClass() {
    mockServer = mock(Server.class);
    doReturn(new InetSocketAddress(123)).when(mockServer).getListenerAddress();
  }

  @BeforeEach
  public void setup() throws IOException {
    conf = new Configuration();
    spylfs = spy(FileContext.getLocalFSFileContext().getDefaultFileSystem());
    lfs = FileContext.getFileContext(spylfs, conf);

    String logDir = lfs.makeQualified(new Path(basedir, "logdir ")).toString();
    conf.set(YarnConfiguration.NM_LOG_DIRS, logDir);
    nmContext = new NMContext(new NMContainerTokenSecretManager(
      conf), new NMTokenSecretManagerInNM(), null,
      new ApplicationACLsManager(conf), new NMNullStateStoreService(), false,
          conf);
    metrics = mock(NodeManagerMetrics.class);
  }

  @AfterEach
  public void cleanup() throws IOException {
    conf = null;
    try {
      FileUtils.deleteDirectory(new File(basedir.toString()));
    } catch (IOException | IllegalArgumentException e) {
      // ignore
    }
  }
  
  @Test
  public void testLocalizationInit() throws Exception {
    conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "077");
    AsyncDispatcher dispatcher = new AsyncDispatcher();
    dispatcher.init(new Configuration());

    ContainerExecutor exec = mock(ContainerExecutor.class);
    DeletionService delService = spy(new DeletionService(exec));
    delService.init(conf);
    delService.start();

    List<Path> localDirs = new ArrayList<Path>();
    String[] sDirs = new String[4];
    for (int i = 0; i < 4; ++i) {
      localDirs.add(lfs.makeQualified(new Path(basedir, i + "")));
      sDirs[i] = localDirs.get(i).toString();
    }
    conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, sDirs);

    LocalDirsHandlerService diskhandler = new LocalDirsHandlerService();
    diskhandler.init(conf);

    ResourceLocalizationService locService =
      spy(new ResourceLocalizationService(dispatcher, exec, delService,
            diskhandler, nmContext, metrics));
    doReturn(lfs)
      .when(locService).getLocalFileContext(isA(Configuration.class));
    try {
      dispatcher.start();

      // initialize ResourceLocalizationService
      locService.init(conf);

      final FsPermission defaultPerm = new FsPermission((short)0755);

      // verify directory creation
      for (Path p : localDirs) {
        p = new Path((new URI(p.toString())).getPath());
        Path usercache = new Path(p, ContainerLocalizer.USERCACHE);
        verify(spylfs)
          .mkdir(eq(usercache),
              eq(defaultPerm), eq(true));
        Path publicCache = new Path(p, ContainerLocalizer.FILECACHE);
        verify(spylfs)
          .mkdir(eq(publicCache),
              eq(defaultPerm), eq(true));
        Path nmPriv = new Path(p, ResourceLocalizationService.NM_PRIVATE_DIR);
        verify(spylfs).mkdir(eq(nmPriv),
            eq(ResourceLocalizationService.NM_PRIVATE_PERM), eq(true));
      }
    } finally {
      dispatcher.stop();
      delService.stop();
    }
  }

  @Test
  public void testDirectoryCleanupOnNewlyCreatedStateStore()
      throws IOException, URISyntaxException {
    conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "077");
    AsyncDispatcher dispatcher = new AsyncDispatcher();
    dispatcher.init(new Configuration());

    ContainerExecutor exec = mock(ContainerExecutor.class);
    DeletionService delService = spy(new DeletionService(exec));
    delService.init(conf);
    delService.start();

    List<Path> localDirs = new ArrayList<Path>();
    String[] sDirs = new String[4];
    for (int i = 0; i < 4; ++i) {
      localDirs.add(lfs.makeQualified(new Path(basedir, i + "")));
      sDirs[i] = localDirs.get(i).toString();
    }
    conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, sDirs);

    LocalDirsHandlerService diskhandler = new LocalDirsHandlerService();
    diskhandler.init(conf);

    NMStateStoreService nmStateStoreService = mock(NMStateStoreService.class);
    when(nmStateStoreService.canRecover()).thenReturn(true);
    when(nmStateStoreService.isNewlyCreated()).thenReturn(true);

    ResourceLocalizationService locService =
        spy(new ResourceLocalizationService(dispatcher, exec, delService,
            diskhandler, nmContext, metrics));
    doReturn(lfs)
        .when(locService).getLocalFileContext(isA(Configuration.class));
    try {
      dispatcher.start();

      // initialize ResourceLocalizationService
      locService.init(conf);

      final FsPermission defaultPerm = new FsPermission((short)0755);

      // verify directory creation
      for (Path p : localDirs) {
        p = new Path((new URI(p.toString())).getPath());
        Path usercache = new Path(p, ContainerLocalizer.USERCACHE);
        verify(spylfs)
            .rename(eq(usercache), any(Path.class), any());
        verify(spylfs)
            .mkdir(eq(usercache),
                eq(defaultPerm), eq(true));
        Path publicCache = new Path(p, ContainerLocalizer.FILECACHE);
        verify(spylfs)
            .rename(eq(usercache), any(Path.class), any());
        verify(spylfs)
            .mkdir(eq(publicCache),
                eq(defaultPerm), eq(true));
        Path nmPriv = new Path(p, ResourceLocalizationService.NM_PRIVATE_DIR);
        verify(spylfs)
            .rename(eq(usercache), any(Path.class), any());
        verify(spylfs).mkdir(eq(nmPriv),
            eq(ResourceLocalizationService.NM_PRIVATE_PERM), eq(true));
      }
    } finally {
      dispatcher.stop();
      delService.stop();
    }
  }

  @Test
  @SuppressWarnings("unchecked") // mocked generics
  public void testResourceRelease() throws Exception {
    List<Path> localDirs = new ArrayList<Path>();
    String[] sDirs = new String[4];
    for (int i = 0; i < 4; ++i) {
      localDirs.add(lfs.makeQualified(new Path(basedir, i + "")));
      sDirs[i] = localDirs.get(i).toString();
    }
    conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, sDirs);

    LocalizerTracker mockLocallilzerTracker = mock(LocalizerTracker.class);
    DrainDispatcher dispatcher = new DrainDispatcher();
    dispatcher.init(conf);
    dispatcher.start();
    EventHandler<ApplicationEvent> applicationBus = mock(EventHandler.class);
    dispatcher.register(ApplicationEventType.class, applicationBus);
    EventHandler<ContainerEvent> containerBus = mock(EventHandler.class);
    dispatcher.register(ContainerEventType.class, containerBus);
    //Ignore actual localization
    EventHandler<LocalizerEvent> localizerBus = mock(EventHandler.class);
    dispatcher.register(LocalizerEventType.class, localizerBus);

    ContainerExecutor exec = mock(ContainerExecutor.class);
    LocalDirsHandlerService dirsHandler = new LocalDirsHandlerService();
    dirsHandler.init(conf);

    DeletionService delService = new DeletionService(exec);
    delService.init(new Configuration());
    delService.start();

    ResourceLocalizationService rawService =
      new ResourceLocalizationService(dispatcher, exec, delService,
            dirsHandler, nmContext, metrics);
    ResourceLocalizationService spyService = spy(rawService);
    doReturn(mockServer).when(spyService).createServer();
    doReturn(mockLocallilzerTracker).when(spyService).createLocalizerTracker(
        isA(Configuration.class));
    doReturn(lfs).when(spyService)
        .getLocalFileContext(isA(Configuration.class));
    try {
      spyService.init(conf);
      spyService.start();

      final String user = "user0";
      // init application
      final Application app = mock(Application.class);
      final ApplicationId appId =
          BuilderUtils.newApplicationId(314159265358979L, 3);
      when(app.getUser()).thenReturn(user);
      when(app.getAppId()).thenReturn(appId);
      spyService.handle(new ApplicationLocalizationEvent(
          LocalizationEventType.INIT_APPLICATION_RESOURCES, app));
      dispatcher.await();
            
      //Get a handle on the trackers after they're setup with INIT_APP_RESOURCES
      LocalResourcesTracker appTracker =
          spyService.getLocalResourcesTracker(
              LocalResourceVisibility.APPLICATION, user, appId);
      LocalResourcesTracker privTracker =
          spyService.getLocalResourcesTracker(LocalResourceVisibility.PRIVATE,
              user, appId);
      LocalResourcesTracker pubTracker =
          spyService.getLocalResourcesTracker(LocalResourceVisibility.PUBLIC,
              user, appId);

      // init container.
      final Container c = getMockContainer(appId, 42, user);
      
      // init resources
      Random r = new Random();
      long seed = r.nextLong();
      System.out.println("SEED: " + seed);
      r.setSeed(seed);
      
      // Send localization requests for one resource of each type.
      final LocalResource privResource = getPrivateMockedResource(r);
      final LocalResourceRequest privReq =
          new LocalResourceRequest(privResource);
      
      final LocalResource pubResource = getPublicMockedResource(r);
      final LocalResourceRequest pubReq = new LocalResourceRequest(pubResource);
      final LocalResource pubResource2 = getPublicMockedResource(r);
      final LocalResourceRequest pubReq2 =
          new LocalResourceRequest(pubResource2);
      
      final LocalResource appResource = getAppMockedResource(r);
      final LocalResourceRequest appReq = new LocalResourceRequest(appResource);
      
      Map<LocalResourceVisibility, Collection<LocalResourceRequest>> req =
          new HashMap<LocalResourceVisibility, 
                      Collection<LocalResourceRequest>>();
      req.put(LocalResourceVisibility.PRIVATE,
          Collections.singletonList(privReq));
      req.put(LocalResourceVisibility.PUBLIC,
          Collections.singletonList(pubReq));
      req.put(LocalResourceVisibility.APPLICATION,
          Collections.singletonList(appReq));
      
      Map<LocalResourceVisibility, Collection<LocalResourceRequest>> req2 =
        new HashMap<LocalResourceVisibility, 
                    Collection<LocalResourceRequest>>();
      req2.put(LocalResourceVisibility.PRIVATE,
          Collections.singletonList(privReq));
      req2.put(LocalResourceVisibility.PUBLIC,
          Collections.singletonList(pubReq2));
      
      Set<LocalResourceRequest> pubRsrcs = new HashSet<LocalResourceRequest>();
      pubRsrcs.add(pubReq);
      pubRsrcs.add(pubReq2);
      
      // Send Request event
      spyService.handle(new ContainerLocalizationRequestEvent(c, req));
      spyService.handle(new ContainerLocalizationRequestEvent(c, req2));
      dispatcher.await();

      int privRsrcCount = 0;
      for (LocalizedResource lr : privTracker) {
        privRsrcCount++;
        assertEquals(2, lr.getRefCount(), "Incorrect reference count");
        assertEquals(privReq, lr.getRequest());
      }
      assertEquals(1, privRsrcCount);

      int pubRsrcCount = 0;
      for (LocalizedResource lr : pubTracker) {
        pubRsrcCount++;
        assertEquals(1, lr.getRefCount(), "Incorrect reference count");
        pubRsrcs.remove(lr.getRequest());
      }
      assertEquals(0, pubRsrcs.size());
      assertEquals(2, pubRsrcCount);

      int appRsrcCount = 0;
      for (LocalizedResource lr : appTracker) {
        appRsrcCount++;
        assertEquals(1, lr.getRefCount(), "Incorrect reference count");
        assertEquals(appReq, lr.getRequest());
      }
      assertEquals(1, appRsrcCount);
      
      //Send Cleanup Event
      spyService.handle(new ContainerLocalizationCleanupEvent(c, req));
      verify(mockLocallilzerTracker)
        .cleanupPrivLocalizers("container_314159265358979_0003_01_000042");
      req2.remove(LocalResourceVisibility.PRIVATE);
      spyService.handle(new ContainerLocalizationCleanupEvent(c, req2));
      dispatcher.await();
      
      pubRsrcs.add(pubReq);
      pubRsrcs.add(pubReq2);

      privRsrcCount = 0;
      for (LocalizedResource lr : privTracker) {
        privRsrcCount++;
        assertEquals(1, lr.getRefCount(), "Incorrect reference count");
        assertEquals(privReq, lr.getRequest());
      }
      assertEquals(1, privRsrcCount);

      pubRsrcCount = 0;
      for (LocalizedResource lr : pubTracker) {
        pubRsrcCount++;
        assertEquals(0, lr.getRefCount(), "Incorrect reference count");
        pubRsrcs.remove(lr.getRequest());
      }
      assertEquals(0, pubRsrcs.size());
      assertEquals(2, pubRsrcCount);

      appRsrcCount = 0;
      for (LocalizedResource lr : appTracker) {
        appRsrcCount++;
      }
      assertEquals(0, appRsrcCount);
    } finally {
      dispatcher.stop();
      delService.stop();
    }
  }
  
  @Test
  @SuppressWarnings("unchecked") // mocked generics
  public void testRecovery() throws Exception {
    final String user1 = "user1";
    final String user2 = "user2";
    final ApplicationId appId1 = ApplicationId.newInstance(1, 1);
    final ApplicationId appId2 = ApplicationId.newInstance(1, 2);

    List<Path> localDirs = new ArrayList<Path>();
    String[] sDirs = new String[4];
    for (int i = 0; i < 4; ++i) {
      localDirs.add(lfs.makeQualified(new Path(basedir, i + "")));
      sDirs[i] = localDirs.get(i).toString();
    }
    conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, sDirs);
    conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, true);

    NMMemoryStateStoreService stateStore = new NMMemoryStateStoreService();
    stateStore.init(conf);
    stateStore.start();
    DrainDispatcher dispatcher = new DrainDispatcher();
    dispatcher.init(conf);
    dispatcher.start();
    EventHandler<ApplicationEvent> applicationBus = mock(EventHandler.class);
    dispatcher.register(ApplicationEventType.class, applicationBus);
    EventHandler<ContainerEvent> containerBus = mock(EventHandler.class);
    dispatcher.register(ContainerEventType.class, containerBus);
    //Ignore actual localization
    EventHandler<LocalizerEvent> localizerBus = mock(EventHandler.class);
    dispatcher.register(LocalizerEventType.class, localizerBus);

    LocalDirsHandlerService dirsHandler = new LocalDirsHandlerService();
    dirsHandler.init(conf);

    ResourceLocalizationService spyService =
        createSpyService(dispatcher, dirsHandler, stateStore);
    try {
      spyService.init(conf);
      spyService.start();

      final Application app1 = mock(Application.class);
      when(app1.getUser()).thenReturn(user1);
      when(app1.getAppId()).thenReturn(appId1);
      final Application app2 = mock(Application.class);
      when(app2.getUser()).thenReturn(user2);
      when(app2.getAppId()).thenReturn(appId2);
      spyService.handle(new ApplicationLocalizationEvent(
          LocalizationEventType.INIT_APPLICATION_RESOURCES, app1));
      spyService.handle(new ApplicationLocalizationEvent(
          LocalizationEventType.INIT_APPLICATION_RESOURCES, app2));
      dispatcher.await();

      //Get a handle on the trackers after they're setup with INIT_APP_RESOURCES
      LocalResourcesTracker appTracker1 =
          spyService.getLocalResourcesTracker(
              LocalResourceVisibility.APPLICATION, user1, appId1);
      LocalResourcesTracker privTracker1 =
          spyService.getLocalResourcesTracker(LocalResourceVisibility.PRIVATE,
              user1, null);
      LocalResourcesTracker appTracker2 =
          spyService.getLocalResourcesTracker(
              LocalResourceVisibility.APPLICATION, user2, appId2);
      LocalResourcesTracker pubTracker =
          spyService.getLocalResourcesTracker(LocalResourceVisibility.PUBLIC,
              null, null);

      // init containers
      final Container c1 = getMockContainer(appId1, 1, user1);
      final Container c2 = getMockContainer(appId2, 2, user2);

      // init resources
      Random r = new Random();
      long seed = r.nextLong();
      System.out.println("SEED: " + seed);
      r.setSeed(seed);

      // Send localization requests of each type.
      final LocalResource privResource1 = getPrivateMockedResource(r);
      final LocalResourceRequest privReq1 =
          new LocalResourceRequest(privResource1);
      final LocalResource privResource2 = getPrivateMockedResource(r);
      final LocalResourceRequest privReq2 =
          new LocalResourceRequest(privResource2);

      final LocalResource pubResource1 = getPublicMockedResource(r);
      final LocalResourceRequest pubReq1 =
          new LocalResourceRequest(pubResource1);
      final LocalResource pubResource2 = getPublicMockedResource(r);
      final LocalResourceRequest pubReq2 =
          new LocalResourceRequest(pubResource2);

      final LocalResource appResource1 = getAppMockedResource(r);
      final LocalResourceRequest appReq1 =
          new LocalResourceRequest(appResource1);
      final LocalResource appResource2 = getAppMockedResource(r);
      final LocalResourceRequest appReq2 =
          new LocalResourceRequest(appResource2);
      final LocalResource appResource3 = getAppMockedResource(r);
      final LocalResourceRequest appReq3 =
          new LocalResourceRequest(appResource3);

      Map<LocalResourceVisibility, Collection<LocalResourceRequest>> req1 =
          new HashMap<LocalResourceVisibility,
                      Collection<LocalResourceRequest>>();
      req1.put(LocalResourceVisibility.PRIVATE,
          Arrays.asList(new LocalResourceRequest[] { privReq1, privReq2 }));
      req1.put(LocalResourceVisibility.PUBLIC,
          Collections.singletonList(pubReq1));
      req1.put(LocalResourceVisibility.APPLICATION,
          Collections.singletonList(appReq1));

      Map<LocalResourceVisibility, Collection<LocalResourceRequest>> req2 =
        new HashMap<LocalResourceVisibility,
                    Collection<LocalResourceRequest>>();
      req2.put(LocalResourceVisibility.APPLICATION,
          Arrays.asList(new LocalResourceRequest[] { appReq2, appReq3 }));
      req2.put(LocalResourceVisibility.PUBLIC,
          Collections.singletonList(pubReq2));

      // Send Request event
      spyService.handle(new ContainerLocalizationRequestEvent(c1, req1));
      spyService.handle(new ContainerLocalizationRequestEvent(c2, req2));
      dispatcher.await();

      // Simulate start of localization for all resources
      privTracker1.getPathForLocalization(privReq1,
          dirsHandler.getLocalPathForWrite(
              ContainerLocalizer.USERCACHE + user1), null);
      privTracker1.getPathForLocalization(privReq2,
          dirsHandler.getLocalPathForWrite(
              ContainerLocalizer.USERCACHE + user1), null);
      LocalizedResource privLr1 = privTracker1.getLocalizedResource(privReq1);
      LocalizedResource privLr2 = privTracker1.getLocalizedResource(privReq2);
      appTracker1.getPathForLocalization(appReq1,
          dirsHandler.getLocalPathForWrite(
              ContainerLocalizer.APPCACHE + appId1), null);
      LocalizedResource appLr1 = appTracker1.getLocalizedResource(appReq1);
      appTracker2.getPathForLocalization(appReq2,
          dirsHandler.getLocalPathForWrite(
              ContainerLocalizer.APPCACHE + appId2), null);
      LocalizedResource appLr2 = appTracker2.getLocalizedResource(appReq2);
      appTracker2.getPathForLocalization(appReq3,
          dirsHandler.getLocalPathForWrite(
              ContainerLocalizer.APPCACHE + appId2), null);
      LocalizedResource appLr3 = appTracker2.getLocalizedResource(appReq3);
      pubTracker.getPathForLocalization(pubReq1,
          dirsHandler.getLocalPathForWrite(ContainerLocalizer.FILECACHE),
          null);
      LocalizedResource pubLr1 = pubTracker.getLocalizedResource(pubReq1);
      pubTracker.getPathForLocalization(pubReq2,
          dirsHandler.getLocalPathForWrite(ContainerLocalizer.FILECACHE),
          null);
      LocalizedResource pubLr2 = pubTracker.getLocalizedResource(pubReq2);

      // Simulate completion of localization for most resources with
      // possibly different sizes than in the request
      assertNotNull(privLr1.getLocalPath(), "Localization not started");
      privTracker1.handle(new ResourceLocalizedEvent(privReq1,
          privLr1.getLocalPath(), privLr1.getSize() + 5));
      assertNotNull(privLr2.getLocalPath(), "Localization not started");
      privTracker1.handle(new ResourceLocalizedEvent(privReq2,
          privLr2.getLocalPath(), privLr2.getSize() + 10));
      assertNotNull(appLr1.getLocalPath(), "Localization not started");
      appTracker1.handle(new ResourceLocalizedEvent(appReq1,
          appLr1.getLocalPath(), appLr1.getSize()));
      assertNotNull(appLr3.getLocalPath(), "Localization not started");
      appTracker2.handle(new ResourceLocalizedEvent(appReq3,
          appLr3.getLocalPath(), appLr3.getSize() + 7));
      assertNotNull(pubLr1.getLocalPath(), "Localization not started");
      pubTracker.handle(new ResourceLocalizedEvent(pubReq1,
          pubLr1.getLocalPath(), pubLr1.getSize() + 1000));
      assertNotNull(pubLr2.getLocalPath(), "Localization not started");
      pubTracker.handle(new ResourceLocalizedEvent(pubReq2,
          pubLr2.getLocalPath(), pubLr2.getSize() + 99999));

      dispatcher.await();
      assertEquals(ResourceState.LOCALIZED, privLr1.getState());
      assertEquals(ResourceState.LOCALIZED, privLr2.getState());
      assertEquals(ResourceState.LOCALIZED, appLr1.getState());
      assertEquals(ResourceState.DOWNLOADING, appLr2.getState());
      assertEquals(ResourceState.LOCALIZED, appLr3.getState());
      assertEquals(ResourceState.LOCALIZED, pubLr1.getState());
      assertEquals(ResourceState.LOCALIZED, pubLr2.getState());

      // restart and recover
      spyService = createSpyService(dispatcher, dirsHandler, stateStore);
      spyService.init(conf);
      spyService.recoverLocalizedResources(
          stateStore.loadLocalizationState());
      dispatcher.await();

      appTracker1 = spyService.getLocalResourcesTracker(
              LocalResourceVisibility.APPLICATION, user1, appId1);
      privTracker1 = spyService.getLocalResourcesTracker(
          LocalResourceVisibility.PRIVATE, user1, null);
      appTracker2 = spyService.getLocalResourcesTracker(
              LocalResourceVisibility.APPLICATION, user2, appId2);
      pubTracker = spyService.getLocalResourcesTracker(
          LocalResourceVisibility.PUBLIC, null, null);

      LocalizedResource recoveredRsrc =
          privTracker1.getLocalizedResource(privReq1);
      assertEquals(privReq1, recoveredRsrc.getRequest());
      assertEquals(privLr1.getLocalPath(), recoveredRsrc.getLocalPath());
      assertEquals(privLr1.getSize(), recoveredRsrc.getSize());
      assertEquals(ResourceState.LOCALIZED, recoveredRsrc.getState());
      recoveredRsrc = privTracker1.getLocalizedResource(privReq2);
      assertEquals(privReq2, recoveredRsrc.getRequest());
      assertEquals(privLr2.getLocalPath(), recoveredRsrc.getLocalPath());
      assertEquals(privLr2.getSize(), recoveredRsrc.getSize());
      assertEquals(ResourceState.LOCALIZED, recoveredRsrc.getState());
      recoveredRsrc = appTracker1.getLocalizedResource(appReq1);
      assertEquals(appReq1, recoveredRsrc.getRequest());
      assertEquals(appLr1.getLocalPath(), recoveredRsrc.getLocalPath());
      assertEquals(appLr1.getSize(), recoveredRsrc.getSize());
      assertEquals(ResourceState.LOCALIZED, recoveredRsrc.getState());
      recoveredRsrc = appTracker2.getLocalizedResource(appReq2);
      assertNull(recoveredRsrc, "in-progress resource should not be present");
      recoveredRsrc = appTracker2.getLocalizedResource(appReq3);
      assertEquals(appReq3, recoveredRsrc.getRequest());
      assertEquals(appLr3.getLocalPath(), recoveredRsrc.getLocalPath());
      assertEquals(appLr3.getSize(), recoveredRsrc.getSize());
      assertEquals(ResourceState.LOCALIZED, recoveredRsrc.getState());
    } finally {
      dispatcher.stop();
      stateStore.close();
    }
  }
  

  @Test
  @Timeout(value = 10)
  @SuppressWarnings("unchecked") // mocked generics
  public void testLocalizerRunnerException() throws Exception {
    DrainDispatcher dispatcher = new DrainDispatcher();
    dispatcher.init(conf);
    dispatcher.start();
    EventHandler<ApplicationEvent> applicationBus = mock(EventHandler.class);
    dispatcher.register(ApplicationEventType.class, applicationBus);
    EventHandler<ContainerEvent> containerBus = mock(EventHandler.class);
    dispatcher.register(ContainerEventType.class, containerBus);

    ContainerExecutor exec = mock(ContainerExecutor.class);
    LocalDirsHandlerService dirsHandler = new LocalDirsHandlerService();
    LocalDirsHandlerService dirsHandlerSpy = spy(dirsHandler);
    dirsHandlerSpy.init(conf);

    DeletionService delServiceReal = new DeletionService(exec);
    DeletionService delService = spy(delServiceReal);
    delService.init(new Configuration());
    delService.start();

    ResourceLocalizationService rawService =
        new ResourceLocalizationService(dispatcher, exec, delService,
            dirsHandlerSpy, nmContext, metrics);
    ResourceLocalizationService spyService = spy(rawService);
    doReturn(mockServer).when(spyService).createServer();
    try {
      spyService.init(conf);
      spyService.start();

      // init application
      final Application app = mock(Application.class);
      final ApplicationId appId =
          BuilderUtils.newApplicationId(314159265358979L, 3);
      when(app.getUser()).thenReturn("user0");
      when(app.getAppId()).thenReturn(appId);
      spyService.handle(new ApplicationLocalizationEvent(
          LocalizationEventType.INIT_APPLICATION_RESOURCES, app));
      dispatcher.await();

      Random r = new Random();
      long seed = r.nextLong();
      System.out.println("SEED: " + seed);
      r.setSeed(seed);
      final Container c = getMockContainer(appId, 42, "user0");
      final LocalResource resource1 = getPrivateMockedResource(r);
      System.out.println("Here 4");
      
      final LocalResourceRequest req1 = new LocalResourceRequest(resource1);
      Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrcs =
        new HashMap<LocalResourceVisibility, 
                    Collection<LocalResourceRequest>>();
      List<LocalResourceRequest> privateResourceList =
          new ArrayList<LocalResourceRequest>();
      privateResourceList.add(req1);
      rsrcs.put(LocalResourceVisibility.PRIVATE, privateResourceList);

      final Constructor<?>[] constructors =
          FSError.class.getDeclaredConstructors();
      constructors[0].setAccessible(true);
      FSError fsError =
          (FSError) constructors[0].newInstance(new IOException("Disk Error"));

      Mockito
        .doThrow(fsError)
        .when(dirsHandlerSpy)
        .getLocalPathForWrite(isA(String.class));
      spyService.handle(new ContainerLocalizationRequestEvent(c, rsrcs));
      Thread.sleep(1000);
      dispatcher.await();
      // Verify if ContainerResourceFailedEvent is invoked on FSError
      verify(containerBus).handle(isA(ContainerResourceFailedEvent.class));
    } finally {
      spyService.stop();
      dispatcher.stop();
      delService.stop();
    }
  }

  @Test
  @Timeout(value = 10)
  @SuppressWarnings({"unchecked", "methodlength"}) // mocked generics
  public void testLocalizationHeartbeat() throws Exception {
    List<Path> localDirs = new ArrayList<Path>();
    String[] sDirs = new String[1];
    // Making sure that we have only one local disk so that it will only be
    // selected for consecutive resource localization calls.  This is required
    // to test LocalCacheDirectoryManager.
    localDirs.add(lfs.makeQualified(new Path(basedir, 0 + "")));
    sDirs[0] = localDirs.get(0).toString();

    conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, sDirs);
    // Adding configuration to make sure there is only one file per
    // directory
    conf.set(YarnConfiguration.NM_LOCAL_CACHE_MAX_FILES_PER_DIRECTORY, "37");
    DrainDispatcher dispatcher = new DrainDispatcher();
    dispatcher.init(conf);
    dispatcher.start();
    EventHandler<ApplicationEvent> applicationBus = mock(EventHandler.class);
    dispatcher.register(ApplicationEventType.class, applicationBus);
    EventHandler<ContainerEvent> containerBus = mock(EventHandler.class);
    dispatcher.register(ContainerEventType.class, containerBus);

    ContainerExecutor exec = mock(ContainerExecutor.class);
    LocalDirsHandlerService dirsHandler = new LocalDirsHandlerService();
    dirsHandler.init(conf);

    DeletionService delServiceReal = new DeletionService(exec);
    DeletionService delService = spy(delServiceReal);
    delService.init(new Configuration());
    delService.start();

    ResourceLocalizationService rawService =
      new ResourceLocalizationService(dispatcher, exec, delService,
            dirsHandler, nmContext, metrics);
    ResourceLocalizationService spyService = spy(rawService);
    doReturn(mockServer).when(spyService).createServer();
    doReturn(lfs).when(spyService).getLocalFileContext(isA(Configuration.class));
    FsPermission defaultPermission =
        FsPermission.getDirDefault().applyUMask(lfs.getUMask());
    FsPermission nmPermission =
        ResourceLocalizationService.NM_PRIVATE_PERM.applyUMask(lfs.getUMask());
    final Path userDir =
        new Path(sDirs[0].substring("file:".length()),
          ContainerLocalizer.USERCACHE);
    final Path fileDir =
        new Path(sDirs[0].substring("file:".length()),
          ContainerLocalizer.FILECACHE);
    final Path sysDir =
        new Path(sDirs[0].substring("file:".length()),
          ResourceLocalizationService.NM_PRIVATE_DIR);
    final FileStatus fs =
        new FileStatus(0, true, 1, 0, System.currentTimeMillis(), 0,
          defaultPermission, "", "", new Path(sDirs[0]));
    final FileStatus nmFs =
        new FileStatus(0, true, 1, 0, System.currentTimeMillis(), 0,
          nmPermission, "", "", sysDir);

    doAnswer(new Answer<FileStatus>() {
      @Override
      public FileStatus answer(InvocationOnMock invocation) throws Throwable {
        Object[] args = invocation.getArguments();
        if (args.length > 0) {
          if (args[0].equals(userDir) || args[0].equals(fileDir)) {
            return fs;
          }
        }
        return nmFs;
      }
    }).when(spylfs).getFileStatus(isA(Path.class));

    try {
      spyService.init(conf);
      spyService.start();

      // init application
      final Application app = mock(Application.class);
      final ApplicationId appId =
          BuilderUtils.newApplicationId(314159265358979L, 3);
      when(app.getUser()).thenReturn("user0");
      when(app.getAppId()).thenReturn(appId);
      spyService.handle(new ApplicationLocalizationEvent(
          LocalizationEventType.INIT_APPLICATION_RESOURCES, app));
      ArgumentMatcher<ApplicationEvent> matchesAppInit =
          evt -> evt.getType() == ApplicationEventType.APPLICATION_INITED
              && appId == evt.getApplicationID();
      dispatcher.await();
      verify(applicationBus).handle(argThat(matchesAppInit));

      // init container rsrc, localizer
      Random r = new Random();
      long seed = r.nextLong();
      System.out.println("SEED: " + seed);
      r.setSeed(seed);
      final Container c = getMockContainer(appId, 42, "user0");
      FSDataOutputStream out =
        new FSDataOutputStream(new DataOutputBuffer(), null);
      doReturn(out).when(spylfs).createInternal(isA(Path.class),
          isA(EnumSet.class), isA(FsPermission.class), anyInt(), anyShort(),
          anyLong(), isA(Progressable.class), isA(ChecksumOpt.class), anyBoolean());
      final LocalResource resource1 = getPrivateMockedResource(r);
      LocalResource resource2 = null;
      do {
        resource2 = getPrivateMockedResource(r);
      } while (resource2 == null || resource2.equals(resource1));
      LocalResource resource3 = null;
      do {
        resource3 = getPrivateMockedResource(r);
      } while (resource3 == null || resource3.equals(resource1)
          || resource3.equals(resource2));
      // above call to make sure we don't get identical resources.
      
      final LocalResourceRequest req1 = new LocalResourceRequest(resource1);
      final LocalResourceRequest req2 = new LocalResourceRequest(resource2);
      final LocalResourceRequest req3 = new LocalResourceRequest(resource3);
      Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrcs =
        new HashMap<LocalResourceVisibility, 
                    Collection<LocalResourceRequest>>();
      List<LocalResourceRequest> privateResourceList =
          new ArrayList<LocalResourceRequest>();
      privateResourceList.add(req1);
      privateResourceList.add(req2);
      privateResourceList.add(req3);
      rsrcs.put(LocalResourceVisibility.PRIVATE, privateResourceList);
      spyService.handle(new ContainerLocalizationRequestEvent(c, rsrcs));
      // Sigh. Thread init of private localizer not accessible
      Thread.sleep(1000);
      dispatcher.await();
      String appStr = appId.toString();
      String ctnrStr = c.getContainerId().toString();
      ArgumentCaptor<LocalizerStartContext> contextCaptor = ArgumentCaptor
          .forClass(LocalizerStartContext.class);
      verify(exec).startLocalizer(contextCaptor.capture());

      LocalizerStartContext context = contextCaptor.getValue();
      Path localizationTokenPath = context.getNmPrivateContainerTokens();

      assertEquals("user0", context.getUser());
      assertEquals(appStr, context.getAppId());
      assertEquals(ctnrStr, context.getLocId());

      // heartbeat from localizer
      LocalResourceStatus rsrc1success = mock(LocalResourceStatus.class);
      LocalResourceStatus rsrc2pending = mock(LocalResourceStatus.class);
      LocalResourceStatus rsrc2success = mock(LocalResourceStatus.class);
      LocalResourceStatus rsrc3success = mock(LocalResourceStatus.class);
      LocalizerStatus stat = mock(LocalizerStatus.class);
      when(stat.getLocalizerId()).thenReturn(ctnrStr);
      when(rsrc1success.getResource()).thenReturn(resource1);
      when(rsrc2pending.getResource()).thenReturn(resource2);
      when(rsrc2success.getResource()).thenReturn(resource2);
      when(rsrc3success.getResource()).thenReturn(resource3);
      when(rsrc1success.getLocalSize()).thenReturn(4344L);
      when(rsrc2success.getLocalSize()).thenReturn(2342L);
      when(rsrc3success.getLocalSize()).thenReturn(5345L);
      URL locPath = getPath("/cache/private/blah");
      when(rsrc1success.getLocalPath()).thenReturn(locPath);
      when(rsrc2success.getLocalPath()).thenReturn(locPath);
      when(rsrc3success.getLocalPath()).thenReturn(locPath);
      when(rsrc1success.getStatus()).thenReturn(ResourceStatusType.FETCH_SUCCESS);
      when(rsrc2pending.getStatus()).thenReturn(ResourceStatusType.FETCH_PENDING);
      when(rsrc2success.getStatus()).thenReturn(ResourceStatusType.FETCH_SUCCESS);
      when(rsrc3success.getStatus()).thenReturn(ResourceStatusType.FETCH_SUCCESS);

      // Four heartbeats with sending:
      // 1 - empty
      // 2 - resource1 FETCH_SUCCESS
      // 3 - resource2 FETCH_PENDING
      // 4 - resource2 FETCH_SUCCESS, resource3 FETCH_SUCCESS
      List<LocalResourceStatus> rsrcs4 = new ArrayList<LocalResourceStatus>();
      rsrcs4.add(rsrc2success);
      rsrcs4.add(rsrc3success);
      when(stat.getResources())
        .thenReturn(Collections.<LocalResourceStatus>emptyList())
        .thenReturn(Collections.singletonList(rsrc1success))
        .thenReturn(Collections.singletonList(rsrc2pending))
        .thenReturn(rsrcs4)
        .thenReturn(Collections.<LocalResourceStatus>emptyList());

      String localPath = Path.SEPARATOR + ContainerLocalizer.USERCACHE +
          Path.SEPARATOR + "user0" + Path.SEPARATOR +
          ContainerLocalizer.FILECACHE;

      // First heartbeat
      LocalizerHeartbeatResponse response = spyService.heartbeat(stat);
      assertEquals(LocalizerAction.LIVE, response.getLocalizerAction());
      assertEquals(1, response.getResourceSpecs().size());
      assertEquals(req1,
        new LocalResourceRequest(response.getResourceSpecs().get(0).getResource()));
      URL localizedPath =
          response.getResourceSpecs().get(0).getDestinationDirectory();
      // Appending to local path unique number(10) generated as a part of
      // LocalResourcesTracker
      assertTrue(localizedPath.getFile().endsWith(
        localPath + Path.SEPARATOR + "10"));

      // Second heartbeat
      response = spyService.heartbeat(stat);
      assertEquals(LocalizerAction.LIVE, response.getLocalizerAction());
      assertEquals(1, response.getResourceSpecs().size());
      assertEquals(req2, new LocalResourceRequest(response.getResourceSpecs()
        .get(0).getResource()));
      localizedPath =
          response.getResourceSpecs().get(0).getDestinationDirectory();
      // Resource's destination path should be now inside sub directory 0 as
      // LocalCacheDirectoryManager will be used and we have restricted number
      // of files per directory to 1.
      assertTrue(localizedPath.getFile().endsWith(
        localPath + Path.SEPARATOR + "0" + Path.SEPARATOR + "11"));

      // Third heartbeat
      response = spyService.heartbeat(stat);
      assertEquals(LocalizerAction.LIVE, response.getLocalizerAction());
      assertEquals(1, response.getResourceSpecs().size());
      assertEquals(req3, new LocalResourceRequest(response.getResourceSpecs()
          .get(0).getResource()));
      localizedPath =
          response.getResourceSpecs().get(0).getDestinationDirectory();
      assertTrue(localizedPath.getFile().endsWith(
          localPath + Path.SEPARATOR + "1" + Path.SEPARATOR + "12"));

      response = spyService.heartbeat(stat);
      assertEquals(LocalizerAction.LIVE, response.getLocalizerAction());

      spyService.handle(new ContainerLocalizationEvent(
          LocalizationEventType.CONTAINER_RESOURCES_LOCALIZED, c));

      // get shutdown after receive CONTAINER_RESOURCES_LOCALIZED event
      response = spyService.heartbeat(stat);
      assertEquals(LocalizerAction.DIE, response.getLocalizerAction());

      dispatcher.await();
      // verify container notification
      ArgumentMatcher<ContainerEvent> matchesContainerLoc =
          evt -> evt.getType() == ContainerEventType.RESOURCE_LOCALIZED
              && c.getContainerId() == evt.getContainerID();
      // total 3 resource localzation calls. one for each resource.
      verify(containerBus, times(3)).handle(argThat(matchesContainerLoc));
        
      // Verify deletion of localization token.
      verify(delService, times(1)).delete(argThat(new FileDeletionMatcher(
          delService, null, localizationTokenPath, null)));
    } finally {
      spyService.stop();
      dispatcher.stop();
      delService.stop();
    }
  }

  private static class DownloadingPathsMatcher implements
      ArgumentMatcher<Path[]>, VarargMatcher {
    static final long serialVersionUID = 0;

    private transient Set<Path> matchPaths;

    DownloadingPathsMatcher(Set<Path> matchPaths) {
      this.matchPaths = matchPaths;
    }

    @Override
    public boolean matches(Path[] downloadingPaths) {
      if (matchPaths.size() != downloadingPaths.length) {
        return false;
      }
      for (Path downloadingPath : downloadingPaths) {
        if (!matchPaths.contains(downloadingPath)) {
          return false;
        }
      }
      return true;
    }

    private void readObject(ObjectInputStream os) throws NotSerializableException {
      throw new NotSerializableException(this.getClass().getName());
    }
  }

  private static class DummyExecutor extends DefaultContainerExecutor {
    private volatile boolean stopLocalization = false;
    private AtomicInteger numLocalizers = new AtomicInteger(0);
    @Override
    public void startLocalizer(LocalizerStartContext ctx)
        throws IOException, InterruptedException {
      numLocalizers.incrementAndGet();
      while (!stopLocalization) {
        Thread.yield();
      }
    }
    private void waitForLocalizers(int num) {
      while (numLocalizers.intValue() < num) {
        Thread.yield();
      }
    }
    private void yieldForLocalizers(int num) {
      for (int i = 0; i < num; i++) {
        try {
          Thread.sleep(100);
        } catch (InterruptedException e) {
          continue;
        }
      }
    }
    private void setStopLocalization() {
      stopLocalization = true;
    }
    private int getNumLocalizers() {
      return numLocalizers.get();
    }
  }

  @Test
  @Timeout(value = 20)
  public void testDownloadingResourcesOnContainerKill() throws Exception {
    List<Path> localDirs = new ArrayList<Path>();
    String[] sDirs = new String[1];
    localDirs.add(lfs.makeQualified(new Path(basedir, 0 + "")));
    sDirs[0] = localDirs.get(0).toString();

    conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, sDirs);

    DummyExecutor exec = new DummyExecutor();
    LocalDirsHandlerService dirsHandler = new LocalDirsHandlerService();
    dirsHandler.init(conf);

    DeletionService delServiceReal = new DeletionService(exec);
    DeletionService delService = spy(delServiceReal);
    delService.init(new Configuration());
    delService.start();

    DrainDispatcher dispatcher = getDispatcher(conf);
    ResourceLocalizationService rawService = new ResourceLocalizationService(
        dispatcher, exec, delService, dirsHandler, nmContext, metrics);

    ResourceLocalizationService spyService = spy(rawService);
    doReturn(mockServer).when(spyService).createServer();
    doReturn(lfs).when(spyService).
        getLocalFileContext(isA(Configuration.class));
    FsPermission defaultPermission =
        FsPermission.getDirDefault().applyUMask(lfs.getUMask());
    FsPermission nmPermission =
        ResourceLocalizationService.NM_PRIVATE_PERM.applyUMask(lfs.getUMask());
    final Path userDir =
        new Path(sDirs[0].substring("file:".length()),
          ContainerLocalizer.USERCACHE);
    final Path fileDir =
        new Path(sDirs[0].substring("file:".length()),
          ContainerLocalizer.FILECACHE);
    final Path sysDir =
        new Path(sDirs[0].substring("file:".length()),
          ResourceLocalizationService.NM_PRIVATE_DIR);
    final FileStatus fs =
        new FileStatus(0, true, 1, 0, System.currentTimeMillis(), 0,
          defaultPermission, "", "", new Path(sDirs[0]));
    final FileStatus nmFs =
        new FileStatus(0, true, 1, 0, System.currentTimeMillis(), 0,
          nmPermission, "", "", sysDir);

    doAnswer(new Answer<FileStatus>() {
      @Override
      public FileStatus answer(InvocationOnMock invocation) throws Throwable {
        Object[] args = invocation.getArguments();
        if (args.length > 0) {
          if (args[0].equals(userDir) || args[0].equals(fileDir)) {
            return fs;
          }
        }
        return nmFs;
      }
    }).when(spylfs).getFileStatus(isA(Path.class));

    try {
      spyService.init(conf);
      spyService.start();

      doLocalization(spyService, dispatcher, exec, delService);

    } finally {
      spyService.stop();
      dispatcher.stop();
      delService.stop();
    }
  }

  @Test
  public void testResourceLocalizationReqsAfterContainerKill()
      throws Exception {
    List<Path> localDirs = new ArrayList<Path>();
    String[] sDirs = new String[1];
    localDirs.add(lfs.makeQualified(new Path(basedir, 0 + "")));
    sDirs[0] = localDirs.get(0).toString();

    conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, sDirs);

    DummyExecutor exec = new DummyExecutor();
    LocalDirsHandlerService dirsHandler = new LocalDirsHandlerService();
    dirsHandler.init(conf);

    DeletionService delServiceReal = new DeletionService(exec);
    DeletionService delService = spy(delServiceReal);
    delService.init(new Configuration());
    delService.start();

    DrainDispatcher dispatcher = getDispatcher(conf);
    ResourceLocalizationService rawService = new ResourceLocalizationService(
        dispatcher, exec, delService, dirsHandler, nmContext, metrics);

    ResourceLocalizationService spyService = spy(rawService);
    doReturn(mockServer).when(spyService).createServer();
    doReturn(lfs).when(spyService).getLocalFileContext(isA(Configuration.class));
    FsPermission defaultPermission =
        FsPermission.getDirDefault().applyUMask(lfs.getUMask());
    FsPermission nmPermission =
        ResourceLocalizationService.NM_PRIVATE_PERM.applyUMask(lfs.getUMask());
    final Path userDir =
        new Path(sDirs[0].substring("file:".length()),
            ContainerLocalizer.USERCACHE);
    final Path fileDir =
        new Path(sDirs[0].substring("file:".length()),
            ContainerLocalizer.FILECACHE);
    final Path sysDir =
        new Path(sDirs[0].substring("file:".length()),
            ResourceLocalizationService.NM_PRIVATE_DIR);
    final FileStatus fs =
        new FileStatus(0, true, 1, 0, System.currentTimeMillis(), 0,
            defaultPermission, "", "", new Path(sDirs[0]));
    final FileStatus nmFs =
        new FileStatus(0, true, 1, 0, System.currentTimeMillis(), 0,
            nmPermission, "", "", sysDir);

    doAnswer(new Answer<FileStatus>() {
      @Override
      public FileStatus answer(InvocationOnMock invocation) throws Throwable {
        Object[] args = invocation.getArguments();
        if (args.length > 0) {
          if (args[0].equals(userDir) || args[0].equals(fileDir)) {
            return fs;
          }
        }
        return nmFs;
      }
    }).when(spylfs).getFileStatus(isA(Path.class));

    try {
      spyService.init(conf);
      spyService.start();

      doLocalizationAfterCleanup(spyService, dispatcher, exec, delService);

    } finally {
      spyService.stop();
      dispatcher.stop();
      delService.stop();
    }
  }

  private DrainDispatcher getDispatcher(Configuration config) {
    DrainDispatcher dispatcher = new DrainDispatcher();
    dispatcher.init(config);
    dispatcher.start();
    return dispatcher;
  }

  @SuppressWarnings("unchecked")
  private EventHandler<ApplicationEvent> getApplicationBus(
      DrainDispatcher dispatcher) {
    EventHandler<ApplicationEvent> applicationBus = mock(EventHandler.class);
    dispatcher.register(ApplicationEventType.class, applicationBus);
    return applicationBus;
  }

  @SuppressWarnings("unchecked")
  private EventHandler<ContainerEvent> getContainerBus(
      DrainDispatcher dispatcher) {
    EventHandler<ContainerEvent> containerBus = mock(EventHandler.class);
    dispatcher.register(ContainerEventType.class, containerBus);
    return containerBus;
  }

  private void initApp(ResourceLocalizationService spyService,
      EventHandler<ApplicationEvent> applicationBus, Application app,
      ApplicationId appId, DrainDispatcher dispatcher) {
    spyService.handle(new ApplicationLocalizationEvent(
        LocalizationEventType.INIT_APPLICATION_RESOURCES, app));
    ArgumentMatcher<ApplicationEvent> matchesAppInit =
        evt -> evt.getType() == ApplicationEventType.APPLICATION_INITED
                && appId == evt.getApplicationID();
    dispatcher.await();
    verify(applicationBus).handle(argThat(matchesAppInit));
  }

  private void doLocalization(ResourceLocalizationService spyService,
      DrainDispatcher dispatcher, DummyExecutor exec,
      DeletionService delService)
      throws IOException, URISyntaxException, InterruptedException {
    final Application app = mock(Application.class);
    final ApplicationId appId =
        BuilderUtils.newApplicationId(314159265358979L, 3);
    String user = "user0";
    when(app.getUser()).thenReturn(user);
    when(app.getAppId()).thenReturn(appId);
    List<LocalResource> resources = initializeLocalizer(appId);
    LocalResource resource1 = resources.get(0);
    LocalResource resource2 = resources.get(1);
    LocalResource resource3 = resources.get(2);
    final Container c1 = getMockContainer(appId, 42, "user0");
    final Container c2 = getMockContainer(appId, 43, "user0");

    EventHandler<ApplicationEvent> applicationBus =
        getApplicationBus(dispatcher);
    EventHandler<ContainerEvent> containerBus = getContainerBus(dispatcher);
    initApp(spyService, applicationBus, app, appId, dispatcher);

    // Send localization requests for container c1 and c2.
    final LocalResourceRequest req1 = new LocalResourceRequest(resource1);
    final LocalResourceRequest req2 = new LocalResourceRequest(resource2);
    final LocalResourceRequest req3 = new LocalResourceRequest(resource3);
    Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrcs =
        new HashMap<LocalResourceVisibility,
            Collection<LocalResourceRequest>>();
    List<LocalResourceRequest> privateResourceList =
        new ArrayList<LocalResourceRequest>();
    privateResourceList.add(req1);
    privateResourceList.add(req2);
    privateResourceList.add(req3);
    rsrcs.put(LocalResourceVisibility.PRIVATE, privateResourceList);
    spyService.handle(new ContainerLocalizationRequestEvent(c1, rsrcs));

    final LocalResourceRequest req11 = new LocalResourceRequest(resource2);
    Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrcs1 =
        new HashMap<LocalResourceVisibility,
            Collection<LocalResourceRequest>>();
    List<LocalResourceRequest> privateResourceList1 =
        new ArrayList<LocalResourceRequest>();
    privateResourceList1.add(req11);
    rsrcs1.put(LocalResourceVisibility.PRIVATE, privateResourceList1);
    spyService.handle(new ContainerLocalizationRequestEvent(c2, rsrcs1));

    dispatcher.await();
    // Wait for localizers of both container c1 and c2 to begin.
    exec.waitForLocalizers(2);
    LocalizerRunner locC1 =
        spyService.getLocalizerRunner(c1.getContainerId().toString());

    LocalizerStatus stat = mockLocalizerStatus(c1, resource1, resource2);

    // First heartbeat which schedules first resource.
    LocalizerHeartbeatResponse response = spyService.heartbeat(stat);
    assertEquals(LocalizerAction.LIVE, response.getLocalizerAction());

    // Second heartbeat which reports first resource as success.
    // Second resource is scheduled.
    response = spyService.heartbeat(stat);
    assertEquals(LocalizerAction.LIVE, response.getLocalizerAction());
    final String locPath1 =
        response.getResourceSpecs().get(0).getDestinationDirectory().getFile();

    // Third heartbeat which reports second resource as pending.
    // Third resource is scheduled.
    response = spyService.heartbeat(stat);
    assertEquals(LocalizerAction.LIVE, response.getLocalizerAction());
    final String locPath2 =
        response.getResourceSpecs().get(0).getDestinationDirectory().getFile();

    // Container c1 is killed which leads to cleanup
    spyService.handle(new ContainerLocalizationCleanupEvent(c1, rsrcs));

    // This heartbeat will indicate to container localizer to die as localizer
    // runner has stopped.
    response = spyService.heartbeat(stat);
    assertEquals(LocalizerAction.DIE, response.getLocalizerAction());

    exec.setStopLocalization();
    dispatcher.await();
    // verify container notification
    ArgumentMatcher<ContainerEvent> successContainerLoc =
        evt -> evt.getType() == ContainerEventType.RESOURCE_LOCALIZED
            && c1.getContainerId() == evt.getContainerID();
    // Only one resource gets localized for container c1.
    verify(containerBus).handle(argThat(successContainerLoc));

    Set<Path> paths =
        Sets.newHashSet(new Path(locPath1), new Path(locPath1 + "_tmp"),
            new Path(locPath2), new Path(locPath2 + "_tmp"));
    // Wait for localizer runner thread for container c1 to finish.
    while (locC1.getState() != Thread.State.TERMINATED) {
      Thread.sleep(50);
    }
    // Verify if downloading resources were submitted for deletion.
    verify(delService, times(2)).delete(argThat(new FileDeletionMatcher(
        delService, user, null, new ArrayList<>(paths))));

    LocalResourcesTracker tracker = spyService.getLocalResourcesTracker(
        LocalResourceVisibility.PRIVATE, "user0", appId);
    // Container c1 was killed but this resource was localized before kill
    // hence its not removed despite ref cnt being 0.
    LocalizedResource rsrc1 = tracker.getLocalizedResource(req1);
    assertNotNull(rsrc1);
    assertThat(rsrc1.getState()).isEqualTo(ResourceState.LOCALIZED);
    assertThat(rsrc1.getRefCount()).isEqualTo(0);

    // Container c1 was killed but this resource is referenced by container c2
    // as well hence its ref cnt is 1.
    LocalizedResource rsrc2 = tracker.getLocalizedResource(req2);
    assertNotNull(rsrc2);
    assertThat(rsrc2.getState()).isEqualTo(ResourceState.DOWNLOADING);
    assertThat(rsrc2.getRefCount()).isEqualTo(1);

    // As container c1 was killed and this resource was not referenced by any
    // other container, hence its removed.
    LocalizedResource rsrc3 = tracker.getLocalizedResource(req3);
    assertNull(rsrc3);
  }

  private void doLocalizationAfterCleanup(
      ResourceLocalizationService spyService,
      DrainDispatcher dispatcher, DummyExecutor exec,
      DeletionService delService)
      throws IOException, URISyntaxException, InterruptedException {
    final Application app = mock(Application.class);
    final ApplicationId appId =
        BuilderUtils.newApplicationId(314159265358979L, 3);
    String user = "user0";
    when(app.getUser()).thenReturn(user);
    when(app.getAppId()).thenReturn(appId);
    List<LocalResource> resources = initializeLocalizer(appId);
    LocalResource resource1 = resources.get(0);
    LocalResource resource2 = resources.get(1);
    LocalResource resource3 = resources.get(2);
    final Container c1 = getMockContainer(appId, 42, "user0");
    final Container c2 = getMockContainer(appId, 43, "user0");

    EventHandler<ApplicationEvent> applicationBus =
        getApplicationBus(dispatcher);
    EventHandler<ContainerEvent> containerBus = getContainerBus(dispatcher);
    initApp(spyService, applicationBus, app, appId, dispatcher);

    // Send localization requests for container c1 and c2.
    final LocalResourceRequest req1 = new LocalResourceRequest(resource1);
    final LocalResourceRequest req2 = new LocalResourceRequest(resource2);
    final LocalResourceRequest req3 = new LocalResourceRequest(resource3);
    Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrcs =
        new HashMap<LocalResourceVisibility,
            Collection<LocalResourceRequest>>();
    List<LocalResourceRequest> privateResourceList =
        new ArrayList<LocalResourceRequest>();
    rsrcs.put(LocalResourceVisibility.PRIVATE, privateResourceList);

    // Start Localization without any resources (so we can simulate the
    // resource requests being delayed until after cleanup.
    spyService.handle(new ContainerLocalizationRequestEvent(c1, rsrcs));
    dispatcher.await();

    // Kill c1 which leads to cleanup
    spyService.handle(new ContainerLocalizationCleanupEvent(c1, rsrcs));
    dispatcher.await();

    // Now we will send the resource requests and releases directly to tracker
    privateResourceList.add(req1);
    privateResourceList.add(req2);
    privateResourceList.add(req3);

    rsrcs.put(LocalResourceVisibility.PRIVATE, privateResourceList);
    LocalizerContext locCtx =
        new LocalizerContext(user, c1.getContainerId(), c1.getCredentials());
    LocalResourcesTracker tracker =
        spyService.getLocalResourcesTracker(LocalResourceVisibility.PRIVATE,
            user, null);
    for (LocalResourceRequest req : privateResourceList) {
      tracker.handle(
          new ResourceRequestEvent(req, LocalResourceVisibility.PRIVATE,
              locCtx));
    }
    dispatcher.await();
    for (LocalResourceRequest req : privateResourceList) {
      tracker.handle(
          new ResourceReleaseEvent(req, c1.getContainerId()));
    }
    dispatcher.await();

    // Now start a second container with the same list of resources
    spyService.handle(new ContainerLocalizationRequestEvent(c2, rsrcs));
    dispatcher.await();

    // Wait for localizers to begin (should only be one for container2)
    exec.yieldForLocalizers(2);
    assertThat(exec.getNumLocalizers()).isEqualTo(1);

    LocalizerRunner locC2 =
        spyService.getLocalizerRunner(c2.getContainerId().toString());
    LocalizerStatus stat = mockLocalizerStatus(c2, resource1, resource2);

    // First heartbeat which schedules first resource.
    LocalizerHeartbeatResponse response = spyService.heartbeat(stat);
    assertEquals(LocalizerAction.LIVE, response.getLocalizerAction());

    // Second heartbeat which reports first resource as success.
    // Second resource is scheduled.
    response = spyService.heartbeat(stat);
    assertEquals(LocalizerAction.LIVE, response.getLocalizerAction());
    final String locPath1 =
        response.getResourceSpecs().get(0).getDestinationDirectory().getFile();

    // Third heartbeat which reports second resource as pending.
    // Third resource is scheduled.
    response = spyService.heartbeat(stat);
    assertEquals(LocalizerAction.LIVE, response.getLocalizerAction());
    final String locPath2 =
        response.getResourceSpecs().get(0).getDestinationDirectory().getFile();

    // Container c2 is killed which leads to cleanup
    spyService.handle(new ContainerLocalizationCleanupEvent(c2, rsrcs));

    // This heartbeat will indicate to container localizer to die as localizer
    // runner has stopped.
    response = spyService.heartbeat(stat);
    assertEquals(LocalizerAction.DIE, response.getLocalizerAction());

    exec.setStopLocalization();
    dispatcher.await();

    // verify container notification
    ArgumentMatcher<ContainerEvent> successContainerLoc =
        evt -> evt.getType() == ContainerEventType.RESOURCE_LOCALIZED
            && c2.getContainerId() == evt.getContainerID();
    // Only one resource gets localized for container c2.
    verify(containerBus).handle(argThat(successContainerLoc));

    Set<Path> paths =
        Sets.newHashSet(new Path(locPath1), new Path(locPath1 + "_tmp"),
            new Path(locPath2), new Path(locPath2 + "_tmp"));
    // Wait for localizer runner thread for container c1 to finish.
    while (locC2.getState() != Thread.State.TERMINATED) {
      Thread.sleep(50);
    }
    // Verify if downloading resources were submitted for deletion.
    verify(delService, times(3)).delete(argThat(new FileDeletionMatcher(
        delService, user, null, new ArrayList<>(paths))));

    // Container c2 was killed but this resource was localized before kill
    // hence its not removed despite ref cnt being 0.
    LocalizedResource rsrc1 = tracker.getLocalizedResource(req1);
    assertNotNull(rsrc1);
    assertThat(rsrc1.getState()).isEqualTo(ResourceState.LOCALIZED);
    assertThat(rsrc1.getRefCount()).isEqualTo(0);

    // Container c1 and c2 were killed before this finished downloading
    // these should no longer be there.
    LocalizedResource rsrc2 = tracker.getLocalizedResource(req2);
    assertNull(rsrc2);
    LocalizedResource rsrc3 = tracker.getLocalizedResource(req3);
    assertNull(rsrc3);

    // Double-check that we never created a Localizer for C1
    assertThat(exec.getNumLocalizers()).isEqualTo(1);
  }

  private LocalizerStatus mockLocalizerStatus(Container c1,
      LocalResource resource1, LocalResource resource2) {
    final String containerIdStr = c1.getContainerId().toString();
    // Heartbeats from container localizer
    LocalResourceStatus rsrc1success = mock(LocalResourceStatus.class);
    LocalResourceStatus rsrc2pending = mock(LocalResourceStatus.class);
    LocalizerStatus stat = mock(LocalizerStatus.class);
    when(stat.getLocalizerId()).thenReturn(containerIdStr);
    when(rsrc1success.getResource()).thenReturn(resource1);
    when(rsrc2pending.getResource()).thenReturn(resource2);
    when(rsrc1success.getLocalSize()).thenReturn(4344L);
    URL locPath = getPath("/some/path");
    when(rsrc1success.getLocalPath()).thenReturn(locPath);
    when(rsrc1success.getStatus()).thenReturn(ResourceStatusType.FETCH_SUCCESS);
    when(rsrc2pending.getStatus()).thenReturn(ResourceStatusType.FETCH_PENDING);

    when(stat.getResources())
        .thenReturn(Collections.<LocalResourceStatus> emptyList())
        .thenReturn(Collections.singletonList(rsrc1success))
        .thenReturn(Collections.singletonList(rsrc2pending))
        .thenReturn(Collections.singletonList(rsrc2pending))
        .thenReturn(Collections.<LocalResourceStatus> emptyList());
    return stat;
  }

  @SuppressWarnings("unchecked")
  private List<LocalResource> initializeLocalizer(ApplicationId appId)
      throws IOException {
    // Initialize localizer.
    Random r = new Random();
    long seed = r.nextLong();
    System.out.println("SEED: " + seed);
    r.setSeed(seed);
    FSDataOutputStream out =
        new FSDataOutputStream(new DataOutputBuffer(), null);
    doReturn(out).when(spylfs).createInternal(isA(Path.class),
        isA(EnumSet.class), isA(FsPermission.class), anyInt(), anyShort(),
        anyLong(), isA(Progressable.class), isA(ChecksumOpt.class),
        anyBoolean());
    final LocalResource resource1 = getPrivateMockedResource(r);
    LocalResource resource2 = null;
    do {
      resource2 = getPrivateMockedResource(r);
    } while (resource2 == null || resource2.equals(resource1));
    LocalResource resource3 = null;
    do {
      resource3 = getPrivateMockedResource(r);
    } while (resource3 == null || resource3.equals(resource1)
        || resource3.equals(resource2));
    return Arrays.asList(resource1, resource2, resource3);
  }

  @Test
  @SuppressWarnings("unchecked")
  public void testPublicResourceInitializesLocalDir() throws Exception {

    // Setup state to simulate restart NM with existing state meaning no
    // directory creation during initialization
    NMStateStoreService spyStateStore = spy(nmContext.getNMStateStore());
    when(spyStateStore.canRecover()).thenReturn(true);
    NMContext spyContext = spy(nmContext);
    when(spyContext.getNMStateStore()).thenReturn(spyStateStore);

    List<Path> localDirs = new ArrayList<Path>();
    String[] sDirs = new String[4];
    for (int i = 0; i < 4; ++i) {
      localDirs.add(lfs.makeQualified(new Path(basedir, i + "")));
      sDirs[i] = localDirs.get(i).toString();
    }
    conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, sDirs);


    DrainDispatcher dispatcher = new DrainDispatcher();
    EventHandler<ApplicationEvent> applicationBus = mock(EventHandler.class);
    dispatcher.register(ApplicationEventType.class, applicationBus);
    EventHandler<ContainerEvent> containerBus = mock(EventHandler.class);
    dispatcher.register(ContainerEventType.class, containerBus);

    ContainerExecutor exec = mock(ContainerExecutor.class);
    DeletionService delService = mock(DeletionService.class);
    LocalDirsHandlerService dirsHandler = new LocalDirsHandlerService();
    dirsHandler.init(conf);

    dispatcher.init(conf);
    dispatcher.start();

    try {
      ResourceLocalizationService rawService =
          new ResourceLocalizationService(dispatcher, exec, delService,
              dirsHandler, spyContext, metrics);
      ResourceLocalizationService spyService = spy(rawService);
      doReturn(mockServer).when(spyService).createServer();
      doReturn(lfs).when(spyService).getLocalFileContext(
          isA(Configuration.class));

      spyService.init(conf);

      final FsPermission defaultPerm = new FsPermission((short)0755);

      // verify directory is not created at initialization
      for (Path p : localDirs) {
        p = new Path((new URI(p.toString())).getPath());
        Path publicCache = new Path(p, ContainerLocalizer.FILECACHE);
        verify(spylfs, never())
            .mkdir(eq(publicCache),eq(defaultPerm), eq(true));
      }

      spyService.start();

      final String user = "user0";
      // init application
      final Application app = mock(Application.class);
      final ApplicationId appId =
          BuilderUtils.newApplicationId(314159265358979L, 3);
      when(app.getUser()).thenReturn(user);
      when(app.getAppId()).thenReturn(appId);
      spyService.handle(new ApplicationLocalizationEvent(
          LocalizationEventType.INIT_APPLICATION_RESOURCES, app));
      dispatcher.await();

      // init container.
      final Container c = getMockContainer(appId, 42, user);

      // init resources
      Random r = new Random();
      long seed = r.nextLong();
      System.out.println("SEED: " + seed);
      r.setSeed(seed);

      // Queue up public resource localization
      final LocalResource pubResource1 = getPublicMockedResource(r);
      final LocalResourceRequest pubReq1 =
          new LocalResourceRequest(pubResource1);

      LocalResource pubResource2 = null;
      do {
        pubResource2 = getPublicMockedResource(r);
      } while (pubResource2 == null || pubResource2.equals(pubResource1));
      // above call to make sure we don't get identical resources.
      final LocalResourceRequest pubReq2 =
          new LocalResourceRequest(pubResource2);

      Set<LocalResourceRequest> pubRsrcs = new HashSet<LocalResourceRequest>();
      pubRsrcs.add(pubReq1);
      pubRsrcs.add(pubReq2);

      Map<LocalResourceVisibility, Collection<LocalResourceRequest>> req =
          new HashMap<LocalResourceVisibility,
              Collection<LocalResourceRequest>>();
      req.put(LocalResourceVisibility.PUBLIC, pubRsrcs);

      spyService.handle(new ContainerLocalizationRequestEvent(c, req));
      dispatcher.await();

      verify(spyService, times(1)).checkAndInitializeLocalDirs();

      // verify directory creation
      for (Path p : localDirs) {
        p = new Path((new URI(p.toString())).getPath());
        Path publicCache = new Path(p, ContainerLocalizer.FILECACHE);
        verify(spylfs).mkdir(eq(publicCache),eq(defaultPerm), eq(true));
      }
    } finally {
      dispatcher.stop();
    }
  }

  @Test
  @SuppressWarnings("unchecked")
  public void testPublicCacheDirPermission() throws Exception {

    // Setup state to simulate restart NM with existing state meaning no
    // directory creation during initialization
    NMStateStoreService spyStateStore = spy(nmContext.getNMStateStore());
    when(spyStateStore.canRecover()).thenReturn(true);
    NMContext spyContext = spy(nmContext);
    when(spyContext.getNMStateStore()).thenReturn(spyStateStore);

    Path localDir = new Path("target", "testPublicCacheDirPermission");
    String sDir = lfs.makeQualified(localDir).toString();

    conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, sDir);
    conf.setInt(YarnConfiguration.NM_LOCAL_CACHE_MAX_FILES_PER_DIRECTORY, 38);

    DrainDispatcher dispatcher = new DrainDispatcher();
    EventHandler<ApplicationEvent> applicationBus = mock(EventHandler.class);
    dispatcher.register(ApplicationEventType.class, applicationBus);
    EventHandler<ContainerEvent> containerBus = mock(EventHandler.class);
    dispatcher.register(ContainerEventType.class, containerBus);

    ContainerExecutor exec = mock(ContainerExecutor.class);
    DeletionService delService = mock(DeletionService.class);
    LocalDirsHandlerService dirsHandler = new LocalDirsHandlerService();
    dirsHandler.init(conf);

    dispatcher.init(conf);
    dispatcher.start();

    try {
      ResourceLocalizationService rawService = new ResourceLocalizationService(
          dispatcher, exec, delService, dirsHandler, spyContext, null);
      ResourceLocalizationService spyService = spy(rawService);
      doReturn(mockServer).when(spyService).createServer();
      doReturn(lfs).when(spyService)
          .getLocalFileContext(isA(Configuration.class));

      spyService.init(conf);
      spyService.start();

      final FsPermission expectedPerm = new FsPermission((short) 0755);
      Path publicCache = new Path(localDir, ContainerLocalizer.FILECACHE);
      FsPermission wrongPerm = new FsPermission((short) 0700);
      Path overflowFolder = new Path(publicCache, "0");
      lfs.mkdir(overflowFolder, wrongPerm, false);

      spyService.lfs.setUMask(new FsPermission((short) 0777));

      final String user = "user0";
      // init application
      final Application app = mock(Application.class);
      final ApplicationId appId = BuilderUtils
          .newApplicationId(314159265358979L, 3);
      when(app.getUser()).thenReturn(user);
      when(app.getAppId()).thenReturn(appId);
      spyService.handle(new ApplicationLocalizationEvent(
          LocalizationEventType.INIT_APPLICATION_RESOURCES, app));
      dispatcher.await();

      // init container.
      final Container c = getMockContainer(appId, 42, user);

      // init resources
      Random r = new Random();
      long seed = r.nextLong();
      System.out.println("SEED: " + seed);
      r.setSeed(seed);

      Set<LocalResourceRequest> pubRsrcs = new HashSet<LocalResourceRequest>();
      for (int i = 0; i < 3; i++) {
        LocalResource pubResource = getPublicMockedResource(r, true, conf,
            sDir);
        LocalResourceRequest pubReq = new LocalResourceRequest(pubResource);
        pubRsrcs.add(pubReq);
      }

      Map<LocalResourceVisibility, Collection<LocalResourceRequest>> req =
          new HashMap<LocalResourceVisibility,
              Collection<LocalResourceRequest>>();
      req.put(LocalResourceVisibility.PUBLIC, pubRsrcs);

      spyService.handle(new ContainerLocalizationRequestEvent(c, req));
      dispatcher.await();

      // verify directory creation

      assertEquals(expectedPerm, lfs.getFileStatus(overflowFolder).getPermission(),
          "Cache directory permissions filecache/0 is incorrect");

    } finally {
      dispatcher.stop();
    }
  }

  @Test
  @Timeout(value = 20)
  @SuppressWarnings("unchecked")
  public void testLocalizerHeartbeatWhenAppCleaningUp() throws Exception {
    conf.set(YarnConfiguration.NM_LOCAL_DIRS,
        lfs.makeQualified(new Path(basedir, 0 + "")).toString());
    // Start dispatcher.
    DrainDispatcher dispatcher = new DrainDispatcher();
    dispatcher.init(conf);
    dispatcher.start();
    dispatcher.register(ApplicationEventType.class, mock(EventHandler.class));
    dispatcher.register(ContainerEventType.class, mock(EventHandler.class));

    DummyExecutor exec = new DummyExecutor();
    LocalDirsHandlerService dirsHandler = new LocalDirsHandlerService();
    dirsHandler.init(conf);
    // Start resource localization service.
    ResourceLocalizationService rawService = new ResourceLocalizationService(
        dispatcher, exec, mock(DeletionService.class), dirsHandler, nmContext,
        metrics);
    ResourceLocalizationService spyService = spy(rawService);
    doReturn(mockServer).when(spyService).createServer();
    doReturn(lfs).when(spyService).
        getLocalFileContext(isA(Configuration.class));
    try {
      spyService.init(conf);
      spyService.start();

      // Init application resources.
      final Application app = mock(Application.class);
      final ApplicationId appId = BuilderUtils.newApplicationId(1234567890L, 3);
      when(app.getUser()).thenReturn("user0");
      when(app.getAppId()).thenReturn(appId);
      when(app.toString()).thenReturn(appId.toString());
      spyService.handle(new ApplicationLocalizationEvent(
          LocalizationEventType.INIT_APPLICATION_RESOURCES, app));
      dispatcher.await();

      // Initialize localizer.
      Random r = new Random();
      long seed = r.nextLong();
      System.out.println("SEED: " + seed);
      r.setSeed(seed);
      final Container c = getMockContainer(appId, 46, "user0");
      FSDataOutputStream out =
          new FSDataOutputStream(new DataOutputBuffer(), null);
      doReturn(out).when(spylfs).createInternal(isA(Path.class),
          isA(EnumSet.class), isA(FsPermission.class), anyInt(), anyShort(),
          anyLong(), isA(Progressable.class), isA(ChecksumOpt.class),
          anyBoolean());
      final LocalResource resource1 = getAppMockedResource(r);
      final LocalResource resource2 = getAppMockedResource(r);

      // Send localization requests for container.
      // 2 resources generated with APPLICATION visibility.
      final LocalResourceRequest req1 = new LocalResourceRequest(resource1);
      final LocalResourceRequest req2 = new LocalResourceRequest(resource2);
      Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrcs =
          new HashMap<LocalResourceVisibility,
              Collection<LocalResourceRequest>>();
      List<LocalResourceRequest> appResourceList = Arrays.asList(req1, req2);
      rsrcs.put(LocalResourceVisibility.APPLICATION, appResourceList);
      spyService.handle(new ContainerLocalizationRequestEvent(c, rsrcs));
      dispatcher.await();

      // Wait for localization to begin.
      exec.waitForLocalizers(1);
      final String containerIdStr = c.getContainerId().toString();
      LocalizerRunner locRunnerForContainer =
          spyService.getLocalizerRunner(containerIdStr);
      // Heartbeats from container localizer
      LocalResourceStatus rsrcSuccess = mock(LocalResourceStatus.class);
      LocalizerStatus stat = mock(LocalizerStatus.class);
      when(stat.getLocalizerId()).thenReturn(containerIdStr);
      when(rsrcSuccess.getResource()).thenReturn(resource1);
      when(rsrcSuccess.getLocalSize()).thenReturn(4344L);
      when(rsrcSuccess.getLocalPath()).thenReturn(getPath("/some/path"));
      when(rsrcSuccess.getStatus()).
          thenReturn(ResourceStatusType.FETCH_SUCCESS);
      when(stat.getResources()).
          thenReturn(Collections.<LocalResourceStatus>emptyList());

      // First heartbeat which schedules first resource.
      LocalizerHeartbeatResponse response = spyService.heartbeat(stat);
      assertEquals(LocalizerAction.LIVE, response.getLocalizerAction(),
          "NM should tell localizer to be LIVE in Heartbeat.");

      // Cleanup container.
      spyService.handle(new ContainerLocalizationCleanupEvent(c, rsrcs));
      dispatcher.await();
      try {
        /*Directly send heartbeat to introduce race as container
          is being cleaned up.*/
        locRunnerForContainer.processHeartbeat(
              Collections.singletonList(rsrcSuccess));
      } catch (Exception e) {
        fail("Exception should not have been thrown on processing heartbeat");
      }
      // Cleanup application.
      spyService.handle(new ApplicationLocalizationEvent(
          LocalizationEventType.DESTROY_APPLICATION_RESOURCES, app));
      dispatcher.await();
      try {
        // Directly send heartbeat to introduce race as app is being cleaned up.
        locRunnerForContainer.processHeartbeat(
            Collections.singletonList(rsrcSuccess));
      } catch (Exception e) {
        fail("Exception should not have been thrown on processing heartbeat");
      }
      // Send another heartbeat.
      response = spyService.heartbeat(stat);
      assertEquals(LocalizerAction.DIE, response.getLocalizerAction(),
          "NM should tell localizer to DIE in Heartbeat.");
      exec.setStopLocalization();
    } finally {
      spyService.stop();
      dispatcher.stop();
    }
  }

  @Test
  @Timeout(value = 20)
  @SuppressWarnings("unchecked") // mocked generics
  public void testFailedPublicResource() throws Exception {
    List<Path> localDirs = new ArrayList<Path>();
    String[] sDirs = new String[4];
    for (int i = 0; i < 4; ++i) {
      localDirs.add(lfs.makeQualified(new Path(basedir, i + "")));
      sDirs[i] = localDirs.get(i).toString();
    }
    conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, sDirs);

    DrainDispatcher dispatcher = new DrainDispatcher();
    EventHandler<ApplicationEvent> applicationBus = mock(EventHandler.class);
    dispatcher.register(ApplicationEventType.class, applicationBus);
    EventHandler<ContainerEvent> containerBus = mock(EventHandler.class);
    dispatcher.register(ContainerEventType.class, containerBus);

    ContainerExecutor exec = mock(ContainerExecutor.class);
    DeletionService delService = mock(DeletionService.class);
    LocalDirsHandlerService dirsHandler = new LocalDirsHandlerService();
    dirsHandler.init(conf);

    dispatcher.init(conf);
    dispatcher.start();

    try {
      ResourceLocalizationService rawService =
          new ResourceLocalizationService(dispatcher, exec, delService,
              dirsHandler, nmContext, metrics);
      ResourceLocalizationService spyService = spy(rawService);
      doReturn(mockServer).when(spyService).createServer();
      doReturn(lfs).when(spyService).getLocalFileContext(
          isA(Configuration.class));

      spyService.init(conf);
      spyService.start();

      final String user = "user0";
      // init application
      final Application app = mock(Application.class);
      final ApplicationId appId =
          BuilderUtils.newApplicationId(314159265358979L, 3);
      when(app.getUser()).thenReturn(user);
      when(app.getAppId()).thenReturn(appId);
      spyService.handle(new ApplicationLocalizationEvent(
          LocalizationEventType.INIT_APPLICATION_RESOURCES, app));
      dispatcher.await();

      // init container.
      final Container c = getMockContainer(appId, 42, user);

      // init resources
      Random r = new Random();
      long seed = r.nextLong();
      System.out.println("SEED: " + seed);
      r.setSeed(seed);

      // cause chmod to fail after a delay
      final CyclicBarrier barrier = new CyclicBarrier(2);
      doAnswer(new Answer<Void>() {
          public Void answer(InvocationOnMock invocation) throws IOException {
            try {
              barrier.await();
            } catch (InterruptedException e) {
            } catch (BrokenBarrierException e) {
            }
            throw new IOException("forced failure");
          }
        }).when(spylfs)
            .setPermission(isA(Path.class), isA(FsPermission.class));

      // Queue up two localization requests for the same public resource
      final LocalResource pubResource = getPublicMockedResource(r);
      final LocalResourceRequest pubReq = new LocalResourceRequest(pubResource);

      Map<LocalResourceVisibility, Collection<LocalResourceRequest>> req =
          new HashMap<LocalResourceVisibility,
                      Collection<LocalResourceRequest>>();
      req.put(LocalResourceVisibility.PUBLIC,
          Collections.singletonList(pubReq));

      Set<LocalResourceRequest> pubRsrcs = new HashSet<LocalResourceRequest>();
      pubRsrcs.add(pubReq);

      spyService.handle(new ContainerLocalizationRequestEvent(c, req));
      spyService.handle(new ContainerLocalizationRequestEvent(c, req));
      dispatcher.await();

      // allow the chmod to fail now that both requests have been queued
      barrier.await();
      verify(containerBus, timeout(5000).times(2))
          .handle(isA(ContainerResourceFailedEvent.class));
    } finally {
      dispatcher.stop();
    }
  }
  
  /*
   * Test case for handling RejectedExecutionException and IOException which can
   * be thrown when adding public resources to the pending queue.
   * RejectedExecutionException can be thrown either due to the incoming queue
   * being full or if the ExecutorCompletionService threadpool is shutdown.
   * Since it's hard to simulate the queue being full, this test just shuts down
   * the threadpool and makes sure the exception is handled. If anything is
   * messed up the async dispatcher thread will cause a system exit causing the
   * test to fail.
   */
  @Test
  @SuppressWarnings("unchecked")
  public void testPublicResourceAddResourceExceptions() throws Exception {
    List<Path> localDirs = new ArrayList<Path>();
    String[] sDirs = new String[4];
    for (int i = 0; i < 4; ++i) {
      localDirs.add(lfs.makeQualified(new Path(basedir, i + "")));
      sDirs[i] = localDirs.get(i).toString();
    }
    conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, sDirs);

    DrainDispatcher dispatcher = new DrainDispatcher();
    EventHandler<ApplicationEvent> applicationBus = mock(EventHandler.class);
    dispatcher.register(ApplicationEventType.class, applicationBus);
    EventHandler<ContainerEvent> containerBus = mock(EventHandler.class);
    dispatcher.register(ContainerEventType.class, containerBus);

    ContainerExecutor exec = mock(ContainerExecutor.class);
    DeletionService delService = mock(DeletionService.class);
    LocalDirsHandlerService dirsHandler = new LocalDirsHandlerService();
    LocalDirsHandlerService dirsHandlerSpy = spy(dirsHandler);
    dirsHandlerSpy.init(conf);

    dispatcher.init(conf);
    dispatcher.start();

    try {
      ResourceLocalizationService rawService =
          new ResourceLocalizationService(dispatcher, exec, delService,
              dirsHandlerSpy, nmContext, metrics);
      ResourceLocalizationService spyService = spy(rawService);
      doReturn(mockServer).when(spyService).createServer();
      doReturn(lfs).when(spyService).getLocalFileContext(
        isA(Configuration.class));

      spyService.init(conf);
      spyService.start();

      final String user = "user0";
      // init application
      final Application app = mock(Application.class);
      final ApplicationId appId =
          BuilderUtils.newApplicationId(314159265358979L, 3);
      when(app.getUser()).thenReturn(user);
      when(app.getAppId()).thenReturn(appId);
      spyService.handle(new ApplicationLocalizationEvent(
        LocalizationEventType.INIT_APPLICATION_RESOURCES, app));
      dispatcher.await();

      // init resources
      Random r = new Random();
      r.setSeed(r.nextLong());

      // Queue localization request for the public resource
      final LocalResource pubResource = getPublicMockedResource(r);
      final LocalResourceRequest pubReq = new LocalResourceRequest(pubResource);
      Map<LocalResourceVisibility, Collection<LocalResourceRequest>> req =
          new HashMap<LocalResourceVisibility, Collection<LocalResourceRequest>>();
      req
        .put(LocalResourceVisibility.PUBLIC, Collections.singletonList(pubReq));

      // init container.
      final Container c = getMockContainer(appId, 42, user);

      // first test ioexception
      Mockito
        .doThrow(new IOException())
        .when(dirsHandlerSpy)
        .getLocalPathForWrite(isA(String.class), Mockito.anyLong(),
          Mockito.anyBoolean());
      // send request
      spyService.handle(new ContainerLocalizationRequestEvent(c, req));
      dispatcher.await();
      LocalResourcesTracker tracker =
          spyService.getLocalResourcesTracker(LocalResourceVisibility.PUBLIC,
            user, appId);
      assertNull(tracker.getLocalizedResource(pubReq));

      // test IllegalArgumentException
      String name = Long.toHexString(r.nextLong());
      URL url = getPath("/local/PRIVATE/" + name + "/");
      final LocalResource rsrc =
          BuilderUtils.newLocalResource(url, LocalResourceType.FILE,
          LocalResourceVisibility.PUBLIC, r.nextInt(1024) + 1024L,
          r.nextInt(1024) + 2048L, false);
      final LocalResourceRequest pubReq1 = new LocalResourceRequest(rsrc);
      Map<LocalResourceVisibility, Collection<LocalResourceRequest>> req1 =
          new HashMap<LocalResourceVisibility, 
          Collection<LocalResourceRequest>>();
      req1.put(LocalResourceVisibility.PUBLIC,
          Collections.singletonList(pubReq1));
      Mockito
        .doCallRealMethod()
        .when(dirsHandlerSpy)
        .getLocalPathForWrite(isA(String.class), Mockito.anyLong(),
          Mockito.anyBoolean());
      // send request
      spyService.handle(new ContainerLocalizationRequestEvent(c, req1));
      dispatcher.await();
      tracker =
          spyService.getLocalResourcesTracker(LocalResourceVisibility.PUBLIC,
          user, appId);
      assertNull(tracker.getLocalizedResource(pubReq));

      // test RejectedExecutionException by shutting down the thread pool
      PublicLocalizer publicLocalizer = spyService.getPublicLocalizer();
      publicLocalizer.threadPool.shutdown();

      spyService.handle(new ContainerLocalizationRequestEvent(c, req));
      dispatcher.await();
      tracker =
          spyService.getLocalResourcesTracker(LocalResourceVisibility.PUBLIC,
            user, appId);
      assertNull(tracker.getLocalizedResource(pubReq));

    } finally {
      // if we call stop with events in the queue, an InterruptedException gets
      // thrown resulting in the dispatcher thread causing a system exit
      dispatcher.await();
      dispatcher.stop();
    }
  }

  @Test
  @Timeout(value = 100)
  @SuppressWarnings("unchecked")
  public void testParallelDownloadAttemptsForPrivateResource() throws Exception {

    DrainDispatcher dispatcher1 = null;
    try {
      dispatcher1 = new DrainDispatcher();
      String user = "testuser";
      ApplicationId appId = BuilderUtils.newApplicationId(1, 1);

      // creating one local directory
      List<Path> localDirs = new ArrayList<Path>();
      String[] sDirs = new String[1];
      for (int i = 0; i < 1; ++i) {
        localDirs.add(lfs.makeQualified(new Path(basedir, i + "")));
        sDirs[i] = localDirs.get(i).toString();
      }
      conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, sDirs);

      LocalDirsHandlerService localDirHandler = new LocalDirsHandlerService();
      localDirHandler.init(conf);
      // Registering event handlers
      EventHandler<ApplicationEvent> applicationBus = mock(EventHandler.class);
      dispatcher1.register(ApplicationEventType.class, applicationBus);
      EventHandler<ContainerEvent> containerBus = mock(EventHandler.class);
      dispatcher1.register(ContainerEventType.class, containerBus);

      ContainerExecutor exec = mock(ContainerExecutor.class);
      DeletionService delService = mock(DeletionService.class);
      LocalDirsHandlerService dirsHandler = new LocalDirsHandlerService();
      // initializing directory handler.
      dirsHandler.init(conf);

      dispatcher1.init(conf);
      dispatcher1.start();

      ResourceLocalizationService rls =
          new ResourceLocalizationService(dispatcher1, exec, delService,
              localDirHandler, nmContext, metrics);
      dispatcher1.register(LocalizationEventType.class, rls);
      rls.init(conf);

      rls.handle(createApplicationLocalizationEvent(user, appId));

      LocalResourceRequest req =
          new LocalResourceRequest(new Path("file:///tmp"), 123L,
            LocalResourceType.FILE, LocalResourceVisibility.PRIVATE, "");

      // We need to pre-populate the LocalizerRunner as the
      // Resource Localization Service code internally starts them which
      // definitely we don't want.

      // creating new containers and populating corresponding localizer runners

      // Container - 1
      Container container1 = createMockContainer(user, 1);
      String localizerId1 = container1.getContainerId().toString();
      rls.getPrivateLocalizers().put(
        localizerId1,
        rls.new LocalizerRunner(new LocalizerContext(user, container1
          .getContainerId(), null), localizerId1));
      LocalizerRunner localizerRunner1 = rls.getLocalizerRunner(localizerId1);

      dispatcher1.getEventHandler().handle(
        createContainerLocalizationEvent(container1,
          LocalResourceVisibility.PRIVATE, req));
      assertTrue(waitForPrivateDownloadToStart(rls, localizerId1, 1, 5000));

      // Container - 2 now makes the request.
      ContainerImpl container2 = createMockContainer(user, 2);
      String localizerId2 = container2.getContainerId().toString();
      rls.getPrivateLocalizers().put(
        localizerId2,
        rls.new LocalizerRunner(new LocalizerContext(user, container2
          .getContainerId(), null), localizerId2));
      LocalizerRunner localizerRunner2 = rls.getLocalizerRunner(localizerId2);
      dispatcher1.getEventHandler().handle(
        createContainerLocalizationEvent(container2,
          LocalResourceVisibility.PRIVATE, req));
      assertTrue(waitForPrivateDownloadToStart(rls, localizerId2, 1, 5000));

      // Retrieving localized resource.
      LocalResourcesTracker tracker =
          rls.getLocalResourcesTracker(LocalResourceVisibility.PRIVATE, user,
            appId);
      LocalizedResource lr = tracker.getLocalizedResource(req);
      // Resource would now have moved into DOWNLOADING state
      assertEquals(ResourceState.DOWNLOADING, lr.getState());
      // Resource should have one permit
      assertEquals(1, lr.sem.availablePermits());

      // Resource Localization Service receives first heart beat from
      // ContainerLocalizer for container1
      LocalizerHeartbeatResponse response1 =
          rls.heartbeat(createLocalizerStatus(localizerId1));

      // Resource must have been added to scheduled map
      assertEquals(1, localizerRunner1.scheduled.size());
      // Checking resource in the response and also available permits for it.
      assertEquals(req.getResource(), response1.getResourceSpecs()
        .get(0).getResource().getResource());
      assertEquals(0, lr.sem.availablePermits());

      // Resource Localization Service now receives first heart beat from
      // ContainerLocalizer for container2
      LocalizerHeartbeatResponse response2 =
          rls.heartbeat(createLocalizerStatus(localizerId2));

      // Resource must not have been added to scheduled map
      assertEquals(0, localizerRunner2.scheduled.size());
      // No resource is returned in response
      assertEquals(0, response2.getResourceSpecs().size());

      // ContainerLocalizer - 1 now sends failed resource heartbeat.
      rls.heartbeat(createLocalizerStatusForFailedResource(localizerId1, req));

      // Resource Localization should fail and state is modified accordingly.
      // Also Local should be release on the LocalizedResource.
      assertTrue(waitForResourceState(lr, rls, req,
          LocalResourceVisibility.PRIVATE, user, appId, ResourceState.FAILED,
          5000));
      assertTrue(lr.getState().equals(ResourceState.FAILED));
      assertEquals(0, localizerRunner1.scheduled.size());

      // Now Container-2 once again sends heart beat to resource localization
      // service

      // Now container-2 again try to download the resource it should still
      // not get the resource as the resource is now not in DOWNLOADING state.
      response2 = rls.heartbeat(createLocalizerStatus(localizerId2));

      // Resource must not have been added to scheduled map.
      // Also as the resource has failed download it will be removed from
      // pending list.
      assertEquals(0, localizerRunner2.scheduled.size());
      assertEquals(0, localizerRunner2.pending.size());
      assertEquals(0, response2.getResourceSpecs().size());

    } finally {
      if (dispatcher1 != null) {
        dispatcher1.stop();
      }
    }
  }
  
  

  @Test
  @Timeout(value = 10)
  @SuppressWarnings("unchecked")
  public void testLocalResourcePath() throws Exception {

    // test the local path where application and user cache files will be
    // localized.

    DrainDispatcher dispatcher1 = null;
    try {
      dispatcher1 = new DrainDispatcher();
      String user = "testuser";
      ApplicationId appId = BuilderUtils.newApplicationId(1, 1);

      // creating one local directory
      List<Path> localDirs = new ArrayList<Path>();
      String[] sDirs = new String[1];
      for (int i = 0; i < 1; ++i) {
        localDirs.add(lfs.makeQualified(new Path(basedir, i + "")));
        sDirs[i] = localDirs.get(i).toString();
      }
      conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, sDirs);

      LocalDirsHandlerService localDirHandler = new LocalDirsHandlerService();
      localDirHandler.init(conf);
      // Registering event handlers
      EventHandler<ApplicationEvent> applicationBus = mock(EventHandler.class);
      dispatcher1.register(ApplicationEventType.class, applicationBus);
      EventHandler<ContainerEvent> containerBus = mock(EventHandler.class);
      dispatcher1.register(ContainerEventType.class, containerBus);

      ContainerExecutor exec = mock(ContainerExecutor.class);
      DeletionService delService = mock(DeletionService.class);
      LocalDirsHandlerService dirsHandler = new LocalDirsHandlerService();
      // initializing directory handler.
      dirsHandler.init(conf);

      dispatcher1.init(conf);
      dispatcher1.start();

      ResourceLocalizationService rls =
          new ResourceLocalizationService(dispatcher1, exec, delService,
              localDirHandler, nmContext, metrics);
      dispatcher1.register(LocalizationEventType.class, rls);
      rls.init(conf);

      rls.handle(createApplicationLocalizationEvent(user, appId));

      // We need to pre-populate the LocalizerRunner as the
      // Resource Localization Service code internally starts them which
      // definitely we don't want.

      // creating new container and populating corresponding localizer runner

      // Container - 1
      Container container1 = createMockContainer(user, 1);
      String localizerId1 = container1.getContainerId().toString();
      rls.getPrivateLocalizers().put(
        localizerId1,
        rls.new LocalizerRunner(new LocalizerContext(user, container1
          .getContainerId(), null), localizerId1));

      // Creating two requests for container
      // 1) Private resource
      // 2) Application resource
      LocalResourceRequest reqPriv =
          new LocalResourceRequest(new Path("file:///tmp1"), 123L,
            LocalResourceType.FILE, LocalResourceVisibility.PRIVATE, "");
      List<LocalResourceRequest> privList =
          new ArrayList<LocalResourceRequest>();
      privList.add(reqPriv);

      LocalResourceRequest reqApp =
          new LocalResourceRequest(new Path("file:///tmp2"), 123L,
            LocalResourceType.FILE, LocalResourceVisibility.APPLICATION, "");
      List<LocalResourceRequest> appList =
          new ArrayList<LocalResourceRequest>();
      appList.add(reqApp);

      Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrcs =
          new HashMap<LocalResourceVisibility, Collection<LocalResourceRequest>>();
      rsrcs.put(LocalResourceVisibility.APPLICATION, appList);
      rsrcs.put(LocalResourceVisibility.PRIVATE, privList);

      dispatcher1.getEventHandler().handle(
        new ContainerLocalizationRequestEvent(container1, rsrcs));

      // Now waiting for resource download to start. Here actual will not start
      // Only the resources will be populated into pending list.
      assertTrue(waitForPrivateDownloadToStart(rls, localizerId1, 2, 5000));

      // Validating user and application cache paths

      String userCachePath =
          StringUtils.join(Path.SEPARATOR, Arrays.asList(localDirs.get(0)
            .toUri().getRawPath(), ContainerLocalizer.USERCACHE, user,
            ContainerLocalizer.FILECACHE));
      String userAppCachePath =
          StringUtils.join(Path.SEPARATOR, Arrays.asList(localDirs.get(0)
            .toUri().getRawPath(), ContainerLocalizer.USERCACHE, user,
            ContainerLocalizer.APPCACHE, appId.toString(),
            ContainerLocalizer.FILECACHE));

      // Now the Application and private resources may come in any order
      // for download.
      // For User cahce :
      // returned destinationPath = user cache path + random number
      // For App cache :
      // returned destinationPath = user app cache path + random number

      int returnedResources = 0;
      boolean appRsrc = false, privRsrc = false;
      while (returnedResources < 2) {
        LocalizerHeartbeatResponse response =
            rls.heartbeat(createLocalizerStatus(localizerId1));
        for (ResourceLocalizationSpec resourceSpec : response
          .getResourceSpecs()) {
          returnedResources++;
          Path destinationDirectory =
              new Path(resourceSpec.getDestinationDirectory().getFile());
          if (resourceSpec.getResource().getVisibility() ==
              LocalResourceVisibility.APPLICATION) {
            appRsrc = true;
            assertEquals(userAppCachePath, destinationDirectory
              .getParent().toUri().toString());
          } else if (resourceSpec.getResource().getVisibility() == 
              LocalResourceVisibility.PRIVATE) {
            privRsrc = true;
            assertEquals(userCachePath, destinationDirectory.getParent()
              .toUri().toString());
          } else {
            throw new Exception("Unexpected resource received.");
          }
        }
      }
      // We should receive both the resources (Application and Private)
      assertTrue(appRsrc && privRsrc);
    } finally {
      if (dispatcher1 != null) {
        dispatcher1.stop();
      }
    }
  }

  private LocalizerStatus createLocalizerStatusForFailedResource(
      String localizerId, LocalResourceRequest req) {
    LocalizerStatus status = createLocalizerStatus(localizerId);
    LocalResourceStatus resourceStatus = new LocalResourceStatusPBImpl();
    resourceStatus.setException(SerializedException
      .newInstance(new YarnException("test")));
    resourceStatus.setStatus(ResourceStatusType.FETCH_FAILURE);
    resourceStatus.setResource(req);
    status.addResourceStatus(resourceStatus);
    return status;
  }

  private LocalizerStatus createLocalizerStatus(String localizerId1) {
    LocalizerStatus status = new LocalizerStatusPBImpl();
    status.setLocalizerId(localizerId1);
    return status;
  }

  private LocalizationEvent createApplicationLocalizationEvent(String user,
      ApplicationId appId) {
    Application app = mock(Application.class);
    when(app.getUser()).thenReturn(user);
    when(app.getAppId()).thenReturn(appId);
    return new ApplicationLocalizationEvent(
      LocalizationEventType.INIT_APPLICATION_RESOURCES, app);
  }

  @Test
  @Timeout(value = 100)
  @SuppressWarnings("unchecked")
  public void testParallelDownloadAttemptsForPublicResource() throws Exception {

    DrainDispatcher dispatcher1 = null;
    String user = "testuser";
    try {
      // creating one local directory
      List<Path> localDirs = new ArrayList<Path>();
      String[] sDirs = new String[1];
      for (int i = 0; i < 1; ++i) {
        localDirs.add(lfs.makeQualified(new Path(basedir, i + "")));
        sDirs[i] = localDirs.get(i).toString();
      }
      conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, sDirs);

      // Registering event handlers
      EventHandler<ApplicationEvent> applicationBus = mock(EventHandler.class);
      dispatcher1 = new DrainDispatcher();
      dispatcher1.register(ApplicationEventType.class, applicationBus);
      EventHandler<ContainerEvent> containerBus = mock(EventHandler.class);
      dispatcher1.register(ContainerEventType.class, containerBus);

      ContainerExecutor exec = mock(ContainerExecutor.class);
      DeletionService delService = mock(DeletionService.class);
      LocalDirsHandlerService dirsHandler = new LocalDirsHandlerService();
      // initializing directory handler.
      dirsHandler.init(conf);

      dispatcher1.init(conf);
      dispatcher1.start();

      // Creating and initializing ResourceLocalizationService but not starting
      // it as otherwise it will remove requests from pending queue.
      ResourceLocalizationService rawService =
          new ResourceLocalizationService(dispatcher1, exec, delService,
              dirsHandler, nmContext, metrics);
      ResourceLocalizationService spyService = spy(rawService);
      dispatcher1.register(LocalizationEventType.class, spyService);
      spyService.init(conf);

      // Initially pending map should be empty for public localizer
      assertEquals(0, spyService.getPublicLocalizer().pending.size());

      LocalResourceRequest req =
          new LocalResourceRequest(new Path("/tmp"), 123L,
            LocalResourceType.FILE, LocalResourceVisibility.PUBLIC, "");

      // Initializing application
      ApplicationImpl app = mock(ApplicationImpl.class);
      ApplicationId appId = BuilderUtils.newApplicationId(1, 1);
      when(app.getAppId()).thenReturn(appId);
      when(app.getUser()).thenReturn(user);
      dispatcher1.getEventHandler().handle(
        new ApplicationLocalizationEvent(
          LocalizationEventType.INIT_APPLICATION_RESOURCES, app));

      // Container - 1

      // container requesting the resource
      ContainerImpl container1 = createMockContainer(user, 1);
      dispatcher1.getEventHandler().handle(
        createContainerLocalizationEvent(container1,
          LocalResourceVisibility.PUBLIC, req));

      // Waiting for resource to change into DOWNLOADING state.
      assertTrue(waitForResourceState(null, spyService, req,
        LocalResourceVisibility.PUBLIC, user, null, ResourceState.DOWNLOADING,
        5000));

      // Waiting for download to start.
      assertTrue(waitForPublicDownloadToStart(spyService, 1, 5000));

      LocalizedResource lr =
          getLocalizedResource(spyService, req, LocalResourceVisibility.PUBLIC,
            user, null);
      // Resource would now have moved into DOWNLOADING state
      assertEquals(ResourceState.DOWNLOADING, lr.getState());

      // pending should have this resource now.
      assertEquals(1, spyService.getPublicLocalizer().pending.size());
      // Now resource should have 0 permit.
      assertEquals(0, lr.sem.availablePermits());

      // Container - 2

      // Container requesting the same resource.
      ContainerImpl container2 = createMockContainer(user, 2);
      dispatcher1.getEventHandler().handle(
        createContainerLocalizationEvent(container2,
          LocalResourceVisibility.PUBLIC, req));

      // Waiting for download to start. This should return false as new download
      // will not start
      assertFalse(waitForPublicDownloadToStart(spyService, 2, 5000));

      // Now Failing the resource download. As a part of it
      // resource state is changed and then lock is released.
      ResourceFailedLocalizationEvent locFailedEvent =
          new ResourceFailedLocalizationEvent(
              req,new Exception("test").toString());
      spyService.getLocalResourcesTracker(LocalResourceVisibility.PUBLIC, user,
        null).handle(locFailedEvent);

      // Waiting for resource to change into FAILED state.
      assertTrue(waitForResourceState(lr, spyService, req,
        LocalResourceVisibility.PUBLIC, user, null, ResourceState.FAILED, 5000));
      assertTrue(waitForResourceState(lr, spyService, req,
          LocalResourceVisibility.APPLICATION, user, appId, ResourceState.FAILED, 5000));

      // releasing lock as a part of download failed process.
      lr.unlock();
      // removing pending download request.
      spyService.getPublicLocalizer().pending.clear();

      LocalizerContext lc = mock(LocalizerContext.class);
      when(lc.getContainerId()).thenReturn(ContainerId.newContainerId(
          ApplicationAttemptId.newInstance(ApplicationId.newInstance(1L, 1), 1),
          1L));

      // Now I need to simulate a race condition wherein Event is added to
      // dispatcher before resource state changes to either FAILED or LOCALIZED
      // Hence sending event directly to dispatcher.
      LocalizerResourceRequestEvent localizerEvent =
          new LocalizerResourceRequestEvent(lr, null, lc, null);

      dispatcher1.getEventHandler().handle(localizerEvent);
      // Waiting for download to start. This should return false as new download
      // will not start
      assertFalse(waitForPublicDownloadToStart(spyService, 1, 5000));
      // Checking available permits now.
      assertEquals(1, lr.sem.availablePermits());

    } finally {
      if (dispatcher1 != null) {
        dispatcher1.stop();
      }
    }

  }

  private boolean waitForPrivateDownloadToStart(
      ResourceLocalizationService service, String localizerId, int size,
      int maxWaitTime) {
    List<LocalizerResourceRequestEvent> pending = null;
    // Waiting for localizer to be created.
    do {
      if (service.getPrivateLocalizers().get(localizerId) != null) {
        pending = service.getPrivateLocalizers().get(localizerId).pending;
      }
      if (pending == null) {
        try {
          maxWaitTime -= 20;
          Thread.sleep(20);
        } catch (Exception e) {
        }
      } else {
        break;
      }
    } while (maxWaitTime > 0);
    if (pending == null) {
      return false;
    }
    do {
      if (pending.size() == size) {
        return true;
      } else {
        try {
          maxWaitTime -= 20;
          Thread.sleep(20);
        } catch (Exception e) {
        }
      }
    } while (maxWaitTime > 0);
    return pending.size() == size;
  }

  private boolean waitForPublicDownloadToStart(
      ResourceLocalizationService service, int size, int maxWaitTime) {
    Map<Future<Path>, LocalizerResourceRequestEvent> pending = null;
    // Waiting for localizer to be created.
    do {
      if (service.getPublicLocalizer() != null) {
        pending = service.getPublicLocalizer().pending;
      }
      if (pending == null) {
        try {
          maxWaitTime -= 20;
          Thread.sleep(20);
        } catch (Exception e) {
        }
      } else {
        break;
      }
    } while (maxWaitTime > 0);
    if (pending == null) {
      return false;
    }
    do {
      if (pending.size() == size) {
        return true;
      } else {
        try {
          maxWaitTime -= 20;
          Thread.sleep(20);
        } catch (InterruptedException e) {
        }
      }
    } while (maxWaitTime > 0);
    return pending.size() == size;

  }

  private LocalizedResource getLocalizedResource(
      ResourceLocalizationService service, LocalResourceRequest req,
      LocalResourceVisibility vis, String user, ApplicationId appId) {
    return service.getLocalResourcesTracker(vis, user, appId)
      .getLocalizedResource(req);
  }

  private boolean waitForResourceState(LocalizedResource lr,
      ResourceLocalizationService service, LocalResourceRequest req,
      LocalResourceVisibility vis, String user, ApplicationId appId,
      ResourceState resourceState, long maxWaitTime) {
    LocalResourcesTracker tracker = null;
    // checking tracker is created
    do {
      if (tracker == null) {
        tracker = service.getLocalResourcesTracker(vis, user, appId);
      }
      if (tracker != null && lr == null) {
        lr = tracker.getLocalizedResource(req);
      }
      if (lr != null) {
        break;
      } else {
        try {
          maxWaitTime -= 20;
          Thread.sleep(20);
        } catch (InterruptedException e) {
        }
      }
    } while (maxWaitTime > 0);
    // this will wait till resource state is changed to (resourceState).
    if (lr == null) {
      return false;
    }
    do {
      if (!lr.getState().equals(resourceState)) {
        try {
          maxWaitTime -= 50;
          Thread.sleep(50);
        } catch (InterruptedException e) {
        }
      } else {
        break;
      }
    } while (maxWaitTime > 0);
    return lr.getState().equals(resourceState);
  }

  private ContainerLocalizationRequestEvent createContainerLocalizationEvent(
      Container container, LocalResourceVisibility vis,
      LocalResourceRequest req) {
    Map<LocalResourceVisibility, Collection<LocalResourceRequest>> reqs =
        new HashMap<LocalResourceVisibility, Collection<LocalResourceRequest>>();
    List<LocalResourceRequest> resourceList =
        new ArrayList<LocalResourceRequest>();
    resourceList.add(req);
    reqs.put(vis, resourceList);
    return new ContainerLocalizationRequestEvent(container, reqs);
  }

  private ContainerImpl createMockContainer(String user, int containerId) {
    ContainerImpl container = mock(ContainerImpl.class);
    when(container.getContainerId()).thenReturn(
      BuilderUtils.newContainerId(1, 1, 1, containerId));
    when(container.getUser()).thenReturn(user);
    Credentials mockCredentials = mock(Credentials.class);
    when(container.getCredentials()).thenReturn(mockCredentials);
    when(container.getContainerState()).thenReturn(ContainerState.LOCALIZING);
    return container;
  }

  private static URL getPath(String path) {
    URL url = BuilderUtils.newURL("file", null, 0, path);
    return url;
  }

  private static LocalResource getMockedResource(Random r, 
      LocalResourceVisibility vis) {
    String name = Long.toHexString(r.nextLong());
    URL url = getPath("/local/PRIVATE/" + name);
    LocalResource rsrc =
        BuilderUtils.newLocalResource(url, LocalResourceType.FILE, vis,
            r.nextInt(1024) + 1024L, r.nextInt(1024) + 2048L, false);
    return rsrc;
  }

  private static LocalResource getMockedResource(Random r,
      LocalResourceVisibility vis, boolean create, Configuration conf,
      String path) {
    String name = Long.toHexString(r.nextLong());
    Path newpath = new Path(path + "/local", name);
    File file = new File(
        Path.getPathWithoutSchemeAndAuthority(newpath).toString());
    try {
      FileSystem.create(FileSystem.get(conf), newpath,
          new FsPermission((short) 0755));
      file.deleteOnExit();
    } catch (IOException e) {
      // Failed to create test resource
      e.printStackTrace();
    }
    LocalResource mockedResource = BuilderUtils.newLocalResource(
        URL.fromPath(newpath), LocalResourceType.FILE, vis,
        file.getTotalSpace(), file.lastModified(), false);
    return mockedResource;
  }
  
  private static LocalResource getAppMockedResource(Random r) {
    return getMockedResource(r, LocalResourceVisibility.APPLICATION);
  }

  private static LocalResource getPublicMockedResource(Random r, boolean create,
      Configuration conf, String path) {
    return getMockedResource(r, LocalResourceVisibility.PUBLIC, create, conf,
        path);
  }
  
  private static LocalResource getPublicMockedResource(Random r) {
    return getMockedResource(r, LocalResourceVisibility.PUBLIC);
  }
  
  private static LocalResource getPrivateMockedResource(Random r) {
    return getMockedResource(r, LocalResourceVisibility.PRIVATE);
  }

  private static Container getMockContainer(ApplicationId appId, int id,
      String user) throws IOException {
    Container c = mock(Container.class);
    ApplicationAttemptId appAttemptId =
        BuilderUtils.newApplicationAttemptId(appId, 1);
    ContainerId cId = BuilderUtils.newContainerId(appAttemptId, id);
    when(c.getUser()).thenReturn(user);
    when(c.getContainerId()).thenReturn(cId);
    Credentials creds = new Credentials();
    Token<? extends TokenIdentifier> tk = getToken(id);
    String fingerprint = ResourceLocalizationService.buildTokenFingerprint(tk);
    assertNotNull(fingerprint);
    assertTrue(fingerprint.matches("^(([0-9a-f]){2} ){9}([0-9a-f]){2}$"),
        "Expected token fingerprint of 10 hex bytes delimited by space.");
    creds.addToken(new Text("tok" + id), tk);
    when(c.getCredentials()).thenReturn(creds);
    when(c.toString()).thenReturn(cId.toString());
    when(c.getContainerState()).thenReturn(ContainerState.LOCALIZING);
    return c;
  }

  private ResourceLocalizationService createSpyService(
      DrainDispatcher dispatcher, LocalDirsHandlerService dirsHandler,
      NMStateStoreService stateStore) {
    ContainerExecutor exec = mock(ContainerExecutor.class);
    LocalizerTracker mockLocalizerTracker = mock(LocalizerTracker.class);
    DeletionService delService = mock(DeletionService.class);
    NMContext nmContext =
        new NMContext(new NMContainerTokenSecretManager(conf),
          new NMTokenSecretManagerInNM(), null,
          new ApplicationACLsManager(conf), stateStore, false, conf);
    ResourceLocalizationService rawService =
      new ResourceLocalizationService(dispatcher, exec, delService,
            dirsHandler, nmContext, metrics);
    ResourceLocalizationService spyService = spy(rawService);
    doReturn(mockServer).when(spyService).createServer();
    doReturn(mockLocalizerTracker).when(spyService).createLocalizerTracker(
        isA(Configuration.class));
    doReturn(lfs).when(spyService)
        .getLocalFileContext(isA(Configuration.class));
    return spyService;
  }

  @SuppressWarnings({ "unchecked", "rawtypes" })
  static Token<? extends TokenIdentifier> getToken(int id) {
    return new Token(("ident" + id).getBytes(), ("passwd" + id).getBytes(),
        new Text("kind" + id), new Text("service" + id));
  }
  
  /*
   * Test to ensure ResourceLocalizationService can handle local dirs going bad.
   * Test first sets up all the components required, then sends events to fetch
   * a private, app and public resource. It then sends events to clean up the
   * container and the app and ensures the right delete calls were made.
   */
  @Test
  @SuppressWarnings("unchecked")
  // mocked generics
  public void testFailedDirsResourceRelease() throws Exception {
    // setup components
    File f = new File(basedir.toString());
    String[] sDirs = new String[4];
    List<Path> localDirs = new ArrayList<Path>(sDirs.length);
    for (int i = 0; i < 4; ++i) {
      sDirs[i] = f.getAbsolutePath() + i;
      localDirs.add(new Path(sDirs[i]));
    }
    List<Path> containerLocalDirs = new ArrayList<Path>(localDirs.size());
    List<Path> appLocalDirs = new ArrayList<Path>(localDirs.size());
    List<Path> nmLocalContainerDirs = new ArrayList<Path>(localDirs.size());
    List<Path> nmLocalAppDirs = new ArrayList<Path>(localDirs.size());
    conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, sDirs);
    conf.setLong(YarnConfiguration.NM_DISK_HEALTH_CHECK_INTERVAL_MS, 500);

    LocalizerTracker mockLocallilzerTracker = mock(LocalizerTracker.class);
    DrainDispatcher dispatcher = new DrainDispatcher();
    dispatcher.init(conf);
    dispatcher.start();
    EventHandler<ApplicationEvent> applicationBus = mock(EventHandler.class);
    dispatcher.register(ApplicationEventType.class, applicationBus);
    EventHandler<ContainerEvent> containerBus = mock(EventHandler.class);
    dispatcher.register(ContainerEventType.class, containerBus);
    // Ignore actual localization
    EventHandler<LocalizerEvent> localizerBus = mock(EventHandler.class);
    dispatcher.register(LocalizerEventType.class, localizerBus);

    ContainerExecutor exec = mock(ContainerExecutor.class);
    LocalDirsHandlerService mockDirsHandler =
        mock(LocalDirsHandlerService.class);
    doReturn(new ArrayList<String>(Arrays.asList(sDirs))).when(
        mockDirsHandler).getLocalDirsForCleanup();

    DeletionService delService = mock(DeletionService.class);

    // setup mocks
    ResourceLocalizationService rawService =
        new ResourceLocalizationService(dispatcher, exec, delService,
            mockDirsHandler, nmContext, metrics);
    ResourceLocalizationService spyService = spy(rawService);
    doReturn(mockServer).when(spyService).createServer();
    doReturn(mockLocallilzerTracker).when(spyService).createLocalizerTracker(
      isA(Configuration.class));
    doReturn(lfs).when(spyService)
      .getLocalFileContext(isA(Configuration.class));
    FsPermission defaultPermission =
        FsPermission.getDirDefault().applyUMask(lfs.getUMask());
    FsPermission nmPermission =
        ResourceLocalizationService.NM_PRIVATE_PERM.applyUMask(lfs.getUMask());
    final FileStatus fs =
        new FileStatus(0, true, 1, 0, System.currentTimeMillis(), 0,
          defaultPermission, "", "", localDirs.get(0));
    final FileStatus nmFs =
        new FileStatus(0, true, 1, 0, System.currentTimeMillis(), 0,
          nmPermission, "", "", localDirs.get(0));

    final String user = "user0";
    // init application
    final Application app = mock(Application.class);
    final ApplicationId appId =
        BuilderUtils.newApplicationId(314159265358979L, 3);
    when(app.getUser()).thenReturn(user);
    when(app.getAppId()).thenReturn(appId);
    when(app.toString()).thenReturn(appId.toString());

    // init container.
    final Container c = getMockContainer(appId, 42, user);

    // setup local app dirs
    List<String> tmpDirs = mockDirsHandler.getLocalDirs();
    for (int i = 0; i < tmpDirs.size(); ++i) {
      Path usersdir = new Path(tmpDirs.get(i), ContainerLocalizer.USERCACHE);
      Path userdir = new Path(usersdir, user);
      Path allAppsdir = new Path(userdir, ContainerLocalizer.APPCACHE);
      Path appDir = new Path(allAppsdir, appId.toString());
      Path containerDir =
          new Path(appDir, c.getContainerId().toString());
      containerLocalDirs.add(containerDir);
      appLocalDirs.add(appDir);

      Path sysDir =
          new Path(tmpDirs.get(i), ResourceLocalizationService.NM_PRIVATE_DIR);
      Path appSysDir = new Path(sysDir, appId.toString());
      Path containerSysDir = new Path(appSysDir, c.getContainerId().toString());

      nmLocalContainerDirs.add(containerSysDir);
      nmLocalAppDirs.add(appSysDir);
    }

    try {
      spyService.init(conf);
      spyService.start();

      spyService.handle(new ApplicationLocalizationEvent(
        LocalizationEventType.INIT_APPLICATION_RESOURCES, app));
      dispatcher.await();

      // Get a handle on the trackers after they're setup with
      // INIT_APP_RESOURCES
      LocalResourcesTracker appTracker =
          spyService.getLocalResourcesTracker(
            LocalResourceVisibility.APPLICATION, user, appId);
      LocalResourcesTracker privTracker =
          spyService.getLocalResourcesTracker(LocalResourceVisibility.PRIVATE,
            user, appId);
      LocalResourcesTracker pubTracker =
          spyService.getLocalResourcesTracker(LocalResourceVisibility.PUBLIC,
            user, appId);

      // init resources
      Random r = new Random();
      long seed = r.nextLong();
      r.setSeed(seed);

      // Send localization requests, one for each type of resource
      final LocalResource privResource = getPrivateMockedResource(r);
      final LocalResourceRequest privReq =
          new LocalResourceRequest(privResource);

      final LocalResource appResource = getAppMockedResource(r);
      final LocalResourceRequest appReq = new LocalResourceRequest(appResource);

      final LocalResource pubResource = getPublicMockedResource(r);
      final LocalResourceRequest pubReq = new LocalResourceRequest(pubResource);

      Map<LocalResourceVisibility, Collection<LocalResourceRequest>> req =
          new HashMap<LocalResourceVisibility, Collection<LocalResourceRequest>>();
      req.put(LocalResourceVisibility.PRIVATE,
        Collections.singletonList(privReq));
      req.put(LocalResourceVisibility.APPLICATION,
        Collections.singletonList(appReq));
      req
        .put(LocalResourceVisibility.PUBLIC, Collections.singletonList(pubReq));

      Map<LocalResourceVisibility, Collection<LocalResourceRequest>> req2 =
          new HashMap<LocalResourceVisibility, Collection<LocalResourceRequest>>();
      req2.put(LocalResourceVisibility.PRIVATE,
        Collections.singletonList(privReq));

      // Send Request event
      spyService.handle(new ContainerLocalizationRequestEvent(c, req));
      spyService.handle(new ContainerLocalizationRequestEvent(c, req2));
      dispatcher.await();

      int privRsrcCount = 0;
      for (LocalizedResource lr : privTracker) {
        privRsrcCount++;
        assertEquals(2, lr.getRefCount(), "Incorrect reference count");
        assertEquals(privReq, lr.getRequest());
      }
      assertEquals(1, privRsrcCount);

      int appRsrcCount = 0;
      for (LocalizedResource lr : appTracker) {
        appRsrcCount++;
        assertEquals(1, lr.getRefCount(), "Incorrect reference count");
        assertEquals(appReq, lr.getRequest());
      }
      assertEquals(1, appRsrcCount);

      int pubRsrcCount = 0;
      for (LocalizedResource lr : pubTracker) {
        pubRsrcCount++;
        assertEquals(1, lr.getRefCount(), "Incorrect reference count");
        assertEquals(pubReq, lr.getRequest());
      }
      assertEquals(1, pubRsrcCount);

      // setup mocks for test, a set of dirs with IOExceptions and let the rest
      // go through
      for (int i = 0; i < containerLocalDirs.size(); ++i) {
        if (i == 2) {
          Mockito.doThrow(new IOException()).when(spylfs)
            .getFileStatus(eq(containerLocalDirs.get(i)));
          Mockito.doThrow(new IOException()).when(spylfs)
            .getFileStatus(eq(nmLocalContainerDirs.get(i)));
        } else {
          doReturn(fs).when(spylfs)
            .getFileStatus(eq(containerLocalDirs.get(i)));
          doReturn(nmFs).when(spylfs).getFileStatus(
            eq(nmLocalContainerDirs.get(i)));
        }
      }

      // Send Cleanup Event
      spyService.handle(new ContainerLocalizationCleanupEvent(c, req));
      verify(mockLocallilzerTracker).cleanupPrivLocalizers(
        "container_314159265358979_0003_01_000042");

      // match cleanup events with the mocks we setup earlier
      for (int i = 0; i < containerLocalDirs.size(); ++i) {
        if (i == 2) {
          try {
            verify(delService, times(1)).delete(argThat(new FileDeletionMatcher(
                delService, user, containerLocalDirs.get(i), null)));
            verify(delService, times(1)).delete(argThat(new FileDeletionMatcher(
                delService, null, nmLocalContainerDirs.get(i), null)));
            fail("deletion attempts for invalid dirs");
          } catch (Throwable e) {
            continue;
          }
        } else {
          verify(delService, times(1)).delete(argThat(new FileDeletionMatcher(
              delService, user, containerLocalDirs.get(i), null)));
          verify(delService, times(1)).delete(argThat(new FileDeletionMatcher(
              delService, null, nmLocalContainerDirs.get(i), null)));
        }
      }

      ArgumentMatcher<ApplicationEvent> matchesAppDestroy =
          evt -> evt.getType() ==
              ApplicationEventType.APPLICATION_RESOURCES_CLEANEDUP
              && appId == evt.getApplicationID();

      dispatcher.await();

      // setup mocks again, this time throw UnsupportedFileSystemException and
      // IOExceptions
      for (int i = 0; i < containerLocalDirs.size(); ++i) {
        if (i == 3) {
          Mockito.doThrow(new IOException()).when(spylfs)
            .getFileStatus(eq(appLocalDirs.get(i)));
          Mockito.doThrow(new UnsupportedFileSystemException("test"))
            .when(spylfs).getFileStatus(eq(nmLocalAppDirs.get(i)));
        } else {
          doReturn(fs).when(spylfs).getFileStatus(eq(appLocalDirs.get(i)));
          doReturn(nmFs).when(spylfs).getFileStatus(eq(nmLocalAppDirs.get(i)));
        }
      }
      LocalizationEvent destroyApp =
          new ApplicationLocalizationEvent(
            LocalizationEventType.DESTROY_APPLICATION_RESOURCES, app);
      spyService.handle(destroyApp);
      // Waits for APPLICATION_RESOURCES_CLEANEDUP event to be handled.
      dispatcher.await();
      verify(applicationBus).handle(argThat(matchesAppDestroy));

      // verify we got the right delete calls
      for (int i = 0; i < containerLocalDirs.size(); ++i) {
        if (i == 3) {
          try {
            verify(delService, times(1)).delete(argThat(new FileDeletionMatcher(
                delService, user, containerLocalDirs.get(i), null)));
            verify(delService, times(1)).delete(argThat(new FileDeletionMatcher(
                delService, null, nmLocalContainerDirs.get(i), null)));
            fail("deletion attempts for invalid dirs");
          } catch (Throwable e) {
            continue;
          }
        } else {
          verify(delService, times(1)).delete(argThat(new FileDeletionMatcher(
              delService, user, containerLocalDirs.get(i), null)));
          verify(delService, times(1)).delete(argThat(new FileDeletionMatcher(
              delService, null, nmLocalContainerDirs.get(i), null)));
        }
      }

    } finally {
      dispatcher.stop();
      delService.stop();
    }
  }

  @Test
  @SuppressWarnings("unchecked")
  public void testDirHandler() throws Exception {
    File f = new File(basedir.toString());
    String[] sDirs = new String[4];
    List<Path> localDirs = new ArrayList<Path>(sDirs.length);
    for (int i = 0; i < 4; ++i) {
      sDirs[i] = f.getAbsolutePath() + i;
      localDirs.add(new Path(sDirs[i]));
    }
    conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, sDirs);
    LocalizerTracker mockLocalizerTracker = mock(LocalizerTracker.class);
    DrainDispatcher dispatcher = new DrainDispatcher();
    dispatcher.init(conf);
    dispatcher.start();
    EventHandler<ApplicationEvent> applicationBus = mock(EventHandler.class);
    dispatcher.register(ApplicationEventType.class, applicationBus);
    EventHandler<LocalizerEvent> localizerBus = mock(EventHandler.class);
    dispatcher.register(LocalizerEventType.class, localizerBus);

    ContainerExecutor exec = mock(ContainerExecutor.class);
    LocalDirsHandlerService mockDirsHandler =
        mock(LocalDirsHandlerService.class);
    doReturn(new ArrayList<String>(Arrays.asList(sDirs))).when(
        mockDirsHandler).getLocalDirsForCleanup();
    // setup mocks
    DeletionService delService = mock(DeletionService.class);
    ResourceLocalizationService rawService =
        new ResourceLocalizationService(dispatcher, exec, delService,
            mockDirsHandler, nmContext, metrics);
    ResourceLocalizationService spyService = spy(rawService);
    doReturn(mockServer).when(spyService).createServer();
    doReturn(mockLocalizerTracker).when(spyService).createLocalizerTracker(
        isA(Configuration.class));

    final String user = "user0";
    // init application
    final Application app = mock(Application.class);
    final ApplicationId appId =
        BuilderUtils.newApplicationId(314159265358979L, 3);
    when(app.getUser()).thenReturn(user);
    when(app.getAppId()).thenReturn(appId);
    when(app.toString()).thenReturn(appId.toString());
    try {
      spyService.init(conf);
      spyService.start();

      spyService.handle(new ApplicationLocalizationEvent(
          LocalizationEventType.INIT_APPLICATION_RESOURCES, app));
      dispatcher.await();

      LocalResourcesTracker appTracker =
          spyService.getLocalResourcesTracker(
              LocalResourceVisibility.APPLICATION, user, appId);
      LocalResourcesTracker privTracker =
          spyService.getLocalResourcesTracker(LocalResourceVisibility.PRIVATE,
              user, appId);
      LocalResourcesTracker pubTracker =
          spyService.getLocalResourcesTracker(LocalResourceVisibility.PUBLIC,
              user, appId);
      assertNotNull(((LocalResourcesTrackerImpl)appTracker).getDirsHandler(),
          "dirHandler for appTracker is null!");
      assertNotNull(((LocalResourcesTrackerImpl)privTracker).getDirsHandler(),
          "dirHandler for privTracker is null!");
      assertNotNull(((LocalResourcesTrackerImpl)pubTracker).getDirsHandler(),
          "dirHandler for pubTracker is null!");
    } finally {
      dispatcher.stop();
      delService.stop();
    }
  }
}