FSTreeTraverser.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.hdfs.server.namenode;

import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_READ_LOCK_REPORTING_THRESHOLD_MS_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_READ_LOCK_REPORTING_THRESHOLD_MS_KEY;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
import org.apache.hadoop.hdfs.util.ReadOnlyList;
import org.apache.hadoop.hdfs.util.RwLockMode;
import org.apache.hadoop.util.Timer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.util.Preconditions;

/**
 * FSTreeTraverser traverse directory recursively and process files
 * in batches.
 */
@InterfaceAudience.Private
public abstract class FSTreeTraverser {


  public static final Logger LOG = LoggerFactory
      .getLogger(FSTreeTraverser.class);

  private final FSDirectory dir;

  private long readLockReportingThresholdMs;

  private Timer timer;

  public FSTreeTraverser(FSDirectory dir, Configuration conf) {
    this.dir = dir;
    this.readLockReportingThresholdMs = conf.getLong(
        DFS_NAMENODE_READ_LOCK_REPORTING_THRESHOLD_MS_KEY,
        DFS_NAMENODE_READ_LOCK_REPORTING_THRESHOLD_MS_DEFAULT);
    timer = new Timer();
  }

  public FSDirectory getFSDirectory() {
    return dir;
  }

  /**
   * Iterate through all files directly inside parent, and recurse down
   * directories. The listing is done in batch, and can optionally start after
   * a position. The iteration of the inode tree is done in a depth-first
   * fashion. But instead of holding all {@link INodeDirectory}'s in memory
   * on the fly, only the path components to the current inode is held. This
   * is to reduce memory consumption.
   *
   * @param parent
   *          The inode id of parent directory
   * @param startId
   *          Id of the start inode.
   * @param startAfter
   *          Full path of a file the traverse should start after.
   * @param traverseInfo
   *          info which may required for processing the child's.
   * @throws IOException
   * @throws InterruptedException
   */
  protected void traverseDir(final INodeDirectory parent, final long startId,
      byte[] startAfter, final TraverseInfo traverseInfo)
      throws IOException, InterruptedException {
    List<byte[]> startAfters = new ArrayList<>();
    if (parent == null) {
      return;
    }
    INode curr = parent;
    // construct startAfters all the way up to the zone inode.
    startAfters.add(startAfter);
    while (curr.getId() != startId) {
      startAfters.add(0, curr.getLocalNameBytes());
      curr = curr.getParent();
    }
    curr = traverseDirInt(startId, parent, startAfters, traverseInfo);
    while (!startAfters.isEmpty()) {
      if (curr == null) {
        // lock was reacquired, re-resolve path.
        curr = resolvePaths(startId, startAfters);
      }
      curr = traverseDirInt(startId, curr, startAfters, traverseInfo);
    }
  }

  /**
   * Iterates the parent directory, and add direct children files to current
   * batch. If batch size meets configured threshold, current batch will be
   * submitted for the processing.
   * <p>
   * Locks could be released and reacquired when a batch submission is
   * finished.
   *
   * @param startId
   *          Id of the start inode.
   * @return The inode which was just processed, if lock is held in the entire
   *         process. Null if lock is released.
   * @throws IOException
   * @throws InterruptedException
   */
  protected INode traverseDirInt(final long startId, INode curr,
      List<byte[]> startAfters, final TraverseInfo traverseInfo)
      throws IOException, InterruptedException {
    assert dir.hasReadLock();
    assert dir.getFSNamesystem().hasReadLock(RwLockMode.FS);
    long lockStartTime = timer.monotonicNow();
    Preconditions.checkNotNull(curr, "Current inode can't be null");
    checkINodeReady(startId);
    final INodeDirectory parent = curr.isDirectory() ? curr.asDirectory()
        : curr.getParent();
    ReadOnlyList<INode> children = parent
        .getChildrenList(Snapshot.CURRENT_STATE_ID);
    if (LOG.isDebugEnabled()) {
      LOG.debug("Traversing directory {}", parent.getFullPathName());
    }

    final byte[] startAfter = startAfters.get(startAfters.size() - 1);
    boolean lockReleased = false;
    for (int i = INodeDirectory.nextChild(children, startAfter); i < children
        .size(); ++i) {
      final INode inode = children.get(i);
      if (!processFileInode(inode, traverseInfo)) {
        // inode wasn't processes. Recurse down if it's a dir,
        // skip otherwise.
        if (!inode.isDirectory()) {
          continue;
        }

        if (!canTraverseDir(inode)) {
          continue;
        }
        // add 1 level to the depth-first search.
        curr = inode;
        if (!startAfters.isEmpty()) {
          startAfters.remove(startAfters.size() - 1);
          startAfters.add(curr.getLocalNameBytes());
        }
        startAfters.add(HdfsFileStatus.EMPTY_NAME);
        return lockReleased ? null : curr;
      }
      if (shouldSubmitCurrentBatch()) {
        final byte[] currentStartAfter = inode.getLocalNameBytes();
        final String parentPath = parent.getFullPathName();
        lockReleased = true;
        readUnlock();
        submitCurrentBatch(startId);
        try {
          throttle();
          checkPauseForTesting();
        } finally {
          readLock();
          lockStartTime = timer.monotonicNow();
        }
        checkINodeReady(startId);

        // Things could have changed when the lock was released.
        // Re-resolve the parent inode.
        FSPermissionChecker pc = dir.getPermissionChecker();
        INode newParent = dir
            .resolvePath(pc, parentPath, FSDirectory.DirOp.READ)
            .getLastINode();
        if (newParent == null || !newParent.equals(parent)) {
          // parent dir is deleted or recreated. We're done.
          return null;
        }
        children = parent.getChildrenList(Snapshot.CURRENT_STATE_ID);
        // -1 to counter the ++ on the for loop
        i = INodeDirectory.nextChild(children, currentStartAfter) - 1;
      }
      if ((timer.monotonicNow()
          - lockStartTime) > readLockReportingThresholdMs) {
        readUnlock();
        try {
          throttle();
        } finally {
          readLock();
          lockStartTime = timer.monotonicNow();
        }
      }
    }
    // Successfully finished this dir, adjust pointers to 1 level up, and
    // startAfter this dir.
    startAfters.remove(startAfters.size() - 1);
    if (!startAfters.isEmpty()) {
      startAfters.remove(startAfters.size() - 1);
      startAfters.add(curr.getLocalNameBytes());
    }
    curr = curr.getParent();
    return lockReleased ? null : curr;
  }

  /**
   * Resolve the cursor of traverse to an inode.
   * <p>
   * The parent of the lowest level startAfter is returned. If somewhere in the
   * middle of startAfters changed, the parent of the lowest unchanged level is
   * returned.
   *
   * @param startId
   *          Id of the start inode.
   * @param startAfters
   *          the cursor, represented by a list of path bytes.
   * @return the parent inode corresponding to the startAfters, or null if the
   *         furthest parent is deleted.
   */
  private INode resolvePaths(final long startId, List<byte[]> startAfters)
      throws IOException {
    // If the readlock was reacquired, we need to resolve the paths again
    // in case things have changed. If our cursor file/dir is changed,
    // continue from the next one.
    INode zoneNode = dir.getInode(startId);
    if (zoneNode == null) {
      throw new FileNotFoundException("Zone " + startId + " is deleted.");
    }
    INodeDirectory parent = zoneNode.asDirectory();
    for (int i = 0; i < startAfters.size(); ++i) {
      if (i == startAfters.size() - 1) {
        // last startAfter does not need to be resolved, since search for
        // nextChild will cover that automatically.
        break;
      }
      INode curr = parent.getChild(startAfters.get(i),
          Snapshot.CURRENT_STATE_ID);
      if (curr == null) {
        // inode at this level has changed. Update startAfters to point to
        // the next dir at the parent level (and dropping any startAfters
        // at lower levels).
        for (; i < startAfters.size(); ++i) {
          startAfters.remove(startAfters.size() - 1);
        }
        break;
      }
      parent = curr.asDirectory();
    }
    return parent;
  }

  protected void readLock() {
    dir.getFSNamesystem().readLock(RwLockMode.FS);
    dir.readLock();
  }

  protected void readUnlock() {
    dir.readUnlock();
    dir.getFSNamesystem().readUnlock(RwLockMode.FS, "FSTreeTraverser");
  }


  protected abstract void checkPauseForTesting() throws InterruptedException;

  /**
   * Process an Inode. Add to current batch if it's a file, no-op otherwise.
   *
   * @param inode
   *          the inode
   * @return true if inode is added to currentBatch and should be process for
   *         next operation. false otherwise: could be inode is not a file.
   * @throws IOException
   * @throws InterruptedException
   */
  protected abstract boolean processFileInode(INode inode,
      TraverseInfo traverseInfo) throws IOException, InterruptedException;

  /**
   * Check whether current batch can be submitted for the processing.
   *
   * @return true if batch size meets the condition, otherwise false.
   */
  protected abstract boolean shouldSubmitCurrentBatch();

  /**
   * Check whether inode is ready for traverse. Throws IOE if it's not.
   *
   * @param startId
   *          Id of the start inode.
   * @throws IOException
   */
  protected abstract void checkINodeReady(long startId) throws IOException;

  /**
   * Submit the current batch for processing.
   *
   * @param startId
   *          Id of the start inode.
   * @throws IOException
   * @throws InterruptedException
   */
  protected abstract void submitCurrentBatch(Long startId)
      throws IOException, InterruptedException;

  /**
   * Throttles the FSTreeTraverser.
   *
   * @throws InterruptedException
   */
  protected abstract void throttle() throws InterruptedException;

  /**
   * Check whether dir is traversable or not.
   *
   * @param inode
   *          Dir inode
   * @return true if dir is traversable otherwise false.
   * @throws IOException
   */
  protected abstract boolean canTraverseDir(INode inode) throws IOException;

  /**
   * Class will represent the additional info required for traverse.
   */
  public static class TraverseInfo {

  }
}