DatanodeDescriptor.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.hdfs.server.blockmanagement;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;

import org.apache.hadoop.classification.VisibleForTesting;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.net.DFSTopologyNodeImpl;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.BlockECReconstructionInfo;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage.State;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
import org.apache.hadoop.hdfs.util.EnumCounters;
import org.apache.hadoop.hdfs.util.LightWeightHashSet;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.util.LightWeightLinkedSet;
import org.apache.hadoop.util.IntrusiveCollection;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * This class extends the DatanodeInfo class with ephemeral information (eg
 * health, capacity, what blocks are associated with the Datanode) that is
 * private to the Namenode, ie this class is not exposed to clients.
 */
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class DatanodeDescriptor extends DatanodeInfo {
  public static final Logger LOG =
      LoggerFactory.getLogger(DatanodeDescriptor.class);
  public static final DatanodeDescriptor[] EMPTY_ARRAY = {};
  private static final int BLOCKS_SCHEDULED_ROLL_INTERVAL = 600*1000; //10min

  /** Block and targets pair */
  @InterfaceAudience.Private
  @InterfaceStability.Evolving
  public static class BlockTargetPair {
    public final Block block;
    public final DatanodeStorageInfo[] targets;    

    BlockTargetPair(Block block, DatanodeStorageInfo[] targets) {
      this.block = block;
      this.targets = targets;
    }
  }

  /** A BlockTargetPair queue. */
  private static class BlockQueue<E> {
    private final Queue<E> blockq = new LinkedList<>();

    /** Size of the queue */
    synchronized int size() {return blockq.size();}

    /** Enqueue */
    synchronized boolean offer(E e) { 
      return blockq.offer(e);
    }

    /** Dequeue */
    synchronized List<E> poll(int numBlocks) {
      if (numBlocks <= 0 || blockq.isEmpty()) {
        return null;
      }

      List<E> results = new ArrayList<>();
      for(; !blockq.isEmpty() && numBlocks > 0; numBlocks--) {
        results.add(blockq.poll());
      }
      return results;
    }

    /**
     * Returns <code>true</code> if the queue contains the specified element.
     */
    synchronized boolean contains(E e) {
      return blockq.contains(e);
    }

    synchronized void clear() {
      blockq.clear();
    }
  }

  /**
   * A list of CachedBlock objects on this datanode.
   */
  public static class CachedBlocksList extends IntrusiveCollection<CachedBlock> {
    public enum Type {
      PENDING_CACHED,
      CACHED,
      PENDING_UNCACHED
    }

    private final DatanodeDescriptor datanode;

    private final Type type;

    CachedBlocksList(DatanodeDescriptor datanode, Type type) {
      this.datanode = datanode;
      this.type = type;
    }

    public DatanodeDescriptor getDatanode() {
      return datanode;
    }

    public Type getType() {
      return type;
    }
  }

  // Stores status of decommissioning.
  // If node is not decommissioning, do not use this object for anything.
  private final LeavingServiceStatus leavingServiceStatus =
      new LeavingServiceStatus();

  protected final Map<String, DatanodeStorageInfo> storageMap =
      new HashMap<>();

  /**
   * The blocks which we want to cache on this DataNode.
   */
  private final CachedBlocksList pendingCached = 
      new CachedBlocksList(this, CachedBlocksList.Type.PENDING_CACHED);

  /**
   * The blocks which we know are cached on this datanode.
   * This list is updated by periodic cache reports.
   */
  private final CachedBlocksList cached = 
      new CachedBlocksList(this, CachedBlocksList.Type.CACHED);

  /**
   * The blocks which we want to uncache on this DataNode.
   */
  private final CachedBlocksList pendingUncached = 
      new CachedBlocksList(this, CachedBlocksList.Type.PENDING_UNCACHED);

  /**
   * The time when the last batch of caching directives was sent, in
   * monotonic milliseconds.
   */
  private long lastCachingDirectiveSentTimeMs;

  // isAlive == heartbeats.contains(this)
  // This is an optimization, because contains takes O(n) time on Arraylist
  private boolean isAlive = false;
  private boolean needKeyUpdate = false;
  private boolean forceRegistration = false;

  // A system administrator can tune the balancer bandwidth parameter
  // (dfs.datanode.balance.bandwidthPerSec) dynamically by calling
  // "dfsadmin -setBalanacerBandwidth <newbandwidth>", at which point the
  // following 'bandwidth' variable gets updated with the new value for each
  // node. Once the heartbeat command is issued to update the value on the
  // specified datanode, this value will be set back to 0.
  private long bandwidth;

  /** A queue of blocks to be replicated by this datanode */
  private final BlockQueue<BlockTargetPair> replicateBlocks =
      new BlockQueue<>();
  /** A queue of ec blocks to be replicated by this datanode. */
  private final BlockQueue<BlockTargetPair> ecBlocksToBeReplicated = new BlockQueue<>();
  /** A queue of ec blocks to be erasure coded by this datanode. */
  private final BlockQueue<BlockECReconstructionInfo> ecBlocksToBeErasureCoded =
      new BlockQueue<>();
  /** A queue of blocks to be recovered by this datanode */
  private final BlockQueue<BlockInfo> recoverBlocks = new BlockQueue<>();
  /** A set of blocks to be invalidated by this datanode */
  private final LightWeightHashSet<Block> invalidateBlocks =
      new LightWeightHashSet<>();

  /* Variables for maintaining number of blocks scheduled to be written to
   * this storage. This count is approximate and might be slightly bigger
   * in case of errors (e.g. datanode does not report if an error occurs
   * while writing the block).
   */
  private EnumCounters<StorageType> currApproxBlocksScheduled
      = new EnumCounters<>(StorageType.class);
  private EnumCounters<StorageType> prevApproxBlocksScheduled
      = new EnumCounters<>(StorageType.class);
  private long lastBlocksScheduledRollTime = 0;
  private int volumeFailures = 0;
  private VolumeFailureSummary volumeFailureSummary = null;
  
  /** 
   * When set to true, the node is not in include list and is not allowed
   * to communicate with the namenode
   */
  private boolean disallowed = false;

  // The number of replication work pending before targets are determined
  private int pendingReplicationWithoutTargets = 0;

  // HB processing can use it to tell if it is the first HB since DN restarted
  private boolean heartbeatedSinceRegistration = false;

  /** The number of volumes that can be written.*/
  private int numVolumesAvailable = 0;

  /**
   * DatanodeDescriptor constructor
   * @param nodeID id of the data node
   */
  public DatanodeDescriptor(DatanodeID nodeID) {
    super(nodeID);
    setLastUpdate(Time.now());
    setLastUpdateMonotonic(Time.monotonicNow());
  }

  /**
   * DatanodeDescriptor constructor
   * @param nodeID id of the data node
   * @param networkLocation location of the data node in network
   */
  public DatanodeDescriptor(DatanodeID nodeID, 
                            String networkLocation) {
    super(nodeID, networkLocation);
    setLastUpdate(Time.now());
    setLastUpdateMonotonic(Time.monotonicNow());
  }

  public CachedBlocksList getPendingCached() {
    return pendingCached;
  }

  public CachedBlocksList getCached() {
    return cached;
  }

  public CachedBlocksList getPendingUncached() {
    return pendingUncached;
  }

  public boolean isAlive() {
    return isAlive;
  }

  public void setAlive(boolean isAlive) {
    this.isAlive = isAlive;
  }

  public synchronized boolean needKeyUpdate() {
    return needKeyUpdate;
  }

  public synchronized void setNeedKeyUpdate(boolean needKeyUpdate) {
    this.needKeyUpdate = needKeyUpdate;
  }

  public LeavingServiceStatus getLeavingServiceStatus() {
    return leavingServiceStatus;
  }

  @VisibleForTesting
  public boolean isHeartbeatedSinceRegistration() {
   return heartbeatedSinceRegistration;
  }

  @VisibleForTesting
  public DatanodeStorageInfo getStorageInfo(String storageID) {
    synchronized (storageMap) {
      return storageMap.get(storageID);
    }
  }

  @VisibleForTesting
  public DatanodeStorageInfo[] getStorageInfos() {
    synchronized (storageMap) {
      final Collection<DatanodeStorageInfo> storages = storageMap.values();
      return storages.toArray(new DatanodeStorageInfo[storages.size()]);
    }
  }

  public EnumSet<StorageType> getStorageTypes() {
    EnumSet<StorageType> storageTypes = EnumSet.noneOf(StorageType.class);
    for (DatanodeStorageInfo dsi : getStorageInfos()) {
      storageTypes.add(dsi.getStorageType());
    }
    return storageTypes;
  }

  public StorageReport[] getStorageReports() {
    final DatanodeStorageInfo[] infos = getStorageInfos();
    final StorageReport[] reports = new StorageReport[infos.length];
    for(int i = 0; i < infos.length; i++) {
      reports[i] = infos[i].toStorageReport();
    }
    return reports;
  }

  boolean hasStaleStorages() {
    synchronized (storageMap) {
      for (DatanodeStorageInfo storage : storageMap.values()) {
        if (StorageType.PROVIDED.equals(storage.getStorageType())) {
          // to verify provided storage participated in this hb, requires
          // check to pass DNDesc.
          // e.g., storageInfo.verifyBlockReportId(this, curBlockReportId)
          continue;
        }
        if (storage.areBlockContentsStale()) {
          return true;
        }
      }
      return false;
    }
  }

  public void resetBlocks() {
    updateStorageStats(this.getStorageReports(), 0L, 0L, 0, 0, null);
    synchronized (invalidateBlocks) {
      this.invalidateBlocks.clear();
    }
    this.volumeFailures = 0;
    // pendingCached, cached, and pendingUncached are protected by the
    // FSN lock.
    this.pendingCached.clear();
    this.cached.clear();
    this.pendingUncached.clear();
  }
  
  public void clearBlockQueues() {
    synchronized (invalidateBlocks) {
      this.invalidateBlocks.clear();
    }
    this.recoverBlocks.clear();
    this.replicateBlocks.clear();
    this.ecBlocksToBeReplicated.clear();
    this.ecBlocksToBeErasureCoded.clear();
    // pendingCached, cached, and pendingUncached are protected by the
    // FSN lock.
    this.pendingCached.clear();
    this.cached.clear();
    this.pendingUncached.clear();
  }

  public int numBlocks() {
    int blocks = 0;
    for (DatanodeStorageInfo entry : getStorageInfos()) {
      blocks += entry.numBlocks();
    }
    return blocks;
  }

  /**
   * Updates stats from datanode heartbeat.
   */
  void updateHeartbeat(StorageReport[] reports, long cacheCapacity,
      long cacheUsed, int xceiverCount, int volFailures,
      VolumeFailureSummary volumeFailureSummary) {
    updateHeartbeatState(reports, cacheCapacity, cacheUsed, xceiverCount,
        volFailures, volumeFailureSummary);
    heartbeatedSinceRegistration = true;
  }

  /**
   * process datanode heartbeat or stats initialization.
   */
  void updateHeartbeatState(StorageReport[] reports, long cacheCapacity,
      long cacheUsed, int xceiverCount, int volFailures,
      VolumeFailureSummary volumeFailureSummary) {
    updateStorageStats(reports, cacheCapacity, cacheUsed, xceiverCount,
        volFailures, volumeFailureSummary);
    setLastUpdate(Time.now());
    setLastUpdateMonotonic(Time.monotonicNow());
    rollBlocksScheduled(getLastUpdateMonotonic());
  }

  private void updateStorageStats(StorageReport[] reports, long cacheCapacity,
      long cacheUsed, int xceiverCount, int volFailures,
      VolumeFailureSummary volumeFailureSummary) {
    long totalCapacity = 0;
    long totalRemaining = 0;
    long totalBlockPoolUsed = 0;
    long totalDfsUsed = 0;
    long totalNonDfsUsed = 0;
    Set<String> visitedMount = new HashSet<>();
    Set<DatanodeStorageInfo> failedStorageInfos = null;
    int volumesAvailable = 0;

    // Decide if we should check for any missing StorageReport and mark it as
    // failed. There are different scenarios.
    // 1. When DN is running, a storage failed. Given the current DN
    //    implementation doesn't add recovered storage back to its storage list
    //    until DN restart, we can assume volFailures won't decrease
    //    during the current DN registration session.
    //    When volumeFailures == this.volumeFailures, it implies there is no
    //    state change. No need to check for failed storage. This is an
    //    optimization.  Recent versions of the DataNode report a
    //    VolumeFailureSummary containing the date/time of the last volume
    //    failure.  If that's available, then we check that instead for greater
    //    accuracy.
    // 2. After DN restarts, volFailures might not increase and it is possible
    //    we still have new failed storage. For example, admins reduce
    //    available storages in configuration. Another corner case
    //    is the failed volumes might change after restart; a) there
    //    is one good storage A, one restored good storage B, so there is
    //    one element in storageReports and that is A. b) A failed. c) Before
    //    DN sends HB to NN to indicate A has failed, DN restarts. d) After DN
    //    restarts, storageReports has one element which is B.
    final boolean checkFailedStorages;
    if (volumeFailureSummary != null && this.volumeFailureSummary != null) {
      checkFailedStorages = volumeFailureSummary.getLastVolumeFailureDate() >
          this.volumeFailureSummary.getLastVolumeFailureDate();
    } else {
      checkFailedStorages = (volFailures > this.volumeFailures) ||
          !heartbeatedSinceRegistration;
    }

    if (checkFailedStorages) {
      if (this.volumeFailures != volFailures) {
        LOG.info("Number of failed storages changes from {} to {}",
            this.volumeFailures, volFailures);
      }
      synchronized (storageMap) {
        failedStorageInfos =
            new HashSet<>(storageMap.values());
      }
    }

    setCacheCapacity(cacheCapacity);
    setCacheUsed(cacheUsed);
    setXceiverCount(xceiverCount);
    this.volumeFailures = volFailures;
    this.volumeFailureSummary = volumeFailureSummary;
    for (StorageReport report : reports) {

      DatanodeStorageInfo storage = null;
      synchronized (storageMap) {
        storage =
            storageMap.get(report.getStorage().getStorageID());
      }
      if (checkFailedStorages) {
        failedStorageInfos.remove(storage);
      }

      storage.receivedHeartbeat(report);
      // skip accounting for capacity of PROVIDED storages!
      if (StorageType.PROVIDED.equals(storage.getStorageType())) {
        continue;
      }

      totalCapacity += report.getCapacity();
      totalRemaining += report.getRemaining();
      totalBlockPoolUsed += report.getBlockPoolUsed();
      totalDfsUsed += report.getDfsUsed();
      String mount = report.getMount();
      // For volumes on the same mount,
      // ignore duplicated volumes for nonDfsUsed.
      if (mount == null || mount.isEmpty()) {
        totalNonDfsUsed += report.getNonDfsUsed();
      } else {
        if (!visitedMount.contains(mount)) {
          totalNonDfsUsed += report.getNonDfsUsed();
          visitedMount.add(mount);
        }
      }
      if (report.getRemaining() > 0 && storage.getState() != State.FAILED) {
        volumesAvailable += 1;
      }
    }
    this.numVolumesAvailable = volumesAvailable;

    // Update total metrics for the node.
    setCapacity(totalCapacity);
    setRemaining(totalRemaining);
    setBlockPoolUsed(totalBlockPoolUsed);
    setDfsUsed(totalDfsUsed);
    setNonDfsUsed(totalNonDfsUsed);
    if (checkFailedStorages) {
      updateFailedStorage(failedStorageInfos);
    }
    long storageMapSize;
    synchronized (storageMap) {
      storageMapSize = storageMap.size();
    }
    if (storageMapSize != reports.length) {
      pruneStorageMap(reports);
    }
  }

  void injectStorage(DatanodeStorageInfo s) {
    synchronized (storageMap) {
      DatanodeStorageInfo storage = storageMap.get(s.getStorageID());
      if (null == storage) {
        LOG.info("Adding new storage ID {} for DN {}", s.getStorageID(),
            getXferAddr());
        DFSTopologyNodeImpl parent = null;
        if (getParent() instanceof DFSTopologyNodeImpl) {
          parent = (DFSTopologyNodeImpl) getParent();
        }
        StorageType type = s.getStorageType();
        if (!hasStorageType(type) && parent != null) {
          // we are about to add a type this node currently does not have,
          // inform the parent that a new type is added to this datanode
          parent.childAddStorage(getName(), type);
        }
        storageMap.put(s.getStorageID(), s);
      } else {
        assert storage == s : "found " + storage + " expected " + s;
      }
    }
  }

  /**
   * Remove stale storages from storageMap. We must not remove any storages
   * as long as they have associated block replicas.
   */
  private void pruneStorageMap(final StorageReport[] reports) {
    synchronized (storageMap) {
      LOG.debug("Number of storages reported in heartbeat={};"
              + " Number of storages in storageMap={}", reports.length,
          storageMap.size());

      HashMap<String, DatanodeStorageInfo> excessStorages;

      // Init excessStorages with all known storages.
      excessStorages = new HashMap<>(storageMap);

      // Remove storages that the DN reported in the heartbeat.
      for (final StorageReport report : reports) {
        excessStorages.remove(report.getStorage().getStorageID());
      }

      // For each remaining storage, remove it if there are no associated
      // blocks.
      for (final DatanodeStorageInfo storageInfo : excessStorages.values()) {
        if (storageInfo.numBlocks() == 0) {
          DatanodeStorageInfo info =
              storageMap.remove(storageInfo.getStorageID());
          if (!hasStorageType(info.getStorageType())) {
            // we removed a storage, and as result there is no more such storage
            // type, inform the parent about this.
            if (getParent() instanceof DFSTopologyNodeImpl) {
              ((DFSTopologyNodeImpl) getParent()).childRemoveStorage(getName(),
                  info.getStorageType());
            }
          }
          LOG.info("Removed storage {} from DataNode {}", storageInfo, this);
        } else {
          // This can occur until all block reports are received.
          LOG.debug("Deferring removal of stale storage {} with {} blocks",
              storageInfo, storageInfo.numBlocks());
        }
      }
    }
  }

  private void updateFailedStorage(
      Set<DatanodeStorageInfo> failedStorageInfos) {
    for (DatanodeStorageInfo storageInfo : failedStorageInfos) {
      if (storageInfo.getState() != DatanodeStorage.State.FAILED) {
        LOG.info("{} failed.", storageInfo);
        storageInfo.setState(DatanodeStorage.State.FAILED);
      }
    }
  }

  private static class BlockIterator implements Iterator<BlockInfo> {
    private int index = 0;
    private final List<Iterator<BlockInfo>> iterators;
    
    private BlockIterator(final int startBlock,
                          final DatanodeStorageInfo... storages) {
      if(startBlock < 0) {
        throw new IllegalArgumentException(
            "Illegal value startBlock = " + startBlock);
      }
      List<Iterator<BlockInfo>> iterators = new ArrayList<>();
      int s = startBlock;
      int sumBlocks = 0;
      for (DatanodeStorageInfo e : storages) {
        int numBlocks = e.numBlocks();
        sumBlocks += numBlocks;
        if(sumBlocks <= startBlock) {
          s -= numBlocks;
        } else {
          iterators.add(e.getBlockIterator());
        }
      }
      this.iterators = Collections.unmodifiableList(iterators);
      // skip to the storage containing startBlock
      for(; s > 0 && hasNext(); s--) {
        next();
      }
    }

    @Override
    public boolean hasNext() {
      update();
      return index < iterators.size() && iterators.get(index).hasNext();
    }

    @Override
    public BlockInfo next() {
      update();
      return iterators.get(index).next();
    }
    
    @Override
    public void remove() {
      throw new UnsupportedOperationException("Remove unsupported.");
    }
    
    private void update() {
      while(index < iterators.size() - 1 && !iterators.get(index).hasNext()) {
        index++;
      }
    }
  }

  Iterator<BlockInfo> getBlockIterator() {
    return getBlockIterator(0);
  }

  /**
   * Get iterator, which starts iterating from the specified block.
   */
  Iterator<BlockInfo> getBlockIterator(final int startBlock) {
    return new BlockIterator(startBlock, getStorageInfos());
  }

  /**
   * Get iterator, which starts iterating from the specified block and storages.
   *
   * @param startBlock on which blocks are start iterating
   * @param storageInfos specified storages
   */
  Iterator<BlockInfo> getBlockIterator(
      final int startBlock, final DatanodeStorageInfo[] storageInfos) {
    return new BlockIterator(startBlock, storageInfos);
  }

  @VisibleForTesting
  public void incrementPendingReplicationWithoutTargets() {
    pendingReplicationWithoutTargets++;
  }

  @VisibleForTesting
  public void decrementPendingReplicationWithoutTargets() {
    pendingReplicationWithoutTargets--;
  }

  /**
   * Store block replication work.
   */
  @VisibleForTesting
  public void addBlockToBeReplicated(Block block,
      DatanodeStorageInfo[] targets) {
    assert(block != null && targets != null && targets.length > 0);
    replicateBlocks.offer(new BlockTargetPair(block, targets));
  }

  /**
   * Store ec block to be replicated work.
   */
  @VisibleForTesting
  public void addECBlockToBeReplicated(Block block, DatanodeStorageInfo[] targets) {
    assert (block != null && targets != null && targets.length > 0);
    ecBlocksToBeReplicated.offer(new BlockTargetPair(block, targets));
  }

  /**
   * Store block erasure coding work.
   */
  void addBlockToBeErasureCoded(ExtendedBlock block,
      DatanodeDescriptor[] sources, DatanodeStorageInfo[] targets,
      byte[] liveBlockIndices, byte[] excludeReconstrutedIndices, ErasureCodingPolicy ecPolicy) {
    assert (block != null && sources != null && sources.length > 0);
    BlockECReconstructionInfo task = new BlockECReconstructionInfo(block,
        sources, targets, liveBlockIndices, excludeReconstrutedIndices, ecPolicy);
    ecBlocksToBeErasureCoded.offer(task);
    BlockManager.LOG.debug("Adding block reconstruction task " + task + "to "
        + getName() + ", current queue size is " + ecBlocksToBeErasureCoded.size());
  }

  /**
   * Store block recovery work.
   */
  void addBlockToBeRecovered(BlockInfo block) {
    if(recoverBlocks.contains(block)) {
      // this prevents adding the same block twice to the recovery queue
      BlockManager.LOG.info(block + " is already in the recovery queue");
      return;
    }
    recoverBlocks.offer(block);
  }

  /**
   * Store block invalidation work.
   */
  void addBlocksToBeInvalidated(List<Block> blocklist) {
    assert(blocklist != null && blocklist.size() > 0);
    synchronized (invalidateBlocks) {
      for(Block blk : blocklist) {
        invalidateBlocks.add(blk);
      }
    }
  }

  /**
   * The number of work items that are pending to be replicated.
   */
  int getNumberOfBlocksToBeReplicated() {
    return pendingReplicationWithoutTargets + replicateBlocks.size()
        + ecBlocksToBeReplicated.size();
  }

  /**
   * The number of work items that are pending to be reconstructed.
   */
  @VisibleForTesting
  public int getNumberOfBlocksToBeErasureCoded() {
    return ecBlocksToBeErasureCoded.size();
  }

  /**
   * The number of ec work items that are pending to be replicated.
   */
  @VisibleForTesting
  public int getNumberOfECBlocksToBeReplicated() {
    return ecBlocksToBeReplicated.size();
  }

  @VisibleForTesting
  public int getNumberOfReplicateBlocks() {
    return replicateBlocks.size();
  }

  List<BlockTargetPair> getReplicationCommand(int maxTransfers) {
    return replicateBlocks.poll(maxTransfers);
  }

  List<BlockTargetPair> getECReplicatedCommand(int maxTransfers) {
    return ecBlocksToBeReplicated.poll(maxTransfers);
  }

  public List<BlockECReconstructionInfo> getErasureCodeCommand(
      int maxTransfers) {
    return ecBlocksToBeErasureCoded.poll(maxTransfers);
  }

  public BlockInfo[] getLeaseRecoveryCommand(int maxTransfers) {
    List<BlockInfo> blocks = recoverBlocks.poll(maxTransfers);
    if(blocks == null)
      return null;
    return blocks.toArray(new BlockInfo[blocks.size()]);
  }

  /**
   * Remove the specified number of blocks to be invalidated
   */
  public Block[] getInvalidateBlocks(int maxblocks) {
    synchronized (invalidateBlocks) {
      Block[] deleteList = invalidateBlocks.pollToArray(new Block[Math.min(
          invalidateBlocks.size(), maxblocks)]);
      return deleteList.length == 0 ? null : deleteList;
    }
  }

  @VisibleForTesting
  public boolean containsInvalidateBlock(Block block) {
    synchronized (invalidateBlocks) {
      return invalidateBlocks.contains(block);
    }
  }

  /**
   * Find whether the datanode contains good storage of given type to
   * place block of size <code>blockSize</code>.
   *
   * <p>Currently datanode only cares about the storage type, in this
   * method, the first storage of given type we see is returned.
   *
   * @param t requested storage type
   * @param blockSize requested block size
   * @param minBlocksForWrite requested the minimum number of blocks
   */
  public DatanodeStorageInfo chooseStorage4Block(StorageType t,
      long blockSize, int minBlocksForWrite) {
    final long requiredSize = blockSize * minBlocksForWrite;
    final long scheduledSize = blockSize * getBlocksScheduled(t);
    long remaining = 0;
    DatanodeStorageInfo storage = null;
    for (DatanodeStorageInfo s : getStorageInfos()) {
      if (s.getState() == State.NORMAL && s.getStorageType() == t) {
        if (storage == null) {
          storage = s;
        }
        long r = s.getRemaining();
        if (r >= requiredSize) {
          remaining += r;
        }
      }
    }
    if (requiredSize > remaining - scheduledSize) {
      BlockPlacementPolicy.LOG.debug(
          "The node {} does not have enough {} space (required={},"
          + " scheduled={}, remaining={}).",
          this, t, requiredSize, scheduledSize, remaining);
      return null;
    }
    return storage;
  }

  /**
   * @return Approximate number of blocks currently scheduled to be written 
   * to the given storage type of this datanode.
   */
  public int getBlocksScheduled(StorageType t) {
    return (int)(currApproxBlocksScheduled.get(t)
        + prevApproxBlocksScheduled.get(t));
  }

  /**
   * @return Approximate number of blocks currently scheduled to be written 
   * to this datanode.
   */
  public int getBlocksScheduled() {
    return (int)(currApproxBlocksScheduled.sum()
        + prevApproxBlocksScheduled.sum());
  }

  /** Increment the number of blocks scheduled. */
  void incrementBlocksScheduled(StorageType t) {
    currApproxBlocksScheduled.add(t, 1);
  }
  
  /** Decrement the number of blocks scheduled. */
  void decrementBlocksScheduled(StorageType t) {
    if (prevApproxBlocksScheduled.get(t) > 0) {
      prevApproxBlocksScheduled.subtract(t, 1);
    } else if (currApproxBlocksScheduled.get(t) > 0) {
      currApproxBlocksScheduled.subtract(t, 1);
    } 
    // its ok if both counters are zero.
  }
  
  /** Adjusts curr and prev number of blocks scheduled every few minutes. */
  private void rollBlocksScheduled(long now) {
    if (now - lastBlocksScheduledRollTime > BLOCKS_SCHEDULED_ROLL_INTERVAL) {
      prevApproxBlocksScheduled.set(currApproxBlocksScheduled);
      currApproxBlocksScheduled.reset();
      lastBlocksScheduledRollTime = now;
    }
  }
  
  @Override
  public int hashCode() {
    // Super implementation is sufficient
    return super.hashCode();
  }

  @Override
  public boolean equals(Object obj) {
    // Sufficient to use super equality as datanodes are uniquely identified
    // by DatanodeID
    return (this == obj) || super.equals(obj);
  }

  /** Leaving service status. */
  public class LeavingServiceStatus {
    private int underReplicatedBlocks;
    private int underReplicatedBlocksInOpenFiles;
    private int outOfServiceOnlyReplicas;
    private LightWeightHashSet<Long> underReplicatedOpenFiles =
        new LightWeightLinkedSet<>();
    private long startTime;
    
    synchronized void set(int lowRedundancyBlocksInOpenFiles,
        LightWeightHashSet<Long> underRepInOpenFiles,
        int underRepBlocks, int outOfServiceOnlyRep) {
      if (!isDecommissionInProgress() && !isEnteringMaintenance()) {
        return;
      }
      underReplicatedOpenFiles = underRepInOpenFiles;
      underReplicatedBlocks = underRepBlocks;
      underReplicatedBlocksInOpenFiles = lowRedundancyBlocksInOpenFiles;
      outOfServiceOnlyReplicas = outOfServiceOnlyRep;
    }

    /** @return the number of under-replicated blocks */
    public synchronized int getUnderReplicatedBlocks() {
      if (!isDecommissionInProgress() && !isEnteringMaintenance()) {
        return 0;
      }
      return underReplicatedBlocks;
    }
    /** @return the number of blocks with out-of-service-only replicas */
    public synchronized int getOutOfServiceOnlyReplicas() {
      if (!isDecommissionInProgress() && !isEnteringMaintenance()) {
        return 0;
      }
      return outOfServiceOnlyReplicas;
    }
    /** @return the number of under-replicated blocks in open files */
    public synchronized int getUnderReplicatedInOpenFiles() {
      if (!isDecommissionInProgress() && !isEnteringMaintenance()) {
        return 0;
      }
      return underReplicatedBlocksInOpenFiles;
    }
    /** @return the collection of under-replicated blocks in open files */
    public synchronized LightWeightHashSet<Long> getOpenFiles() {
      if (!isDecommissionInProgress() && !isEnteringMaintenance()) {
        return new LightWeightLinkedSet<>();
      }
      return underReplicatedOpenFiles;
    }
    /** Set start time */
    public synchronized void setStartTime(long time) {
      if (!isDecommissionInProgress() && !isEnteringMaintenance()) {
        return;
      }
      startTime = time;
    }
    /** @return start time */
    public synchronized long getStartTime() {
      if (!isDecommissionInProgress() && !isEnteringMaintenance()) {
        return 0;
      }
      return startTime;
    }
  }  // End of class LeavingServiceStatus

  /**
   * Set the flag to indicate if this datanode is disallowed from communicating
   * with the namenode.
   */
  public void setDisallowed(boolean flag) {
    disallowed = flag;
  }
  /** Is the datanode disallowed from communicating with the namenode? */
  public boolean isDisallowed() {
    return disallowed;
  }

  /**
   * @return number of failed volumes in the datanode.
   */
  public int getVolumeFailures() {
    return volumeFailures;
  }

  /**
   * Returns info about volume failures.
   *
   * @return info about volume failures, possibly null
   */
  public VolumeFailureSummary getVolumeFailureSummary() {
    return volumeFailureSummary;
  }

  /**
   * Return the number of volumes that can be written.
   * @return the number of volumes that can be written.
   */
  public int getNumVolumesAvailable() {
    return numVolumesAvailable;
  }

  /**
   * @param nodeReg DatanodeID to update registration for.
   */
  @Override
  public void updateRegInfo(DatanodeID nodeReg) {
    super.updateRegInfo(nodeReg);
    
    // must re-process IBR after re-registration
    for(DatanodeStorageInfo storage : getStorageInfos()) {
      if (storage.getStorageType() != StorageType.PROVIDED) {
        storage.setBlockReportCount(0);
      }
    }
    heartbeatedSinceRegistration = false;
    forceRegistration = false;
  }

  /**
   * @return balancer bandwidth in bytes per second for this datanode
   */
  public synchronized long getBalancerBandwidth() {
    return this.bandwidth;
  }

  /**
   * @param bandwidth balancer bandwidth in bytes per second for this datanode
   */
  public synchronized void setBalancerBandwidth(long bandwidth) {
    this.bandwidth = bandwidth;
  }

  @Override
  public String dumpDatanode() {
    StringBuilder sb = new StringBuilder(super.dumpDatanode());
    int repl = replicateBlocks.size();
    if (repl > 0) {
      sb.append(" ").append(repl).append(" blocks to be replicated;");
    }
    int ecRepl = ecBlocksToBeReplicated.size();
    if (ecRepl > 0) {
      sb.append(" ").append(ecRepl).append(" ec blocks to be replicated;");
    }
    int ec = ecBlocksToBeErasureCoded.size();
    if(ec > 0) {
      sb.append(" ").append(ec).append(" blocks to be erasure coded;");
    }
    int inval = invalidateBlocks.size();
    if (inval > 0) {
      sb.append(" ").append(inval).append(" blocks to be invalidated;");      
    }
    int recover = recoverBlocks.size();
    if (recover > 0) {
      sb.append(" ").append(recover).append(" blocks to be recovered;");
    }
    return sb.toString();
  }

  DatanodeStorageInfo updateStorage(DatanodeStorage s) {
    synchronized (storageMap) {
      DatanodeStorageInfo storage = storageMap.get(s.getStorageID());
      DFSTopologyNodeImpl parent = null;
      if (getParent() instanceof DFSTopologyNodeImpl) {
        parent = (DFSTopologyNodeImpl) getParent();
      }

      if (storage == null) {
        LOG.info("Adding new storage ID {} for DN {}", s.getStorageID(),
            getXferAddr());
        StorageType type = s.getStorageType();
        if (!hasStorageType(type) && parent != null) {
          // we are about to add a type this node currently does not have,
          // inform the parent that a new type is added to this datanode
          parent.childAddStorage(getName(), s.getStorageType());
        }
        storage = new DatanodeStorageInfo(this, s);
        storageMap.put(s.getStorageID(), storage);
      } else if (storage.getState() != s.getState() ||
                 storage.getStorageType() != s.getStorageType()) {
        // For backwards compatibility, make sure that the type and
        // state are updated. Some reports from older datanodes do
        // not include these fields so we may have assumed defaults.
        StorageType oldType = storage.getStorageType();
        StorageType newType = s.getStorageType();
        if (oldType != newType && !hasStorageType(newType) && parent != null) {
          // we are about to add a type this node currently does not have
          // inform the parent that a new type is added to this datanode
          // if old == new, nothing's changed. don't bother
          parent.childAddStorage(getName(), newType);
        }
        storage.updateFromStorage(s);
        storageMap.put(storage.getStorageID(), storage);
        if (oldType != newType && !hasStorageType(oldType) && parent != null) {
          // there is no more old type storage on this datanode, inform parent
          // about this change.
          parent.childRemoveStorage(getName(), oldType);
        }
      }
      return storage;
    }
  }

  /**
   * @return   The time at which we last sent caching directives to this 
   *           DataNode, in monotonic milliseconds.
   */
  public long getLastCachingDirectiveSentTimeMs() {
    return this.lastCachingDirectiveSentTimeMs;
  }

  /**
   * @param time  The time at which we last sent caching directives to this 
   *              DataNode, in monotonic milliseconds.
   */
  public void setLastCachingDirectiveSentTimeMs(long time) {
    this.lastCachingDirectiveSentTimeMs = time;
  }
  
  /**
   * @return whether at least first block report has been received
   */
  public boolean checkBlockReportReceived() {
    if(this.getStorageInfos().length == 0) {
      return false;
    }
    for(DatanodeStorageInfo storageInfo: this.getStorageInfos()) {
      if(storageInfo.getBlockReportCount() == 0 )
        return false;
    }
    return true;
  }

  public void setForceRegistration(boolean force) {
    forceRegistration = force;
  }

  public boolean isRegistered() {
    return isAlive() && !forceRegistration;
  }

  public boolean hasStorageType(StorageType type) {
    for (DatanodeStorageInfo dnStorage : getStorageInfos()) {
      if (dnStorage.getStorageType() == type) {
        return true;
      }
    }
    return false;
  }
}