ReencryptionHandler.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 * <p>
 * http://www.apache.org/licenses/LICENSE-2.0
 * <p>
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.hdfs.server.namenode;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.util.Preconditions;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersion;
import org.apache.hadoop.fs.FileEncryptionInfo;
import org.apache.hadoop.fs.XAttr;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.ReencryptionStatus;
import org.apache.hadoop.hdfs.protocol.ZoneReencryptionStatus;
import org.apache.hadoop.hdfs.protocol.ZoneReencryptionStatus.State;
import org.apache.hadoop.hdfs.server.namenode.FSTreeTraverser.TraverseInfo;
import org.apache.hadoop.hdfs.server.namenode.ReencryptionUpdater.FileEdekInfo;
import org.apache.hadoop.hdfs.server.namenode.ReencryptionUpdater.ReencryptionTask;
import org.apache.hadoop.hdfs.server.namenode.ReencryptionUpdater.ZoneSubmissionTracker;
import org.apache.hadoop.hdfs.util.RwLockMode;
import org.apache.hadoop.ipc.RetriableException;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.StopWatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.security.GeneralSecurityException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REENCRYPT_BATCH_SIZE_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REENCRYPT_BATCH_SIZE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REENCRYPT_SLEEP_INTERVAL_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REENCRYPT_SLEEP_INTERVAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REENCRYPT_THROTTLE_LIMIT_HANDLER_RATIO_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REENCRYPT_THROTTLE_LIMIT_HANDLER_RATIO_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REENCRYPT_EDEK_THREADS_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REENCRYPT_EDEK_THREADS_KEY;

/**
 * Class for handling re-encrypt EDEK operations.
 * <p>
 * For each EZ, ReencryptionHandler walks the tree in a depth-first order,
 * and submits batches of (files + existing edeks) as re-encryption tasks
 * to a thread pool. Each thread in the pool then contacts the KMS to
 * re-encrypt the edeks. ReencryptionUpdater tracks the tasks and updates
 * file xattrs with the new edeks.
 * <p>
 * File renames are disabled in the EZ that's being re-encrypted. Newly created
 * files will have new edeks, because the edek cache is drained upon the
 * submission of a re-encryption command.
 * <p>
 * It is assumed only 1 ReencryptionHandler will be running, because:
 *   1. The bottleneck of the entire re-encryption appears to be on the KMS.
 *   2. Even with multiple handlers, since updater requires writelock and is
 * single-threaded, the performance gain is limited.
 * <p>
 * This class uses the FSDirectory lock for synchronization.
 */
@InterfaceAudience.Private
public class ReencryptionHandler implements Runnable {

  public static final Logger LOG =
      LoggerFactory.getLogger(ReencryptionHandler.class);

  // 2000 is based on buffer size = 512 * 1024, and SetXAttr op size is
  // 100 - 200 bytes (depending on the xattr value).
  // The buffer size is hard-coded, see outputBufferCapacity from QJM.
  private static final int MAX_BATCH_SIZE_WITHOUT_FLOODING = 2000;

  private final EncryptionZoneManager ezManager;
  private final FSDirectory dir;
  private final long interval;
  private final int reencryptBatchSize;
  private double throttleLimitHandlerRatio;
  private final int reencryptThreadPoolSize;
  // stopwatches for throttling
  private final StopWatch throttleTimerAll = new StopWatch();
  private final StopWatch throttleTimerLocked = new StopWatch();

  private ExecutorCompletionService<ReencryptionTask> batchService;
  private BlockingQueue<Runnable> taskQueue;
  // protected by ReencryptionHandler object lock
  private final Map<Long, ZoneSubmissionTracker> submissions = new HashMap<>();

  // The current batch that the handler is working on. Handler is designed to
  // be single-threaded, see class javadoc for more details.
  private ReencryptionBatch currentBatch;

  private final ReencryptionPendingInodeIdCollector traverser;

  private final ReencryptionUpdater reencryptionUpdater;
  private ExecutorService updaterExecutor;

  // Vars for unit tests.
  private volatile boolean shouldPauseForTesting = false;
  private volatile int pauseAfterNthSubmission = 0;

  /**
   * Stop the re-encryption updater thread, as well as all EDEK re-encryption
   * tasks submitted.
   */
  void stopThreads() {
    assert dir.hasWriteLock();
    synchronized (this) {
      for (ZoneSubmissionTracker zst : submissions.values()) {
        zst.cancelAllTasks();
      }
    }
    if (updaterExecutor != null) {
      updaterExecutor.shutdownNow();
    }
  }

  /**
   * Start the re-encryption updater thread.
   */
  void startUpdaterThread() {
    updaterExecutor = Executors.newSingleThreadExecutor(
        new ThreadFactoryBuilder().setDaemon(true)
            .setNameFormat("reencryptionUpdaterThread #%d").build());
    updaterExecutor.execute(reencryptionUpdater);
  }

  @VisibleForTesting
  synchronized void pauseForTesting() {
    shouldPauseForTesting = true;
    LOG.info("Pausing re-encrypt handler for testing.");
    notify();
  }

  @VisibleForTesting
  synchronized void resumeForTesting() {
    shouldPauseForTesting = false;
    LOG.info("Resuming re-encrypt handler for testing.");
    notify();
  }

  @VisibleForTesting
  void pauseForTestingAfterNthSubmission(final int count) {
    assert pauseAfterNthSubmission == 0;
    pauseAfterNthSubmission = count;
  }

  @VisibleForTesting
  void pauseUpdaterForTesting() {
    reencryptionUpdater.pauseForTesting();
  }

  @VisibleForTesting
  void resumeUpdaterForTesting() {
    reencryptionUpdater.resumeForTesting();
  }

  @VisibleForTesting
  void pauseForTestingAfterNthCheckpoint(final long zoneId, final int count) {
    reencryptionUpdater.pauseForTestingAfterNthCheckpoint(zoneId, count);
  }

  ReencryptionHandler(final EncryptionZoneManager ezMgr,
      final Configuration conf) {
    this.ezManager = ezMgr;
    Preconditions.checkNotNull(ezManager.getProvider(),
        "No provider set, cannot re-encrypt");
    this.dir = ezMgr.getFSDirectory();
    this.interval =
        conf.getTimeDuration(DFS_NAMENODE_REENCRYPT_SLEEP_INTERVAL_KEY,
            DFS_NAMENODE_REENCRYPT_SLEEP_INTERVAL_DEFAULT,
            TimeUnit.MILLISECONDS);
    Preconditions.checkArgument(interval > 0,
        DFS_NAMENODE_REENCRYPT_SLEEP_INTERVAL_KEY + " is not positive.");
    this.reencryptBatchSize = conf.getInt(DFS_NAMENODE_REENCRYPT_BATCH_SIZE_KEY,
        DFS_NAMENODE_REENCRYPT_BATCH_SIZE_DEFAULT);
    Preconditions.checkArgument(reencryptBatchSize > 0,
        DFS_NAMENODE_REENCRYPT_BATCH_SIZE_KEY + " is not positive.");
    if (reencryptBatchSize > MAX_BATCH_SIZE_WITHOUT_FLOODING) {
      LOG.warn("Re-encryption batch size is {}. It could cause edit log buffer "
          + "to be full and trigger a logSync within the writelock, greatly "
          + "impacting namenode throughput.", reencryptBatchSize);
    }
    this.throttleLimitHandlerRatio =
        conf.getDouble(DFS_NAMENODE_REENCRYPT_THROTTLE_LIMIT_HANDLER_RATIO_KEY,
            DFS_NAMENODE_REENCRYPT_THROTTLE_LIMIT_HANDLER_RATIO_DEFAULT);
    LOG.info("Configured throttleLimitHandlerRatio={} for re-encryption",
        throttleLimitHandlerRatio);
    Preconditions.checkArgument(throttleLimitHandlerRatio > 0.0f,
        DFS_NAMENODE_REENCRYPT_THROTTLE_LIMIT_HANDLER_RATIO_KEY
            + " is not positive.");
    this.reencryptThreadPoolSize =
        conf.getInt(DFS_NAMENODE_REENCRYPT_EDEK_THREADS_KEY,
            DFS_NAMENODE_REENCRYPT_EDEK_THREADS_DEFAULT);

    taskQueue = new LinkedBlockingQueue<>();
    ThreadPoolExecutor threadPool =
        new ThreadPoolExecutor(reencryptThreadPoolSize, reencryptThreadPoolSize,
            60, TimeUnit.SECONDS, taskQueue, new Daemon.DaemonFactory() {
              private final AtomicInteger ind = new AtomicInteger(0);

              @Override
              public Thread newThread(Runnable r) {
                Thread t = super.newThread(r);
                t.setName("reencryption edek Thread-" + ind.getAndIncrement());
                return t;
              }
            }, new ThreadPoolExecutor.CallerRunsPolicy() {

              @Override
              public void rejectedExecution(Runnable runnable,
                  ThreadPoolExecutor e) {
                LOG.info("Execution rejected, executing in current thread");
                super.rejectedExecution(runnable, e);
              }
            });

    threadPool.allowCoreThreadTimeOut(true);
    this.batchService = new ExecutorCompletionService(threadPool);
    reencryptionUpdater =
        new ReencryptionUpdater(dir, batchService, this, conf);
    currentBatch = new ReencryptionBatch(reencryptBatchSize);
    traverser = new ReencryptionPendingInodeIdCollector(dir, this, conf);
  }

  ReencryptionStatus getReencryptionStatus() {
    return ezManager.getReencryptionStatus();
  }

  void cancelZone(final long zoneId, final String zoneName) throws IOException {
    assert dir.hasWriteLock();
    final ZoneReencryptionStatus zs =
        getReencryptionStatus().getZoneStatus(zoneId);
    if (zs == null || zs.getState() == State.Completed) {
      throw new IOException("Zone " + zoneName + " is not under re-encryption");
    }
    zs.cancel();
    removeZoneTrackerStopTasks(zoneId);
  }

  void removeZone(final long zoneId) {
    assert dir.hasWriteLock();
    LOG.info("Removing zone {} from re-encryption.", zoneId);
    removeZoneTrackerStopTasks(zoneId);
    getReencryptionStatus().removeZone(zoneId);
  }

  synchronized private void removeZoneTrackerStopTasks(final long zoneId) {
    final ZoneSubmissionTracker zst = submissions.get(zoneId);
    if (zst != null) {
      zst.cancelAllTasks();
      submissions.remove(zoneId);
    }
  }

  ZoneSubmissionTracker getTracker(final long zoneId) {
    assert dir.hasReadLock();
    return unprotectedGetTracker(zoneId);
  }

  /**
   * Get the tracker without holding the FSDirectory lock.
   * The submissions object is protected by object lock.
   */
  synchronized ZoneSubmissionTracker unprotectedGetTracker(final long zoneId) {
    return submissions.get(zoneId);
  }

  /**
   * Add a dummy tracker (with 1 task that has 0 files to re-encrypt)
   * for the zone. This is necessary to complete the re-encryption in case
   * no file in the entire zone needs re-encryption at all. We cannot simply
   * update zone status and set zone xattrs, because in the handler we only hold
   * readlock, and setting xattrs requires upgrading to a writelock.
   *
   * @param zoneId
   */
  void addDummyTracker(final long zoneId, ZoneSubmissionTracker zst) {
    assert dir.hasReadLock();
    if (zst == null) {
      zst = new ZoneSubmissionTracker();
    }
    zst.setSubmissionDone();

    final Future future = batchService.submit(
        new EDEKReencryptCallable(zoneId, new ReencryptionBatch(), this));
    zst.addTask(future);
    synchronized (this) {
      submissions.put(zoneId, zst);
    }
  }

  /**
   * Main loop. It takes at most 1 zone per scan, and executes until the zone
   * is completed.
   * {@link #reencryptEncryptionZone(long)}.
   */
  @Override
  public void run() {
    LOG.info("Starting up re-encrypt thread with interval={} millisecond.",
        interval);
    while (true) {
      try {
        synchronized (this) {
          wait(interval);
        }
        traverser.checkPauseForTesting();
      } catch (InterruptedException ie) {
        LOG.info("Re-encrypt handler interrupted. Exiting");
        Thread.currentThread().interrupt();
        return;
      }

      final Long zoneId;
      dir.getFSNamesystem().readLock(RwLockMode.FS);
      try {
        zoneId = getReencryptionStatus().getNextUnprocessedZone();
        if (zoneId == null) {
          // empty queue.
          continue;
        }
        LOG.info("Executing re-encrypt commands on zone {}. Current zones:{}",
            zoneId, getReencryptionStatus());
        getReencryptionStatus().markZoneStarted(zoneId);
        resetSubmissionTracker(zoneId);
      } finally {
        dir.getFSNamesystem().readUnlock(RwLockMode.FS, "reEncryptThread");
      }

      try {
        reencryptEncryptionZone(zoneId);
      } catch (RetriableException | SafeModeException re) {
        LOG.info("Re-encryption caught exception, will retry", re);
        getReencryptionStatus().markZoneForRetry(zoneId);
      } catch (IOException ioe) {
        LOG.warn("IOException caught when re-encrypting zone {}", zoneId, ioe);
      } catch (InterruptedException ie) {
        LOG.info("Re-encrypt handler interrupted. Exiting.");
        Thread.currentThread().interrupt();
        return;
      } catch (Throwable t) {
        LOG.error("Re-encrypt handler thread exiting. Exception caught when"
            + " re-encrypting zone {}.", zoneId, t);
        return;
      }
    }
  }

  /**
   * Re-encrypts a zone by recursively iterating all paths inside the zone,
   * in lexicographic order.
   * Files are re-encrypted, and subdirs are processed during iteration.
   *
   * @param zoneId the Zone's id.
   * @throws IOException
   * @throws InterruptedException
   */
  void reencryptEncryptionZone(final long zoneId)
      throws IOException, InterruptedException {
    throttleTimerAll.reset().start();
    throttleTimerLocked.reset();
    final INode zoneNode;
    final ZoneReencryptionStatus zs;

    traverser.readLock();
    try {
      zoneNode = dir.getInode(zoneId);
      // start re-encrypting the zone from the beginning
      if (zoneNode == null) {
        LOG.info("Directory with id {} removed during re-encrypt, skipping",
            zoneId);
        return;
      }
      if (!zoneNode.isDirectory()) {
        LOG.info("Cannot re-encrypt directory with id {} because it's not a"
            + " directory.", zoneId);
        return;
      }

      zs = getReencryptionStatus().getZoneStatus(zoneId);
      assert zs != null;
      // Only costly log FullPathName here once, and use id elsewhere.
      LOG.info("Re-encrypting zone {}(id={})", zoneNode.getFullPathName(),
          zoneId);
      if (zs.getLastCheckpointFile() == null) {
        // new re-encryption
        traverser.traverseDir(zoneNode.asDirectory(), zoneId,
            HdfsFileStatus.EMPTY_NAME,
            new ZoneTraverseInfo(zs.getEzKeyVersionName()));
      } else {
        // resuming from a past re-encryption
        restoreFromLastProcessedFile(zoneId, zs);
      }
      // save the last batch and mark complete
      traverser.submitCurrentBatch(zoneId);
      LOG.info("Submission completed of zone {} for re-encryption.", zoneId);
      reencryptionUpdater.markZoneSubmissionDone(zoneId);
    } finally {
      traverser.readUnlock();
    }
  }

  /**
   * Reset the zone submission tracker for re-encryption.
   * @param zoneId
   */
  synchronized private void resetSubmissionTracker(final long zoneId) {
    ZoneSubmissionTracker zst = submissions.get(zoneId);
    if (zst == null) {
      zst = new ZoneSubmissionTracker();
      submissions.put(zoneId, zst);
    } else {
      zst.reset();
    }
  }

  List<XAttr> completeReencryption(final INode zoneNode) throws IOException {
    assert dir.hasWriteLock();
    assert dir.getFSNamesystem().hasWriteLock(RwLockMode.FS);
    final Long zoneId = zoneNode.getId();
    ZoneReencryptionStatus zs = getReencryptionStatus().getZoneStatus(zoneId);
    assert zs != null;
    LOG.info("Re-encryption completed on zone {}. Re-encrypted {} files,"
            + " failures encountered: {}.", zoneNode.getFullPathName(),
        zs.getFilesReencrypted(), zs.getNumReencryptionFailures());
    synchronized (this) {
      submissions.remove(zoneId);
    }
    return FSDirEncryptionZoneOp
        .updateReencryptionFinish(dir, INodesInPath.fromINode(zoneNode), zs);
  }

  /**
   * Restore the re-encryption from the progress inside ReencryptionStatus.
   * This means start from exactly the lastProcessedFile (LPF), skipping all
   * earlier paths in lexicographic order. Lexicographically-later directories
   * on the LPF parent paths are added to subdirs.
   */
  private void restoreFromLastProcessedFile(final long zoneId,
      final ZoneReencryptionStatus zs)
      throws IOException, InterruptedException {
    final INodeDirectory parent;
    final byte[] startAfter;
    final INodesInPath lpfIIP =
        dir.getINodesInPath(zs.getLastCheckpointFile(), FSDirectory.DirOp.READ);
    parent = lpfIIP.getLastINode().getParent();
    startAfter = lpfIIP.getLastINode().getLocalNameBytes();
    traverser.traverseDir(parent, zoneId, startAfter,
        new ZoneTraverseInfo(zs.getEzKeyVersionName()));
  }

  final class ReencryptionBatch {
    // First file's path, for logging purpose.
    private String firstFilePath;
    private final List<FileEdekInfo> batch;

    ReencryptionBatch() {
      this(reencryptBatchSize);
    }

    ReencryptionBatch(int initialCapacity) {
      batch = new ArrayList<>(initialCapacity);
    }

    void add(final INodeFile inode) throws IOException {
      assert dir.hasReadLock();
      Preconditions.checkNotNull(inode, "INodeFile is null");
      if (batch.isEmpty()) {
        firstFilePath = inode.getFullPathName();
      }
      batch.add(new FileEdekInfo(dir, inode));
    }

    String getFirstFilePath() {
      return firstFilePath;
    }

    boolean isEmpty() {
      return batch.isEmpty();
    }

    int size() {
      return batch.size();
    }

    void clear() {
      batch.clear();
    }

    List<FileEdekInfo> getBatch() {
      return batch;
    }
  }

  /**
   * Simply contacts the KMS for re-encryption. No NN locks held.
   */
  private static class EDEKReencryptCallable
      implements Callable<ReencryptionTask> {
    private final long zoneNodeId;
    private final ReencryptionBatch batch;
    private final ReencryptionHandler handler;

    EDEKReencryptCallable(final long zoneId,
        final ReencryptionBatch currentBatch, final ReencryptionHandler rh) {
      zoneNodeId = zoneId;
      batch = currentBatch;
      handler = rh;
    }

    @Override
    public ReencryptionTask call() {
      LOG.info("Processing batched re-encryption for zone {}, batch size {},"
          + " start:{}", zoneNodeId, batch.size(), batch.getFirstFilePath());
      if (batch.isEmpty()) {
        return new ReencryptionTask(zoneNodeId, 0, batch);
      }
      final StopWatch kmsSW = new StopWatch().start();

      int numFailures = 0;
      String result = "Completed";
      if (!reencryptEdeks()) {
        numFailures += batch.size();
        result = "Failed to";
      }
      LOG.info("{} re-encrypting one batch of {} edeks from KMS,"
              + " time consumed: {}, start: {}.", result,
          batch.size(), kmsSW.stop(), batch.getFirstFilePath());
      return new ReencryptionTask(zoneNodeId, numFailures, batch);
    }

    private boolean reencryptEdeks() {
      // communicate with the kms out of lock
      final List<EncryptedKeyVersion> edeks = new ArrayList<>(batch.size());
      for (FileEdekInfo entry : batch.getBatch()) {
        edeks.add(entry.getExistingEdek());
      }
      // provider already has LoadBalancingKMSClientProvider's reties. It that
      // fails, just fail this callable.
      try {
        handler.ezManager.getProvider().reencryptEncryptedKeys(edeks);
        EncryptionFaultInjector.getInstance().reencryptEncryptedKeys();
      } catch (GeneralSecurityException | IOException ex) {
        LOG.warn("Failed to re-encrypt one batch of {} edeks, start:{}",
            batch.size(), batch.getFirstFilePath(), ex);
        return false;
      }
      int i = 0;
      for (FileEdekInfo entry : batch.getBatch()) {
        assert i < edeks.size();
        entry.setEdek(edeks.get(i++));
      }
      return true;
    }
  }


  /**
   * Called when a new zone is submitted for re-encryption. This will interrupt
   * the background thread if it's waiting for the next
   * DFS_NAMENODE_REENCRYPT_SLEEP_INTERVAL_KEY.
   */
  synchronized void notifyNewSubmission() {
    LOG.debug("Notifying handler for new re-encryption command.");
    this.notify();
  }

  public ReencryptionPendingInodeIdCollector getTraverser() {
    return traverser;
  }

  /**
   * ReencryptionPendingInodeIdCollector which throttle based on configured
   * throttle ratio.
   */
  class ReencryptionPendingInodeIdCollector extends FSTreeTraverser {

    private final ReencryptionHandler reencryptionHandler;

    ReencryptionPendingInodeIdCollector(FSDirectory dir,
        ReencryptionHandler rHandler, Configuration conf) {
      super(dir, conf);
      this.reencryptionHandler = rHandler;
    }

    @Override
    protected void checkPauseForTesting()
        throws InterruptedException {
      assert !dir.hasReadLock();
      assert !dir.getFSNamesystem().hasReadLock(RwLockMode.FS);
      while (shouldPauseForTesting) {
        LOG.info("Sleeping in the re-encrypt handler for unit test.");
        synchronized (reencryptionHandler) {
          if (shouldPauseForTesting) {
            reencryptionHandler.wait(30000);
          }
        }
        LOG.info("Continuing re-encrypt handler after pausing.");
      }
    }

    /**
     * Process an Inode for re-encryption. Add to current batch if it's a file,
     * no-op otherwise.
     *
     * @param inode
     *          the inode
     * @return true if inode is added to currentBatch and should be
     *         re-encrypted. false otherwise: could be inode is not a file, or
     *         inode's edek's key version is not changed.
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    public boolean processFileInode(INode inode, TraverseInfo traverseInfo)
        throws IOException, InterruptedException {
      assert dir.hasReadLock();
      if (LOG.isTraceEnabled()) {
        LOG.trace("Processing {} for re-encryption", inode.getFullPathName());
      }
      if (!inode.isFile()) {
        return false;
      }
      FileEncryptionInfo feInfo = FSDirEncryptionZoneOp.getFileEncryptionInfo(
          dir, INodesInPath.fromINode(inode));
      if (feInfo == null) {
        LOG.warn("File {} skipped re-encryption because it is not encrypted! "
            + "This is very likely a bug.", inode.getId());
        return false;
      }
      if (traverseInfo instanceof ZoneTraverseInfo
          && ((ZoneTraverseInfo) traverseInfo).getEzKeyVerName().equals(
              feInfo.getEzKeyVersionName())) {
        if (LOG.isDebugEnabled()) {
          LOG.debug("File {} skipped re-encryption because edek's key version"
              + " name is not changed.", inode.getFullPathName());
        }
        return false;
      }
      currentBatch.add(inode.asFile());
      return true;
    }

    /**
     * Check whether zone is ready for re-encryption. Throws IOE if it's not. 1.
     * If EZ is deleted. 2. if the re-encryption is canceled. 3. If NN is not
     * active or is in safe mode.
     *
     * @throws IOException
     *           if zone does not exist / is cancelled, or if NN is not ready
     *           for write.
     */
    @Override
    protected void checkINodeReady(long zoneId) throws IOException {
      final ZoneReencryptionStatus zs = getReencryptionStatus().getZoneStatus(
          zoneId);
      if (zs == null) {
        throw new IOException("Zone " + zoneId + " status cannot be found.");
      }
      if (zs.isCanceled()) {
        throw new IOException("Re-encryption is canceled for zone " + zoneId);
      }
      dir.getFSNamesystem().checkNameNodeSafeMode(
          "NN is in safe mode, cannot re-encrypt.");
      // re-encryption should be cancelled when NN goes to standby. Just
      // double checking for sanity.
      dir.getFSNamesystem().checkOperation(NameNode.OperationCategory.WRITE);
    }

    /**
     * Submit the current batch to the thread pool.
     *
     * @param zoneId
     *          Id of the EZ INode
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    protected void submitCurrentBatch(final Long zoneId) throws IOException,
        InterruptedException {
      if (currentBatch.isEmpty()) {
        return;
      }
      ZoneSubmissionTracker zst;
      synchronized (ReencryptionHandler.this) {
        zst = submissions.get(zoneId);
        if (zst == null) {
          zst = new ZoneSubmissionTracker();
          submissions.put(zoneId, zst);
        }
        Future future = batchService.submit(new EDEKReencryptCallable(zoneId,
            currentBatch, reencryptionHandler));
        zst.addTask(future);
      }
      LOG.info("Submitted batch (start:{}, size:{}) of zone {} to re-encrypt.",
          currentBatch.getFirstFilePath(), currentBatch.size(), zoneId);
      currentBatch = new ReencryptionBatch(reencryptBatchSize);
      // flip the pause flag if this is nth submission.
      // The actual pause need to happen outside of the lock.
      if (pauseAfterNthSubmission > 0) {
        if (--pauseAfterNthSubmission == 0) {
          shouldPauseForTesting = true;
        }
      }
    }

    /**
     * Throttles the ReencryptionHandler in 3 aspects:
     * 1. Prevents generating more Callables than the CPU could possibly
     * handle.
     * 2. Prevents generating more Callables than the ReencryptionUpdater
     * can handle, under its own throttling.
     * 3. Prevents contending FSN/FSD read locks. This is done based
     * on the DFS_NAMENODE_REENCRYPT_THROTTLE_LIMIT_RATIO_KEY configuration.
     * <p>
     * Item 1 and 2 are to control NN heap usage.
     *
     * @throws InterruptedException
     */
    @VisibleForTesting
    @Override
    protected void throttle() throws InterruptedException {
      assert !dir.hasReadLock();
      assert !dir.getFSNamesystem().hasReadLock(RwLockMode.FS);
      final int numCores = Runtime.getRuntime().availableProcessors();
      if (taskQueue.size() >= numCores) {
        LOG.debug("Re-encryption handler throttling because queue size {} is"
            + "larger than number of cores {}", taskQueue.size(), numCores);
        while (taskQueue.size() >= numCores) {
          Thread.sleep(100);
        }
      }

      // 2. if tasks are piling up on the updater, don't create new callables
      // until the queue size goes down.
      final int maxTasksPiled = Runtime.getRuntime().availableProcessors() * 2;
      int numTasks = numTasksSubmitted();
      if (numTasks >= maxTasksPiled) {
        LOG.debug("Re-encryption handler throttling because total tasks pending"
            + " re-encryption updater is {}", numTasks);
        while (numTasks >= maxTasksPiled) {
          Thread.sleep(500);
          numTasks = numTasksSubmitted();
        }
      }

      // 3.
      if (throttleLimitHandlerRatio >= 1.0) {
        return;
      }
      final long expect = (long) (throttleTimerAll.now(TimeUnit.MILLISECONDS)
          * throttleLimitHandlerRatio);
      final long actual = throttleTimerLocked.now(TimeUnit.MILLISECONDS);
      if (LOG.isDebugEnabled()) {
        LOG.debug("Re-encryption handler throttling expect: {}, actual: {},"
            + " throttleTimerAll:{}", expect, actual,
            throttleTimerAll.now(TimeUnit.MILLISECONDS));
      }
      if (expect - actual < 0) {
        // in case throttleLimitHandlerRatio is very small, expect will be 0.
        // so sleepMs should not be calculated from expect, to really meet the
        // ratio. e.g. if ratio is 0.001, expect = 0 and actual = 1, sleepMs
        // should be 1000 - throttleTimerAll.now()
        final long sleepMs = (long) (actual / throttleLimitHandlerRatio)
            - throttleTimerAll.now(TimeUnit.MILLISECONDS);
        LOG.debug("Throttling re-encryption, sleeping for {} ms", sleepMs);
        Thread.sleep(sleepMs);
      }
      throttleTimerAll.reset().start();
      throttleTimerLocked.reset();
    }

    private int numTasksSubmitted() {
      int ret = 0;
      synchronized (ReencryptionHandler.this) {
        for (ZoneSubmissionTracker zst : submissions.values()) {
          ret += zst.getTasks().size();
        }
      }
      return ret;
    }

    @Override
    public boolean shouldSubmitCurrentBatch() {
      return currentBatch.size() >= reencryptBatchSize;
    }

    @Override
    public boolean canTraverseDir(INode inode) throws IOException {
      if (ezManager.isEncryptionZoneRoot(inode, inode.getFullPathName())) {
        // nested EZ, ignore.
        LOG.info("{}({}) is a nested EZ, skipping for re-encryption",
            inode.getFullPathName(), inode.getId());
        return false;
      }
      return true;
    }

    @Override
    protected void readLock() {
      super.readLock();
      throttleTimerLocked.start();
    }

    @Override
    protected void readUnlock() {
      super.readUnlock();
      throttleTimerLocked.stop();
    }
  }

  private static class ZoneTraverseInfo extends TraverseInfo {
    private String ezKeyVerName;

    ZoneTraverseInfo(String ezKeyVerName) {
      this.ezKeyVerName = ezKeyVerName;
    }

    public String getEzKeyVerName() {
      return ezKeyVerName;
    }
  }
}