TestDatanodeManager.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.hdfs.server.blockmanagement;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Random;
import java.util.Set;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.net.DFSNetworkTopology;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockTargetPair;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.protocol.DatanodeInfoWithStorage;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.BlockECReconstructionInfo;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.util.RwLockMode;
import org.apache.hadoop.net.DNSToSwitchMapping;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.test.Whitebox;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;

import static org.hamcrest.core.Is.is;
import static org.junit.Assert.*;

public class TestDatanodeManager {
  
  public static final Logger LOG =
      LoggerFactory.getLogger(TestDatanodeManager.class);
  
  //The number of times the registration / removal of nodes should happen
  final int NUM_ITERATIONS = 500;

  private static DatanodeManager mockDatanodeManager(
      FSNamesystem fsn, Configuration conf) throws IOException {
    BlockManager bm = Mockito.mock(BlockManager.class);
    Mockito.when(bm.getMaxReplicationStreams()).thenReturn(
        conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 2));
    Mockito.when(bm.getReplicationStreamsHardLimit()).thenReturn(
        conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY, 2));
    BlockReportLeaseManager blm = new BlockReportLeaseManager(conf);
    Mockito.when(bm.getBlockReportLeaseManager()).thenReturn(blm);
    DatanodeManager dm = new DatanodeManager(bm, fsn, conf);
    return dm;
  }

  /**
   * Create an InetSocketAddress for a host:port string
   * @param host a host identifier in host:port format
   * @return a corresponding InetSocketAddress object
   */
  private static InetSocketAddress entry(String host) {
    return HostFileManager.parseEntry("dummy", "dummy", host);
  }

  /**
   * This test checks that if a node is re-registered with a new software
   * version after the heartbeat expiry interval but before the HeartbeatManager
   * has a chance to detect this and remove it, the node's version will still
   * be correctly decremented.
   */
  @Test
  public void testNumVersionsCorrectAfterReregister()
      throws IOException, InterruptedException {
    //Create the DatanodeManager which will be tested
    FSNamesystem fsn = Mockito.mock(FSNamesystem.class);
    Mockito.when(fsn.hasWriteLock()).thenReturn(true);
    Mockito.when(fsn.hasWriteLock(RwLockMode.BM)).thenReturn(true);
    Configuration conf = new Configuration();
    conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 0);
    conf.setLong(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 10);
    DatanodeManager dm = mockDatanodeManager(fsn, conf);

    String storageID = "someStorageID1";
    String ip = "someIP" + storageID;

    // Register then reregister the same node but with a different version
    for (int i = 0; i <= 1; i++) {
      dm.registerDatanode(new DatanodeRegistration(
          new DatanodeID(ip, "", storageID, 9000, 0, 0, 0),
          null, null, "version" + i));
      if (i == 0) {
        Thread.sleep(25);
      }
    }

    //Verify DatanodeManager has the correct count
    Map<String, Integer> mapToCheck = dm.getDatanodesSoftwareVersions();
    assertNull("should be no more version0 nodes", mapToCheck.get("version0"));
    assertEquals("should be one version1 node",
        mapToCheck.get("version1").intValue(), 1);
  }

  /**
   * This test checks that if a node is re-registered with a different ip, its
   * host2DatanodeMap is correctly updated with the new ip.
   */
  @Test
  public void testHost2NodeMapCorrectAfterReregister()
          throws IOException, InterruptedException {
    //Create the DatanodeManager which will be tested
    FSNamesystem fsn = Mockito.mock(FSNamesystem.class);
    Mockito.when(fsn.hasWriteLock()).thenReturn(true);
    Mockito.when(fsn.hasWriteLock(RwLockMode.BM)).thenReturn(true);
    Configuration conf = new Configuration();
    DatanodeManager dm = mockDatanodeManager(fsn, conf);

    String storageID = "someStorageID1";
    String ipOld = "someIPOld" + storageID;
    String ipNew = "someIPNew" + storageID;

    dm.registerDatanode(new DatanodeRegistration(
            new DatanodeID(ipOld, "", storageID, 9000, 0, 0, 0),
            null, null, "version"));

    dm.registerDatanode(new DatanodeRegistration(
            new DatanodeID(ipNew, "", storageID, 9000, 0, 0, 0),
            null, null, "version"));

    assertNull("should be no node with old ip", dm.getDatanodeByHost(ipOld));
    assertNotNull("should be a node with new ip", dm.getDatanodeByHost(ipNew));

    storageID = "someStorageID2";
    String hostnameOld = "someHostNameOld" + storageID;
    String hostnameNew = "someHostNameNew" + storageID;

    dm.registerDatanode(new DatanodeRegistration(
            new DatanodeID("ip", hostnameOld, storageID, 9000, 0, 0, 0),
            null, null, "version"));

    dm.registerDatanode(new DatanodeRegistration(
            new DatanodeID("ip", hostnameNew, storageID, 9000, 0, 0, 0),
            null, null, "version"));

    assertNull("should be no node with old hostname", dm.getDatanodeByHostName(hostnameOld));
    assertNotNull("should be a node with new hostname", dm.getDatanodeByHostName(hostnameNew));
  }

  /**
   * This test sends a random sequence of node registrations and node removals
   * to the DatanodeManager (of nodes with different IDs and versions), and
   * checks that the DatanodeManager keeps a correct count of different software
   * versions at all times.
   */
  @Test
  public void testNumVersionsReportedCorrect() throws IOException {
    //Create the DatanodeManager which will be tested
    FSNamesystem fsn = Mockito.mock(FSNamesystem.class);
    Mockito.when(fsn.hasWriteLock()).thenReturn(true);
    Mockito.when(fsn.hasWriteLock(RwLockMode.BM)).thenReturn(true);
    DatanodeManager dm = mockDatanodeManager(fsn, new Configuration());

    //Seed the RNG with a known value so test failures are easier to reproduce
    Random rng = new Random();
    int seed = rng.nextInt();
    rng = new Random(seed);
    LOG.info("Using seed " + seed + " for testing");

    //A map of the Storage IDs to the DN registration it was registered with
    HashMap <String, DatanodeRegistration> sIdToDnReg =
      new HashMap<String, DatanodeRegistration>();

    for(int i=0; i<NUM_ITERATIONS; ++i) {

      //If true, remove a node for every 3rd time (if there's one)
      if(rng.nextBoolean() && i%3 == 0 && sIdToDnReg.size()!=0) {
        //Pick a random node.
        int randomIndex = rng.nextInt() % sIdToDnReg.size();
        //Iterate to that random position 
        Iterator<Map.Entry<String, DatanodeRegistration>> it =
          sIdToDnReg.entrySet().iterator();
        for(int j=0; j<randomIndex-1; ++j) {
          it.next();
        }
        DatanodeRegistration toRemove = it.next().getValue();
        LOG.info("Removing node " + toRemove.getDatanodeUuid() + " ip " +
        toRemove.getXferAddr() + " version : " + toRemove.getSoftwareVersion());

        //Remove that random node
        dm.removeDatanode(toRemove);
        it.remove();
      }

      // Otherwise register a node. This node may be a new / an old one
      else {
        //Pick a random storageID to register.
        String storageID = "someStorageID" + rng.nextInt(5000);

        DatanodeRegistration dr = Mockito.mock(DatanodeRegistration.class);
        Mockito.when(dr.getDatanodeUuid()).thenReturn(storageID);

        //If this storageID had already been registered before
        if(sIdToDnReg.containsKey(storageID)) {
          dr = sIdToDnReg.get(storageID);
          //Half of the times, change the IP address
          if(rng.nextBoolean()) {
            dr.setIpAddr(dr.getIpAddr() + "newIP");
          }
        } else { //This storageID has never been registered
          //Ensure IP address is unique to storageID
          String ip = "someIP" + storageID;
          Mockito.when(dr.getIpAddr()).thenReturn(ip);
          Mockito.when(dr.getXferAddr()).thenReturn(ip + ":9000");
          Mockito.when(dr.getXferPort()).thenReturn(9000);
        }
        //Pick a random version to register with
        Mockito.when(dr.getSoftwareVersion()).thenReturn(
          "version" + rng.nextInt(5));

        LOG.info("Registering node storageID: " + dr.getDatanodeUuid() +
          ", version: " + dr.getSoftwareVersion() + ", IP address: "
          + dr.getXferAddr());

        //Register this random node
        dm.registerDatanode(dr);
        sIdToDnReg.put(storageID, dr);
      }

      //Verify DatanodeManager still has the right count
      Map<String, Integer> mapToCheck = dm.getDatanodesSoftwareVersions();

      //Remove counts from versions and make sure that after removing all nodes
      //mapToCheck is empty
      for(Entry<String, DatanodeRegistration> it: sIdToDnReg.entrySet()) {
        String ver = it.getValue().getSoftwareVersion();
        if(!mapToCheck.containsKey(ver)) {
          throw new AssertionError("The correct number of datanodes of a "
            + "version was not found on iteration " + i);
        }
        mapToCheck.put(ver, mapToCheck.get(ver) - 1);
        if(mapToCheck.get(ver) == 0) {
          mapToCheck.remove(ver);
        }
      }
      for(Entry <String, Integer> entry: mapToCheck.entrySet()) {
        LOG.info("Still in map: " + entry.getKey() + " has "
          + entry.getValue());
      }
      assertEquals("The map of version counts returned by DatanodeManager was"
        + " not what it was expected to be on iteration " + i, 0,
        mapToCheck.size());
    }
  }
  
  @Test (timeout = 100000)
  public void testRejectUnresolvedDatanodes() throws IOException {
    //Create the DatanodeManager which will be tested
    FSNamesystem fsn = Mockito.mock(FSNamesystem.class);
    Mockito.when(fsn.hasWriteLock()).thenReturn(true);
    Mockito.when(fsn.hasWriteLock(RwLockMode.BM)).thenReturn(true);

    Configuration conf = new Configuration();
    
    //Set configuration property for rejecting unresolved topology mapping
    conf.setBoolean(
        DFSConfigKeys.DFS_REJECT_UNRESOLVED_DN_TOPOLOGY_MAPPING_KEY, true);
    
    //set TestDatanodeManager.MyResolver to be used for topology resolving
    conf.setClass(
        CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
        TestDatanodeManager.MyResolver.class, DNSToSwitchMapping.class);
    
    //create DatanodeManager
    DatanodeManager dm = mockDatanodeManager(fsn, conf);

    //storageID to register.
    String storageID = "someStorageID-123";
    
    DatanodeRegistration dr = Mockito.mock(DatanodeRegistration.class);
    Mockito.when(dr.getDatanodeUuid()).thenReturn(storageID);
    
    try {
      //Register this node
      dm.registerDatanode(dr);
      Assert.fail("Expected an UnresolvedTopologyException");
    } catch (UnresolvedTopologyException ute) {
      LOG.info("Expected - topology is not resolved and " +
          "registration is rejected.");
    } catch (Exception e) {
      Assert.fail("Expected an UnresolvedTopologyException");
    }
  }
  
  /**
   * MyResolver class provides resolve method which always returns null 
   * in order to simulate unresolved topology mapping.
   */
  public static class MyResolver implements DNSToSwitchMapping {
    @Override
    public List<String> resolve(List<String> names) {
      return null;
    }

    @Override
    public void reloadCachedMappings() {
    }

    @Override
    public void reloadCachedMappings(List<String> names) {  
    }
  }

  /**
   * This test creates a LocatedBlock with 5 locations, sorts the locations
   * based on the network topology, and ensures the locations are still aligned
   * with the storage ids and storage types.
   */
  @Test
  public void testSortLocatedBlocks() throws IOException, URISyntaxException {
    HelperFunction(null, 0);
  }

  /**
   * Execute a functional topology script and make sure that helper
   * function works correctly
   *
   * @throws IOException
   * @throws URISyntaxException
   */
  @Test
  public void testgoodScript() throws IOException, URISyntaxException {
    HelperFunction("/" + Shell.appendScriptExtension("topology-script"), 0);
  }


  /**
   * Run a broken script and verify that helper function is able to
   * ignore the broken script and work correctly
   *
   * @throws IOException
   * @throws URISyntaxException
   */
  @Test
  public void testBadScript() throws IOException, URISyntaxException {
    HelperFunction("/" + Shell.appendScriptExtension("topology-broken-script"),
        0);
  }

  /**
   * Test with different sorting functions but include datanodes.
   * with provided storage
   * @throws IOException
   * @throws URISyntaxException
   */
  @Test
  public void testWithProvidedTypes() throws IOException, URISyntaxException {
    HelperFunction(null, 1);
    HelperFunction(null, 3);
    HelperFunction("/" + Shell.appendScriptExtension("topology-script"), 1);
    HelperFunction("/" + Shell.appendScriptExtension("topology-script"), 2);
  }

  /**
   * Helper function that tests the DatanodeManagers SortedBlock function
   * we invoke this function with and without topology scripts
   *
   * @param scriptFileName - Script Name or null
   * @param providedStorages - number of provided storages to add
   *
   * @throws URISyntaxException
   * @throws IOException
   */
  public void HelperFunction(String scriptFileName, int providedStorages)
    throws URISyntaxException, IOException {
    // create the DatanodeManager which will be tested
    Configuration conf = new Configuration();
    FSNamesystem fsn = Mockito.mock(FSNamesystem.class);
    Mockito.when(fsn.hasWriteLock()).thenReturn(true);
    Mockito.when(fsn.hasWriteLock(RwLockMode.BM)).thenReturn(true);
    if (scriptFileName != null && !scriptFileName.isEmpty()) {
      URL shellScript = getClass().getResource(scriptFileName);
      Path resourcePath = Paths.get(shellScript.toURI());
      FileUtil.setExecutable(resourcePath.toFile(), true);
      conf.set(DFSConfigKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY,
        resourcePath.toString());
    }
    DatanodeManager dm = mockDatanodeManager(fsn, conf);

    int totalDNs = 6 + providedStorages;

    // register 6 datanodes, each with different storage ID and type
    DatanodeInfo[] locs = new DatanodeInfo[totalDNs];
    String[] storageIDs = new String[totalDNs];
    List<StorageType> storageTypesList = new ArrayList<>(
        Arrays.asList(StorageType.ARCHIVE,
            StorageType.DEFAULT,
            StorageType.DISK,
            StorageType.NVDIMM,
            StorageType.RAM_DISK,
            StorageType.SSD));

    for (int i = 0; i < providedStorages; i++) {
      storageTypesList.add(StorageType.PROVIDED);
    }

    StorageType[] storageTypes = storageTypesList.toArray(
        StorageType.EMPTY_ARRAY);

    for (int i = 0; i < totalDNs; i++) {
      // register new datanode
      String uuid = "UUID-" + i;
      String ip = "IP-" + i;
      DatanodeRegistration dr = Mockito.mock(DatanodeRegistration.class);
      Mockito.when(dr.getDatanodeUuid()).thenReturn(uuid);
      Mockito.when(dr.getIpAddr()).thenReturn(ip);
      Mockito.when(dr.getXferAddr()).thenReturn(ip + ":9000");
      Mockito.when(dr.getXferPort()).thenReturn(9000);
      Mockito.when(dr.getSoftwareVersion()).thenReturn("version1");
      dm.registerDatanode(dr);

      // get location and storage information
      locs[i] = dm.getDatanode(uuid);
      storageIDs[i] = "storageID-" + i;
    }

    // set first 2 locations as decommissioned
    locs[0].setDecommissioned();
    locs[1].setDecommissioned();

    // create LocatedBlock with above locations
    ExtendedBlock b = new ExtendedBlock("somePoolID", 1234);
    LocatedBlock block = new LocatedBlock(b, locs, storageIDs, storageTypes);
    List<LocatedBlock> blocks = new ArrayList<>();
    blocks.add(block);

    final String targetIp = locs[5].getIpAddr();

    // sort block locations
    dm.sortLocatedBlocks(targetIp, blocks);

    // check that storage IDs/types are aligned with datanode locs
    DatanodeInfo[] sortedLocs = block.getLocations();
    storageIDs = block.getStorageIDs();
    storageTypes = block.getStorageTypes();
    assertThat(sortedLocs.length, is(totalDNs));
    assertThat(storageIDs.length, is(totalDNs));
    assertThat(storageTypes.length, is(totalDNs));
    for (int i = 0; i < sortedLocs.length; i++) {
      assertThat(((DatanodeInfoWithStorage) sortedLocs[i]).getStorageID(),
        is(storageIDs[i]));
      assertThat(((DatanodeInfoWithStorage) sortedLocs[i]).getStorageType(),
        is(storageTypes[i]));
    }
    // Ensure the local node is first.
    assertThat(sortedLocs[0].getIpAddr(), is(targetIp));
    // Ensure the two decommissioned DNs were moved to the end.
    assertThat(sortedLocs[sortedLocs.length - 1].getAdminState(),
      is(DatanodeInfo.AdminStates.DECOMMISSIONED));
    assertThat(sortedLocs[sortedLocs.length - 2].getAdminState(),
      is(DatanodeInfo.AdminStates.DECOMMISSIONED));
    // check that the StorageType of datanoodes immediately
    // preceding the decommissioned datanodes is PROVIDED
    for (int i = 0; i < providedStorages; i++) {
      assertThat(
          ((DatanodeInfoWithStorage)
              sortedLocs[sortedLocs.length - 3 - i]).getStorageType(),
          is(StorageType.PROVIDED));
    }
  }

  @Test
  public void testGetBlockLocations()
        throws URISyntaxException, IOException {
    // create the DatanodeManager which will be tested
    Configuration conf = new Configuration();
    FSNamesystem fsn = Mockito.mock(FSNamesystem.class);
    Mockito.when(fsn.hasWriteLock()).thenReturn(true);
    Mockito.when(fsn.hasWriteLock(RwLockMode.BM)).thenReturn(true);
    URL shellScript = getClass().getResource(
        "/" + Shell.appendScriptExtension("topology-script"));
    Path resourcePath = Paths.get(shellScript.toURI());
    FileUtil.setExecutable(resourcePath.toFile(), true);
    conf.set(DFSConfigKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY,
        resourcePath.toString());
    DatanodeManager dm = mockDatanodeManager(fsn, conf);

    int totalDNs = 5;
    // register 5 datanodes and 2 node per rack
    DatanodeInfo[] locs = new DatanodeInfo[totalDNs];
    String[] storageIDs = new String[totalDNs];
    for (int i = 0; i < totalDNs; i++) {
      // register new datanode
      String uuid = "UUID-" + i;
      String ip = "IP-" + i / 2 + "-" + i;
      DatanodeRegistration dr = Mockito.mock(DatanodeRegistration.class);
      Mockito.when(dr.getDatanodeUuid()).thenReturn(uuid);
      Mockito.when(dr.getIpAddr()).thenReturn(ip);
      dm.registerDatanode(dr);

      // get location and storage information
      locs[i] = dm.getDatanode(uuid);
      storageIDs[i] = "storageID-" + i;
    }

    // set first 2 locations as decommissioned
    locs[0].setDecommissioned();
    locs[1].setDecommissioned();

    // create LocatedBlock with above locations
    ExtendedBlock b = new ExtendedBlock("somePoolID", 1234);
    LocatedBlock block = new LocatedBlock(b, locs);
    List<LocatedBlock> blocks = new ArrayList<>();
    blocks.add(block);

    // test client in cluster
    final String targetIpInCluster = locs[4].getIpAddr();
    dm.sortLocatedBlocks(targetIpInCluster, blocks);
    DatanodeInfo[] sortedLocs = block.getLocations();
    assertEquals(totalDNs, sortedLocs.length);
    // Ensure the local node is first.
    assertEquals(targetIpInCluster, sortedLocs[0].getIpAddr());
    // Ensure the two decommissioned DNs were moved to the end.
    assertEquals(DatanodeInfo.AdminStates.DECOMMISSIONED,
        sortedLocs[sortedLocs.length -1].getAdminState());
    assertEquals(DatanodeInfo.AdminStates.DECOMMISSIONED,
        sortedLocs[sortedLocs.length - 2].getAdminState());

    // test client not in cluster but same rack with locs[5]
    final String targetIpNotInCluster = locs[4].getIpAddr() + "-client";
    dm.sortLocatedBlocks(targetIpNotInCluster, blocks);
    DatanodeInfo[] sortedLocs2 = block.getLocations();
    assertEquals(totalDNs, sortedLocs2.length);
    // Ensure the local rack is first.
    assertEquals(locs[4].getIpAddr(), sortedLocs2[0].getIpAddr());
  }

  @Test
  public void testGetBlockLocationConsiderLoad()
      throws IOException, URISyntaxException {
    Configuration conf = new Configuration();
    conf.setBoolean(
        DFSConfigKeys.DFS_NAMENODE_READ_CONSIDERLOAD_KEY, true);
    conf.setBoolean(
        DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_KEY, true);
    FSNamesystem fsn = Mockito.mock(FSNamesystem.class);
    Mockito.when(fsn.hasWriteLock()).thenReturn(true);
    URL shellScript = getClass().getResource(
        "/" + Shell.appendScriptExtension("topology-script"));
    Path resourcePath = Paths.get(shellScript.toURI());
    FileUtil.setExecutable(resourcePath.toFile(), true);
    conf.set(DFSConfigKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY,
        resourcePath.toString());
    DatanodeManager dm = mockDatanodeManager(fsn, conf);

    int totalDNs = 5;
    // Register 5 datanodes and 2 nodes per rack with different load.
    DatanodeInfo[] locs = new DatanodeInfo[totalDNs];
    String[] storageIDs = new String[totalDNs];
    for (int i = 0; i < totalDNs; i++) {
      // Register new datanode.
      String uuid = "UUID-" + i;
      String ip = "IP-" + i / 2 + "-" + i;
      DatanodeRegistration dr = Mockito.mock(DatanodeRegistration.class);
      Mockito.when(dr.getDatanodeUuid()).thenReturn(uuid);
      Mockito.when(dr.getIpAddr()).thenReturn(ip);
      dm.registerDatanode(dr);

      // Get location and storage information.
      locs[i] = dm.getDatanode(uuid);
      storageIDs[i] = "storageID-" + i;

      // Set load for datanodes.
      locs[i].setXceiverCount(i);
    }

    // Set node 0 decommissioned.
    locs[0].setDecommissioned();

    // Create LocatedBlock with above locations.
    ExtendedBlock b = new ExtendedBlock("somePoolID", 1234);
    LocatedBlock block = new LocatedBlock(b, locs);
    List<LocatedBlock> blocks = new ArrayList<>();
    blocks.add(block);

    // Test client located at locs[3] in cluster.
    final String targetIpInCluster = locs[3].getIpAddr();
    dm.sortLocatedBlocks(targetIpInCluster, blocks);
    DatanodeInfo[] sortedLocs = block.getLocations();
    assertEquals(totalDNs, sortedLocs.length);
    // Ensure the local node is first.
    assertEquals(targetIpInCluster, sortedLocs[0].getIpAddr());
    // Ensure the lightweight node is more close when distance is same.
    assertEquals(locs[3].getIpAddr(), sortedLocs[0].getIpAddr());
    assertEquals(locs[2].getIpAddr(), sortedLocs[1].getIpAddr());
    assertEquals(locs[1].getIpAddr(), sortedLocs[2].getIpAddr());
    assertEquals(locs[4].getIpAddr(), sortedLocs[3].getIpAddr());
    // Ensure the two decommissioned DNs were moved to the end.
    assertThat(sortedLocs[4].getAdminState(),
        is(DatanodeInfo.AdminStates.DECOMMISSIONED));
    assertEquals(locs[0].getIpAddr(), sortedLocs[4].getIpAddr());

    // Test client not in cluster but same rack with locs[3].
    final String targetIpNotInCluster = locs[3].getIpAddr() + "-client";
    dm.sortLocatedBlocks(targetIpNotInCluster, blocks);
    DatanodeInfo[] sortedLocs2 = block.getLocations();
    assertEquals(totalDNs, sortedLocs2.length);
    // Ensure the local rack is first and lightweight node is first
    // when distance is same.
    assertEquals(locs[2].getIpAddr(), sortedLocs2[0].getIpAddr());
    assertEquals(locs[3].getIpAddr(), sortedLocs2[1].getIpAddr());
    assertEquals(locs[1].getIpAddr(), sortedLocs2[2].getIpAddr());
    assertEquals(locs[4].getIpAddr(), sortedLocs2[3].getIpAddr());
    // Ensure the two decommissioned DNs were moved to the end.
    assertThat(sortedLocs[4].getAdminState(),
        is(DatanodeInfo.AdminStates.DECOMMISSIONED));
    assertEquals(locs[0].getIpAddr(), sortedLocs2[4].getIpAddr());
  }

  @Test
  public void testGetBlockLocationConsiderLoadWithNodesOfSameDistance()
      throws IOException, URISyntaxException {
    Configuration conf = new Configuration();
    conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_READ_CONSIDERLOAD_KEY, true);
    conf.setBoolean(
        DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_KEY, true);
    FSNamesystem fsn = Mockito.mock(FSNamesystem.class);
    Mockito.when(fsn.hasWriteLock()).thenReturn(true);
    Mockito.when(fsn.hasWriteLock(RwLockMode.BM)).thenReturn(true);
    URL shellScript = getClass()
        .getResource("/" + Shell.appendScriptExtension("topology-script"));
    Path resourcePath = Paths.get(shellScript.toURI());
    FileUtil.setExecutable(resourcePath.toFile(), true);
    conf.set(DFSConfigKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY,
        resourcePath.toString());
    DatanodeManager dm = mockDatanodeManager(fsn, conf);

    int totalDNs = 5;
    // Register 5 datanodes and 2 nodes per rack with different load.
    DatanodeInfo[] locs = new DatanodeInfo[totalDNs];
    String[] storageIDs = new String[totalDNs];
    for (int i = 0; i < totalDNs; i++) {
      // Register new datanode.
      String uuid = "UUID-" + i;
      String ip = "IP-" + i / 2 + "-" + i;
      DatanodeRegistration dr = Mockito.mock(DatanodeRegistration.class);
      Mockito.when(dr.getDatanodeUuid()).thenReturn(uuid);
      Mockito.when(dr.getIpAddr()).thenReturn(ip);
      dm.registerDatanode(dr);

      // Get location and storage information.
      locs[i] = dm.getDatanode(uuid);
      storageIDs[i] = "storageID-" + i;

      // Set load for datanodes.
      locs[i].setXceiverCount(2);
    }

    // Set node 0 decommissioned.
    locs[0].setDecommissioned();

    // Create LocatedBlock with above locations.
    ExtendedBlock b = new ExtendedBlock("somePoolID", 1234);
    LocatedBlock block = new LocatedBlock(b, locs);
    List<LocatedBlock> blocks = new ArrayList<>();
    blocks.add(block);

    // Test client not in cluster but same rack with locs[3].
    // Number of iterations to do the test
    int numIterations = 100;

    Set<String> ipSet = new HashSet<>();
    final String targetIpNotInCluster = locs[3].getIpAddr() + "-client";
    for (int i = 0; i < numIterations; i++) {
      dm.sortLocatedBlocks(targetIpNotInCluster, blocks);
      DatanodeInfo[] sortedLocs = block.getLocations();
      assertEquals(totalDNs, sortedLocs.length);
      if (!ipSet.contains(sortedLocs[0].getIpAddr())) {
        ipSet.add(sortedLocs[0].getIpAddr());
      }
    }

    // when the two nodes' distance and weight are same, they are same close.
    assertEquals(2, ipSet.size());
  }

  @Test
  public void testGetBlockLocationConsiderStorageType()
      throws IOException, URISyntaxException {
    Configuration conf = new Configuration();
    conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_READ_CONSIDERSTORAGETYPE_KEY,
        true);
    conf.setBoolean(
        DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_KEY, true);
    FSNamesystem fsn = Mockito.mock(FSNamesystem.class);
    Mockito.when(fsn.hasWriteLock()).thenReturn(true);
    Mockito.when(fsn.hasWriteLock(RwLockMode.BM)).thenReturn(true);
    URL shellScript = getClass()
        .getResource("/" + Shell.appendScriptExtension("topology-script"));
    Path resourcePath = Paths.get(shellScript.toURI());
    FileUtil.setExecutable(resourcePath.toFile(), true);
    conf.set(DFSConfigKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY,
        resourcePath.toString());
    DatanodeManager dm = mockDatanodeManager(fsn, conf);

    int totalDNs = 5;
    // Register 5 datanodes and 2 nodes per rack with different load.
    DatanodeInfo[] locs = new DatanodeInfo[totalDNs];
    String[] storageIDs = new String[totalDNs];
    List<StorageType> storageTypesList =
        new ArrayList<>(Arrays.asList(StorageType.ARCHIVE, StorageType.DISK,
            StorageType.SSD, StorageType.DEFAULT, StorageType.SSD));
    StorageType[] storageTypes = storageTypesList.toArray(
        StorageType.EMPTY_ARRAY);

    for (int i = 0; i < totalDNs; i++) {
      // Register new datanode.
      String uuid = "UUID-" + i;
      String ip = "IP-" + i / 2 + "-" + i;
      DatanodeRegistration dr = Mockito.mock(DatanodeRegistration.class);
      Mockito.when(dr.getDatanodeUuid()).thenReturn(uuid);
      Mockito.when(dr.getIpAddr()).thenReturn(ip);
      dm.registerDatanode(dr);

      // Get location and storage information.
      locs[i] = dm.getDatanode(uuid);
      storageIDs[i] = "storageID-" + i;
    }

    // Set node 0 decommissioned.
    locs[0].setDecommissioned();

    // Create LocatedBlock with above locations.
    ExtendedBlock b = new ExtendedBlock("somePoolID", 1234);
    LocatedBlock block = new LocatedBlock(b, locs, storageIDs, storageTypes);
    List<LocatedBlock> blocks = new ArrayList<>();
    blocks.add(block);

    // Test client located at locs[3] in cluster.
    final String targetIpInCluster = locs[3].getIpAddr();
    dm.sortLocatedBlocks(targetIpInCluster, blocks);
    DatanodeInfo[] sortedLocs = block.getLocations();
    assertEquals(totalDNs, sortedLocs.length);
    // Ensure the local node is first.
    assertEquals(targetIpInCluster, sortedLocs[0].getIpAddr());
    // Ensure choose fast storage type node when distance is same.
    assertEquals(locs[3].getIpAddr(), sortedLocs[0].getIpAddr());
    assertEquals(locs[2].getIpAddr(), sortedLocs[1].getIpAddr());
    assertEquals(locs[4].getIpAddr(), sortedLocs[2].getIpAddr());
    assertEquals(locs[1].getIpAddr(), sortedLocs[3].getIpAddr());
    // Ensure the two decommissioned DNs were moved to the end.
    assertThat(sortedLocs[4].getAdminState(),
        is(DatanodeInfo.AdminStates.DECOMMISSIONED));
    assertEquals(locs[0].getIpAddr(), sortedLocs[4].getIpAddr());

    // Test client not in cluster but same rack with locs[3].
    final String targetIpNotInCluster = locs[3].getIpAddr() + "-client";
    dm.sortLocatedBlocks(targetIpNotInCluster, blocks);
    DatanodeInfo[] sortedLocs2 = block.getLocations();
    assertEquals(totalDNs, sortedLocs2.length);
    // Ensure the local rack is first and choose fast storage type node
    // when distance is same.
    assertEquals(locs[2].getIpAddr(), sortedLocs2[0].getIpAddr());
    assertEquals(locs[3].getIpAddr(), sortedLocs2[1].getIpAddr());
    assertEquals(locs[4].getIpAddr(), sortedLocs2[2].getIpAddr());
    assertEquals(locs[1].getIpAddr(), sortedLocs2[3].getIpAddr());
    // Ensure the two decommissioned DNs were moved to the end.
    assertThat(sortedLocs[4].getAdminState(),
        is(DatanodeInfo.AdminStates.DECOMMISSIONED));
    assertEquals(locs[0].getIpAddr(), sortedLocs2[4].getIpAddr());
  }

  @Test
  public void testGetBlockLocationConsiderStorageTypeAndLoad()
      throws IOException, URISyntaxException {
    Configuration conf = new Configuration();
    conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_READ_CONSIDERSTORAGETYPE_KEY,
        true);
    conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_READ_CONSIDERLOAD_KEY, true);
    conf.setBoolean(
        DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_KEY, true);
    FSNamesystem fsn = Mockito.mock(FSNamesystem.class);
    Mockito.when(fsn.hasWriteLock()).thenReturn(true);
    Mockito.when(fsn.hasWriteLock(RwLockMode.BM)).thenReturn(true);
    URL shellScript = getClass()
        .getResource("/" + Shell.appendScriptExtension("topology-script"));
    Path resourcePath = Paths.get(shellScript.toURI());
    FileUtil.setExecutable(resourcePath.toFile(), true);
    conf.set(DFSConfigKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY,
        resourcePath.toString());
    DatanodeManager dm = mockDatanodeManager(fsn, conf);

    int totalDNs = 5;
    // Register 5 datanodes and 2 nodes per rack with different load.
    DatanodeInfo[] locs = new DatanodeInfo[totalDNs];
    String[] storageIDs = new String[totalDNs];
    List<StorageType> storageTypesList =
        new ArrayList<>(Arrays.asList(StorageType.DISK, StorageType.DISK,
            StorageType.DEFAULT, StorageType.SSD, StorageType.SSD));
    StorageType[] storageTypes = storageTypesList.toArray(
        StorageType.EMPTY_ARRAY);

    for (int i = 0; i < totalDNs; i++) {
      // Register new datanode.
      String uuid = "UUID-" + i;
      String ip = "IP-" + i / 2 + "-" + i;
      DatanodeRegistration dr = Mockito.mock(DatanodeRegistration.class);
      Mockito.when(dr.getDatanodeUuid()).thenReturn(uuid);
      Mockito.when(dr.getIpAddr()).thenReturn(ip);
      dm.registerDatanode(dr);

      // Get location and storage information.
      locs[i] = dm.getDatanode(uuid);
      storageIDs[i] = "storageID-" + i;

      // Set load for datanodes.
      locs[i].setXceiverCount(i);
    }

    // Set node 0 decommissioned.
    locs[0].setDecommissioned();

    // Create LocatedBlock with above locations.
    ExtendedBlock b = new ExtendedBlock("somePoolID", 1234);
    LocatedBlock block = new LocatedBlock(b, locs, storageIDs, storageTypes);
    List<LocatedBlock> blocks = new ArrayList<>();
    blocks.add(block);

    // Test client located at locs[3] in cluster.
    final String targetIpInCluster = locs[3].getIpAddr();
    dm.sortLocatedBlocks(targetIpInCluster, blocks);
    DatanodeInfo[] sortedLocs = block.getLocations();
    assertEquals(totalDNs, sortedLocs.length);
    // Ensure the local node is first.
    assertEquals(targetIpInCluster, sortedLocs[0].getIpAddr());
    // Ensure choose the light weight node between light weight and fast storage
    // type node when distance is same.
    assertEquals(locs[3].getIpAddr(), sortedLocs[0].getIpAddr());
    assertEquals(locs[2].getIpAddr(), sortedLocs[1].getIpAddr());
    assertEquals(locs[1].getIpAddr(), sortedLocs[2].getIpAddr());
    assertEquals(locs[4].getIpAddr(), sortedLocs[3].getIpAddr());
    // Ensure the two decommissioned DNs were moved to the end.
    assertThat(sortedLocs[4].getAdminState(),
        is(DatanodeInfo.AdminStates.DECOMMISSIONED));
    assertEquals(locs[0].getIpAddr(), sortedLocs[4].getIpAddr());

    // Test client not in cluster but same rack with locs[3].
    final String targetIpNotInCluster = locs[3].getIpAddr() + "-client";
    dm.sortLocatedBlocks(targetIpNotInCluster, blocks);
    DatanodeInfo[] sortedLocs2 = block.getLocations();
    assertEquals(totalDNs, sortedLocs2.length);
    // Ensure the local rack is first and choose the light weight node between
    // light weight and fast storage type node when distance is same.
    assertEquals(locs[2].getIpAddr(), sortedLocs2[0].getIpAddr());
    assertEquals(locs[3].getIpAddr(), sortedLocs2[1].getIpAddr());
    assertEquals(locs[1].getIpAddr(), sortedLocs2[2].getIpAddr());
    assertEquals(locs[4].getIpAddr(), sortedLocs2[3].getIpAddr());
    // Ensure the two decommissioned DNs were moved to the end.
    assertThat(sortedLocs[4].getAdminState(),
        is(DatanodeInfo.AdminStates.DECOMMISSIONED));
    assertEquals(locs[0].getIpAddr(), sortedLocs2[4].getIpAddr());
  }

  /**
   * Test whether removing a host from the includes list without adding it to
   * the excludes list will exclude it from data node reports.
   */
  @Test
  public void testRemoveIncludedNode() throws IOException {
    FSNamesystem fsn = Mockito.mock(FSNamesystem.class);

    // Set the write lock so that the DatanodeManager can start
    Mockito.when(fsn.hasWriteLock()).thenReturn(true);
    Mockito.when(fsn.hasWriteLock(RwLockMode.BM)).thenReturn(true);

    DatanodeManager dm = mockDatanodeManager(fsn, new Configuration());
    HostFileManager hm = new HostFileManager();
    HostSet noNodes = new HostSet();
    HostSet oneNode = new HostSet();
    HostSet twoNodes = new HostSet();
    DatanodeRegistration dr1 = new DatanodeRegistration(
      new DatanodeID("127.0.0.1", "127.0.0.1", "someStorageID-123",
          12345, 12345, 12345, 12345),
      new StorageInfo(HdfsServerConstants.NodeType.DATA_NODE),
      new ExportedBlockKeys(), "test");
    DatanodeRegistration dr2 = new DatanodeRegistration(
      new DatanodeID("127.0.0.1", "127.0.0.1", "someStorageID-234",
          23456, 23456, 23456, 23456),
      new StorageInfo(HdfsServerConstants.NodeType.DATA_NODE),
      new ExportedBlockKeys(), "test");

    twoNodes.add(entry("127.0.0.1:12345"));
    twoNodes.add(entry("127.0.0.1:23456"));
    oneNode.add(entry("127.0.0.1:23456"));

    hm.refresh(twoNodes, noNodes);
    Whitebox.setInternalState(dm, "hostConfigManager", hm);

    // Register two data nodes to simulate them coming up.
    // We need to add two nodes, because if we have only one node, removing it
    // will cause the includes list to be empty, which means all hosts will be
    // allowed.
    dm.registerDatanode(dr1);
    dm.registerDatanode(dr2);

    // Make sure that both nodes are reported
    List<DatanodeDescriptor> both =
        dm.getDatanodeListForReport(HdfsConstants.DatanodeReportType.ALL);

    // Sort the list so that we know which one is which
    Collections.sort(both);

    Assert.assertEquals("Incorrect number of hosts reported",
        2, both.size());
    Assert.assertEquals("Unexpected host or host in unexpected position",
        "127.0.0.1:12345", both.get(0).getInfoAddr());
    Assert.assertEquals("Unexpected host or host in unexpected position",
        "127.0.0.1:23456", both.get(1).getInfoAddr());

    // Remove one node from includes, but do not add it to excludes.
    hm.refresh(oneNode, noNodes);

    // Make sure that only one node is still reported
    List<DatanodeDescriptor> onlyOne =
        dm.getDatanodeListForReport(HdfsConstants.DatanodeReportType.ALL);

    Assert.assertEquals("Incorrect number of hosts reported",
        1, onlyOne.size());
    Assert.assertEquals("Unexpected host reported",
        "127.0.0.1:23456", onlyOne.get(0).getInfoAddr());

    // Remove all nodes from includes
    hm.refresh(noNodes, noNodes);

    // Check that both nodes are reported again
    List<DatanodeDescriptor> bothAgain =
        dm.getDatanodeListForReport(HdfsConstants.DatanodeReportType.ALL);

    // Sort the list so that we know which one is which
    Collections.sort(bothAgain);

    Assert.assertEquals("Incorrect number of hosts reported",
        2, bothAgain.size());
    Assert.assertEquals("Unexpected host or host in unexpected position",
        "127.0.0.1:12345", bothAgain.get(0).getInfoAddr());
    Assert.assertEquals("Unexpected host or host in unexpected position",
        "127.0.0.1:23456", bothAgain.get(1).getInfoAddr());
  }

  /**
   * Verify the correctness of pending recovery process.
   *
   * @param numReplicationBlocks the number of replication blocks in the queue.
   * @param numEcBlocksToBeReplicated the number of EC blocks to be replicated in the queue.
   * @param numBlocksToBeErasureCoded number of EC blocks to be erasure coded in the queue.
   * @param maxTransfers the maxTransfer value.
   * @param maxTransfersHardLimit the maxTransfer hard limit value.
   * @param numReplicationTasks the number of replication tasks polled from the queue.
   * @param numECTasksToBeReplicated the number of EC tasks to be replicated polled from the queue.
   * @param numECTasksToBeErasureCoded the number of EC tasks to be erasure coded polled from
   *                                   the queue.
   * @param isDecommissioning if the node is in the decommissioning process.
   *
   * @throws IOException
   */
  private void verifyPendingRecoveryTasks(
      int numReplicationBlocks, int numEcBlocksToBeReplicated, int numBlocksToBeErasureCoded,
      int maxTransfers, int maxTransfersHardLimit, int numReplicationTasks,
      int numECTasksToBeReplicated, int numECTasksToBeErasureCoded, boolean isDecommissioning)
      throws IOException {
    FSNamesystem fsn = Mockito.mock(FSNamesystem.class);
    Mockito.when(fsn.hasWriteLock()).thenReturn(true);
    Mockito.when(fsn.hasWriteLock(RwLockMode.BM)).thenReturn(true);
    Configuration conf = new Configuration();
    conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, maxTransfers);
    conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY,
        maxTransfersHardLimit);
    DatanodeManager dm = Mockito.spy(mockDatanodeManager(fsn, conf));

    DatanodeDescriptor nodeInfo = Mockito.mock(DatanodeDescriptor.class);
    Mockito.when(nodeInfo.isRegistered()).thenReturn(true);
    Mockito.when(nodeInfo.getStorageInfos())
        .thenReturn(new DatanodeStorageInfo[0]);
    Mockito.when(nodeInfo.isDecommissionInProgress())
        .thenReturn(isDecommissioning);

    if (numReplicationBlocks > 0) {
      Mockito.when(nodeInfo.getNumberOfReplicateBlocks())
          .thenReturn(numReplicationBlocks);

      List<BlockTargetPair> tasks =
          Collections.nCopies(
              Math.min(numReplicationTasks, numReplicationBlocks),
              new BlockTargetPair(null, null));
      Mockito.when(nodeInfo.getReplicationCommand(numReplicationTasks))
          .thenReturn(tasks);
    }

    if (numEcBlocksToBeReplicated > 0) {
      Mockito.when(nodeInfo.getNumberOfECBlocksToBeReplicated())
              .thenReturn(numEcBlocksToBeReplicated);

      List<BlockTargetPair> ecReplicatedTasks =
              Collections.nCopies(
                      Math.min(numECTasksToBeReplicated, numEcBlocksToBeReplicated),
                      new BlockTargetPair(null, null));
      Mockito.when(nodeInfo.getECReplicatedCommand(numECTasksToBeReplicated))
              .thenReturn(ecReplicatedTasks);
    }

    if (numBlocksToBeErasureCoded > 0) {
      Mockito.when(nodeInfo.getNumberOfBlocksToBeErasureCoded())
          .thenReturn(numBlocksToBeErasureCoded);

      List<BlockECReconstructionInfo> tasks =
          Collections.nCopies(numECTasksToBeErasureCoded, null);
      Mockito.when(nodeInfo.getErasureCodeCommand(numECTasksToBeErasureCoded))
          .thenReturn(tasks);
    }

    DatanodeRegistration dnReg = Mockito.mock(DatanodeRegistration.class);
    Mockito.when(dm.getDatanode(dnReg)).thenReturn(nodeInfo);
    DatanodeCommand[] cmds = dm.handleHeartbeat(
        dnReg, new StorageReport[1], "bp-123", 0, 0, 10, 0, 0, null,
        SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT);

    long expectedNumCmds = Arrays.stream(
        new int[]{numReplicationTasks + numECTasksToBeReplicated, numECTasksToBeErasureCoded})
        .filter(x -> x > 0)
        .count();
    assertEquals(expectedNumCmds, cmds.length);

    int idx = 0;
    if (numReplicationTasks > 0 || numECTasksToBeReplicated > 0) {
      assertTrue(cmds[idx] instanceof BlockCommand);
      BlockCommand cmd = (BlockCommand) cmds[0];
      assertEquals(numReplicationTasks + numECTasksToBeReplicated, cmd.getBlocks().length);
      assertEquals(numReplicationTasks + numECTasksToBeReplicated, cmd.getTargets().length);
      idx++;
    }

    if (numECTasksToBeErasureCoded > 0) {
      assertTrue(cmds[idx] instanceof BlockECReconstructionCommand);
      BlockECReconstructionCommand cmd =
          (BlockECReconstructionCommand) cmds[idx];
      assertEquals(numECTasksToBeErasureCoded, cmd.getECTasks().size());
    }

    Mockito.verify(nodeInfo).getReplicationCommand(numReplicationTasks);
    Mockito.verify(nodeInfo).getECReplicatedCommand(numECTasksToBeReplicated);
    Mockito.verify(nodeInfo).getErasureCodeCommand(numECTasksToBeErasureCoded);
  }

  @Test
  public void testPendingRecoveryTasks() throws IOException {
    // Tasks are slitted according to the ratio between queue lengths.
    verifyPendingRecoveryTasks(20, 0, 20, 20, 30, 10, 0, 10, false);
    verifyPendingRecoveryTasks(40, 0, 10, 20, 30, 16, 0, 4, false);

    // Approximately load tasks if the ratio between queue length is large.
    verifyPendingRecoveryTasks(400, 0, 1, 20, 30, 20, 0, 1, false);

    // Tasks use dfs.namenode.replication.max-streams-hard-limit for decommissioning node
    verifyPendingRecoveryTasks(20, 10, 10, 20, 40, 10, 10, 5, true);
  }

  @Test
  public void testNetworkTopologyInstantiation() throws Exception {
    // case 1, dfs.use.dfs.network.topology=true, use the default
    // DFSNetworkTopology impl.
    Configuration conf1 = new HdfsConfiguration();
    FSNamesystem fsn = Mockito.mock(FSNamesystem.class);
    DatanodeManager dm1 = mockDatanodeManager(fsn, conf1);
    assertEquals(DFSNetworkTopology.class, dm1.getNetworkTopology().getClass());

    // case 2, dfs.use.dfs.network.topology=false, use the default
    // NetworkTopology impl.
    Configuration conf2 = new HdfsConfiguration();
    conf2.setBoolean(DFSConfigKeys.DFS_USE_DFS_NETWORK_TOPOLOGY_KEY, false);
    DatanodeManager dm2 = mockDatanodeManager(fsn, conf2);
    assertEquals(NetworkTopology.class, dm2.getNetworkTopology()
        .getClass());

    // case 3, dfs.use.dfs.network.topology=false, and specify the
    // net.topology.impl property.
    Configuration conf3 = new HdfsConfiguration();
    conf3.setClass(CommonConfigurationKeysPublic.NET_TOPOLOGY_IMPL_KEY,
        MockDfsNetworkTopology.class, NetworkTopology.class);
    conf3.setBoolean(DFSConfigKeys.DFS_USE_DFS_NETWORK_TOPOLOGY_KEY, false);
    DatanodeManager dm3 = mockDatanodeManager(fsn, conf3);
    assertEquals(MockDfsNetworkTopology.class, dm3.getNetworkTopology()
        .getClass());

    // case 4, dfs.use.dfs.network.topology=true, and specify the
    // dfs.net.topology.impl property.
    Configuration conf4 = new HdfsConfiguration();
    conf4.setClass(DFSConfigKeys.DFS_NET_TOPOLOGY_IMPL_KEY,
            MockDfsNetworkTopology.class, NetworkTopology.class);
    conf4.setBoolean(DFSConfigKeys.DFS_USE_DFS_NETWORK_TOPOLOGY_KEY, true);
    DatanodeManager dm4 = mockDatanodeManager(fsn, conf4);
    assertEquals(MockDfsNetworkTopology.class, dm4.getNetworkTopology()
        .getClass());
  }

  /**
   * A NetworkTopology implementation for test.
   *
   */
  public static class MockDfsNetworkTopology extends DFSNetworkTopology {
    public MockDfsNetworkTopology(){
      super();
    }
  }

  @Test
  public void testComputeReconstructedTaskNum() throws IOException {
    verifyComputeReconstructedTaskNum(100, 100, 150, 250, 100);
    verifyComputeReconstructedTaskNum(200, 100000, 200000, 300000, 400000);
    verifyComputeReconstructedTaskNum(1000000, 100, 150, 250, 100);
    verifyComputeReconstructedTaskNum(14000000, 200, 200, 400, 200);

  }
  public void verifyComputeReconstructedTaskNum(int xmitsInProgress, int numReplicationBlocks,
      int maxTransfers, int numECTasksToBeReplicated, int numBlocksToBeErasureCoded)
      throws IOException {
    FSNamesystem fsn = Mockito.mock(FSNamesystem.class);
    Mockito.when(fsn.hasWriteLock()).thenReturn(true);
    Mockito.when(fsn.hasWriteLock(RwLockMode.BM)).thenReturn(true);
    Configuration conf = new Configuration();
    conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, maxTransfers);
    DatanodeManager dm = Mockito.spy(mockDatanodeManager(fsn, conf));

    DatanodeDescriptor nodeInfo = Mockito.mock(DatanodeDescriptor.class);
    Mockito.when(nodeInfo.isRegistered()).thenReturn(true);
    Mockito.when(nodeInfo.getStorageInfos()).thenReturn(new DatanodeStorageInfo[0]);

    Mockito.when(nodeInfo.getNumberOfReplicateBlocks()).thenReturn(numReplicationBlocks);
    Mockito.when(nodeInfo.getNumberOfECBlocksToBeReplicated()).thenReturn(numECTasksToBeReplicated);
    Mockito.when(nodeInfo.getNumberOfBlocksToBeErasureCoded())
        .thenReturn(numBlocksToBeErasureCoded);

    // Create an ArgumentCaptor to capture the counts for numReplicationTasks,
    // numEcReplicatedTasks,numECReconstructedTasks.
    ArgumentCaptor<Integer> captor = ArgumentCaptor.forClass(Integer.class);
    Mockito.when(nodeInfo.getErasureCodeCommand(ArgumentMatchers.anyInt()))
        .thenReturn(Collections.nCopies(0, null));
    Mockito.when(nodeInfo.getReplicationCommand(ArgumentMatchers.anyInt()))
        .thenReturn(Collections.nCopies(0, null));
    Mockito.when(nodeInfo.getECReplicatedCommand(ArgumentMatchers.anyInt()))
        .thenReturn(Collections.nCopies(0, null));

    DatanodeRegistration nodeReg = Mockito.mock(DatanodeRegistration.class);
    Mockito.when(dm.getDatanode(nodeReg)).thenReturn(nodeInfo);


    dm.handleHeartbeat(nodeReg, new StorageReport[1], "bp-123", 0, 0,
        10, xmitsInProgress, 0, null, SlowPeerReports.EMPTY_REPORT,
        SlowDiskReports.EMPTY_REPORT);

    Mockito.verify(nodeInfo).getReplicationCommand(captor.capture());
    int numReplicationTasks = captor.getValue();

    Mockito.verify(nodeInfo).getECReplicatedCommand(captor.capture());
    int numEcReplicatedTasks = captor.getValue();

    Mockito.verify(nodeInfo).getErasureCodeCommand(captor.capture());
    int numECReconstructedTasks = captor.getValue();

    // Verify that when DN xmitsInProgress exceeds maxTransfers,
    // the number of tasks should be <= 0.
    if (xmitsInProgress >= maxTransfers) {
      assertTrue(numReplicationTasks <= 0);
      assertTrue(numEcReplicatedTasks <= 0);
      assertTrue(numECReconstructedTasks <= 0);
    } else {
      assertTrue(numReplicationTasks >= 0);
      assertTrue(numEcReplicatedTasks >= 0);
      assertTrue(numECReconstructedTasks >= 0);
    }
  }
}