DiskBalancer.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 */
package org.apache.hadoop.hdfs.server.datanode;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.util.Preconditions;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi
    .FsVolumeReferences;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus
    .DiskBalancerWorkEntry;
import org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus.Result;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.diskbalancer.DiskBalancerConstants;
import org.apache.hadoop.hdfs.server.diskbalancer.DiskBalancerException;
import org.apache.hadoop.hdfs.server.diskbalancer.planner.NodePlan;
import org.apache.hadoop.hdfs.server.diskbalancer.planner.Step;
import org.apache.hadoop.hdfs.web.JsonUtil;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.LinkedList;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;

/**
 * Worker class for Disk Balancer.
 * <p>
 * Here is the high level logic executed by this class. Users can submit disk
 * balancing plans using submitPlan calls. After a set of sanity checks the plan
 * is admitted and put into workMap.
 * <p>
 * The executePlan launches a thread that picks up work from workMap and hands
 * it over to the BlockMover#copyBlocks function.
 * <p>
 * Constraints :
 * <p>
 * Only one plan can be executing in a datanode at any given time. This is
 * ensured by checking the future handle of the worker thread in submitPlan.
 */
@InterfaceAudience.Private
public class DiskBalancer {

  @VisibleForTesting
  public static final Logger LOG = LoggerFactory.getLogger(DiskBalancer
      .class);
  private final FsDatasetSpi<?> dataset;
  private final String dataNodeUUID;
  private final BlockMover blockMover;
  private final ReentrantLock lock;
  private final ConcurrentHashMap<VolumePair, DiskBalancerWorkItem> workMap;
  private volatile boolean isDiskBalancerEnabled = false;
  private ExecutorService scheduler;
  private Future future;
  private String planID;
  private String planFile;
  private DiskBalancerWorkStatus.Result currentResult;
  private long bandwidth;
  private volatile long planValidityInterval;
  private final Configuration config;

  /**
   * Constructs a Disk Balancer object. This object takes care of reading a
   * NodePlan and executing it against a set of volumes.
   *
   * @param dataNodeUUID - Data node UUID
   * @param conf         - Hdfs Config
   * @param blockMover   - Object that supports moving blocks.
   */
  public DiskBalancer(String dataNodeUUID,
                      Configuration conf, BlockMover blockMover) {
    this.config = conf;
    this.currentResult = Result.NO_PLAN;
    this.blockMover = blockMover;
    this.dataset = this.blockMover.getDataset();
    this.dataNodeUUID = dataNodeUUID;
    scheduler = Executors.newSingleThreadExecutor();
    lock = new ReentrantLock();
    workMap = new ConcurrentHashMap<>();
    this.planID = "";  // to keep protobuf happy.
    this.planFile = "";  // to keep protobuf happy.
    this.isDiskBalancerEnabled = conf.getBoolean(
        DFSConfigKeys.DFS_DISK_BALANCER_ENABLED,
        DFSConfigKeys.DFS_DISK_BALANCER_ENABLED_DEFAULT);
    this.bandwidth = conf.getInt(
        DFSConfigKeys.DFS_DISK_BALANCER_MAX_DISK_THROUGHPUT,
        DFSConfigKeys.DFS_DISK_BALANCER_MAX_DISK_THROUGHPUT_DEFAULT);
    this.planValidityInterval = conf.getTimeDuration(
        DFSConfigKeys.DFS_DISK_BALANCER_PLAN_VALID_INTERVAL,
        DFSConfigKeys.DFS_DISK_BALANCER_PLAN_VALID_INTERVAL_DEFAULT,
        TimeUnit.MILLISECONDS);
  }

  /**
   * Shutdown  disk balancer services.
   */
  public void shutdown() {
    lock.lock();
    boolean needShutdown = false;
    try {
      this.isDiskBalancerEnabled = false;
      this.currentResult = Result.NO_PLAN;
      if ((this.future != null) && (!this.future.isDone())) {
        this.currentResult = Result.PLAN_CANCELLED;
        this.blockMover.setExitFlag();
        scheduler.shutdown();
        needShutdown = true;
      }
    } finally {
      lock.unlock();
    }
    // no need to hold lock while shutting down executor.
    if (needShutdown) {
      shutdownExecutor();
    }
  }

  /**
   * Shutdown the executor.
   */
  private void shutdownExecutor() {
    final int secondsTowait = 10;
    try {
      if (!scheduler.awaitTermination(secondsTowait, TimeUnit.SECONDS)) {
        scheduler.shutdownNow();
        if (!scheduler.awaitTermination(secondsTowait, TimeUnit.SECONDS)) {
          LOG.error("Disk Balancer : Scheduler did not terminate.");
        }
      }
    } catch (InterruptedException ex) {
      scheduler.shutdownNow();
      Thread.currentThread().interrupt();
    }
  }

  /**
   * Takes a client submitted plan and converts into a set of work items that
   * can be executed by the blockMover.
   *
   * @param planId      - A SHA-1 of the plan string
   * @param planVersion - version of the plan string - for future use.
   * @param planFileName    - Plan file name
   * @param planData    - Plan data in json format
   * @param force       - Skip some validations and execute the plan file.
   * @throws DiskBalancerException
   */
  public void submitPlan(String planId, long planVersion, String planFileName,
                         String planData, boolean force)
          throws DiskBalancerException {
    lock.lock();
    try {
      checkDiskBalancerEnabled();
      if ((this.future != null) && (!this.future.isDone())) {
        LOG.error("Disk Balancer - Executing another plan (Plan File: {}, Plan ID: {}), " +
            "submitPlan failed.", planFile, planID);
        throw new DiskBalancerException("Executing another plan",
            DiskBalancerException.Result.PLAN_ALREADY_IN_PROGRESS);
      }
      NodePlan nodePlan = verifyPlan(planId, planVersion, planData, force);
      createWorkPlan(nodePlan);
      this.planID = planId;
      this.planFile = planFileName;
      this.currentResult = Result.PLAN_UNDER_PROGRESS;
      executePlan();
    } finally {
      lock.unlock();
    }
  }

  /**
   * Get FsVolume by volume UUID.
   * @param fsDataset
   * @param volUuid
   * @return FsVolumeSpi
   */
  private static FsVolumeSpi getFsVolume(final FsDatasetSpi<?> fsDataset,
      final String volUuid) {
    FsVolumeSpi fsVolume = null;
    try (FsVolumeReferences volumeReferences =
           fsDataset.getFsVolumeReferences()) {
      for (int i = 0; i < volumeReferences.size(); i++) {
        if (volumeReferences.get(i).getStorageID().equals(volUuid)) {
          fsVolume = volumeReferences.get(i);
          break;
        }
      }
    } catch (IOException e) {
      LOG.warn("Disk Balancer - Error when closing volume references: ", e);
    }
    return fsVolume;
  }

  /**
   * Returns the current work status of a previously submitted Plan.
   *
   * @return DiskBalancerWorkStatus.
   * @throws DiskBalancerException
   */
  public DiskBalancerWorkStatus queryWorkStatus() throws DiskBalancerException {
    lock.lock();
    try {
      checkDiskBalancerEnabled();
      // if we had a plan in progress, check if it is finished.
      if (this.currentResult == Result.PLAN_UNDER_PROGRESS &&
          this.future != null &&
          this.future.isDone()) {
        this.currentResult = Result.PLAN_DONE;
      }

      DiskBalancerWorkStatus status =
          new DiskBalancerWorkStatus(this.currentResult, this.planID,
                  this.planFile);
      for (Map.Entry<VolumePair, DiskBalancerWorkItem> entry :
          workMap.entrySet()) {
        DiskBalancerWorkEntry workEntry = new DiskBalancerWorkEntry(
            entry.getKey().getSourceVolBasePath(),
            entry.getKey().getDestVolBasePath(),
            entry.getValue());
        status.addWorkEntry(workEntry);
      }
      return status;
    } finally {
      lock.unlock();
    }
  }

  /**
   * Cancels a running plan.
   *
   * @param planID - Hash of the plan to cancel.
   * @throws DiskBalancerException
   */
  public void cancelPlan(String planID) throws DiskBalancerException {
    lock.lock();
    boolean needShutdown = false;
    try {
      checkDiskBalancerEnabled();
      if (this.planID == null ||
          !this.planID.equals(planID) ||
          this.planID.isEmpty()) {
        LOG.error("Disk Balancer - No such plan. Cancel plan failed. PlanID: " +
            planID);
        throw new DiskBalancerException("No such plan.",
            DiskBalancerException.Result.NO_SUCH_PLAN);
      }
      if (!this.future.isDone()) {
        this.currentResult = Result.PLAN_CANCELLED;
        this.blockMover.setExitFlag();
        scheduler.shutdown();
        needShutdown = true;
      }
    } finally {
      lock.unlock();
    }
    // no need to hold lock while shutting down executor.
    if (needShutdown) {
      shutdownExecutor();
    }
  }

  /**
   * Returns a volume ID to Volume base path map.
   *
   * @return Json string of the volume map.
   * @throws DiskBalancerException
   */
  public String getVolumeNames() throws DiskBalancerException {
    lock.lock();
    try {
      checkDiskBalancerEnabled();
      return JsonUtil.toJsonString(getStorageIDToVolumeBasePathMap());
    } catch (DiskBalancerException ex) {
      throw ex;
    } catch (IOException e) {
      throw new DiskBalancerException("Internal error, Unable to " +
          "create JSON string.", e,
          DiskBalancerException.Result.INTERNAL_ERROR);
    } finally {
      lock.unlock();
    }
  }

  /**
   * Returns the current bandwidth.
   *
   * @return string representation of bandwidth.
   * @throws DiskBalancerException
   */
  public long getBandwidth() throws DiskBalancerException {
    lock.lock();
    try {
      checkDiskBalancerEnabled();
      return this.bandwidth;
    } finally {
      lock.unlock();
    }
  }

  /**
   * Throws if Disk balancer is disabled.
   *
   * @throws DiskBalancerException
   */
  private void checkDiskBalancerEnabled()
      throws DiskBalancerException {
    if (!isDiskBalancerEnabled) {
      throw new DiskBalancerException("Disk Balancer is not enabled.",
          DiskBalancerException.Result.DISK_BALANCER_NOT_ENABLED);
    }
  }

  /**
   * Sets Disk balancer is to enable or not to enable.
   *
   * @param diskBalancerEnabled
   *          true, enable diskBalancer, otherwise false to disable it.
   */
  public void setDiskBalancerEnabled(boolean diskBalancerEnabled) {
    isDiskBalancerEnabled = diskBalancerEnabled;
  }

  /**
   * Returns the value indicating if diskBalancer is enabled.
   *
   * @return boolean.
   */
  @VisibleForTesting
  public boolean isDiskBalancerEnabled() {
    return isDiskBalancerEnabled;
  }

  /**
   * Sets maximum amount of time disk balancer plan is valid.
   *
   * @param planValidityInterval - maximum amount of time in the unit of milliseconds.
   */
  public void setPlanValidityInterval(long planValidityInterval) {
    this.config.setTimeDuration(DFSConfigKeys.DFS_DISK_BALANCER_PLAN_VALID_INTERVAL,
        planValidityInterval, TimeUnit.MILLISECONDS);
    this.planValidityInterval = planValidityInterval;
  }

  /**
   * Gets maximum amount of time disk balancer plan is valid.
   *
   * @return the maximum amount of time in milliseconds.
   */
  @VisibleForTesting
  public long getPlanValidityInterval() {
    return planValidityInterval;
  }

  /**
   * Gets maximum amount of time disk balancer plan is valid in config.
   *
   * @return the maximum amount of time in milliseconds.
   */
  @VisibleForTesting
  public long getPlanValidityIntervalInConfig() {
    return config.getTimeDuration(DFSConfigKeys.DFS_DISK_BALANCER_PLAN_VALID_INTERVAL,
        DFSConfigKeys.DFS_DISK_BALANCER_PLAN_VALID_INTERVAL_DEFAULT, TimeUnit.MILLISECONDS);
  }

  /**
   * Verifies that user provided plan is valid.
   *
   * @param planID      - SHA-1 of the plan.
   * @param planVersion - Version of the plan, for future use.
   * @param plan        - Plan String in Json.
   * @param force       - Skip verifying when the plan was generated.
   * @return a NodePlan Object.
   * @throws DiskBalancerException
   */
  private NodePlan verifyPlan(String planID, long planVersion, String plan,
                              boolean force) throws DiskBalancerException {

    Preconditions.checkState(lock.isHeldByCurrentThread());
    verifyPlanVersion(planVersion);
    NodePlan nodePlan = verifyPlanHash(planID, plan);
    if (!force) {
      verifyTimeStamp(nodePlan);
    }
    verifyNodeUUID(nodePlan);
    return nodePlan;
  }

  /**
   * Verifies the plan version is something that we support.
   *
   * @param planVersion - Long version.
   * @throws DiskBalancerException
   */
  private void verifyPlanVersion(long planVersion)
      throws DiskBalancerException {
    if ((planVersion < DiskBalancerConstants.DISKBALANCER_MIN_VERSION) ||
        (planVersion > DiskBalancerConstants.DISKBALANCER_MAX_VERSION)) {
      LOG.error("Disk Balancer - Invalid plan version.");
      throw new DiskBalancerException("Invalid plan version.",
          DiskBalancerException.Result.INVALID_PLAN_VERSION);
    }
  }

  /**
   * Verifies that plan matches the SHA-1 provided by the client.
   *
   * @param planID - SHA-1 Hex Bytes
   * @param plan   - Plan String
   * @throws DiskBalancerException
   */
  private NodePlan verifyPlanHash(String planID, String plan)
      throws DiskBalancerException {
    final long sha1Length = 40;
    if (plan == null || plan.length() == 0) {
      LOG.error("Disk Balancer -  Invalid plan.");
      throw new DiskBalancerException("Invalid plan.",
          DiskBalancerException.Result.INVALID_PLAN);
    }

    if ((planID == null) ||
        (planID.length() != sha1Length) ||
        !DigestUtils.sha1Hex(plan.getBytes(StandardCharsets.UTF_8))
            .equalsIgnoreCase(planID)) {
      LOG.error("Disk Balancer - Invalid plan hash.");
      throw new DiskBalancerException("Invalid or mis-matched hash.",
          DiskBalancerException.Result.INVALID_PLAN_HASH);
    }

    try {
      return NodePlan.parseJson(plan);
    } catch (IOException ex) {
      throw new DiskBalancerException("Parsing plan failed.", ex,
          DiskBalancerException.Result.MALFORMED_PLAN);
    }
  }

  /**
   * Verifies that this plan is not older than 24 hours.
   *
   * @param plan - Node Plan
   */
  private void verifyTimeStamp(NodePlan plan) throws DiskBalancerException {
    long now = Time.now();
    long planTime = plan.getTimeStamp();

    if ((planTime + planValidityInterval) < now) {
      String planValidity = config.get(
          DFSConfigKeys.DFS_DISK_BALANCER_PLAN_VALID_INTERVAL,
          DFSConfigKeys.DFS_DISK_BALANCER_PLAN_VALID_INTERVAL_DEFAULT);
      if (planValidity.matches("[0-9]$")) {
        planValidity += "ms";
      }
      String errorString = "Plan was generated more than " + planValidity
          + " ago";
      LOG.error("Disk Balancer - " + errorString);
      throw new DiskBalancerException(errorString,
          DiskBalancerException.Result.OLD_PLAN_SUBMITTED);
    }
  }

  /**
   * Verify Node UUID.
   *
   * @param plan - Node Plan
   */
  private void verifyNodeUUID(NodePlan plan) throws DiskBalancerException {
    if ((plan.getNodeUUID() == null) ||
        !plan.getNodeUUID().equals(this.dataNodeUUID)) {
      LOG.error("Disk Balancer - Plan was generated for another node.");
      throw new DiskBalancerException(
          "Plan was generated for another node.",
          DiskBalancerException.Result.DATANODE_ID_MISMATCH);
    }
  }

  /**
   * Convert a node plan to DiskBalancerWorkItem that Datanode can execute.
   *
   * @param plan - Node Plan
   */
  private void createWorkPlan(NodePlan plan) throws DiskBalancerException {
    Preconditions.checkState(lock.isHeldByCurrentThread());

    // Cleanup any residual work in the map.
    workMap.clear();
    Map<String, String> storageIDToVolBasePathMap =
        getStorageIDToVolumeBasePathMap();

    for (Step step : plan.getVolumeSetPlans()) {
      String sourceVolUuid = step.getSourceVolume().getUuid();
      String destVolUuid = step.getDestinationVolume().getUuid();

      String sourceVolBasePath = storageIDToVolBasePathMap.get(sourceVolUuid);
      if (sourceVolBasePath == null) {
        final String errMsg = "Disk Balancer - Unable to find source volume: "
            + step.getSourceVolume().getPath() + ". SubmitPlan failed.";
        LOG.error(errMsg);
        throw new DiskBalancerException(errMsg,
            DiskBalancerException.Result.INVALID_VOLUME);
      }

      String destVolBasePath = storageIDToVolBasePathMap.get(destVolUuid);
      if (destVolBasePath == null) {
        final String errMsg = "Disk Balancer - Unable to find dest volume: "
            + step.getDestinationVolume().getPath() + ". SubmitPlan failed.";
        LOG.error(errMsg);
        throw new DiskBalancerException(errMsg,
            DiskBalancerException.Result.INVALID_VOLUME);
      }
      VolumePair volumePair = new VolumePair(sourceVolUuid,
          sourceVolBasePath, destVolUuid, destVolBasePath);
      createWorkPlan(volumePair, step);
    }
  }

  /**
   * Returns volume UUID to volume base path map.
   *
   * @return Map
   * @throws DiskBalancerException
   */
  private Map<String, String> getStorageIDToVolumeBasePathMap()
      throws DiskBalancerException {
    Map<String, String> storageIDToVolBasePathMap = new HashMap<>();
    // Get volumes snapshot so no need to acquire dataset lock.
    try (FsDatasetSpi.FsVolumeReferences references = dataset.
        getFsVolumeReferences()) {
      for (int ndx = 0; ndx < references.size(); ndx++) {
        FsVolumeSpi vol = references.get(ndx);
        storageIDToVolBasePathMap.put(vol.getStorageID(),
            vol.getBaseURI().getPath());
      }
    } catch (IOException ex) {
      LOG.error("Disk Balancer - Internal Error.", ex);
      throw new DiskBalancerException("Internal error", ex,
          DiskBalancerException.Result.INTERNAL_ERROR);
    }
    return storageIDToVolBasePathMap;
  }

  /**
   * Starts Executing the plan, exits when the plan is done executing.
   */
  private void executePlan() {
    Preconditions.checkState(lock.isHeldByCurrentThread());
    this.blockMover.setRunnable();
    if (this.scheduler.isShutdown()) {
      this.scheduler = Executors.newSingleThreadExecutor();
    }

    this.future = scheduler.submit(new Runnable() {
      @Override
      public void run() {
        Thread.currentThread().setName("DiskBalancerThread");
        LOG.info("Executing Disk balancer plan. Plan File: {}, Plan ID: {}",
            planFile, planID);
        for (Map.Entry<VolumePair, DiskBalancerWorkItem> entry :
            workMap.entrySet()) {
          blockMover.setRunnable();
          blockMover.copyBlocks(entry.getKey(), entry.getValue());
        }
      }
    });
  }

  /**
   * Insert work items to work map.
   * @param volumePair - VolumePair
   * @param step - Move Step
   */
  private void createWorkPlan(final VolumePair volumePair, Step step)
      throws DiskBalancerException {
    if (volumePair.getSourceVolUuid().equals(volumePair.getDestVolUuid())) {
      final String errMsg = "Disk Balancer - Source and destination volumes " +
          "are same: " + volumePair.getSourceVolUuid();
      LOG.warn(errMsg);
      throw new DiskBalancerException(errMsg,
          DiskBalancerException.Result.INVALID_MOVE);
    }
    long bytesToMove = step.getBytesToMove();
    // In case we have a plan with more than
    // one line of same VolumePair
    // we compress that into one work order.
    if (workMap.containsKey(volumePair)) {
      bytesToMove += workMap.get(volumePair).getBytesToCopy();
    }

    DiskBalancerWorkItem work = new DiskBalancerWorkItem(bytesToMove, 0);

    // all these values can be zero, if so we will use
    // values from configuration.
    work.setBandwidth(step.getBandwidth());
    work.setTolerancePercent(step.getTolerancePercent());
    work.setMaxDiskErrors(step.getMaxDiskErrors());
    workMap.put(volumePair, work);
  }

  /**
   * BlockMover supports moving blocks across Volumes.
   */
  public interface BlockMover {
    /**
     * Copies blocks from a set of volumes.
     *
     * @param pair - Source and Destination Volumes.
     * @param item - Number of bytes to move from volumes.
     */
    void copyBlocks(VolumePair pair, DiskBalancerWorkItem item);

    /**
     * Begin the actual copy operations. This is useful in testing.
     */
    void setRunnable();

    /**
     * Tells copyBlocks to exit from the copy routine.
     */
    void setExitFlag();

    /**
     * Returns a pointer to the current dataset we are operating against.
     *
     * @return FsDatasetSpi
     */
    FsDatasetSpi getDataset();

    /**
     * Returns time when this plan started executing.
     *
     * @return Start time in milliseconds.
     */
    long getStartTime();

    /**
     * Number of seconds elapsed.
     *
     * @return time in seconds
     */
    long getElapsedSeconds();

  }

  /**
   * Holds source and dest volumes UUIDs and their BasePaths
   * that disk balancer will be operating against.
   */
  public static class VolumePair {
    private final String sourceVolUuid;
    private final String destVolUuid;
    private final String sourceVolBasePath;
    private final String destVolBasePath;

    /**
     * Constructs a volume pair.
     * @param sourceVolUuid     - Source Volume
     * @param sourceVolBasePath - Source Volume Base Path
     * @param destVolUuid       - Destination Volume
     * @param destVolBasePath   - Destination Volume Base Path
     */
    public VolumePair(final String sourceVolUuid,
        final String sourceVolBasePath, final String destVolUuid,
        final String destVolBasePath) {
      this.sourceVolUuid = sourceVolUuid;
      this.sourceVolBasePath = sourceVolBasePath;
      this.destVolUuid = destVolUuid;
      this.destVolBasePath = destVolBasePath;
    }

    /**
     * Gets source volume UUID.
     *
     * @return UUID String
     */
    public String getSourceVolUuid() {
      return sourceVolUuid;
    }

    /**
     * Gets source volume base path.
     * @return String
     */
    public String getSourceVolBasePath() {
      return sourceVolBasePath;
    }
    /**
     * Gets destination volume UUID.
     *
     * @return UUID String
     */
    public String getDestVolUuid() {
      return destVolUuid;
    }

    /**
     * Gets desitnation volume base path.
     *
     * @return String
     */
    public String getDestVolBasePath() {
      return destVolBasePath;
    }

    @Override
    public boolean equals(Object o) {
      if (this == o) {
        return true;
      }
      if (o == null || getClass() != o.getClass()) {
        return false;
      }

      VolumePair that = (VolumePair) o;
      return sourceVolUuid.equals(that.sourceVolUuid)
          && sourceVolBasePath.equals(that.sourceVolBasePath)
          && destVolUuid.equals(that.destVolUuid)
          && destVolBasePath.equals(that.destVolBasePath);
    }

    @Override
    public int hashCode() {
      final int primeNum = 31;
      final List<String> volumeStrList = Arrays.asList(sourceVolUuid,
          sourceVolBasePath, destVolUuid, destVolBasePath);
      int result = 1;
      for (String str : volumeStrList) {
        result = (result * primeNum) + str.hashCode();
      }
      return result;
    }
  }

  /**
   * Actual DataMover class for DiskBalancer.
   * <p>
   */
  public static class DiskBalancerMover implements BlockMover {
    private final FsDatasetSpi dataset;
    private long diskBandwidth;
    private long blockTolerance;
    private long maxDiskErrors;
    private int poolIndex;
    private AtomicBoolean shouldRun;
    private long startTime;
    private long secondsElapsed;

    /**
     * Constructs diskBalancerMover.
     *
     * @param dataset Dataset
     * @param conf    Configuration
     */
    public DiskBalancerMover(FsDatasetSpi dataset, Configuration conf) {
      this.dataset = dataset;
      shouldRun = new AtomicBoolean(false);

      this.diskBandwidth = conf.getLong(
          DFSConfigKeys.DFS_DISK_BALANCER_MAX_DISK_THROUGHPUT,
          DFSConfigKeys.DFS_DISK_BALANCER_MAX_DISK_THROUGHPUT_DEFAULT);

      this.blockTolerance = conf.getLong(
          DFSConfigKeys.DFS_DISK_BALANCER_BLOCK_TOLERANCE,
          DFSConfigKeys.DFS_DISK_BALANCER_BLOCK_TOLERANCE_DEFAULT);

      this.maxDiskErrors = conf.getLong(
          DFSConfigKeys.DFS_DISK_BALANCER_MAX_DISK_ERRORS,
          DFSConfigKeys.DFS_DISK_BALANCER_MAX_DISK_ERRORS_DEFAULT);

      // Since these are user provided values make sure it is sane
      // or ignore faulty values.
      if (this.diskBandwidth <= 0) {
        LOG.debug("Found 0 or less as max disk throughput, ignoring config " +
            "value. value : " + diskBandwidth);
        diskBandwidth =
            DFSConfigKeys.DFS_DISK_BALANCER_MAX_DISK_THROUGHPUT_DEFAULT;
      }

      if (this.blockTolerance <= 0) {
        LOG.debug("Found 0 or less for block tolerance value, ignoring config" +
            "value. value : " + blockTolerance);
        blockTolerance =
            DFSConfigKeys.DFS_DISK_BALANCER_BLOCK_TOLERANCE_DEFAULT;

      }

      if (this.maxDiskErrors < 0) {
        LOG.debug("Found  less than 0 for maxDiskErrors value, ignoring " +
            "config value. value : " + maxDiskErrors);
        maxDiskErrors =
            DFSConfigKeys.DFS_DISK_BALANCER_MAX_DISK_ERRORS_DEFAULT;
      }
    }

    /**
     * Sets Diskmover copyblocks into runnable state.
     */
    @Override
    public void setRunnable() {
      this.shouldRun.set(true);
    }

    /**
     * Signals copy block to exit.
     */
    @Override
    public void setExitFlag() {
      this.shouldRun.set(false);
    }

    /**
     * Returns the shouldRun boolean flag.
     */
    public boolean shouldRun() {
      return this.shouldRun.get();
    }

    /**
     * Checks if a given block is less than needed size to meet our goal.
     *
     * @param blockSize - block len
     * @param item      - Work item
     * @return true if this block meets our criteria, false otherwise.
     */
    private boolean isLessThanNeeded(long blockSize,
                                     DiskBalancerWorkItem item) {
      long bytesToCopy = item.getBytesToCopy() - item.getBytesCopied();
      bytesToCopy = bytesToCopy +
          ((bytesToCopy * getBlockTolerancePercentage(item)) / 100);
      return blockSize <= bytesToCopy;
    }

    /**
     * Returns the default block tolerance if the plan does not have value of
     * tolerance specified.
     *
     * @param item - DiskBalancerWorkItem
     * @return long
     */
    private long getBlockTolerancePercentage(DiskBalancerWorkItem item) {
      return item.getTolerancePercent() <= 0 ? this.blockTolerance :
          item.getTolerancePercent();
    }

    /**
     * Inflates bytesCopied and returns true or false. This allows us to stop
     * copying if we have reached close enough.
     *
     * @param item DiskBalancerWorkItem
     * @return -- false if we need to copy more, true if we are done
     */
    private boolean isCloseEnough(DiskBalancerWorkItem item) {
      long temp = item.getBytesCopied() +
          ((item.getBytesCopied() * getBlockTolerancePercentage(item)) / 100);
      return item.getBytesToCopy() < temp;
    }

    /**
     * Returns disk bandwidth associated with this plan, if none is specified
     * returns the global default.
     *
     * @param item DiskBalancerWorkItem.
     * @return MB/s - long
     */
    private long getDiskBandwidth(DiskBalancerWorkItem item) {
      return item.getBandwidth() <= 0 ? this.diskBandwidth : item
          .getBandwidth();
    }

    /**
     * Computes sleep delay needed based on the block that just got copied. we
     * copy using a burst mode, that is we let the copy proceed in full
     * throttle. Once a copy is done, we compute how many bytes have been
     * transferred and try to average it over the user specified bandwidth. In
     * other words, This code implements a poor man's token bucket algorithm for
     * traffic shaping.
     *
     * @param bytesCopied - byteCopied.
     * @param timeUsed    in milliseconds
     * @param item        DiskBalancerWorkItem
     * @return sleep delay in Milliseconds.
     */
    @VisibleForTesting
    public long computeDelay(long bytesCopied, long timeUsed,
                              DiskBalancerWorkItem item) {

      // we had an overflow, ignore this reading and continue.
      if (timeUsed == 0) {
        return 0;
      }
      final int megaByte = 1024 * 1024;
      if (bytesCopied < megaByte) {
        return 0;
      }
      long bytesInMB = bytesCopied / megaByte;

      // converting disk bandwidth in MB/millisec
      float bandwidth = getDiskBandwidth(item) / 1000f;
      float delay = ((long) (bytesInMB / bandwidth) - timeUsed);
      return (delay <= 0) ? 0 : (long) delay;
    }

    /**
     * Returns maximum errors to tolerate for the specific plan or the default.
     *
     * @param item - DiskBalancerWorkItem
     * @return maximum error counts to tolerate.
     */
    private long getMaxError(DiskBalancerWorkItem item) {
      return item.getMaxDiskErrors() <= 0 ? this.maxDiskErrors :
          item.getMaxDiskErrors();
    }

    /**
     * Gets the next block that we can copy, returns null if we cannot find a
     * block that fits our parameters or if have run out of blocks.
     *
     * @param iter Block Iter
     * @param item - Work item
     * @return Extended block or null if no copyable block is found.
     */
    private ExtendedBlock getBlockToCopy(FsVolumeSpi.BlockIterator iter,
                                         DiskBalancerWorkItem item) {
      while (!iter.atEnd() && item.getErrorCount() <= getMaxError(item)) {
        try {
          ExtendedBlock block = iter.nextBlock();
          if(null == block){
            LOG.info("NextBlock call returned null. No valid block to copy. {}",
                item.toJson());
            return null;
          }
          // A valid block is a finalized block, we iterate until we get
          // finalized blocks
          if (!this.dataset.isValidBlock(block)) {
            continue;
          }
          // We don't look for the best, we just do first fit
          if (isLessThanNeeded(block.getNumBytes(), item)) {
            return block;
          }
        } catch (IOException e) {
          item.incErrorCount();
        }
      }
      if (item.getErrorCount() > getMaxError(item)) {
        item.setErrMsg("Error count exceeded.");
        LOG.info("Maximum error count exceeded. Error count: {} Max error:{} ",
            item.getErrorCount(), item.getMaxDiskErrors());
      }
      return null;
    }

    /**
     * Opens all Block pools on a given volume.
     *
     * @param source    Source
     * @param poolIters List of PoolIters to maintain.
     */
    private void openPoolIters(FsVolumeSpi source, List<FsVolumeSpi
        .BlockIterator> poolIters) {
      Preconditions.checkNotNull(source);
      Preconditions.checkNotNull(poolIters);

      for (String blockPoolID : source.getBlockPoolList()) {
        poolIters.add(source.newBlockIterator(blockPoolID,
            "DiskBalancerSource"));
      }
    }

    /**
     * Returns the next block that we copy from all the block pools. This
     * function looks across all block pools to find the next block to copy.
     *
     * @param poolIters - List of BlockIterators
     * @return ExtendedBlock.
     */
    ExtendedBlock getNextBlock(List<FsVolumeSpi.BlockIterator> poolIters,
                               DiskBalancerWorkItem item) {
      Preconditions.checkNotNull(poolIters);
      int currentCount = 0;
      ExtendedBlock block = null;
      while (block == null && currentCount < poolIters.size()) {
        currentCount++;
        int index = poolIndex++ % poolIters.size();
        FsVolumeSpi.BlockIterator currentPoolIter = poolIters.get(index);
        block = getBlockToCopy(currentPoolIter, item);
      }

      if (block == null) {
        try {
          item.setErrMsg("No source blocks found to move.");
          LOG.error("No movable source blocks found. {}", item.toJson());
        } catch (IOException e) {
          LOG.error("Unable to get json from Item.");
        }
      }
      return block;
    }

    /**
     * Close all Pool Iters.
     *
     * @param poolIters List of BlockIters
     */
    private void closePoolIters(List<FsVolumeSpi.BlockIterator> poolIters) {
      Preconditions.checkNotNull(poolIters);
      for (FsVolumeSpi.BlockIterator iter : poolIters) {
        try {
          iter.close();
        } catch (IOException ex) {
          LOG.error("Error closing a block pool iter. ex: ", ex);
        }
      }
    }

    /**
     * Copies blocks from a set of volumes.
     *
     * @param pair - Source and Destination Volumes.
     * @param item - Number of bytes to move from volumes.
     */
    @Override
    public void copyBlocks(VolumePair pair, DiskBalancerWorkItem item) {
      String sourceVolUuid = pair.getSourceVolUuid();
      String destVolUuuid = pair.getDestVolUuid();

      // When any of the DiskBalancerWorkItem volumes are not
      // available, return after setting error in item.
      FsVolumeSpi source = getFsVolume(this.dataset, sourceVolUuid);
      if (source == null) {
        final String errMsg = "Disk Balancer - Unable to find source volume: "
            + pair.getSourceVolBasePath();
        LOG.error(errMsg);
        item.setErrMsg(errMsg);
        return;
      }
      FsVolumeSpi dest = getFsVolume(this.dataset, destVolUuuid);
      if (dest == null) {
        final String errMsg = "Disk Balancer - Unable to find dest volume: "
            + pair.getDestVolBasePath();
        LOG.error(errMsg);
        item.setErrMsg(errMsg);
        return;
      }

      if (source.isTransientStorage() || dest.isTransientStorage()) {
        final String errMsg = "Disk Balancer - Unable to support " +
                "transient storage type.";
        LOG.error(errMsg);
        item.setErrMsg(errMsg);
        return;
      }

      List<FsVolumeSpi.BlockIterator> poolIters = new LinkedList<>();
      startTime = Time.now();
      item.setStartTime(startTime);
      secondsElapsed = 0;

      try {
        openPoolIters(source, poolIters);
        if (poolIters.size() == 0) {
          LOG.error("No block pools found on volume. volume : {}. Exiting.",
              source.getBaseURI());
          return;
        }

        while (shouldRun()) {
          try {

            // Check for the max error count constraint.
            if (item.getErrorCount() > getMaxError(item)) {
              LOG.error("Exceeded the max error count. source {}, dest: {} " +
                      "error count: {}", source.getBaseURI(),
                  dest.getBaseURI(), item.getErrorCount());
              break;
            }

            // Check for the block tolerance constraint.
            if (isCloseEnough(item)) {
              LOG.info("Copy from {} to {} done. copied {} bytes and {} " +
                      "blocks.",
                  source.getBaseURI(), dest.getBaseURI(),
                  item.getBytesCopied(), item.getBlocksCopied());
              this.setExitFlag();
              continue;
            }

            ExtendedBlock block = getNextBlock(poolIters, item);
            // we are not able to find any blocks to copy.
            if (block == null) {
              LOG.error("No source blocks, exiting the copy. Source: {}, " +
                  "Dest:{}", source.getBaseURI(), dest.getBaseURI());
              this.setExitFlag();
              continue;
            }

            // check if someone told us exit, treat this as an interruption
            // point
            // for the thread, since both getNextBlock and moveBlocAcrossVolume
            // can take some time.
            if (!shouldRun()) {
              continue;
            }

            long timeUsed;
            // There is a race condition here, but we will get an IOException
            // if dest has no space, which we handle anyway.
            if (dest.getAvailable() > item.getBytesToCopy()) {
              long begin = System.nanoTime();
              this.dataset.moveBlockAcrossVolumes(block, dest);
              long now = System.nanoTime();
              timeUsed = (now - begin) > 0 ? now - begin : 0;
            } else {

              // Technically it is possible for us to find a smaller block and
              // make another copy, but opting for the safer choice of just
              // exiting here.
              LOG.error("Destination volume: {} does not have enough space to" +
                  " accommodate a block. Block Size: {} Exiting from" +
                  " copyBlocks.", dest.getBaseURI(), block.getNumBytes());
              break;
            }

            LOG.debug("Moved block with size {} from  {} to {}",
                block.getNumBytes(), source.getBaseURI(),
                dest.getBaseURI());

            // Check for the max throughput constraint.
            // We sleep here to keep the promise that we will not
            // copy more than Max MB/sec. we sleep enough time
            // to make sure that our promise is good on average.
            // Because we sleep, if a shutdown or cancel call comes in
            // we exit via Thread Interrupted exception.
            Thread.sleep(computeDelay(block.getNumBytes(), TimeUnit.NANOSECONDS
                .toMillis(timeUsed), item));

            // We delay updating the info to avoid confusing the user.
            // This way we report the copy only if it is under the
            // throughput threshold.
            item.incCopiedSoFar(block.getNumBytes());
            item.incBlocksCopied();
            secondsElapsed = TimeUnit.MILLISECONDS.toSeconds(Time.now() -
                startTime);
            item.setSecondsElapsed(secondsElapsed);
          } catch (IOException ex) {
            LOG.error("Exception while trying to copy blocks. error: ", ex);
            item.incErrorCount();
          } catch (InterruptedException e) {
            LOG.error("Copy Block Thread interrupted, exiting the copy.");
            Thread.currentThread().interrupt();
            item.incErrorCount();
            this.setExitFlag();
          } catch (RuntimeException ex) {
            // Exiting if any run time exceptions.
            LOG.error("Got an unexpected Runtime Exception ", ex);
            item.incErrorCount();
            this.setExitFlag();
          }
        }
      } finally {
        // Close all Iters.
        closePoolIters(poolIters);
      }
    }

    /**
     * Returns a pointer to the current dataset we are operating against.
     *
     * @return FsDatasetSpi
     */
    @Override
    public FsDatasetSpi getDataset() {
      return dataset;
    }

    /**
     * Returns time when this plan started executing.
     *
     * @return Start time in milliseconds.
     */
    @Override
    public long getStartTime() {
      return startTime;
    }

    /**
     * Number of seconds elapsed.
     *
     * @return time in seconds
     */
    @Override
    public long getElapsedSeconds() {
      return secondsElapsed;
    }
  }
}