ReencryptionUpdater.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 * <p>
 * http://www.apache.org/licenses/LICENSE-2.0
 * <p>
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.hdfs.server.namenode;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersion;
import org.apache.hadoop.fs.FileEncryptionInfo;
import org.apache.hadoop.fs.XAttr;
import org.apache.hadoop.fs.XAttrSetFlag;
import org.apache.hadoop.hdfs.protocol.ZoneReencryptionStatus;
import org.apache.hadoop.hdfs.server.namenode.ReencryptionHandler.ReencryptionBatch;
import org.apache.hadoop.hdfs.util.RwLockMode;
import org.apache.hadoop.ipc.RetriableException;
import org.apache.hadoop.util.Lists;
import org.apache.hadoop.util.StopWatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.util.Preconditions;

import java.io.IOException;
import java.util.Arrays;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.ListIterator;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REENCRYPT_THROTTLE_LIMIT_UPDATER_RATIO_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REENCRYPT_THROTTLE_LIMIT_UPDATER_RATIO_KEY;

/**
 * Class for finalizing re-encrypt EDEK operations, by updating file xattrs with
 * edeks returned from reencryption.
 * <p>
 * The tasks are submitted by ReencryptionHandler.
 * <p>
 * It is assumed only 1 Updater will be running, since updating file xattrs
 * requires namespace write lock, and performance gain from multi-threading
 * is limited.
 */
@InterfaceAudience.Private
public final class ReencryptionUpdater implements Runnable {

  public static final Logger LOG =
      LoggerFactory.getLogger(ReencryptionUpdater.class);

  private volatile boolean shouldPauseForTesting = false;
  private volatile int pauseAfterNthCheckpoint = 0;
  private volatile long pauseZoneId = 0;

  private double throttleLimitRatio;
  private final StopWatch throttleTimerAll = new StopWatch();
  private final StopWatch throttleTimerLocked = new StopWatch();

  private volatile long faultRetryInterval = 60000;
  private volatile boolean isRunning = false;

  /**
   * Class to track re-encryption submissions of a single zone. It contains
   * all the submitted futures, and statistics about how far the futures are
   * processed.
   */
  static final class ZoneSubmissionTracker {
    private boolean submissionDone;
    private LinkedList<Future> tasks;
    private int numCheckpointed;
    private int numFutureDone;

    ZoneSubmissionTracker() {
      submissionDone = false;
      tasks = new LinkedList<>();
      numCheckpointed = 0;
      numFutureDone = 0;
    }

    void reset() {
      submissionDone = false;
      tasks.clear();
      numCheckpointed = 0;
      numFutureDone = 0;
    }

    LinkedList<Future> getTasks() {
      return tasks;
    }

    void cancelAllTasks() {
      if (!tasks.isEmpty()) {
        LOG.info("Cancelling {} re-encryption tasks", tasks.size());
        for (Future f : tasks) {
          f.cancel(true);
        }
      }
    }

    void addTask(final Future task) {
      tasks.add(task);
    }

    private boolean isCompleted() {
      return submissionDone && tasks.isEmpty();
    }

    void setSubmissionDone() {
      submissionDone = true;
    }
  }

  /**
   * Class representing the task for one batch of a re-encryption command. It
   * also contains statistics about how far this single batch has been executed.
   */
  static final class ReencryptionTask {
    private final long zoneId;
    private boolean processed = false;
    private int numFilesUpdated = 0;
    private int numFailures = 0;
    private String lastFile = null;
    private final ReencryptionBatch batch;

    ReencryptionTask(final long id, final int failures,
        final ReencryptionBatch theBatch) {
      zoneId = id;
      numFailures = failures;
      batch = theBatch;
    }
  }

  /**
   * Class that encapsulates re-encryption details of a file. It contains the
   * file inode, stores the initial edek of the file, and the new edek
   * after re-encryption.
   * <p>
   * Assumptions are the object initialization happens when dir lock is held,
   * and inode is valid and is encrypted during initialization.
   * <p>
   * Namespace changes may happen during re-encryption, and if inode is changed
   * the re-encryption is skipped.
   */
  static final class FileEdekInfo {
    private final long inodeId;
    private final EncryptedKeyVersion existingEdek;
    private EncryptedKeyVersion edek = null;

    FileEdekInfo(FSDirectory dir, INodeFile inode) throws IOException {
      assert dir.hasReadLock();
      Preconditions.checkNotNull(inode, "INodeFile is null");
      inodeId = inode.getId();
      final FileEncryptionInfo fei = FSDirEncryptionZoneOp
          .getFileEncryptionInfo(dir, INodesInPath.fromINode(inode));
      Preconditions.checkNotNull(fei,
          "FileEncryptionInfo is null for " + inodeId);
      existingEdek = EncryptedKeyVersion
          .createForDecryption(fei.getKeyName(), fei.getEzKeyVersionName(),
              fei.getIV(), fei.getEncryptedDataEncryptionKey());
    }

    long getInodeId() {
      return inodeId;
    }

    EncryptedKeyVersion getExistingEdek() {
      return existingEdek;
    }

    void setEdek(final EncryptedKeyVersion ekv) {
      assert ekv != null;
      edek = ekv;
    }
  }

  @VisibleForTesting
  synchronized void pauseForTesting() {
    shouldPauseForTesting = true;
    LOG.info("Pausing re-encrypt updater for testing.");
    notify();
  }

  @VisibleForTesting
  synchronized void resumeForTesting() {
    shouldPauseForTesting = false;
    LOG.info("Resuming re-encrypt updater for testing.");
    notify();
  }

  @VisibleForTesting
  void pauseForTestingAfterNthCheckpoint(final long zoneId, final int count) {
    assert pauseAfterNthCheckpoint == 0;
    pauseAfterNthCheckpoint = count;
    pauseZoneId = zoneId;
  }

  @VisibleForTesting
  boolean isRunning() {
    return isRunning;
  }

  private final FSDirectory dir;
  private final CompletionService<ReencryptionTask> batchService;
  private final ReencryptionHandler handler;

  ReencryptionUpdater(final FSDirectory fsd,
      final CompletionService<ReencryptionTask> service,
      final ReencryptionHandler rh, final Configuration conf) {
    dir = fsd;
    batchService = service;
    handler = rh;
    this.throttleLimitRatio =
        conf.getDouble(DFS_NAMENODE_REENCRYPT_THROTTLE_LIMIT_UPDATER_RATIO_KEY,
            DFS_NAMENODE_REENCRYPT_THROTTLE_LIMIT_UPDATER_RATIO_DEFAULT);
    Preconditions.checkArgument(throttleLimitRatio > 0.0f,
        DFS_NAMENODE_REENCRYPT_THROTTLE_LIMIT_UPDATER_RATIO_KEY
            + " is not positive.");
  }

  /**
   * Called by the submission thread to indicate all tasks have been submitted.
   * If this is called but no tasks has been submitted, the re-encryption is
   * considered complete.
   *
   * @param zoneId Id of the zone inode.
   * @throws IOException
   * @throws InterruptedException
   */
  void markZoneSubmissionDone(final long zoneId)
      throws IOException, InterruptedException {
    final ZoneSubmissionTracker tracker = handler.getTracker(zoneId);
    if (tracker != null && !tracker.getTasks().isEmpty()) {
      tracker.submissionDone = true;
    } else {
      // Caller thinks submission is done, but no tasks submitted - meaning
      // no files in the EZ need to be re-encrypted. Complete directly.
      handler.addDummyTracker(zoneId, tracker);
    }
  }

  @Override
  public void run() {
    isRunning = true;
    throttleTimerAll.start();
    while (true) {
      try {
        // Assuming single-threaded updater.
        takeAndProcessTasks();
      } catch (InterruptedException ie) {
        LOG.warn("Re-encryption updater thread interrupted. Exiting.");
        Thread.currentThread().interrupt();
        isRunning = false;
        return;
      } catch (IOException | CancellationException e) {
        LOG.warn("Re-encryption updater thread exception.", e);
      } catch (Throwable t) {
        LOG.error("Re-encryption updater thread exiting.", t);
        isRunning = false;
        return;
      }
    }
  }

  /**
   * Process a completed ReencryptionTask. Each inode id is resolved to an INode
   * object, skip if the inode is deleted.
   * <p>
   * Only file xattr is updated by this method. Re-encryption progress is not
   * updated.
   *
   * @param zoneNodePath full path of the EZ inode.
   * @param task     the completed task.
   * @throws IOException
   * @throws InterruptedException
   */
  private void processTaskEntries(final String zoneNodePath,
      final ReencryptionTask task) throws IOException, InterruptedException {
    assert dir.hasWriteLock();
    if (!task.batch.isEmpty() && task.numFailures == 0) {
      LOG.debug(
          "Updating file xattrs for re-encrypting zone {}," + " starting at {}",
          zoneNodePath, task.batch.getFirstFilePath());
      final int batchSize = task.batch.size();
      for (Iterator<FileEdekInfo> it = task.batch.getBatch().iterator();
           it.hasNext();) {
        FileEdekInfo entry = it.next();
        // resolve the inode again, and skip if it's doesn't exist
        LOG.trace("Updating {} for re-encryption.", entry.getInodeId());
        final INode inode = dir.getInode(entry.getInodeId());
        if (inode == null) {
          LOG.debug("INode {} doesn't exist, skipping re-encrypt.",
              entry.getInodeId());
          // also remove from batch so later it's not saved.
          it.remove();
          continue;
        }

        // Cautiously check file encryption info, and only update if we're sure
        // it's still using the same edek.
        Preconditions.checkNotNull(entry.edek);
        final FileEncryptionInfo fei = FSDirEncryptionZoneOp
            .getFileEncryptionInfo(dir, INodesInPath.fromINode(inode));
        if (!fei.getKeyName().equals(entry.edek.getEncryptionKeyName())) {
          LOG.debug("Inode {} EZ key changed, skipping re-encryption.",
              entry.getInodeId());
          it.remove();
          continue;
        }
        if (fei.getEzKeyVersionName()
            .equals(entry.edek.getEncryptionKeyVersionName())) {
          LOG.debug(
              "Inode {} EZ key version unchanged, skipping re-encryption.",
              entry.getInodeId());
          it.remove();
          continue;
        }
        if (!Arrays.equals(fei.getEncryptedDataEncryptionKey(),
            entry.existingEdek.getEncryptedKeyVersion().getMaterial())) {
          LOG.debug("Inode {} existing edek changed, skipping re-encryption",
              entry.getInodeId());
          it.remove();
          continue;
        }
        FileEncryptionInfo newFei = new FileEncryptionInfo(fei.getCipherSuite(),
            fei.getCryptoProtocolVersion(),
            entry.edek.getEncryptedKeyVersion().getMaterial(),
            entry.edek.getEncryptedKeyIv(), fei.getKeyName(),
            entry.edek.getEncryptionKeyVersionName());
        final INodesInPath iip = INodesInPath.fromINode(inode);
        FSDirEncryptionZoneOp
            .setFileEncryptionInfo(dir, iip, newFei, XAttrSetFlag.REPLACE);
        task.lastFile = iip.getPath();
        ++task.numFilesUpdated;
      }

      LOG.info("Updated xattrs on {}({}) files in zone {} for re-encryption,"
              + " starting:{}.", task.numFilesUpdated, batchSize,
          zoneNodePath, task.batch.getFirstFilePath());
    }
    task.processed = true;
  }

  /**
   * Iterate tasks for the given zone, and update progress accordingly. The
   * checkpoint indicates all files before it are done re-encryption, so it will
   * be updated to the position where all tasks before are completed.
   *
   * @param zoneNode the EZ inode.
   * @param tracker  the zone submission tracker.
   * @return the list containing the last checkpointed xattr. Empty if
   *   no checkpoint happened.
   * @throws ExecutionException
   * @throws IOException
   * @throws InterruptedException
   */
  private List<XAttr> processCheckpoints(final INode zoneNode,
      final ZoneSubmissionTracker tracker)
      throws ExecutionException, IOException, InterruptedException {
    assert dir.hasWriteLock();
    final long zoneId = zoneNode.getId();
    final String zonePath = zoneNode.getFullPathName();
    final ZoneReencryptionStatus status =
        handler.getReencryptionStatus().getZoneStatus(zoneId);
    assert status != null;
    // always start from the beginning, because the checkpoint means all files
    // before it are re-encrypted.
    final LinkedList<Future> tasks = tracker.getTasks();
    final List<XAttr> xAttrs = Lists.newArrayListWithCapacity(1);
    ListIterator<Future> iter = tasks.listIterator();
    synchronized (handler) {
      while (iter.hasNext()) {
        Future<ReencryptionTask> curr = iter.next();
        if (curr.isCancelled()) {
          break;
        }
        if (!curr.isDone() || !curr.get().processed) {
          // still has earlier tasks not completed, skip here.
          break;
        }
        ReencryptionTask task = curr.get();
        LOG.debug("Updating re-encryption checkpoint with completed task."
            + " last: {} size:{}.", task.lastFile, task.batch.size());
        assert zoneId == task.zoneId;
        try {
          final XAttr xattr = FSDirEncryptionZoneOp
              .updateReencryptionProgress(dir, zoneNode, status, task.lastFile,
                  task.numFilesUpdated, task.numFailures);
          xAttrs.clear();
          xAttrs.add(xattr);
        } catch (IOException ie) {
          LOG.warn("Failed to update re-encrypted progress to xattr" +
                  " for zone {}", zonePath, ie);
          ++task.numFailures;
        }
        ++tracker.numCheckpointed;
        iter.remove();
      }
    }
    if (tracker.isCompleted()) {
      LOG.debug("Removed re-encryption tracker for zone {} because it completed"
              + " with {} tasks.", zonePath, tracker.numCheckpointed);
      return handler.completeReencryption(zoneNode);
    }
    return xAttrs;
  }

  private void takeAndProcessTasks() throws Exception {
    final Future<ReencryptionTask> completed = batchService.take();
    throttle();
    checkPauseForTesting();
    if (completed.isCancelled()) {
      // Ignore canceled zones. The cancellation is edit-logged by the handler.
      LOG.debug("Skipped a canceled re-encryption task");
      return;
    }
    final ReencryptionTask task = completed.get();

    boolean shouldRetry;
    do {
      dir.getFSNamesystem().writeLock(RwLockMode.FS);
      try {
        throttleTimerLocked.start();
        processTask(task);
        shouldRetry = false;
      } catch (RetriableException | SafeModeException re) {
        // Keep retrying until succeed.
        LOG.info("Exception when processing re-encryption task for zone {}, "
                + "retrying...", task.zoneId, re);
        shouldRetry = true;
        Thread.sleep(faultRetryInterval);
      } catch (IOException ioe) {
        LOG.warn("Failure processing re-encryption task for zone {}",
            task.zoneId, ioe);
        ++task.numFailures;
        task.processed = true;
        shouldRetry = false;
      } finally {
        dir.getFSNamesystem().writeUnlock(RwLockMode.FS, "reencryptUpdater");
        throttleTimerLocked.stop();
      }
      // logSync regardless, to prevent edit log buffer overflow triggering
      // logSync inside FSN writelock.
      dir.getEditLog().logSync();
    } while (shouldRetry);
  }

  private void processTask(ReencryptionTask task)
      throws InterruptedException, ExecutionException, IOException {
    final List<XAttr> xAttrs;
    final String zonePath;
    dir.writeLock();
    try {
      handler.getTraverser().checkINodeReady(task.zoneId);
      final INode zoneNode = dir.getInode(task.zoneId);
      if (zoneNode == null) {
        // ez removed.
        return;
      }
      zonePath = zoneNode.getFullPathName();
      LOG.info("Processing returned re-encryption task for zone {}({}), "
              + "batch size {}, start:{}", zonePath, task.zoneId,
          task.batch.size(), task.batch.getFirstFilePath());
      final ZoneSubmissionTracker tracker =
          handler.getTracker(zoneNode.getId());
      if (tracker == null) {
        // re-encryption canceled.
        LOG.info("Re-encryption was canceled.");
        return;
      }
      tracker.numFutureDone++;
      EncryptionFaultInjector.getInstance().reencryptUpdaterProcessOneTask();
      processTaskEntries(zonePath, task);
      EncryptionFaultInjector.getInstance().reencryptUpdaterProcessCheckpoint();
      xAttrs = processCheckpoints(zoneNode, tracker);
    } finally {
      dir.writeUnlock();
    }
    FSDirEncryptionZoneOp.saveFileXAttrsForBatch(dir, task.batch.getBatch());
    if (!xAttrs.isEmpty()) {
      dir.getEditLog().logSetXAttrs(zonePath, xAttrs, false);
    }
  }

  private synchronized void checkPauseForTesting() throws InterruptedException {
    assert !dir.hasWriteLock();
    assert !dir.getFSNamesystem().hasWriteLock(RwLockMode.FS);
    if (pauseAfterNthCheckpoint != 0) {
      ZoneSubmissionTracker tracker =
          handler.unprotectedGetTracker(pauseZoneId);
      if (tracker != null) {
        if (tracker.numFutureDone == pauseAfterNthCheckpoint) {
          shouldPauseForTesting = true;
          pauseAfterNthCheckpoint = 0;
        }
      }
    }
    while (shouldPauseForTesting) {
      LOG.info("Sleeping in the re-encryption updater for unit test.");
      wait();
      LOG.info("Continuing re-encryption updater after pausing.");
    }
  }

  /**
   * Throttles the ReencryptionUpdater to prevent from contending FSN/FSD write
   * locks. This is done by the configuration.
   */
  private void throttle() throws InterruptedException {
    if (throttleLimitRatio >= 1.0) {
      return;
    }

    final long expect = (long) (throttleTimerAll.now(TimeUnit.MILLISECONDS)
        * throttleLimitRatio);
    final long actual = throttleTimerLocked.now(TimeUnit.MILLISECONDS);
    if (LOG.isDebugEnabled()) {
      LOG.debug("Re-encryption updater throttling expect: {}, actual: {},"
              + " throttleTimerAll:{}", expect, actual,
          throttleTimerAll.now(TimeUnit.MILLISECONDS));
    }
    if (expect - actual < 0) {
      // in case throttleLimitHandlerRatio is very small, expect will be 0.
      // so sleepMs should not be calculated from expect, to really meet the
      // ratio. e.g. if ratio is 0.001, expect = 0 and actual = 1, sleepMs
      // should be 1000 - throttleTimerAll.now()
      final long sleepMs =
          (long) (actual / throttleLimitRatio) - throttleTimerAll
              .now(TimeUnit.MILLISECONDS);
      LOG.debug("Throttling re-encryption, sleeping for {} ms", sleepMs);
      Thread.sleep(sleepMs);
    }
    throttleTimerAll.reset().start();
    throttleTimerLocked.reset();
  }
}