InMemorySCMStore.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.server.sharedcachemanager.store;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.StringInterner;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.sharedcache.SharedCacheUtil;
import org.apache.hadoop.yarn.server.sharedcachemanager.AppChecker;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * A thread safe version of an in-memory SCM store. The thread safety is
 * implemented with two key pieces: (1) at the mapping level a ConcurrentHashMap
 * is used to allow concurrency to resources and their associated references,
 * and (2) a key level lock is used to ensure mutual exclusion between any
 * operation that accesses a resource with the same key. <br>
 * <br>
 * To ensure safe key-level locking, we use the original string key and intern
 * it weakly using hadoop's <code>StringInterner</code>. It avoids the pitfalls
 * of using built-in String interning. The interned strings are also weakly
 * referenced, so it can be garbage collected once it is done. And there is
 * little risk of keys being available for other parts of the code so they can
 * be used as locks accidentally. <br>
 * <br>
 * Resources in the in-memory store are evicted based on a time staleness
 * criteria. If a resource is not referenced (i.e. used) for a given period, it
 * is designated as a stale resource and is considered evictable.
 */
@Private
@Evolving
public class InMemorySCMStore extends SCMStore {
  private static final Logger LOG =
      LoggerFactory.getLogger(InMemorySCMStore.class);

  private final Map<String, SharedCacheResource> cachedResources =
      new ConcurrentHashMap<String, SharedCacheResource>();
  private Collection<ApplicationId> initialApps =
      new ArrayList<ApplicationId>();
  private final Object initialAppsLock = new Object();
  private long startTime;
  private int stalenessMinutes;
  private ScheduledExecutorService scheduler;
  private int initialDelayMin;
  private int checkPeriodMin;

  public InMemorySCMStore() {
    super(InMemorySCMStore.class.getName());
  }

  @VisibleForTesting
  public InMemorySCMStore(AppChecker appChecker) {
    super(InMemorySCMStore.class.getName(), appChecker);
  }

  private String intern(String key) {
    return StringInterner.weakIntern(key);
  }

  /**
   * The in-memory store bootstraps itself from the shared cache entries that
   * exist in HDFS.
   */
  @Override
  protected void serviceInit(Configuration conf) throws Exception {

    this.startTime = System.currentTimeMillis();
    this.initialDelayMin = getInitialDelay(conf);
    this.checkPeriodMin = getCheckPeriod(conf);
    this.stalenessMinutes = getStalenessPeriod(conf);

    bootstrap(conf);

    ThreadFactory tf =
        new ThreadFactoryBuilder().setNameFormat("InMemorySCMStore")
            .build();
    scheduler = HadoopExecutors.newSingleThreadScheduledExecutor(tf);

    super.serviceInit(conf);
  }

  @Override
  protected void serviceStart() throws Exception {
    // start composed services first
    super.serviceStart();

    // Get initial list of running applications
    LOG.info("Getting the active app list to initialize the in-memory scm store");
    synchronized (initialAppsLock) {
      initialApps = appChecker.getActiveApplications();
    }
    LOG.info(initialApps.size() + " apps recorded as active at this time");

    Runnable task = new AppCheckTask(appChecker);
    scheduler.scheduleAtFixedRate(task, initialDelayMin, checkPeriodMin,
        TimeUnit.MINUTES);
    LOG.info("Scheduled the in-memory scm store app check task to run every "
        + checkPeriodMin + " minutes.");
  }

  @Override
  protected void serviceStop() throws Exception {
    LOG.info("Stopping the " + InMemorySCMStore.class.getSimpleName()
        + " service.");
    if (scheduler != null) {
      LOG.info("Shutting down the background thread.");
      scheduler.shutdownNow();
      try {
        if (!scheduler.awaitTermination(10, TimeUnit.SECONDS)) {
          LOG.warn("Gave up waiting for the app check task to shutdown.");
        }
      } catch (InterruptedException e) {
        LOG.warn(
            "The InMemorySCMStore was interrupted while shutting down the "
                + "app check task.", e);
      }
      LOG.info("The background thread stopped.");
    }
    super.serviceStop();
  }

  private void bootstrap(Configuration conf) throws IOException {
    Map<String, String> initialCachedResources =
        getInitialCachedResources(FileSystem.get(conf), conf);
    LOG.info("Bootstrapping from " + initialCachedResources.size()
        + " cache resources located in the file system");
    Iterator<Map.Entry<String, String>> it =
        initialCachedResources.entrySet().iterator();
    while (it.hasNext()) {
      Map.Entry<String, String> e = it.next();
      String key = intern(e.getKey());
      String fileName = e.getValue();
      SharedCacheResource resource = new SharedCacheResource(fileName);
      // we don't hold the lock for this as it is done as part of serviceInit
      cachedResources.put(key, resource);
      // clear out the initial resource to reduce the footprint
      it.remove();
    }
    LOG.info("Bootstrapping complete");
  }

  @VisibleForTesting
  Map<String, String> getInitialCachedResources(FileSystem fs,
      Configuration conf) throws IOException {
    // get the root directory for the shared cache
    String location =
        conf.get(YarnConfiguration.SHARED_CACHE_ROOT,
            YarnConfiguration.DEFAULT_SHARED_CACHE_ROOT);
    Path root = new Path(location);
    try {
      fs.getFileStatus(root);
    } catch (FileNotFoundException e) {
      String message =
          "The shared cache root directory " + location + " was not found";
      LOG.error(message);
      throw (IOException)new FileNotFoundException(message)
          .initCause(e);
    }

    int nestedLevel = SharedCacheUtil.getCacheDepth(conf);
    // now traverse individual directories and process them
    // the directory structure is specified by the nested level parameter
    // (e.g. 9/c/d/<checksum>/file)
    String pattern = SharedCacheUtil.getCacheEntryGlobPattern(nestedLevel+1);

    LOG.info("Querying for all individual cached resource files");
    FileStatus[] entries = fs.globStatus(new Path(root, pattern));
    int numEntries = entries == null ? 0 : entries.length;
    LOG.info("Found " + numEntries + " files: processing for one resource per "
        + "key");

    Map<String, String> initialCachedEntries = new HashMap<String, String>();
    if (entries != null) {
      for (FileStatus entry : entries) {
        Path file = entry.getPath();
        String fileName = file.getName();
        if (entry.isFile()) {
          // get the parent to get the checksum
          Path parent = file.getParent();
          if (parent != null) {
            // the name of the immediate parent directory is the checksum
            String key = parent.getName();
            // make sure we insert only one file per checksum whichever comes
            // first
            if (initialCachedEntries.containsKey(key)) {
              LOG.warn("Key " + key + " is already mapped to file "
                  + initialCachedEntries.get(key) + "; file " + fileName
                  + " will not be added");
            } else {
              initialCachedEntries.put(key, fileName);
            }
          }
        }
      }
    }
    LOG.info("A total of " + initialCachedEntries.size()
        + " files are now mapped");
    return initialCachedEntries;
  }

  /**
   * Adds the given resource to the store under the key and the filename. If the
   * entry is already found, it returns the existing filename. It represents the
   * state of the store at the time of this query. The entry may change or even
   * be removed once this method returns. The caller should be prepared to
   * handle that situation.
   * 
   * @return the filename of the newly inserted resource or that of the existing
   *         resource
   */
  @Override
  public String addResource(String key, String fileName) {
    String interned = intern(key);
    synchronized (interned) {
      SharedCacheResource resource = cachedResources.get(interned);
      if (resource == null) {
        resource = new SharedCacheResource(fileName);
        cachedResources.put(interned, resource);
      }
      return resource.getFileName();
    }
  }

  /**
   * Adds the provided resource reference to the cache resource under the key,
   * and updates the access time. If it returns a non-null value, the caller may
   * safely assume that the resource will not be removed at least until the app
   * in this resource reference has terminated.
   * 
   * @return the filename of the resource, or null if the resource is not found
   */
  @Override
  public String addResourceReference(String key,
      SharedCacheResourceReference ref) {
    String interned = intern(key);
    synchronized (interned) {
      SharedCacheResource resource = cachedResources.get(interned);
      if (resource == null) { // it's not mapped
        return null;
      }
      resource.addReference(ref);
      resource.updateAccessTime();
      return resource.getFileName();
    }
  }

  /**
   * Returns the list of resource references currently registered under the
   * cache entry. If the list is empty, it returns an empty collection. The
   * returned collection is unmodifiable and a snapshot of the information at
   * the time of the query. The state may change after this query returns. The
   * caller should handle the situation that some or all of these resource
   * references are no longer relevant.
   * 
   * @return the collection that contains the resource references associated
   *         with the resource; or an empty collection if no resource references
   *         are registered under this resource
   */
  @Override
  public Collection<SharedCacheResourceReference> getResourceReferences(String key) {
    String interned = intern(key);
    synchronized (interned) {
      SharedCacheResource resource = cachedResources.get(interned);
      if (resource == null) {
        return Collections.emptySet();
      }
      Set<SharedCacheResourceReference> refs =
          new HashSet<SharedCacheResourceReference>(
              resource.getResourceReferences());
      return Collections.unmodifiableSet(refs);
    }
  }

  /**
   * Removes the provided resource reference from the resource. If the resource
   * does not exist, nothing will be done.
   */
  @Override
  public boolean removeResourceReference(String key, SharedCacheResourceReference ref,
      boolean updateAccessTime) {
    String interned = intern(key);
    synchronized (interned) {
      boolean removed = false;
      SharedCacheResource resource = cachedResources.get(interned);
      if (resource != null) {
        Set<SharedCacheResourceReference> resourceRefs =
            resource.getResourceReferences();
        removed = resourceRefs.remove(ref);
        if (updateAccessTime) {
          resource.updateAccessTime();
        }
      }
      return removed;
    }
  }

  /**
   * Removes the provided collection of resource references from the resource.
   * If the resource does not exist, nothing will be done.
   */
  @Override
  public void removeResourceReferences(String key,
      Collection<SharedCacheResourceReference> refs, boolean updateAccessTime) {
    String interned = intern(key);
    synchronized (interned) {
      SharedCacheResource resource = cachedResources.get(interned);
      if (resource != null) {
        Set<SharedCacheResourceReference> resourceRefs =
            resource.getResourceReferences();
        resourceRefs.removeAll(refs);
        if (updateAccessTime) {
          resource.updateAccessTime();
        }
      }
    }
  }

  /**
   * Provides atomicity for the method.
   */
  @Override
  public void cleanResourceReferences(String key) throws YarnException {
    String interned = intern(key);
    synchronized (interned) {
      super.cleanResourceReferences(key);
    }
  }

  /**
   * Removes the given resource from the store. Returns true if the resource is
   * found and removed or if the resource is not found. Returns false if it was
   * unable to remove the resource because the resource reference list was not
   * empty.
   */
  @Override
  public boolean removeResource(String key) {
    String interned = intern(key);
    synchronized (interned) {
      SharedCacheResource resource = cachedResources.get(interned);
      if (resource == null) {
        return true;
      }

      if (!resource.getResourceReferences().isEmpty()) {
        return false;
      }
      // no users
      cachedResources.remove(interned);
      return true;
    }
  }

  /**
   * Obtains the access time for a resource. It represents the view of the
   * resource at the time of the query. The value may have been updated at a
   * later point.
   * 
   * @return the access time of the resource if found; -1 if the resource is not
   *         found
   */
  @VisibleForTesting
  long getAccessTime(String key) {
    String interned = intern(key);
    synchronized (interned) {
      SharedCacheResource resource = cachedResources.get(interned);
      return resource == null ? -1 : resource.getAccessTime();
    }
  }

  @Override
  public boolean isResourceEvictable(String key, FileStatus file) {
    synchronized (initialAppsLock) {
      if (initialApps.size() > 0) {
        return false;
      }
    }

    long staleTime =
        System.currentTimeMillis()
            - TimeUnit.MINUTES.toMillis(this.stalenessMinutes);
    long accessTime = getAccessTime(key);
    if (accessTime == -1) {
      // check modification time
      long modTime = file.getModificationTime();
      // if modification time is older then the store startup time, we need to
      // just use the store startup time as the last point of certainty
      long lastUse = modTime < this.startTime ? this.startTime : modTime;
      return lastUse < staleTime;
    } else {
      // check access time
      return accessTime < staleTime;
    }
  }

  private static int getStalenessPeriod(Configuration conf) {
    int stalenessMinutes =
        conf.getInt(YarnConfiguration.IN_MEMORY_STALENESS_PERIOD_MINS,
            YarnConfiguration.DEFAULT_IN_MEMORY_STALENESS_PERIOD_MINS);
    // non-positive value is invalid; use the default
    if (stalenessMinutes <= 0) {
      throw new HadoopIllegalArgumentException("Non-positive staleness value: "
          + stalenessMinutes
          + ". The staleness value must be greater than zero.");
    }
    return stalenessMinutes;
  }

  private static int getInitialDelay(Configuration conf) {
    int initialMinutes =
        conf.getInt(YarnConfiguration.IN_MEMORY_INITIAL_DELAY_MINS,
            YarnConfiguration.DEFAULT_IN_MEMORY_INITIAL_DELAY_MINS);
    // non-positive value is invalid; use the default
    if (initialMinutes <= 0) {
      throw new HadoopIllegalArgumentException(
          "Non-positive initial delay value: " + initialMinutes
              + ". The initial delay value must be greater than zero.");
    }
    return initialMinutes;
  }

  private static int getCheckPeriod(Configuration conf) {
    int checkMinutes =
        conf.getInt(YarnConfiguration.IN_MEMORY_CHECK_PERIOD_MINS,
            YarnConfiguration.DEFAULT_IN_MEMORY_CHECK_PERIOD_MINS);
    // non-positive value is invalid; use the default
    if (checkMinutes <= 0) {
      throw new HadoopIllegalArgumentException(
          "Non-positive check period value: " + checkMinutes
              + ". The check period value must be greater than zero.");
    }
    return checkMinutes;
  }

  @Private
  @Evolving
  class AppCheckTask implements Runnable {

    private final AppChecker taskAppChecker;

    public AppCheckTask(AppChecker appChecker) {
      this.taskAppChecker = appChecker;
    }

    @Override
    public void run() {
      try {
        LOG.info("Checking the initial app list for finished applications.");
        synchronized (initialAppsLock) {
          if (initialApps.isEmpty()) {
            // we're fine, no-op; there are no active apps that were running at
            // the time of the service start
          } else {
            LOG.info("Looking into " + initialApps.size()
                + " apps to see if they are still active");
            Iterator<ApplicationId> it = initialApps.iterator();
            while (it.hasNext()) {
              ApplicationId id = it.next();
              try {
                if (!taskAppChecker.isApplicationActive(id)) {
                  // remove it from the list
                  it.remove();
                }
              } catch (YarnException e) {
                LOG.warn("Exception while checking the app status;"
                    + " will leave the entry in the list", e);
                // continue
              }
            }
          }
          LOG.info("There are now " + initialApps.size()
              + " entries in the list");
        }
      } catch (Throwable e) {
        LOG.error(
            "Unexpected exception thrown during in-memory store app check task."
                + " Rescheduling task.", e);
      }

    }
  }
}