ResourceLocalizationService.java

/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;

import static org.apache.hadoop.fs.CreateFlag.CREATE;
import static org.apache.hadoop.fs.CreateFlag.OVERWRITE;

import org.apache.hadoop.yarn.exceptions.ConfigurationException;
import org.apache.hadoop.yarn.server.nodemanager.recovery.RecoveryIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.DataOutputStream;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

import org.apache.commons.codec.digest.DigestUtils;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FSError;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.util.DiskChecker;
import org.apache.hadoop.util.DiskValidator;
import org.apache.hadoop.util.DiskValidatorFactory;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.hadoop.util.concurrent.HadoopScheduledThreadPoolExecutor;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.api.records.impl.pb.LocalResourcePBImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.DirectoryCollection.DirsChangeListener;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.api.LocalizationProtocol;
import org.apache.hadoop.yarn.server.nodemanager.api.ResourceLocalizationSpec;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalResourceStatus;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerAction;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerHeartbeatResponse;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerStatus;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationInitedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerResourceFailedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.FileDeletionTask;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalCacheCleaner.LocalCacheCleanerStats;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ApplicationLocalizationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerResourceRequestEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceFailedLocalizationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceLocalizedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRecoveredEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceReleaseEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRequestEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.security.LocalizerTokenIdentifier;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.security.LocalizerTokenSecretManager;
import org.apache.hadoop.yarn.server.nodemanager.executor.LocalizerStartContext;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.LocalResourceTrackerState;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredLocalizationState;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredUserResources;
import org.apache.hadoop.yarn.server.nodemanager.security.authorize.NMPolicyProvider;
import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerBuilderUtils;
import org.apache.hadoop.yarn.util.FSDownload;
import org.apache.hadoop.yarn.util.LRUCacheHashMap;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.thirdparty.com.google.common.cache.CacheBuilder;
import org.apache.hadoop.thirdparty.com.google.common.cache.LoadingCache;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;

public class ResourceLocalizationService extends CompositeService
    implements EventHandler<LocalizationEvent>, LocalizationProtocol {

  private static final Logger LOG =
       LoggerFactory.getLogger(ResourceLocalizationService.class);
  public static final String NM_PRIVATE_DIR = "nmPrivate";
  public static final FsPermission NM_PRIVATE_PERM = new FsPermission((short) 0700);
  private static final FsPermission PUBLIC_FILECACHE_FOLDER_PERMS =
      new FsPermission((short) 0755);

  private Server server;
  private InetSocketAddress localizationServerAddress;
  @VisibleForTesting
  long cacheTargetSize;
  private long cacheCleanupPeriod;

  private final ContainerExecutor exec;
  protected final Dispatcher dispatcher;
  private final DeletionService delService;
  private LocalizerTracker localizerTracker;
  private RecordFactory recordFactory;
  private final ScheduledExecutorService cacheCleanup;
  private LocalizerTokenSecretManager secretManager;
  private NMStateStoreService stateStore;
  @VisibleForTesting
  final NodeManagerMetrics metrics;

  @VisibleForTesting
  LocalResourcesTracker publicRsrc;

  private LocalDirsHandlerService dirsHandler;
  private DirsChangeListener localDirsChangeListener;
  private DirsChangeListener logDirsChangeListener;
  private Context nmContext;
  private DiskValidator diskValidator;

  /**
   * Map of LocalResourceTrackers keyed by username, for private
   * resources.
   */
  @VisibleForTesting
  final ConcurrentMap<String, LocalResourcesTracker> privateRsrc =
    new ConcurrentHashMap<String,LocalResourcesTracker>();

  /**
   * Map of LocalResourceTrackers keyed by appid, for application
   * resources.
   */
  private final ConcurrentMap<String,LocalResourcesTracker> appRsrc =
    new ConcurrentHashMap<String,LocalResourcesTracker>();
  
  FileContext lfs;

  public ResourceLocalizationService(Dispatcher dispatcher,
      ContainerExecutor exec, DeletionService delService,
      LocalDirsHandlerService dirsHandler, Context context,
      NodeManagerMetrics metrics) {

    super(ResourceLocalizationService.class.getName());
    this.exec = exec;
    this.dispatcher = dispatcher;
    this.delService = delService;
    this.dirsHandler = dirsHandler;

    this.cacheCleanup = new HadoopScheduledThreadPoolExecutor(1,
        new ThreadFactoryBuilder()
          .setNameFormat("ResourceLocalizationService Cache Cleanup")
          .build());
    this.stateStore = context.getNMStateStore();
    this.nmContext = context;
    this.metrics = metrics;
  }

  FileContext getLocalFileContext(Configuration conf) {
    try {
      return FileContext.getLocalFSFileContext(conf);
    } catch (IOException e) {
      throw new YarnRuntimeException("Failed to access local fs");
    }
  }

  private void validateConf(Configuration conf) {
    int perDirFileLimit =
        conf.getInt(YarnConfiguration.NM_LOCAL_CACHE_MAX_FILES_PER_DIRECTORY,
          YarnConfiguration.DEFAULT_NM_LOCAL_CACHE_MAX_FILES_PER_DIRECTORY);
    if (perDirFileLimit <= 36) {
      LOG.error(YarnConfiguration.NM_LOCAL_CACHE_MAX_FILES_PER_DIRECTORY
          + " parameter is configured with very low value.");
      throw new YarnRuntimeException(
        YarnConfiguration.NM_LOCAL_CACHE_MAX_FILES_PER_DIRECTORY
            + " parameter is configured with a value less than 37.");
    } else {
      LOG.info("per directory file limit = " + perDirFileLimit);
    }
  }

  @Override
  public void serviceInit(Configuration conf) throws Exception {
    this.validateConf(conf);
    this.publicRsrc = new LocalResourcesTrackerImpl(null, null, dispatcher,
        true, conf, stateStore, dirsHandler);
    this.recordFactory = RecordFactoryProvider.getRecordFactory(conf);

    try {
      lfs = getLocalFileContext(conf);
      lfs.setUMask(new FsPermission((short) FsPermission.DEFAULT_UMASK));

      if (!stateStore.canRecover()|| stateStore.isNewlyCreated()) {
        cleanUpLocalDirs(lfs, delService);
        cleanupLogDirs(lfs, delService);
        initializeLocalDirs(lfs);
        initializeLogDirs(lfs);
      }
    } catch (Exception e) {
      throw new YarnRuntimeException(
        "Failed to initialize LocalizationService", e);
    }

    diskValidator = DiskValidatorFactory.getInstance(
        YarnConfiguration.DEFAULT_DISK_VALIDATOR);
    cacheTargetSize =
      conf.getLong(YarnConfiguration.NM_LOCALIZER_CACHE_TARGET_SIZE_MB, YarnConfiguration.DEFAULT_NM_LOCALIZER_CACHE_TARGET_SIZE_MB) << 20;
    cacheCleanupPeriod =
      conf.getLong(YarnConfiguration.NM_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS, YarnConfiguration.DEFAULT_NM_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS);
    localizationServerAddress = conf.getSocketAddr(
        YarnConfiguration.NM_BIND_HOST,
        YarnConfiguration.NM_LOCALIZER_ADDRESS,
        YarnConfiguration.DEFAULT_NM_LOCALIZER_ADDRESS,
        YarnConfiguration.DEFAULT_NM_LOCALIZER_PORT);

    localizerTracker = createLocalizerTracker(conf);
    addService(localizerTracker);
    dispatcher.register(LocalizerEventType.class, localizerTracker);
    localDirsChangeListener = new DirsChangeListener() {
      @Override
      public void onDirsChanged() {
        checkAndInitializeLocalDirs();
      }
    };
    logDirsChangeListener = new DirsChangeListener() {
      @Override
      public void onDirsChanged() {
        initializeLogDirs(lfs);
      }
    };
    super.serviceInit(conf);
  }

  //Recover localized resources after an NM restart
  public void recoverLocalizedResources(RecoveredLocalizationState state)
      throws URISyntaxException, IOException {
    LocalResourceTrackerState trackerState = state.getPublicTrackerState();
    recoverTrackerResources(publicRsrc, trackerState);

    try (RecoveryIterator<Map.Entry<String, RecoveredUserResources>> it
             = state.getIterator()) {
      while (it.hasNext()) {
        Map.Entry<String, RecoveredUserResources> userEntry = it.next();
        String user = userEntry.getKey();
        RecoveredUserResources userResources = userEntry.getValue();
        trackerState = userResources.getPrivateTrackerState();
        LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user,
            null, dispatcher, true, super.getConfig(), stateStore,
            dirsHandler);
        LocalResourcesTracker oldTracker = privateRsrc.putIfAbsent(user,
            tracker);
        if (oldTracker != null) {
          tracker = oldTracker;
        }
        recoverTrackerResources(tracker, trackerState);

        for (Map.Entry<ApplicationId, LocalResourceTrackerState> appEntry :
            userResources.getAppTrackerStates().entrySet()) {
          trackerState = appEntry.getValue();
          ApplicationId appId = appEntry.getKey();
          String appIdStr = appId.toString();
          LocalResourcesTracker tracker1 = new LocalResourcesTrackerImpl(user,
              appId, dispatcher, false, super.getConfig(), stateStore,
              dirsHandler);
          LocalResourcesTracker oldTracker1 = appRsrc.putIfAbsent(appIdStr,
              tracker1);
          if (oldTracker1 != null) {
            tracker1 = oldTracker1;
          }
          recoverTrackerResources(tracker1, trackerState);
        }
      }
    }
  }

  private void recoverTrackerResources(LocalResourcesTracker tracker,
      LocalResourceTrackerState state) throws URISyntaxException, IOException {
    try (RecoveryIterator<LocalizedResourceProto> it =
             state.getCompletedResourcesIterator()) {
      if (it != null) {
        while (it.hasNext()) {
          LocalizedResourceProto proto = it.next();
          LocalResource rsrc = new LocalResourcePBImpl(proto.getResource());
          LocalResourceRequest req = new LocalResourceRequest(rsrc);
          LOG.debug("Recovering localized resource {} at {}",
              req, proto.getLocalPath());
          tracker.handle(new ResourceRecoveredEvent(req,
              new Path(proto.getLocalPath()), proto.getSize()));
        }
      }
    }

    try (RecoveryIterator<Map.Entry<LocalResourceProto, Path>> it =
             state.getStartedResourcesIterator()) {
      if(it != null) {
        while (it.hasNext()) {
          Map.Entry<LocalResourceProto, Path> entry = it.next();
          LocalResource rsrc = new LocalResourcePBImpl(entry.getKey());
          LocalResourceRequest req = new LocalResourceRequest(rsrc);
          Path localPath = entry.getValue();
          tracker.handle(new ResourceRecoveredEvent(req, localPath, 0));

          // delete any in-progress localizations, containers will request again
          LOG.info("Deleting in-progress localization for " + req + " at "
               + localPath);
          tracker.remove(tracker.getLocalizedResource(req), delService);
        }
      }
    }

    // TODO: remove untracked directories in local filesystem
  }

  @Override
  public LocalizerHeartbeatResponse heartbeat(LocalizerStatus status) {
    return localizerTracker.processHeartbeat(status);
  }

  @Override
  public void serviceStart() throws Exception {
    cacheCleanup.scheduleWithFixedDelay(new CacheCleanup(dispatcher),
        cacheCleanupPeriod, cacheCleanupPeriod, TimeUnit.MILLISECONDS);
    server = createServer();
    server.start();
    localizationServerAddress =
        getConfig().updateConnectAddr(YarnConfiguration.NM_BIND_HOST,
                                      YarnConfiguration.NM_LOCALIZER_ADDRESS,
                                      YarnConfiguration.DEFAULT_NM_LOCALIZER_ADDRESS,
                                      server.getListenerAddress());
    LOG.info("Localizer started on port " + server.getPort());
    super.serviceStart();
    dirsHandler.registerLocalDirsChangeListener(localDirsChangeListener);
    dirsHandler.registerLogDirsChangeListener(logDirsChangeListener);
  }

  LocalizerTracker createLocalizerTracker(Configuration conf) {
    return new LocalizerTracker(conf);
  }

  Server createServer() {
    Configuration conf = getConfig();
    YarnRPC rpc = YarnRPC.create(conf);
    if (UserGroupInformation.isSecurityEnabled()) {
      secretManager = new LocalizerTokenSecretManager();      
    }
    
    Server server = rpc.getServer(LocalizationProtocol.class, this,
        localizationServerAddress, conf, secretManager, 
        conf.getInt(YarnConfiguration.NM_LOCALIZER_CLIENT_THREAD_COUNT, 
            YarnConfiguration.DEFAULT_NM_LOCALIZER_CLIENT_THREAD_COUNT));
    
    // Enable service authorization?
    if (conf.getBoolean(
        CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, 
        false)) {
      server.refreshServiceAcl(conf, NMPolicyProvider.getInstance());
    }
    
    return server;
  }

  @Override
  public void serviceStop() throws Exception {
    dirsHandler.deregisterLocalDirsChangeListener(localDirsChangeListener);
    dirsHandler.deregisterLogDirsChangeListener(logDirsChangeListener);
    if (server != null) {
      server.stop();
    }
    cacheCleanup.shutdown();
    super.serviceStop();
  }

  @Override
  public void handle(LocalizationEvent event) {
    // TODO: create log dir as $logdir/$user/$appId
    switch (event.getType()) {
    case INIT_APPLICATION_RESOURCES:
      handleInitApplicationResources(
          ((ApplicationLocalizationEvent)event).getApplication());
      break;
    case LOCALIZE_CONTAINER_RESOURCES:
      handleInitContainerResources((ContainerLocalizationRequestEvent) event);
      break;
    case CONTAINER_RESOURCES_LOCALIZED:
      handleContainerResourcesLocalized((ContainerLocalizationEvent) event);
      break;
    case CACHE_CLEANUP:
      handleCacheCleanup();
      break;
    case CLEANUP_CONTAINER_RESOURCES:
      handleCleanupContainerResources((ContainerLocalizationCleanupEvent)event);
      break;
    case DESTROY_APPLICATION_RESOURCES:
      handleDestroyApplicationResources(
          ((ApplicationLocalizationEvent)event).getApplication());
      break;
    default:
      throw new YarnRuntimeException("Unknown localization event: " + event);
    }
  }
  
  /**
   * Handle event received the first time any container is scheduled
   * by a given application.
   */
  @SuppressWarnings("unchecked")
  private void handleInitApplicationResources(Application app) {
    // 0) Create application tracking structs
    String userName = app.getUser();
    privateRsrc.putIfAbsent(userName, new LocalResourcesTrackerImpl(userName,
        null, dispatcher, true, super.getConfig(), stateStore, dirsHandler));
    String appIdStr = app.getAppId().toString();
    appRsrc.putIfAbsent(appIdStr, new LocalResourcesTrackerImpl(app.getUser(),
        app.getAppId(), dispatcher, false, super.getConfig(), stateStore,
        dirsHandler));
    // 1) Signal container init
    //
    // This is handled by the ApplicationImpl state machine and allows
    // containers to proceed with launching.
    dispatcher.getEventHandler().handle(new ApplicationInitedEvent(
          app.getAppId()));
  }
  
  /**
   * For each of the requested resources for a container, determines the
   * appropriate {@link LocalResourcesTracker} and forwards a 
   * {@link LocalResourceRequest} to that tracker.
   */
  private void handleInitContainerResources(
      ContainerLocalizationRequestEvent rsrcReqs) {
    Container c = rsrcReqs.getContainer();
    EnumSet<ContainerState> set =
        EnumSet.of(ContainerState.LOCALIZING,
            ContainerState.RUNNING, ContainerState.REINITIALIZING);
    if (!set.contains(c.getContainerState())) {
      LOG.warn(c.getContainerId() + " is at " + c.getContainerState()
          + " state, do not localize resources.");
      return;
    }
    // create a loading cache for the file statuses
    LoadingCache<Path,Future<FileStatus>> statCache =
        CacheBuilder.newBuilder().build(FSDownload.createStatusCacheLoader(getConfig()));
    LocalizerContext ctxt = new LocalizerContext(
        c.getUser(), c.getContainerId(), c.getCredentials(), statCache);
    Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrcs =
      rsrcReqs.getRequestedResources();
    for (Map.Entry<LocalResourceVisibility, Collection<LocalResourceRequest>> e :
         rsrcs.entrySet()) {
      LocalResourcesTracker tracker =
          getLocalResourcesTracker(e.getKey(), c.getUser(),
              c.getContainerId().getApplicationAttemptId()
                  .getApplicationId());
      for (LocalResourceRequest req : e.getValue()) {
        tracker.handle(new ResourceRequestEvent(req, e.getKey(), ctxt));
        LOG.debug("Localizing {} for container {}",
            req.getPath(), c.getContainerId());
      }
    }
  }

  /**
   * Once a container's resources are localized, kill the corresponding
   * {@link ContainerLocalizer}
   */
  private void handleContainerResourcesLocalized(
      ContainerLocalizationEvent event) {
    Container c = event.getContainer();
    String locId = c.getContainerId().toString();
    localizerTracker.endContainerLocalization(locId);
  }

  @VisibleForTesting
  LocalCacheCleanerStats handleCacheCleanup() {
    LocalCacheCleaner cleaner =
        new LocalCacheCleaner(delService, cacheTargetSize);
    cleaner.addResources(publicRsrc);
    for (LocalResourcesTracker t : privateRsrc.values()) {
      cleaner.addResources(t);
    }
    LocalCacheCleaner.LocalCacheCleanerStats stats = cleaner.cleanCache();
    if (LOG.isDebugEnabled()) {
      LOG.debug(stats.toStringDetailed());
    } else if (LOG.isInfoEnabled()) {
      LOG.info(stats.toString());
    }

    // Update metrics
    metrics.setCacheSizeBeforeClean(stats.getCacheSizeBeforeClean());
    metrics.setTotalBytesDeleted(stats.getTotalDelSize());
    metrics.setPrivateBytesDeleted(stats.getPrivateDelSize());
    metrics.setPublicBytesDeleted(stats.getPublicDelSize());
    return stats;
  }


  @SuppressWarnings("unchecked")
  private void handleCleanupContainerResources(
      ContainerLocalizationCleanupEvent rsrcCleanup) {
    Container c = rsrcCleanup.getContainer();
    Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrcs =
      rsrcCleanup.getResources();
    for (Map.Entry<LocalResourceVisibility, Collection<LocalResourceRequest>> e :
         rsrcs.entrySet()) {
      LocalResourcesTracker tracker = getLocalResourcesTracker(e.getKey(), c.getUser(),
          c.getContainerId().getApplicationAttemptId()
          .getApplicationId());
      for (LocalResourceRequest req : e.getValue()) {
        tracker.handle(new ResourceReleaseEvent(req,
            c.getContainerId()));
      }
    }
    String locId = c.getContainerId().toString();
    localizerTracker.cleanupPrivLocalizers(locId);

    // Delete the container directories
    String userName = c.getUser();
    String containerIDStr = c.toString();
    String appIDStr =
        c.getContainerId().getApplicationAttemptId().getApplicationId()
            .toString();
    
    // Try deleting from good local dirs and full local dirs because a dir might
    // have gone bad while the app was running(disk full). In addition
    // a dir might have become good while the app was running.
    // Check if the container dir exists and if it does, try to delete it

    for (String localDir : dirsHandler.getLocalDirsForCleanup()) {
      // Delete the user-owned container-dir
      Path usersdir = new Path(localDir, ContainerLocalizer.USERCACHE);
      Path userdir = new Path(usersdir, userName);
      Path allAppsdir = new Path(userdir, ContainerLocalizer.APPCACHE);
      Path appDir = new Path(allAppsdir, appIDStr);
      Path containerDir = new Path(appDir, containerIDStr);
      submitDirForDeletion(userName, containerDir);

      // Delete the nmPrivate container-dir

      Path sysDir = new Path(localDir, NM_PRIVATE_DIR);
      Path appSysDir = new Path(sysDir, appIDStr);
      Path containerSysDir = new Path(appSysDir, containerIDStr);
      submitDirForDeletion(null, containerSysDir);
    }

    dispatcher.getEventHandler().handle(
        new ContainerEvent(c.getContainerId(),
            ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP));
  }
  
  private void submitDirForDeletion(String userName, Path dir) {
    try {
      lfs.getFileStatus(dir);
      FileDeletionTask deletionTask = new FileDeletionTask(delService, userName,
          dir, null);
      delService.delete(deletionTask);
    } catch (UnsupportedFileSystemException ue) {
      LOG.warn("Local dir " + dir + " is an unsupported filesystem", ue);
    } catch (IOException ie) {
      // ignore
      return;
    }
  }


  @SuppressWarnings({"unchecked"})
  private void handleDestroyApplicationResources(Application application) {
    String userName = application.getUser();
    ApplicationId appId = application.getAppId();
    String appIDStr = application.toString();
    LocalResourcesTracker appLocalRsrcsTracker =
      appRsrc.remove(appId.toString());
    if (appLocalRsrcsTracker != null) {
      for (LocalizedResource rsrc : appLocalRsrcsTracker ) {
        Path localPath = rsrc.getLocalPath();
        if (localPath != null) {
          try {
            stateStore.removeLocalizedResource(userName, appId, localPath);
          } catch (IOException e) {
            LOG.error("Unable to remove resource " + rsrc + " for " + appIDStr
                + " from state store", e);
          }
        }
      }
    } else {
      LOG.warn("Removing uninitialized application " + application);
    }

    // Delete the application directories
    userName = application.getUser();
    appIDStr = application.toString();

    for (String localDir : dirsHandler.getLocalDirsForCleanup()) {

      // Delete the user-owned app-dir
      Path usersdir = new Path(localDir, ContainerLocalizer.USERCACHE);
      Path userdir = new Path(usersdir, userName);
      Path allAppsdir = new Path(userdir, ContainerLocalizer.APPCACHE);
      Path appDir = new Path(allAppsdir, appIDStr);
      submitDirForDeletion(userName, appDir);

      // Delete the nmPrivate app-dir
      Path sysDir = new Path(localDir, NM_PRIVATE_DIR);
      Path appSysDir = new Path(sysDir, appIDStr);
      submitDirForDeletion(null, appSysDir);
    }

    // TODO: decrement reference counts of all resources associated with this
    // app

    dispatcher.getEventHandler().handle(new ApplicationEvent(
          application.getAppId(),
          ApplicationEventType.APPLICATION_RESOURCES_CLEANEDUP));
  }


  LocalResourcesTracker getLocalResourcesTracker(
      LocalResourceVisibility visibility, String user, ApplicationId appId) {
    switch (visibility) {
      default:
      case PUBLIC:
        return publicRsrc;
      case PRIVATE:
        return privateRsrc.get(user);
      case APPLICATION:
        return appRsrc.get(appId.toString());
    }
  }

  private String getUserFileCachePath(String user) {
    return StringUtils.join(Path.SEPARATOR, Arrays.asList(".",
      ContainerLocalizer.USERCACHE, user, ContainerLocalizer.FILECACHE));

  }

  private String getAppFileCachePath(String user, String appId) {
    return StringUtils.join(Path.SEPARATOR, Arrays.asList(".",
        ContainerLocalizer.USERCACHE, user, ContainerLocalizer.APPCACHE, appId,
        ContainerLocalizer.FILECACHE));
  }
  
  @VisibleForTesting
  @Private
  public PublicLocalizer getPublicLocalizer() {
    return localizerTracker.publicLocalizer;
  }

  @VisibleForTesting
  @Private
  public LocalizerRunner getLocalizerRunner(String locId) {
    return localizerTracker.privLocalizers.get(locId);
  }
  
  @VisibleForTesting
  @Private
  public Map<String, LocalizerRunner> getPrivateLocalizers() {
    return localizerTracker.privLocalizers;
  }
  
  /**
   * Sub-component handling the spawning of {@link ContainerLocalizer}s
   */
  class LocalizerTracker extends AbstractService implements EventHandler<LocalizerEvent>  {

    private final PublicLocalizer publicLocalizer;
    private final Map<String,LocalizerRunner> privLocalizers;
    private final Map<String, String> recentlyCleanedLocalizers;
    private final int maxRecentlyCleaned = 128;

    LocalizerTracker(Configuration conf) {
      this(conf, new HashMap<String,LocalizerRunner>());
    }

    LocalizerTracker(Configuration conf,
        Map<String,LocalizerRunner> privLocalizers) {
      super(LocalizerTracker.class.getName());
      this.publicLocalizer = new PublicLocalizer(conf);
      this.privLocalizers = privLocalizers;
      this.recentlyCleanedLocalizers =
          new LRUCacheHashMap<String, String>(maxRecentlyCleaned, false);
    }
    
    @Override
    public synchronized void serviceStart() throws Exception {
      publicLocalizer.start();
      super.serviceStart();
    }

    public LocalizerHeartbeatResponse processHeartbeat(LocalizerStatus status) {
      String locId = status.getLocalizerId();
      synchronized (privLocalizers) {
        LocalizerRunner localizer = privLocalizers.get(locId);
        if (null == localizer) {
          // TODO process resources anyway
          LOG.info("Unknown localizer with localizerId " + locId
              + " is sending heartbeat. Ordering it to DIE");
          LocalizerHeartbeatResponse response =
            recordFactory.newRecordInstance(LocalizerHeartbeatResponse.class);
          response.setLocalizerAction(LocalizerAction.DIE);
          return response;
        }
        return localizer.processHeartbeat(status.getResources());
      }
    }
    
    @Override
    public void serviceStop() throws Exception {
      for (LocalizerRunner localizer : privLocalizers.values()) {
        localizer.interrupt();
      }
      publicLocalizer.interrupt();
      super.serviceStop();
    }

    @Override
    public void handle(LocalizerEvent event) {
      String locId = event.getLocalizerId();
      switch (event.getType()) {
      case REQUEST_RESOURCE_LOCALIZATION:
        // 0) find running localizer or start new thread
        LocalizerResourceRequestEvent req =
          (LocalizerResourceRequestEvent)event;
        switch (req.getVisibility()) {
        case PUBLIC:
          publicLocalizer.addResource(req);
          break;
        case PRIVATE:
        case APPLICATION:
          synchronized (privLocalizers) {
            LocalizerRunner localizer = privLocalizers.get(locId);
            if (localizer != null && localizer.killContainerLocalizer.get()) {
              // Old localizer thread has been stopped, remove it and create
              // a new localizer thread.
              LOG.info("New " + event.getType() + " localize request for "
                  + locId + ", remove old private localizer.");
              privLocalizers.remove(locId);
              localizer.interrupt();
              localizer = null;
            }
            if (null == localizer) {
              // Don't create a new localizer if this one has been recently
              // cleaned up - this can happen if localization requests come
              // in after cleanupPrivLocalizers has been called.
              if (recentlyCleanedLocalizers.containsKey(locId)) {
                LOG.info(
                    "Skipping localization request for recently cleaned " +
                    "localizer " + locId + " resource:" + req.getResource());
                break;
              }
              LOG.info("Created localizer for " + locId);
              localizer = new LocalizerRunner(req.getContext(), locId);
              privLocalizers.put(locId, localizer);
              localizer.start();
            }
            // 1) propagate event
            localizer.addResource(req);
          }
          break;
        }
        break;
      }
    }

    public void cleanupPrivLocalizers(String locId) {
      synchronized (privLocalizers) {
        LocalizerRunner localizer = privLocalizers.get(locId);
        recentlyCleanedLocalizers.put(locId, locId);
        if (null == localizer) {
          return; // ignore; already gone
        }
        privLocalizers.remove(locId);
        localizer.interrupt();
      }
    }

    public void endContainerLocalization(String locId) {
      LocalizerRunner localizer;
      synchronized (privLocalizers) {
        localizer = privLocalizers.get(locId);
        if (null == localizer) {
          return; // ignore
        }
      }
      localizer.endContainerLocalization();
    }
  }
  

  private static ExecutorService createLocalizerExecutor(Configuration conf) {
    int nThreads = conf.getInt(
        YarnConfiguration.NM_LOCALIZER_FETCH_THREAD_COUNT,
        YarnConfiguration.DEFAULT_NM_LOCALIZER_FETCH_THREAD_COUNT);
    ThreadFactory tf = new ThreadFactoryBuilder()
      .setNameFormat("PublicLocalizer #%d")
      .build();
    return HadoopExecutors.newFixedThreadPool(nThreads, tf);
  }


  class PublicLocalizer extends Thread {

    final FileContext lfs;
    final Configuration conf;
    final ExecutorService threadPool;
    final CompletionService<Path> queue;
    // Its shared between public localizer and dispatcher thread.
    final Map<Future<Path>,LocalizerResourceRequestEvent> pending;

    PublicLocalizer(Configuration conf) {
      super("Public Localizer");
      this.lfs = getLocalFileContext(conf);
      this.conf = conf;
      this.pending = Collections.synchronizedMap(
          new HashMap<Future<Path>, LocalizerResourceRequestEvent>());
      this.threadPool = createLocalizerExecutor(conf);
      this.queue = new ExecutorCompletionService<Path>(threadPool);
    }

    public void addResource(LocalizerResourceRequestEvent request) {
      // TODO handle failures, cancellation, requests by other containers
      LocalizedResource rsrc = request.getResource();
      LocalResourceRequest key = rsrc.getRequest();
      LOG.info("Downloading public resource: " + key);
      /*
       * Here multiple containers may request the same resource. So we need
       * to start downloading only when
       * 1) ResourceState == DOWNLOADING
       * 2) We are able to acquire non blocking semaphore lock.
       * If not we will skip this resource as either it is getting downloaded
       * or it FAILED / LOCALIZED.
       */

      if (rsrc.tryAcquire()) {
        if (rsrc.getState() == ResourceState.DOWNLOADING) {
          LocalResource resource = request.getResource().getRequest();
          try {
            Path publicRootPath =
                dirsHandler.getLocalPathForWrite("." + Path.SEPARATOR
                    + ContainerLocalizer.FILECACHE,
                  ContainerLocalizer.getEstimatedSize(resource), true);
            Path publicDirDestPath =
                publicRsrc.getPathForLocalization(key, publicRootPath,
                    delService);
            if (publicDirDestPath == null) {
              return;
            }
            if (!publicDirDestPath.getParent().equals(publicRootPath)) {
              createParentDirs(publicDirDestPath, publicRootPath);
              if (diskValidator != null) {
                diskValidator.checkStatus(
                    new File(publicDirDestPath.toUri().getPath()));
              } else {
                throw new DiskChecker.DiskErrorException(
                    "Disk Validator is null!");
              }
            }

            // explicitly synchronize pending here to avoid future task
            // completing and being dequeued before pending updated
            synchronized (pending) {
              pending.put(queue.submit(new FSDownload(lfs, null, conf,
                  publicDirDestPath, resource, request.getContext().getStatCache())),
                  request);
            }
          } catch (IOException e) {
            rsrc.unlock();
            publicRsrc.handle(new ResourceFailedLocalizationEvent(request
              .getResource().getRequest(), e.getMessage()));
            LOG.error("Local path for public localization is not found. "
                + " May be disks failed.", e);
          } catch (IllegalArgumentException ie) {
            rsrc.unlock();
            publicRsrc.handle(new ResourceFailedLocalizationEvent(request
                .getResource().getRequest(), ie.getMessage()));
            LOG.error("Local path for public localization is not found. "
                + " Incorrect path. " + request.getResource().getRequest()
                .getPath(), ie);
          } catch (RejectedExecutionException re) {
            rsrc.unlock();
            publicRsrc.handle(new ResourceFailedLocalizationEvent(request
              .getResource().getRequest(), re.getMessage()));
            LOG.error("Failed to submit rsrc " + rsrc + " for download."
                + " Either queue is full or threadpool is shutdown.", re);
          }
        } else {
          LOG.debug("Skip downloading resource: {} since it's in"
                + " state: {}", key, rsrc.getState());
          rsrc.unlock();
        }
      } else {
        LOG.debug("Skip downloading resource: {} since it is locked"
              + " by other threads", key);
      }
    }

    private void createParentDirs(Path destDirPath, Path destDirRoot)
        throws IOException {
      if (destDirPath == null || destDirPath.equals(destDirRoot)) {
        return;
      }
      createParentDirs(destDirPath.getParent(), destDirRoot);
      createDir(destDirPath, PUBLIC_FILECACHE_FOLDER_PERMS);
    }

    private void createDir(Path dirPath, FsPermission perms)
        throws IOException {
      lfs.mkdir(dirPath, perms, false);
      if (!perms.equals(perms.applyUMask(lfs.getUMask()))) {
        lfs.setPermission(dirPath, perms);
      }
    }

    @Override
    public void run() {
      try {
        // TODO shutdown, better error handling esp. DU
        while (!Thread.currentThread().isInterrupted()) {
          try {
            Future<Path> completed = queue.take();
            LocalizerResourceRequestEvent assoc = pending.remove(completed);
            try {
              if (null == assoc) {
                LOG.error("Localized unknown resource to " + completed);
                // TODO delete
                return;
              }
              Path local = completed.get();
              LocalResourceRequest key = assoc.getResource().getRequest();
              publicRsrc.handle(new ResourceLocalizedEvent(key, local, FileUtil
                .getDU(new File(local.toUri()))));
              assoc.getResource().unlock();
            } catch (ExecutionException e) {
              String user = assoc.getContext().getUser();
              ApplicationId applicationId = assoc.getContext().getContainerId().getApplicationAttemptId().getApplicationId();
              LocalResourcesTracker tracker =
                getLocalResourcesTracker(LocalResourceVisibility.APPLICATION, user, applicationId);
              final String diagnostics = "Failed to download resource " +
                  assoc.getResource() + " " + e.getCause();
              if(tracker != null) {
                tracker.handle(new ResourceFailedLocalizationEvent(
                    assoc.getResource().getRequest(), diagnostics));
              }
              publicRsrc.handle(new ResourceFailedLocalizationEvent(
                  assoc.getResource().getRequest(), diagnostics));
              LOG.error(diagnostics);
              assoc.getResource().unlock();
            } catch (CancellationException e) {
              // ignore; shutting down
            }
          } catch (InterruptedException e) {
            return;
          }
        }
      } catch(Throwable t) {
        LOG.error("Error: Shutting down", t);
      } finally {
        LOG.info("Public cache exiting");
        threadPool.shutdownNow();
      }
    }

  }

  /**
   * Runs the {@link ContainerLocalizer} itself in a separate process with
   * access to user's credentials. One {@link LocalizerRunner} per localizerId.
   * 
   */
  class LocalizerRunner extends Thread {

    final LocalizerContext context;
    final String localizerId;
    final Map<LocalResourceRequest,LocalizerResourceRequestEvent> scheduled;
    // Its a shared list between Private Localizer and dispatcher thread.
    final List<LocalizerResourceRequestEvent> pending;
    private AtomicBoolean killContainerLocalizer = new AtomicBoolean(false);

    // TODO: threadsafe, use outer?
    private final RecordFactory recordFactory =
      RecordFactoryProvider.getRecordFactory(getConfig());

    private final String tokenFileName;

    LocalizerRunner(LocalizerContext context, String localizerId) {
      super("LocalizerRunner for " + localizerId);
      this.context = context;
      this.localizerId = localizerId;
      this.pending =
          Collections
            .synchronizedList(new ArrayList<LocalizerResourceRequestEvent>());
      this.scheduled = new HashMap<>();
      tokenFileName =  String.format(ContainerExecutor.TOKEN_FILE_NAME_FMT,
         localizerId + Long.toHexString(System.currentTimeMillis()));
    }

    public void addResource(LocalizerResourceRequestEvent request) {
      pending.add(request);
    }

    public void endContainerLocalization() {
      killContainerLocalizer.set(true);
    }

    /**
     * Find next resource to be given to a spawned localizer.
     * 
     * @return the next resource to be localized
     */
    private ResourceLocalizationSpec findNextResource(
        String user, ApplicationId applicationId) {
      synchronized (pending) {
        for (Iterator<LocalizerResourceRequestEvent> i = pending.iterator();
            i.hasNext();) {
          LocalizerResourceRequestEvent evt = i.next();
          LocalizedResource nRsrc = evt.getResource();
          // Resource download should take place ONLY if resource is in
          // Downloading state
          if (nRsrc.getState() != ResourceState.DOWNLOADING) {
            i.remove();
            continue;
          }
          /*
           * Multiple containers will try to download the same resource. So the
           * resource download should start only if
           * 1) We can acquire a non blocking semaphore lock on resource
           * 2) Resource is still in DOWNLOADING state
           */
          if (nRsrc.tryAcquire()) {
            if (nRsrc.getState() == ResourceState.DOWNLOADING) {
              LocalResourceRequest nextRsrc = nRsrc.getRequest();
              LocalResource next =
                  recordFactory.newRecordInstance(LocalResource.class);
              next.setResource(URL.fromPath(nextRsrc.getPath()));
              next.setTimestamp(nextRsrc.getTimestamp());
              next.setType(nextRsrc.getType());
              next.setVisibility(evt.getVisibility());
              next.setPattern(evt.getPattern());
              ResourceLocalizationSpec nextSpec = null;
              try {
                LocalResourcesTracker tracker = getLocalResourcesTracker(
                    next.getVisibility(), user, applicationId);
                if (tracker != null) {
                  Path localPath = getPathForLocalization(next, tracker);
                  if (localPath != null) {
                    nextSpec = NodeManagerBuilderUtils.
                        newResourceLocalizationSpec(next, localPath);
                  }
                }
              } catch (IOException e) {
                LOG.error("local path for PRIVATE localization could not be " +
                    "found. Disks might have failed.", e);
              } catch (IllegalArgumentException e) {
                LOG.error("Incorrect path for PRIVATE localization."
                    + next.getResource().getFile(), e);
              } catch (URISyntaxException e) {
                LOG.error(
                    "Got exception in parsing URL of LocalResource:"
                        + next.getResource(), e);
              }
              if (nextSpec != null) {
                scheduled.put(nextRsrc, evt);
                return nextSpec;
              } else {
                // We failed to get a path for this, don't try to localize this
                // resource again.
                nRsrc.unlock();
                i.remove();
                continue;
              }
            } else {
              // Need to release acquired lock
              nRsrc.unlock();
            }
          }
        }
        return null;
      }
    }

    LocalizerHeartbeatResponse processHeartbeat(
        List<LocalResourceStatus> remoteResourceStatuses) {
      LocalizerHeartbeatResponse response =
        recordFactory.newRecordInstance(LocalizerHeartbeatResponse.class);
      String user = context.getUser();
      ApplicationId applicationId =
          context.getContainerId().getApplicationAttemptId().getApplicationId();

      boolean fetchFailed = false;
      // Update resource statuses.
      for (LocalResourceStatus stat : remoteResourceStatuses) {
        LocalResource rsrc = stat.getResource();
        LocalResourceRequest req = null;
        try {
          req = new LocalResourceRequest(rsrc);
        } catch (URISyntaxException e) {
          LOG.error(
              "Got exception in parsing URL of LocalResource:"
                  + rsrc.getResource(), e);
          continue;
        }
        LocalizerResourceRequestEvent assoc = scheduled.get(req);
        if (assoc == null) {
          // internal error
          LOG.error("Unknown resource reported: " + req);
          continue;
        }
        LocalResourcesTracker tracker =
            getLocalResourcesTracker(req.getVisibility(), user, applicationId);
        if (tracker == null) {
          // This is likely due to a race between heartbeat and
          // app cleaning up.
          continue;
        }
        switch (stat.getStatus()) {
          case FETCH_SUCCESS:
            // notify resource
            try {
              tracker.handle(new ResourceLocalizedEvent(req,
                  stat.getLocalPath().toPath(), stat.getLocalSize()));
            } catch (URISyntaxException e) { }

            // unlocking the resource and removing it from scheduled resource
            // list
            assoc.getResource().unlock();
            scheduled.remove(req);
            break;
          case FETCH_PENDING:
            break;
          case FETCH_FAILURE:
            final String diagnostics = stat.getException().toString();
            LOG.warn("{} failed for {} : {}", req, localizerId, diagnostics);
            fetchFailed = true;
            tracker.handle(new ResourceFailedLocalizationEvent(req,
                diagnostics));

            // unlocking the resource and removing it from scheduled resource
            // list
            assoc.getResource().unlock();
            scheduled.remove(req);
            break;
          default:
            LOG.info("Unknown status: " + stat.getStatus());
            fetchFailed = true;
            tracker.handle(new ResourceFailedLocalizationEvent(req,
                stat.getException().getMessage()));
            break;
        }
      }
      if (fetchFailed || killContainerLocalizer.get()) {
        response.setLocalizerAction(LocalizerAction.DIE);
        return response;
      }

      // Give the localizer resources for remote-fetching.
      List<ResourceLocalizationSpec> rsrcs =
          new ArrayList<ResourceLocalizationSpec>();

      /*
       * TODO : It doesn't support multiple downloads per ContainerLocalizer
       * at the same time. We need to think whether we should support this.
       */
      ResourceLocalizationSpec next = findNextResource(user, applicationId);
      if (next != null) {
        rsrcs.add(next);
      }

      response.setLocalizerAction(LocalizerAction.LIVE);
      response.setResourceSpecs(rsrcs);
      return response;
    }

    private Path getPathForLocalization(LocalResource rsrc,
        LocalResourcesTracker tracker) throws IOException, URISyntaxException {
      String user = context.getUser();
      ApplicationId appId =
          context.getContainerId().getApplicationAttemptId().getApplicationId();
      LocalResourceVisibility vis = rsrc.getVisibility();
      String cacheDirectory = null;
      if (vis == LocalResourceVisibility.PRIVATE) {// PRIVATE Only
        cacheDirectory = getUserFileCachePath(user);
      } else {// APPLICATION ONLY
        cacheDirectory = getAppFileCachePath(user, appId.toString());
      }
      Path dirPath =
          dirsHandler.getLocalPathForWrite(cacheDirectory,
            ContainerLocalizer.getEstimatedSize(rsrc), false);
      return tracker.getPathForLocalization(new LocalResourceRequest(rsrc),
          dirPath, delService);
    }

    @Override
    @SuppressWarnings("unchecked") // dispatcher not typed
    public void run() {
      Path nmPrivateCTokensPath = null;
      Throwable exception = null;
      try {
        // Get nmPrivateDir
        nmPrivateCTokensPath = dirsHandler.getLocalPathForWrite(
            NM_PRIVATE_DIR + Path.SEPARATOR + tokenFileName);

        // 0) init queue, etc.
        // 1) write credentials to private dir
        writeCredentials(nmPrivateCTokensPath);
        // 2) exec initApplication and wait
        if (dirsHandler.areDisksHealthy()) {
          exec.startLocalizer(new LocalizerStartContext.Builder()
              .setNmPrivateContainerTokens(nmPrivateCTokensPath)
              .setNmAddr(localizationServerAddress)
              .setUser(context.getUser())
              .setAppId(context.getContainerId()
                  .getApplicationAttemptId().getApplicationId().toString())
              .setLocId(localizerId)
              .setDirsHandler(dirsHandler)
              .build());
        } else {
          throw new IOException("All disks failed. "
              + dirsHandler.getDisksHealthReport(false));
        }
        // TODO handle ExitCodeException separately?
      } catch (ConfigurationException e) {
        exception = e;
        LOG.error("Failed to launch localizer for {}, due to configuration error. " +
            "Marking the node unhealthy.", localizerId, e);
        nmContext.getNodeStatusUpdater().reportException(e);
      } catch (Exception | FSError e) {
        exception = e;
      } finally {
        if (exception != null) {
          LOG.info("Localizer failed for "+localizerId, exception);
          // On error, report failure to Container and signal ABORT
          // Notify resource of failed localization
          ContainerId cId = context.getContainerId();
          dispatcher.getEventHandler().handle(new ContainerResourceFailedEvent(
              cId, null, exception.getMessage()));
        }
        List<Path> paths = new ArrayList<Path>();
        for (LocalizerResourceRequestEvent event : scheduled.values()) {
          // This means some resources were in downloading state. Schedule
          // deletion task for localization dir and tmp dir used for downloading
          Path locRsrcPath = event.getResource().getLocalPath();
          if (locRsrcPath != null) {
            Path locRsrcDirPath = locRsrcPath.getParent();
            paths.add(locRsrcDirPath);
            paths.add(new Path(locRsrcDirPath + "_tmp"));
          }
          event.getResource().unlock();
        }
        if (!paths.isEmpty()) {
          FileDeletionTask deletionTask = new FileDeletionTask(delService,
              context.getUser(), null, paths);
          delService.delete(deletionTask);
        }
        FileDeletionTask deletionTask = new FileDeletionTask(delService, null,
            nmPrivateCTokensPath, null);
        delService.delete(deletionTask);
      }
    }

    private Credentials getSystemCredentialsSentFromRM(
        LocalizerContext localizerContext) throws IOException {
      ApplicationId appId =
          localizerContext.getContainerId().getApplicationAttemptId()
            .getApplicationId();
      Credentials systemCredentials =
          nmContext.getSystemCredentialsForApps().get(appId);
      if (systemCredentials == null) {
        return null;
      }

      LOG.debug("Adding new framework-token for {} for localization: {}",
          appId, systemCredentials.getAllTokens());

      return systemCredentials;
    }
    
    private void writeCredentials(Path nmPrivateCTokensPath)
        throws IOException {
      DataOutputStream tokenOut = null;
      try {
        Credentials credentials = context.getCredentials();
        if (UserGroupInformation.isSecurityEnabled()) {
          Credentials systemCredentials =
              getSystemCredentialsSentFromRM(context);
          if (systemCredentials != null) {
            credentials = systemCredentials;
          }
        }

        FileContext lfs = getLocalFileContext(getConfig());
        tokenOut =
            lfs.create(nmPrivateCTokensPath, EnumSet.of(CREATE, OVERWRITE));
        LOG.info("Writing credentials to the nmPrivate file "
            + nmPrivateCTokensPath.toString());
        if (LOG.isDebugEnabled()) {
          LOG.debug("Credentials list in {}: " + nmPrivateCTokensPath);
          for (Token<? extends TokenIdentifier> tk : credentials
              .getAllTokens()) {
            LOG.debug("{} : {}", tk, buildTokenFingerprint(tk));
          }
        }
        if (UserGroupInformation.isSecurityEnabled()) {
          credentials = new Credentials(credentials);
          LocalizerTokenIdentifier id = secretManager.createIdentifier();
          Token<LocalizerTokenIdentifier> localizerToken =
              new Token<LocalizerTokenIdentifier>(id, secretManager);
          credentials.addToken(id.getKind(), localizerToken);
        }
        credentials.writeTokenStorageToStream(tokenOut);
      } finally {
        if (tokenOut != null) {
          tokenOut.close();
        }
      }
    }

  }

  /**
   * Returns a fingerprint of a token.  The fingerprint is suitable for use in
   * logging, because it cannot be used to determine the secret.  The
   * fingerprint is built using the first 10 bytes of a SHA-256 hash of the
   * string encoding of the token.  The returned string contains the hex
   * representation of each byte, delimited by a space.
   *
   * @param tk token
   * @return token fingerprint
   * @throws IOException if there is an I/O error
   */
  @VisibleForTesting
  static String buildTokenFingerprint(Token<? extends TokenIdentifier> tk)
      throws IOException {
    char[] digest = DigestUtils.sha256Hex(tk.encodeToUrlString()).toCharArray();
    StringBuilder fingerprint = new StringBuilder();
    for (int i = 0; i < 10; ++i) {
      if (i > 0) {
        fingerprint.append(' ');
      }
      fingerprint.append(digest[2 * i]);
      fingerprint.append(digest[2 * i + 1]);
    }
    return fingerprint.toString();
  }

  static class CacheCleanup extends Thread {

    private final Dispatcher dispatcher;

    public CacheCleanup(Dispatcher dispatcher) {
      super("CacheCleanup");
      this.dispatcher = dispatcher;
    }

    @Override
    @SuppressWarnings("unchecked") // dispatcher not typed
    public void run() {
      dispatcher.getEventHandler().handle(
          new LocalizationEvent(LocalizationEventType.CACHE_CLEANUP));
    }

  }

  private void initializeLocalDirs(FileContext lfs) {
    List<String> localDirs = dirsHandler.getLocalDirs();
    for (String localDir : localDirs) {
      initializeLocalDir(lfs, localDir);
    }
  }

  private void initializeLocalDir(FileContext lfs, String localDir) {

    Map<Path, FsPermission> pathPermissionMap = getLocalDirsPathPermissionsMap(localDir);
    for (Map.Entry<Path, FsPermission> entry : pathPermissionMap.entrySet()) {
      FileStatus status;
      try {
        status = lfs.getFileStatus(entry.getKey());
      }
      catch(FileNotFoundException fs) {
        status = null;
      }
      catch(IOException ie) {
        String msg = "Could not get file status for local dir " + entry.getKey();
        LOG.warn(msg, ie);
        throw new YarnRuntimeException(msg, ie);
      }
      if(status == null) {
        try {
          lfs.mkdir(entry.getKey(), entry.getValue(), true);
          status = lfs.getFileStatus(entry.getKey());
        } catch (IOException e) {
          String msg = "Could not initialize local dir " + entry.getKey();
          LOG.warn(msg, e);
          throw new YarnRuntimeException(msg, e);
        }
      }
      FsPermission perms = status.getPermission();
      if(!perms.equals(entry.getValue())) {
        try {
          lfs.setPermission(entry.getKey(), entry.getValue());
        }
        catch(IOException ie) {
          String msg = "Could not set permissions for local dir " + entry.getKey();
          LOG.warn(msg, ie);
          throw new YarnRuntimeException(msg, ie);
        }
      }
    }
  }

  private void initializeLogDirs(FileContext lfs) {
    List<String> logDirs = dirsHandler.getLogDirs();
    for (String logDir : logDirs) {
      initializeLogDir(lfs, logDir);
    }
  }

  private void initializeLogDir(FileContext fs, String logDir) {
    try {
      fs.mkdir(new Path(logDir), null, true);
    } catch (FileAlreadyExistsException fe) {
      // do nothing
    } catch (IOException e) {
      String msg = "Could not initialize log dir " + logDir;
      LOG.warn(msg, e);
      throw new YarnRuntimeException(msg, e);
    }
  }

  private void cleanupLogDirs(FileContext fs, DeletionService del) {
    for (String logDir : dirsHandler.getLogDirsForCleanup()) {
      try {
        cleanupLogDir(fs, del, logDir);
      } catch (IOException e) {
        LOG.warn("failed to cleanup app log dir " + logDir, e);
      }
    }
  }

  private void cleanupLogDir(FileContext fs, DeletionService del,
      String logDir) throws IOException {
    if (!fs.util().exists(new Path(logDir))){
      return;
    }
    renameAppLogDir(logDir);
    deleteAppLogDir(fs, del, logDir);
  }

  private void renameAppLogDir(String logDir) throws IOException {
    long currentTimeStamp = System.currentTimeMillis();
    RemoteIterator<FileStatus> fileStatuses =
        lfs.listStatus(new Path(logDir));
    if (fileStatuses != null) {
      while (fileStatuses.hasNext()) {
        FileStatus fileStatus = fileStatuses.next();
        String appName = fileStatus.getPath().getName();
        if (appName.matches("^application_\\d+_\\d+$")) {
          lfs.rename(new Path(logDir, appName),
              new Path(logDir, appName + "_DEL_" + currentTimeStamp));
        }
      }
    }
  }

  private void deleteAppLogDir(FileContext fs, DeletionService del,
      String logDir) throws IOException {
    RemoteIterator<FileStatus> fileStatuses =
        fs.listStatus(new Path(logDir));
    if (fileStatuses != null) {
      while (fileStatuses.hasNext()) {
        FileStatus fileStatus = fileStatuses.next();
        String appName = fileStatus.getPath().getName();
        if (appName.matches("^application_\\d+_\\d+_DEL_\\d+$")) {
          LOG.info("delete app log dir," + appName);
          FileDeletionTask deletionTask = new FileDeletionTask(del, null,
              fileStatus.getPath(), null);
          del.delete(deletionTask);
        }
      }
    }
  }

  private void cleanUpLocalDirs(FileContext lfs, DeletionService del) {
    for (String localDir : dirsHandler.getLocalDirsForCleanup()) {
      cleanUpLocalDir(lfs, del, localDir);
    }
  }

  private void cleanUpLocalDir(FileContext lfs, DeletionService del,
      String localDir) {
    long currentTimeStamp = System.currentTimeMillis();
    renameLocalDir(lfs, localDir, ContainerLocalizer.USERCACHE,
      currentTimeStamp);
    renameLocalDir(lfs, localDir, ContainerLocalizer.FILECACHE,
      currentTimeStamp);
    renameLocalDir(lfs, localDir, ResourceLocalizationService.NM_PRIVATE_DIR,
      currentTimeStamp);
    try {
      deleteLocalDir(lfs, del, localDir);
    } catch (IOException e) {
      // Do nothing, just give the warning
      LOG.warn("Failed to delete localDir: " + localDir);
    }
  }

  private void renameLocalDir(FileContext lfs, String localDir,
      String localSubDir, long currentTimeStamp) {
    try {
      lfs.rename(new Path(localDir, localSubDir), new Path(
          localDir, localSubDir + "_DEL_" + currentTimeStamp));
    } catch (FileNotFoundException ex) {
      // No need to handle this exception
      // localSubDir may not be exist
    } catch (Exception ex) {
      // Do nothing, just give the warning
      LOG.warn("Failed to rename the local file under " +
          localDir + "/" + localSubDir);
    }
  }

  private void deleteLocalDir(FileContext lfs, DeletionService del,
      String localDir) throws IOException {
    RemoteIterator<FileStatus> fileStatus = lfs.listStatus(new Path(localDir));
    if (fileStatus != null) {
      while (fileStatus.hasNext()) {
        FileStatus status = fileStatus.next();
        try {
          if (status.getPath().getName().matches(".*" +
              ContainerLocalizer.USERCACHE + "_DEL_.*")) {
            LOG.info("usercache path : " + status.getPath().toString());
            cleanUpFilesPerUserDir(lfs, del, status.getPath());
          } else if (status.getPath().getName()
              .matches(".*" + NM_PRIVATE_DIR + "_DEL_.*")
              ||
              status.getPath().getName()
                  .matches(".*" + ContainerLocalizer.FILECACHE + "_DEL_.*")) {
            FileDeletionTask deletionTask = new FileDeletionTask(del, null,
                status.getPath(), null);
            del.delete(deletionTask);
          }
        } catch (IOException ex) {
          // Do nothing, just give the warning
          LOG.warn("Failed to delete this local Directory: " +
              status.getPath().getName());
        }
      }
    }
  }

  private void cleanUpFilesPerUserDir(FileContext lfs, DeletionService del,
      Path userDirPath) throws IOException {
    RemoteIterator<FileStatus> userDirStatus = lfs.listStatus(userDirPath);
    FileDeletionTask dependentDeletionTask = new FileDeletionTask(del, null,
        userDirPath, new ArrayList<Path>());
    if (userDirStatus != null && userDirStatus.hasNext()) {
      List<FileDeletionTask> deletionTasks = new ArrayList<FileDeletionTask>();
      while (userDirStatus.hasNext()) {
        FileStatus status = userDirStatus.next();
        String owner = status.getOwner();
        List<Path> pathList = new ArrayList<>();
        pathList.add(status.getPath());
        FileDeletionTask deletionTask = new FileDeletionTask(del, owner, null,
            pathList);
        deletionTask.addDeletionTaskDependency(dependentDeletionTask);
        deletionTasks.add(deletionTask);
      }
      for (FileDeletionTask task : deletionTasks) {
        del.delete(task);
      }
    } else {
      del.delete(dependentDeletionTask);
    }
  }
  
  /**
   * Check each local dir to ensure it has been setup correctly and will
   * attempt to fix any issues it finds.
   * @return void
   */
  @VisibleForTesting
  void checkAndInitializeLocalDirs() {
    List<String> dirs = dirsHandler.getLocalDirs();
    List<String> checkFailedDirs = new ArrayList<String>();
    for (String dir : dirs) {
      try {
        checkLocalDir(dir);
      } catch (YarnRuntimeException e) {
        checkFailedDirs.add(dir);
      }
    }
    for (String dir : checkFailedDirs) {
      LOG.info("Attempting to initialize " + dir);
      initializeLocalDir(lfs, dir);
      try {
        checkLocalDir(dir);
      } catch (YarnRuntimeException e) {
        String msg =
            "Failed to setup local dir " + dir + ", which was marked as good.";
        LOG.warn(msg, e);
        throw new YarnRuntimeException(msg, e);
      }
    }
  }

  private boolean checkLocalDir(String localDir) {

    Map<Path, FsPermission> pathPermissionMap = getLocalDirsPathPermissionsMap(localDir);

    for (Map.Entry<Path, FsPermission> entry : pathPermissionMap.entrySet()) {
      FileStatus status;
      try {
        status = lfs.getFileStatus(entry.getKey());
      } catch (Exception e) {
        String msg =
            "Could not carry out resource dir checks for " + localDir
                + ", which was marked as good";
        LOG.warn(msg, e);
        throw new YarnRuntimeException(msg, e);
      }

      if (!status.getPermission().equals(entry.getValue())) {
        String msg =
            "Permissions incorrectly set for dir " + entry.getKey()
                + ", should be " + entry.getValue() + ", actual value = "
                + status.getPermission();
        LOG.warn(msg);
        throw new YarnRuntimeException(msg);
      }
    }
    return true;
  }

  private Map<Path, FsPermission> getLocalDirsPathPermissionsMap(String localDir) {
    Map<Path, FsPermission> localDirPathFsPermissionsMap = new HashMap<Path, FsPermission>();

    FsPermission defaultPermission =
        FsPermission.getDirDefault().applyUMask(lfs.getUMask());
    FsPermission nmPrivatePermission =
        NM_PRIVATE_PERM.applyUMask(lfs.getUMask());

    Path userDir = new Path(localDir, ContainerLocalizer.USERCACHE);
    Path fileDir = new Path(localDir, ContainerLocalizer.FILECACHE);
    Path sysDir = new Path(localDir, NM_PRIVATE_DIR);

    localDirPathFsPermissionsMap.put(userDir, defaultPermission);
    localDirPathFsPermissionsMap.put(fileDir, defaultPermission);
    localDirPathFsPermissionsMap.put(sysDir, nmPrivatePermission);
    return localDirPathFsPermissionsMap;
  }

  public LocalizedResource getLocalizedResource(LocalResourceRequest req,
      String user, ApplicationId appId) {
    LocalResourcesTracker tracker =
        getLocalResourcesTracker(req.getVisibility(), user, appId);
    if (tracker == null) {
      return null;
    }
    return tracker.getLocalizedResource(req);
  }
}