TestReplicationPolicy.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.hdfs.server.blockmanagement;

import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOADBYSTORAGETYPE_KEY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.AddBlockFlag;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.LogVerificationAppender;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.TestBlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.StatefulBlockInfo;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.INodeFile;
import org.apache.hadoop.hdfs.server.namenode.Namesystem;
import org.apache.hadoop.hdfs.server.namenode.TestINodeFile;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.util.RwLockMode;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.log4j.spi.LoggingEvent;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
public class TestReplicationPolicy extends BaseReplicationPolicyTest {

  private static final String filename = "/dummyfile.txt";
  // The interval for marking a datanode as stale,
  private static final long staleInterval =
      DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT;
  private static AtomicLong mockINodeId = new AtomicLong(0);
  @Rule
  public ExpectedException exception = ExpectedException.none();

  public TestReplicationPolicy(String blockPlacementPolicyClassName) {
    this.blockPlacementPolicy = blockPlacementPolicyClassName;
  }

  @Parameterized.Parameters
  public static Iterable<Object[]> data() {
    return Arrays.asList(new Object[][] {
        { BlockPlacementPolicyDefault.class.getName() },
        { BlockPlacementPolicyWithUpgradeDomain.class.getName() } });
  }

  private void updateHeartbeatForExtraStorage(long capacity,
      long dfsUsed, long remaining, long blockPoolUsed) {
    DatanodeDescriptor dn = dataNodes[5];
    dn.getStorageInfos()[1].setUtilizationForTesting(
        capacity, dfsUsed, remaining, blockPoolUsed);
    dn.updateHeartbeat(
        BlockManagerTestUtil.getStorageReportsForDatanode(dn),
        0L, 0L, 0, 0, null);
  }

  private void resetHeartbeatForStorages() {
    for (int i=0; i < dataNodes.length; i++) {
      updateHeartbeatWithUsage(dataNodes[i],
          2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
          2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L,
          0, 0);
    }
    // No available space in the extra storage of dn0
    updateHeartbeatForExtraStorage(0L, 0L, 0L, 0L);
  }

  @Override
  DatanodeDescriptor[] getDatanodeDescriptors(Configuration conf) {
    final String[] racks = {
        "/d1/r1",
        "/d1/r1",
        "/d1/r2",
        "/d1/r2",
        "/d2/r3",
        "/d2/r3"};
    storages = DFSTestUtil.createDatanodeStorageInfos(racks);
    // create an extra storage for dn5.
    DatanodeStorage extraStorage = new DatanodeStorage(
        storages[5].getStorageID() + "-extra", DatanodeStorage.State.NORMAL,
        StorageType.DEFAULT);
    BlockManagerTestUtil.updateStorage(storages[5].getDatanodeDescriptor(),
        extraStorage);
    return DFSTestUtil.toDatanodeDescriptor(storages);
  }

  /**
   * Test whether the remaining space per storage is individually
   * considered.
   */
  @Test
  public void testChooseNodeWithMultipleStorages1() throws Exception {
    updateHeartbeatWithUsage(dataNodes[5],
        2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
        (2*HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE)/3, 0L,
        0L, 0L, 0, 0);

    updateHeartbeatForExtraStorage(
        2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
        (2*HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE)/3, 0L);

    DatanodeStorageInfo[] targets;
    targets = chooseTarget (1, dataNodes[5],
        new ArrayList<DatanodeStorageInfo>(), null);
    assertEquals(1, targets.length);
    assertEquals(storages[4], targets[0]);

    resetHeartbeatForStorages();
  }

  /**
   * Test whether all storages on the datanode are considered while
   * choosing target to place block.
   */
  @Test
  public void testChooseNodeWithMultipleStorages2() throws Exception {
    updateHeartbeatWithUsage(dataNodes[5],
        2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
        (2*HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE)/3, 0L,
        0L, 0L, 0, 0);

    updateHeartbeatForExtraStorage(
        2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
        HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L);

    DatanodeStorageInfo[] targets;
    targets = chooseTarget (1, dataNodes[5],
        new ArrayList<DatanodeStorageInfo>(), null);
    assertEquals(1, targets.length);
    assertEquals(dataNodes[5], targets[0].getDatanodeDescriptor());

    resetHeartbeatForStorages();
  }

  /**
   * In this testcase, client is dataNodes[0]. So the 1st replica should be
   * placed on dataNodes[0], the 2nd replica should be placed on 
   * different rack and third should be placed on different node
   * of rack chosen for 2nd node.
   * The only excpetion is when the <i>numOfReplicas</i> is 2, 
   * the 1st is on dataNodes[0] and the 2nd is on a different rack.
   * @throws Exception
   */
  @Test
  public void testChooseTarget1() throws Exception {
    updateHeartbeatWithUsage(dataNodes[0],
        2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
        HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
        0L, 0L, 4, 0); // overloaded

    DatanodeStorageInfo[] targets;
    targets = chooseTarget(0);
    assertEquals(targets.length, 0);
    
    targets = chooseTarget(1);
    assertEquals(targets.length, 1);
    assertEquals(storages[0], targets[0]);
    
    targets = chooseTarget(2);
    assertEquals(targets.length, 2);
    assertEquals(storages[0], targets[0]);
    assertFalse(isOnSameRack(targets[0], targets[1]));
    
    targets = chooseTarget(3);
    assertEquals(targets.length, 3);
    assertEquals(storages[0], targets[0]);
    assertFalse(isOnSameRack(targets[0], targets[1]));
    assertTrue(isOnSameRack(targets[1], targets[2]));

    targets = chooseTarget(4);
    assertEquals(targets.length, 4);
    assertEquals(storages[0], targets[0]);
    assertTrue(isOnSameRack(targets[1], targets[2]) ||
               isOnSameRack(targets[2], targets[3]));
    assertFalse(isOnSameRack(targets[0], targets[2]));

    resetHeartbeatForStorages();
  }

  /**
   * In this testcase, client is dataNodes[0], but the dataNodes[1] is
   * not allowed to be chosen. So the 1st replica should be
   * placed on dataNodes[0], the 2nd replica should be placed on a different
   * rack, the 3rd should be on same rack as the 2nd replica, and the rest
   * should be placed on a third rack.
   * @throws Exception
   */
  @Test
  public void testChooseTarget2() throws Exception { 
    Set<Node> excludedNodes;
    DatanodeStorageInfo[] targets;
    List<DatanodeStorageInfo> chosenNodes = new ArrayList<>();

    excludedNodes = new HashSet<>();
    excludedNodes.add(dataNodes[1]);
    targets = chooseTarget(0, chosenNodes, excludedNodes);
    assertEquals(targets.length, 0);
    
    excludedNodes.clear();
    chosenNodes.clear();
    excludedNodes.add(dataNodes[1]); 
    targets = chooseTarget(1, chosenNodes, excludedNodes);
    assertEquals(targets.length, 1);
    assertEquals(storages[0], targets[0]);
    
    excludedNodes.clear();
    chosenNodes.clear();
    excludedNodes.add(dataNodes[1]); 
    targets = chooseTarget(2, chosenNodes, excludedNodes);
    assertEquals(targets.length, 2);
    assertEquals(storages[0], targets[0]);

    assertFalse(isOnSameRack(targets[0], targets[1]));
    
    excludedNodes.clear();
    chosenNodes.clear();
    excludedNodes.add(dataNodes[1]); 
    targets = chooseTarget(3, chosenNodes, excludedNodes);
    assertEquals(targets.length, 3);
    assertEquals(storages[0], targets[0]);

    assertFalse(isOnSameRack(targets[0], targets[1]));
    assertTrue(isOnSameRack(targets[1], targets[2]));
    
    excludedNodes.clear();
    chosenNodes.clear();
    excludedNodes.add(dataNodes[1]); 
    targets = chooseTarget(4, chosenNodes, excludedNodes);
    assertEquals(targets.length, 4);
    assertEquals(storages[0], targets[0]);

    for(int i=1; i<4; i++) {
      assertFalse(isOnSameRack(targets[0], targets[i]));
    }
    assertTrue(isOnSameRack(targets[1], targets[2]) ||
               isOnSameRack(targets[2], targets[3]));
    assertFalse(isOnSameRack(targets[1], targets[3]));

    excludedNodes.clear();
    chosenNodes.clear();
    excludedNodes.add(dataNodes[1]); 
    chosenNodes.add(storages[2]);
    targets = replicator.chooseTarget(filename, 1, dataNodes[0], chosenNodes, true,
        excludedNodes, BLOCK_SIZE, TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY,
        null);
    System.out.println("targets=" + Arrays.asList(targets));
    assertEquals(2, targets.length);
    //make sure that the chosen node is in the target.
    int i = 0;
    for (; i < targets.length && !storages[2].equals(targets[i]); i++);
    assertTrue(i < targets.length);
  }

  /**
   * In this testcase, client is dataNodes[0], but dataNodes[0] is not qualified
   * to be chosen. So the 1st replica should be placed on dataNodes[1], 
   * the 2nd replica should be placed on a different rack,
   * the 3rd replica should be placed on the same rack as the 2nd replica,
   * and the rest should be placed on the third rack.
   * @throws Exception
   */
  @Test
  public void testChooseTarget3() throws Exception {
    // make data node 0 to be not qualified to choose
    updateHeartbeatWithUsage(dataNodes[0],
        2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
        (HdfsServerConstants.MIN_BLOCKS_FOR_WRITE-1)*BLOCK_SIZE, 0L,
        0L, 0L, 0, 0); // no space
        
    DatanodeStorageInfo[] targets;
    targets = chooseTarget(0);
    assertEquals(targets.length, 0);
    
    targets = chooseTarget(1);
    assertEquals(targets.length, 1);
    assertEquals(storages[1], targets[0]);
    
    targets = chooseTarget(2);
    assertEquals(targets.length, 2);
    assertEquals(storages[1], targets[0]);
    assertFalse(isOnSameRack(targets[0], targets[1]));
    
    targets = chooseTarget(3);
    assertEquals(targets.length, 3);
    assertEquals(storages[1], targets[0]);
    assertTrue(isOnSameRack(targets[1], targets[2]));
    assertFalse(isOnSameRack(targets[0], targets[1]));
    
    targets = chooseTarget(4);
    assertEquals(targets.length, 4);
    assertEquals(storages[1], targets[0]);
    for(int i=1; i<4; i++) {
      assertFalse(isOnSameRack(targets[0], targets[i]));
    }
    assertTrue(isOnSameRack(targets[1], targets[2]) ||
               isOnSameRack(targets[2], targets[3]));
    assertFalse(isOnSameRack(targets[1], targets[3]));

    resetHeartbeatForStorages();
  }
  
  /**
   * In this testcase, client is dataNodes[0], but none of the nodes on rack 1
   * is qualified to be chosen. So the 1st replica should be placed on either
   * rack 2 or rack 3. 
   * the 2nd replica should be placed on a different rack,
   * the 3rd replica should be placed on the same rack as the 1st replica,
   * @throws Exception
   */
  @Test
  public void testChoooseTarget4() throws Exception {
    // make data node 0 & 1 to be not qualified to choose: not enough disk space
    for(int i=0; i<2; i++) {
      updateHeartbeatWithUsage(dataNodes[i],
          2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
          (HdfsServerConstants.MIN_BLOCKS_FOR_WRITE-1)*BLOCK_SIZE, 0L, 0L, 0L, 0, 0);
    }
      
    DatanodeStorageInfo[] targets;
    targets = chooseTarget(0);
    assertEquals(targets.length, 0);
    
    targets = chooseTarget(1);
    assertEquals(targets.length, 1);
    assertFalse(isOnSameRack(targets[0], dataNodes[0]));
    
    targets = chooseTarget(2);
    assertEquals(targets.length, 2);
    assertFalse(isOnSameRack(targets[0], dataNodes[0]));
    assertFalse(isOnSameRack(targets[0], targets[1]));
    
    targets = chooseTarget(3);
    assertEquals(targets.length, 3);
    for(int i=0; i<3; i++) {
      assertFalse(isOnSameRack(targets[i], dataNodes[0]));
    }
    assertTrue(isOnSameRack(targets[0], targets[1]) ||
               isOnSameRack(targets[1], targets[2]));
    assertFalse(isOnSameRack(targets[0], targets[2]));

    resetHeartbeatForStorages();
  }

  /**
   * In this testcase, client is is a node outside of file system.
   * So the 1st replica can be placed on any node. 
   * the 2nd replica should be placed on a different rack,
   * the 3rd replica should be placed on the same rack as the 2nd replica,
   * @throws Exception
   */
  @Test
  public void testChooseTarget5() throws Exception {
    DatanodeDescriptor writerDesc =
      DFSTestUtil.getDatanodeDescriptor("7.7.7.7", "/d2/r4");

    DatanodeStorageInfo[] targets;
    targets = chooseTarget(0, writerDesc);
    assertEquals(targets.length, 0);

    targets = chooseTarget(1, writerDesc);
    assertEquals(targets.length, 1);

    targets = chooseTarget(2, writerDesc);
    assertEquals(targets.length, 2);
    assertFalse(isOnSameRack(targets[0], targets[1]));

    targets = chooseTarget(3, writerDesc);
    assertEquals(targets.length, 3);
    assertTrue(isOnSameRack(targets[1], targets[2]));
    assertFalse(isOnSameRack(targets[0], targets[1]));
  }

  /**
   * In this testcase, there are enough total number of nodes, but only
   * one rack is actually available.
   * @throws Exception
   */
  @Test
  public void testChooseTarget6() throws Exception {
    DatanodeStorageInfo storage = DFSTestUtil.createDatanodeStorageInfo(
        "DS-xxxx", "7.7.7.7", "/d2/r3", "host7");
    DatanodeDescriptor newDn = storage.getDatanodeDescriptor();
    Set<Node> excludedNodes;
    List<DatanodeStorageInfo> chosenNodes = new ArrayList<>();

    excludedNodes = new HashSet<>();
    excludedNodes.add(dataNodes[0]);
    excludedNodes.add(dataNodes[1]);
    excludedNodes.add(dataNodes[2]);
    excludedNodes.add(dataNodes[3]);

    DatanodeStorageInfo[] targets;
    // Only two nodes available in a rack. Try picking two nodes. Only one
    // should return.
    targets = chooseTarget(2, chosenNodes, excludedNodes);
    assertEquals(1, targets.length);

    // Make three nodes available in a rack.
    final BlockManager bm = namenode.getNamesystem().getBlockManager();
    bm.getDatanodeManager().getNetworkTopology().add(newDn);
    bm.getDatanodeManager().getHeartbeatManager().addDatanode(newDn);
    updateHeartbeatWithUsage(newDn,
        2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
        2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, 0);

    // Try picking three nodes. Only two should return.
    excludedNodes.clear();
    excludedNodes.add(dataNodes[0]);
    excludedNodes.add(dataNodes[1]);
    excludedNodes.add(dataNodes[2]);
    excludedNodes.add(dataNodes[3]);
    chosenNodes.clear();
    try {
      targets = chooseTarget(3, chosenNodes, excludedNodes);
      assertEquals(2, targets.length);
    } finally {
      bm.getDatanodeManager().getNetworkTopology().remove(newDn);
    }
    resetHeartbeatForStorages();
  }


  /**
   * In this testcase, it tries to choose more targets than available nodes and
   * check the result, with stale node avoidance on the write path enabled.
   * @throws Exception
   */
  @Test
  public void testChooseTargetWithMoreThanAvailableNodesWithStaleness()
      throws Exception {
    try {
      namenode.getNamesystem().getBlockManager().getDatanodeManager()
        .setNumStaleNodes(dataNodes.length);
      testChooseTargetWithMoreThanAvailableNodes();
    } finally {
      namenode.getNamesystem().getBlockManager().getDatanodeManager()
        .setNumStaleNodes(0);
    }
  }
  
  /**
   * In this testcase, it tries to choose more targets than available nodes and
   * check the result. 
   * @throws Exception
   */
  @Test
  public void testChooseTargetWithMoreThanAvailableNodes() throws Exception {
    // make data node 0 & 1 to be not qualified to choose: not enough disk space
    for(int i=0; i<2; i++) {
      updateHeartbeatWithUsage(dataNodes[i],
          2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
          (HdfsServerConstants.MIN_BLOCKS_FOR_WRITE-1)*BLOCK_SIZE, 0L, 0L, 0L, 0, 0);
    }
    
    final LogVerificationAppender appender = new LogVerificationAppender();
    final Logger logger = Logger.getRootLogger();
    logger.addAppender(appender);
    
    // try to choose NUM_OF_DATANODES which is more than actually available
    // nodes.
    DatanodeStorageInfo[] targets = chooseTarget(dataNodes.length);
    assertEquals(targets.length, dataNodes.length - 2);

    final List<LoggingEvent> log = appender.getLog();
    assertNotNull(log);
    assertFalse(log.size() == 0);
    final LoggingEvent lastLogEntry = log.get(log.size() - 1);
    
    assertTrue(Level.WARN.isGreaterOrEqual(lastLogEntry.getLevel()));
    // Suppose to place replicas on each node but two data nodes are not
    // available for placing replica, so here we expect a short of 2
    assertTrue(((String)lastLogEntry.getMessage()).contains("in need of 2"));

    resetHeartbeatForStorages();
  }

  private boolean containsWithinRange(DatanodeStorageInfo target,
      DatanodeDescriptor[] nodes, int startIndex, int endIndex) {
    assert startIndex >= 0 && startIndex < nodes.length;
    assert endIndex >= startIndex && endIndex < nodes.length;
    for (int i = startIndex; i <= endIndex; i++) {
      if (nodes[i].equals(target.getDatanodeDescriptor())) {
        return true;
      }
    }
    return false;
  }
  
  private boolean containsWithinRange(DatanodeDescriptor target,
      DatanodeStorageInfo[] nodes, int startIndex, int endIndex) {
    assert startIndex >= 0 && startIndex < nodes.length;
    assert endIndex >= startIndex && endIndex < nodes.length;
    for (int i = startIndex; i <= endIndex; i++) {
      if (nodes[i].getDatanodeDescriptor().equals(target)) {
        return true;
      }
    }
    return false;
  }

  @Test
  public void testChooseTargetWithStaleNodes() throws Exception {
    // Set dataNodes[0] as stale
    DFSTestUtil.resetLastUpdatesWithOffset(dataNodes[0], -(staleInterval + 1));
    namenode.getNamesystem().getBlockManager()
      .getDatanodeManager().getHeartbeatManager().heartbeatCheck();
    assertTrue(namenode.getNamesystem().getBlockManager()
        .getDatanodeManager().shouldAvoidStaleDataNodesForWrite());
    DatanodeStorageInfo[] targets;
    // We set the datanode[0] as stale, thus should choose datanode[1] since
    // datanode[1] is on the same rack with datanode[0] (writer)
    targets = chooseTarget(1);
    assertEquals(targets.length, 1);
    assertEquals(storages[1], targets[0]);

    Set<Node> excludedNodes = new HashSet<>();
    excludedNodes.add(dataNodes[1]);
    List<DatanodeStorageInfo> chosenNodes = new ArrayList<>();
    targets = chooseTarget(1, chosenNodes, excludedNodes);
    assertEquals(targets.length, 1);
    assertFalse(isOnSameRack(targets[0], dataNodes[0]));
    
    // reset
    DFSTestUtil.resetLastUpdatesWithOffset(dataNodes[0], 0);
    namenode.getNamesystem().getBlockManager()
      .getDatanodeManager().getHeartbeatManager().heartbeatCheck();
  }

  /**
   * In this testcase, we set 3 nodes (dataNodes[0] ~ dataNodes[2]) as stale,
   * and when the number of replicas is less or equal to 3, all the healthy
   * datanodes should be returned by the chooseTarget method. When the number 
   * of replicas is 4, a stale node should be included.
   * 
   * @throws Exception
   */
  @Test
  public void testChooseTargetWithHalfStaleNodes() throws Exception {
    // Set dataNodes[0], dataNodes[1], and dataNodes[2] as stale
    for (int i = 0; i < 3; i++) {
      DFSTestUtil
          .resetLastUpdatesWithOffset(dataNodes[i], -(staleInterval + 1));
    }
    namenode.getNamesystem().getBlockManager()
      .getDatanodeManager().getHeartbeatManager().heartbeatCheck();

    DatanodeStorageInfo[] targets = chooseTarget(0);
    assertEquals(targets.length, 0);

    // Since we have 6 datanodes total, stale nodes should
    // not be returned until we ask for more than 3 targets
    targets = chooseTarget(1);
    assertEquals(targets.length, 1);
    assertFalse(containsWithinRange(targets[0], dataNodes, 0, 2));

    targets = chooseTarget(2);
    assertEquals(targets.length, 2);
    assertFalse(containsWithinRange(targets[0], dataNodes, 0, 2));
    assertFalse(containsWithinRange(targets[1], dataNodes, 0, 2));

    targets = chooseTarget(3);
    assertEquals(targets.length, 3);
    assertTrue(containsWithinRange(targets[0], dataNodes, 3, 5));
    assertTrue(containsWithinRange(targets[1], dataNodes, 3, 5));
    assertTrue(containsWithinRange(targets[2], dataNodes, 3, 5));

    targets = chooseTarget(4);
    assertEquals(targets.length, 4);
    assertTrue(containsWithinRange(dataNodes[3], targets, 0, 3));
    assertTrue(containsWithinRange(dataNodes[4], targets, 0, 3));
    assertTrue(containsWithinRange(dataNodes[5], targets, 0, 3));

    for (int i = 0; i < dataNodes.length; i++) {
      DFSTestUtil.resetLastUpdatesWithOffset(dataNodes[i], 0);
    }
    namenode.getNamesystem().getBlockManager()
      .getDatanodeManager().getHeartbeatManager().heartbeatCheck();
  }

  @Test
  public void testChooseTargetWithMoreThanHalfStaleNodes() throws Exception {
    HdfsConfiguration conf = new HdfsConfiguration();
    conf.setBoolean(
        DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_KEY, true);
    String[] hosts = new String[]{"host1", "host2", "host3", 
                                  "host4", "host5", "host6"};
    String[] racks = new String[]{"/d1/r1", "/d1/r1", "/d1/r2", 
                                  "/d1/r2", "/d2/r3", "/d2/r3"};
    MiniDFSCluster miniCluster = new MiniDFSCluster.Builder(conf).racks(racks)
        .hosts(hosts).numDataNodes(hosts.length).build();
    miniCluster.waitActive();
    
    try {
      // Step 1. Make two datanodes as stale, check whether the 
      // avoidStaleDataNodesForWrite calculation is correct.
      // First stop the heartbeat of host1 and host2
      for (int i = 0; i < 2; i++) {
        DataNode dn = miniCluster.getDataNodes().get(i);
        DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, true);
        DatanodeDescriptor dnDes = miniCluster.getNameNode().getNamesystem()
            .getBlockManager().getDatanodeManager()
            .getDatanode(dn.getDatanodeId());
        DFSTestUtil.resetLastUpdatesWithOffset(dnDes, -(staleInterval + 1));
      }
      // Instead of waiting, explicitly call heartbeatCheck to 
      // let heartbeat manager to detect stale nodes
      miniCluster.getNameNode().getNamesystem().getBlockManager()
          .getDatanodeManager().getHeartbeatManager().heartbeatCheck();
      int numStaleNodes = miniCluster.getNameNode().getNamesystem()
          .getBlockManager().getDatanodeManager().getNumStaleNodes();
      assertEquals(numStaleNodes, 2);
      assertTrue(miniCluster.getNameNode().getNamesystem().getBlockManager()
          .getDatanodeManager().shouldAvoidStaleDataNodesForWrite());
      // Call chooseTarget
      DatanodeDescriptor staleNodeInfo = miniCluster.getNameNode()
          .getNamesystem().getBlockManager().getDatanodeManager()
          .getDatanode(miniCluster.getDataNodes().get(0).getDatanodeId());
      BlockPlacementPolicy replicator = miniCluster.getNameNode()
          .getNamesystem().getBlockManager().getBlockPlacementPolicy();
      DatanodeStorageInfo[] targets = replicator.chooseTarget(filename, 3,
          staleNodeInfo, new ArrayList<DatanodeStorageInfo>(), false, null,
          BLOCK_SIZE, TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY,
          null);

      assertEquals(targets.length, 3);
      assertFalse(isOnSameRack(targets[0], staleNodeInfo));
      
      // Step 2. Set more than half of the datanodes as stale
      for (int i = 0; i < 4; i++) {
        DataNode dn = miniCluster.getDataNodes().get(i);
        DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, true);
        DatanodeDescriptor dnDesc = miniCluster.getNameNode().getNamesystem().getBlockManager()
            .getDatanodeManager().getDatanode(dn.getDatanodeId());
        DFSTestUtil.resetLastUpdatesWithOffset(dnDesc, -(staleInterval + 1));
      }
      // Explicitly call heartbeatCheck
      miniCluster.getNameNode().getNamesystem().getBlockManager()
          .getDatanodeManager().getHeartbeatManager().heartbeatCheck();
      numStaleNodes = miniCluster.getNameNode().getNamesystem()
          .getBlockManager().getDatanodeManager().getNumStaleNodes();
      assertEquals(numStaleNodes, 4);
      // According to our strategy, stale datanodes will be included for writing
      // to avoid hotspots
      assertFalse(miniCluster.getNameNode().getNamesystem().getBlockManager()
          .getDatanodeManager().shouldAvoidStaleDataNodesForWrite());
      // Call chooseTarget
      targets = replicator.chooseTarget(filename, 3, staleNodeInfo,
          new ArrayList<DatanodeStorageInfo>(), false, null, BLOCK_SIZE,
          TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY, null);
      assertEquals(targets.length, 3);
      assertTrue(isOnSameRack(targets[0], staleNodeInfo));
      
      // Step 3. Set 2 stale datanodes back to healthy nodes,
      // still have 2 stale nodes
      for (int i = 2; i < 4; i++) {
        DataNode dn = miniCluster.getDataNodes().get(i);
        DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, false);
        DatanodeDescriptor dnDesc = miniCluster.getNameNode().getNamesystem()
            .getBlockManager().getDatanodeManager()
            .getDatanode(dn.getDatanodeId());
        DFSTestUtil.resetLastUpdatesWithOffset(dnDesc, 0);
      }
      // Explicitly call heartbeatCheck
      miniCluster.getNameNode().getNamesystem().getBlockManager()
          .getDatanodeManager().getHeartbeatManager().heartbeatCheck();
      numStaleNodes = miniCluster.getNameNode().getNamesystem()
          .getBlockManager().getDatanodeManager().getNumStaleNodes();
      assertEquals(numStaleNodes, 2);
      assertTrue(miniCluster.getNameNode().getNamesystem().getBlockManager()
          .getDatanodeManager().shouldAvoidStaleDataNodesForWrite());
      // Call chooseTarget
      targets = chooseTarget(3, staleNodeInfo);
      assertEquals(targets.length, 3);
      assertFalse(isOnSameRack(targets[0], staleNodeInfo));
    } finally {
      miniCluster.shutdown();
    }
  }
  
  /**
   * This testcase tests re-replication, when dataNodes[0] is already chosen.
   * So the 1st replica can be placed on random rack. 
   * the 2nd replica should be placed on different node by same rack as 
   * the 1st replica. The 3rd replica can be placed randomly.
   * @throws Exception
   */
  @Test
  public void testRereplicate1() throws Exception {
    List<DatanodeStorageInfo> chosenNodes = new ArrayList<>();
    chosenNodes.add(storages[0]);
    DatanodeStorageInfo[] targets;
    
    targets = chooseTarget(0, chosenNodes);
    assertEquals(targets.length, 0);
    
    targets = chooseTarget(1, chosenNodes);
    assertEquals(targets.length, 1);
    assertFalse(isOnSameRack(targets[0], dataNodes[0]));
    
    targets = chooseTarget(2, chosenNodes);
    assertEquals(targets.length, 2);
    assertTrue(isOnSameRack(targets[0], dataNodes[0]));
    assertFalse(isOnSameRack(targets[0], targets[1]));
    
    targets = chooseTarget(3, chosenNodes);
    assertEquals(targets.length, 3);
    assertTrue(isOnSameRack(targets[0], dataNodes[0]));
    assertFalse(isOnSameRack(targets[0], targets[2]));
  }

  /**
   * This testcase tests re-replication, 
   * when dataNodes[0] and dataNodes[1] are already chosen.
   * So the 1st replica should be placed on a different rack than rack 1. 
   * the rest replicas can be placed randomly,
   * @throws Exception
   */
  @Test
  public void testRereplicate2() throws Exception {
    List<DatanodeStorageInfo> chosenNodes = new ArrayList<>();
    chosenNodes.add(storages[0]);
    chosenNodes.add(storages[1]);

    DatanodeStorageInfo[] targets;
    targets = chooseTarget(0, chosenNodes);
    assertEquals(targets.length, 0);
    
    targets = chooseTarget(1, chosenNodes);
    assertEquals(targets.length, 1);
    assertFalse(isOnSameRack(targets[0], dataNodes[0]));
    
    targets = chooseTarget(2, chosenNodes);
    assertEquals(targets.length, 2);
    assertFalse(isOnSameRack(targets[0], dataNodes[0]));
    assertFalse(isOnSameRack(targets[1], dataNodes[0]));
  }

  /**
   * This testcase tests re-replication, 
   * when dataNodes[0] and dataNodes[2] are already chosen.
   * So the 1st replica should be placed on the rack that the writer resides. 
   * the rest replicas can be placed randomly,
   * @throws Exception
   */
  @Test
  public void testRereplicate3() throws Exception {
    List<DatanodeStorageInfo> chosenNodes = new ArrayList<>();
    chosenNodes.add(storages[0]);
    chosenNodes.add(storages[2]);
    
    DatanodeStorageInfo[] targets;
    targets = chooseTarget(0, chosenNodes);
    assertEquals(targets.length, 0);
    
    targets = chooseTarget(1, chosenNodes);
    assertEquals(targets.length, 1);
    assertTrue(isOnSameRack(targets[0], dataNodes[0]));
    assertFalse(isOnSameRack(targets[0], dataNodes[2]));
    
    targets = chooseTarget(1, dataNodes[2], chosenNodes);
    assertEquals(targets.length, 1);
    assertTrue(isOnSameRack(targets[0], dataNodes[2]));
    assertFalse(isOnSameRack(targets[0], dataNodes[0]));

    targets = chooseTarget(2, chosenNodes);
    assertEquals(targets.length, 2);
    assertTrue(isOnSameRack(targets[0], dataNodes[0]));
    
    targets = chooseTarget(2, dataNodes[2], chosenNodes);
    assertEquals(targets.length, 2);
    assertTrue(isOnSameRack(targets[0], dataNodes[2]));
  }

  private BlockInfo genBlockInfo(long id) {
    return genBlockInfo(id, false);
  }

  private BlockInfo genBlockInfo(long id, boolean isBlockCorrupted) {
    BlockInfo bInfo = new BlockInfoContiguous(new Block(id), (short) 3);
    if (!isBlockCorrupted) {
      bInfo.setBlockCollectionId(mockINodeId.incrementAndGet());
    }
    return bInfo;
  }

  /**
   * Test for the high priority blocks are processed before the low priority
   * blocks.
   */
  @Test(timeout = 60000)
  public void testReplicationWithPriority() throws Exception {
    int DFS_NAMENODE_REPLICATION_INTERVAL = 1000;
    int HIGH_PRIORITY = 0;
    Configuration conf = new Configuration();
    conf.setInt(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY, 1);
    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2)
        .format(true).build();
    try {
      cluster.waitActive();
      final LowRedundancyBlocks neededReconstruction = cluster.getNameNode()
          .getNamesystem().getBlockManager().neededReconstruction;
      for (int i = 0; i < 100; i++) {
        // Adding the blocks directly to normal priority

        neededReconstruction.add(genBlockInfo(ThreadLocalRandom.current().
            nextLong(), true), 2, 0, 0, 3);
      }
      // Lets wait for the replication interval, to start process normal
      // priority blocks
      Thread.sleep(DFS_NAMENODE_REPLICATION_INTERVAL);
      
      // Adding the block directly to high priority list
      neededReconstruction.add(genBlockInfo(ThreadLocalRandom.current().
          nextLong(), true), 1, 0, 0, 3);

      // Lets wait for the replication interval
      Thread.sleep(DFS_NAMENODE_REPLICATION_INTERVAL);

      // Check replication completed successfully. Need not wait till it process
      // all the 100 normal blocks.
      assertFalse("Not able to clear the element from high priority list",
          neededReconstruction.iterator(HIGH_PRIORITY).hasNext());
    } finally {
      cluster.shutdown();
    }
  }
  
  /**
   * Test for the ChooseLowRedundancyBlocks are processed based on priority
   */
  @Test
  public void testChooseLowRedundancyBlocks() throws Exception {
    LowRedundancyBlocks lowRedundancyBlocks = new LowRedundancyBlocks();

    for (int i = 0; i < 5; i++) {
      // Adding QUEUE_HIGHEST_PRIORITY block
      lowRedundancyBlocks.add(genBlockInfo(ThreadLocalRandom.current().
          nextLong()), 1, 0, 0, 3);

      // Adding QUEUE_VERY_LOW_REDUNDANCY block
      lowRedundancyBlocks.add(genBlockInfo(ThreadLocalRandom.current().
          nextLong()), 2, 0, 0, 7);

      // Adding QUEUE_REPLICAS_BADLY_DISTRIBUTED block
      lowRedundancyBlocks.add(genBlockInfo(ThreadLocalRandom.current().
          nextLong()), 6, 0, 0, 6);

      // Adding QUEUE_LOW_REDUNDANCY block
      lowRedundancyBlocks.add(genBlockInfo(ThreadLocalRandom.current().
          nextLong()), 5, 0, 0, 6);

      // Adding QUEUE_WITH_CORRUPT_BLOCKS block
      lowRedundancyBlocks.add(genBlockInfo(ThreadLocalRandom.current().
          nextLong()), 0, 0, 0, 3);
    }

    // Choose 6 blocks from lowRedundancyBlocks. Then it should pick 5 blocks
    // from QUEUE_HIGHEST_PRIORITY and 1 block from QUEUE_VERY_LOW_REDUNDANCY.
    List<List<BlockInfo>> chosenBlocks =
        lowRedundancyBlocks.chooseLowRedundancyBlocks(6);
    assertTheChosenBlocks(chosenBlocks, 5, 1, 0, 0, 0);

    // Choose 10 blocks from lowRedundancyBlocks. Then it should pick 4 blocks
    // from QUEUE_VERY_LOW_REDUNDANCY, 5 blocks from QUEUE_LOW_REDUNDANCY and 1
    // block from QUEUE_REPLICAS_BADLY_DISTRIBUTED.
    chosenBlocks = lowRedundancyBlocks.chooseLowRedundancyBlocks(10);
    assertTheChosenBlocks(chosenBlocks, 0, 4, 5, 1, 0);

    // Adding QUEUE_HIGHEST_PRIORITY
    lowRedundancyBlocks.add(genBlockInfo(ThreadLocalRandom.current().
        nextLong()), 0, 1, 0, 3);

    // Choose 10 blocks from lowRedundancyBlocks. Then it should pick 1 block
    // from QUEUE_HIGHEST_PRIORITY, 4 blocks from
    // QUEUE_REPLICAS_BADLY_DISTRIBUTED
    chosenBlocks = lowRedundancyBlocks.chooseLowRedundancyBlocks(10);
    assertTheChosenBlocks(chosenBlocks, 1, 0, 0, 4);

    // Since it is reached to end of all lists,
    // should start picking the blocks from start.
    // Choose 7 blocks from lowRedundancyBlocks. Then it should pick 6 blocks
    // from QUEUE_HIGHEST_PRIORITY, 1 block from QUEUE_VERY_LOW_REDUNDANCY.
    chosenBlocks = lowRedundancyBlocks.chooseLowRedundancyBlocks(7);
    assertTheChosenBlocks(chosenBlocks, 6, 1, 0, 0, 0);
  }
  
  /** asserts the chosen blocks with expected priority blocks */
  private void assertTheChosenBlocks(
      List<List<BlockInfo>> chosenBlocks, int... expectedSizes) {
    int i = 0;
    for(; i < chosenBlocks.size(); i++) {
      assertEquals("Not returned the expected number for i=" + i,
          expectedSizes[i], chosenBlocks.get(i).size());
    }
    for(; i < expectedSizes.length; i++) {
      assertEquals("Expected size is non-zero for i=" + i, 0, expectedSizes[i]);
    }
  }
  
  /**
   * Test for the chooseReplicaToDelete are processed based on 
   * block locality and free space
   */
  @Test
  public void testChooseReplicaToDelete() throws Exception {
    List<DatanodeStorageInfo> replicaList = new ArrayList<>();
    final Map<String, List<DatanodeStorageInfo>> rackMap
        = new HashMap<String, List<DatanodeStorageInfo>>();

    storages[0].setRemainingForTests(4*1024*1024);
    dataNodes[0].setRemaining(calculateRemaining(dataNodes[0]));
    replicaList.add(storages[0]);

    storages[1].setRemainingForTests(3*1024*1024);
    dataNodes[1].setRemaining(calculateRemaining(dataNodes[1]));
    replicaList.add(storages[1]);

    storages[2].setRemainingForTests(2*1024*1024);
    dataNodes[2].setRemaining(calculateRemaining(dataNodes[2]));
    replicaList.add(storages[2]);

    //Even if this node has the most space, because the storage[5] has
    //the lowest it should be chosen in case of block delete.
    storages[4].setRemainingForTests(100 * 1024 * 1024);
    storages[5].setRemainingForTests(512 * 1024);
    dataNodes[5].setRemaining(calculateRemaining(dataNodes[5]));
    replicaList.add(storages[5]);

    // Refresh the last update time for all the datanodes
    for (int i = 0; i < dataNodes.length; i++) {
      DFSTestUtil.resetLastUpdatesWithOffset(dataNodes[i], 0);
    }

    List<DatanodeStorageInfo> first = new ArrayList<>();
    List<DatanodeStorageInfo> second = new ArrayList<>();
    replicator.splitNodesWithRack(replicaList, replicaList, rackMap, first,
        second);
    // storages[0] and storages[1] are in first set as their rack has two 
    // replica nodes, while storages[2] and dataNodes[5] are in second set.
    assertEquals(2, first.size());
    assertEquals(2, second.size());
    List<StorageType> excessTypes = new ArrayList<>();
    {
      // test returning null
      excessTypes.add(StorageType.SSD);
      assertNull(((BlockPlacementPolicyDefault) replicator)
          .chooseReplicaToDelete(first, second, excessTypes, rackMap));
    }
    excessTypes.add(StorageType.DEFAULT);
    DatanodeStorageInfo chosen = ((BlockPlacementPolicyDefault) replicator)
        .chooseReplicaToDelete(first, second, excessTypes, rackMap);
    // Within all storages, storages[5] with least free space
    assertEquals(chosen, storages[5]);

    replicator.adjustSetsWithChosenReplica(rackMap, first, second, chosen);
    assertEquals(2, first.size());
    assertEquals(1, second.size());
    // Within first set, storages[1] with less free space
    excessTypes.add(StorageType.DEFAULT);
    chosen = ((BlockPlacementPolicyDefault) replicator).chooseReplicaToDelete(
        first, second, excessTypes, rackMap);
    assertEquals(chosen, storages[1]);
  }

  /**
   * Test for the chooseReplicaToDelete are processed based on
   * EC and STRIPED Policy.
   */
  @Test
  public void testStripedChooseReplicaToDelete() throws Exception {
    List<DatanodeStorageInfo> replicaList = new ArrayList<>();
    List<DatanodeStorageInfo> candidate = new ArrayList<>();
    final Map<String, List<DatanodeStorageInfo>> rackMap
        = new HashMap<String, List<DatanodeStorageInfo>>();

    replicaList.add(storages[0]);
    replicaList.add(storages[1]);
    replicaList.add(storages[2]);
    replicaList.add(storages[4]);

    candidate.add(storages[0]);
    candidate.add(storages[2]);
    candidate.add(storages[4]);

    // Refresh the last update time for all the datanodes
    for (int i = 0; i < dataNodes.length; i++) {
      DFSTestUtil.resetLastUpdatesWithOffset(dataNodes[i], 0);
    }

    List<DatanodeStorageInfo> first = new ArrayList<>();
    List<DatanodeStorageInfo> second = new ArrayList<>();
    BlockPlacementPolicy policy = getStriptedPolicy();
    policy.splitNodesWithRack(replicaList, candidate, rackMap, first,
        second);
    // storages[0] is in first set as its rack has two replica nodes,
    // while storages[2] and dataNodes[4] are in second set.
    assertEquals(1, first.size());
    assertEquals(2, second.size());
    List<StorageType> excessTypes = new ArrayList<>();
    excessTypes.add(StorageType.DEFAULT);
    DatanodeStorageInfo chosen = ((BlockPlacementPolicyDefault) policy)
        .chooseReplicaToDelete(first, second, excessTypes, rackMap);
    // Within all storages, storages[0] is in the rack that has two replica blocks
    assertEquals(chosen, storages[0]);
    policy.adjustSetsWithChosenReplica(rackMap, first, second, chosen);
    assertEquals(0, first.size());
    assertEquals(2, second.size());

    // Within second set, storages[2] should be next to be deleted in order.
    excessTypes.add(StorageType.DEFAULT);
    chosen = ((BlockPlacementPolicyDefault) policy).chooseReplicaToDelete(
        first, second, excessTypes, rackMap);
    assertEquals(chosen, storages[2]);
    policy.adjustSetsWithChosenReplica(rackMap, first, second, chosen);
    assertEquals(0, first.size());
    assertEquals(1, second.size());

    chosen = ((BlockPlacementPolicyDefault) policy).chooseReplicaToDelete(
        first, second, excessTypes, rackMap);
    assertEquals(chosen, null);
  }

  private long calculateRemaining(DatanodeDescriptor dataNode) {
    long sum = 0;
    for (DatanodeStorageInfo storageInfo: dataNode.getStorageInfos()){
      sum += storageInfo.getRemaining();
    }
    return sum;
  }

  @Test
  public void testChooseReplicasToDelete() throws Exception {
    Collection<DatanodeStorageInfo> nonExcess = new ArrayList<>();
    nonExcess.add(storages[0]);
    nonExcess.add(storages[1]);
    nonExcess.add(storages[2]);
    nonExcess.add(storages[3]);
    List<DatanodeStorageInfo> excessReplicas;
    BlockStoragePolicySuite POLICY_SUITE = BlockStoragePolicySuite
        .createDefaultSuite();
    BlockStoragePolicy storagePolicy = POLICY_SUITE.getDefaultPolicy();
    DatanodeStorageInfo excessSSD = DFSTestUtil.createDatanodeStorageInfo(
        "Storage-excess-SSD-ID", "localhost",
        storages[0].getDatanodeDescriptor().getNetworkLocation(),
        "foo.com", StorageType.SSD, null);
    updateHeartbeatWithUsage(excessSSD.getDatanodeDescriptor(),
        2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
        2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0,
        0);

    // use delete hint case.

    DatanodeDescriptor delHintNode = storages[0].getDatanodeDescriptor();
    List<StorageType> excessTypes = storagePolicy.chooseExcess((short) 3,
        DatanodeStorageInfo.toStorageTypes(nonExcess));
    excessReplicas = replicator.chooseReplicasToDelete(nonExcess, nonExcess, 3,
        excessTypes, storages[3].getDatanodeDescriptor(), delHintNode);
    assertTrue(excessReplicas.size() == 1);
    assertTrue(excessReplicas.contains(storages[0]));

    // Excess type deletion

    DatanodeStorageInfo excessStorage = DFSTestUtil.createDatanodeStorageInfo(
        "Storage-excess-ID", "localhost", delHintNode.getNetworkLocation(),
        "foo.com", StorageType.ARCHIVE, null);
    nonExcess.add(excessStorage);
    excessTypes = storagePolicy.chooseExcess((short) 3,
        DatanodeStorageInfo.toStorageTypes(nonExcess));
    excessReplicas = replicator.chooseReplicasToDelete(nonExcess, nonExcess, 3,
        excessTypes, storages[3].getDatanodeDescriptor(), null);
    assertTrue(excessReplicas.contains(excessStorage));

    // The block was initially created on excessSSD(rack r1),
    // storages[4](rack r3) and storages[5](rack r3) with
    // ONESSD_STORAGE_POLICY_NAME storage policy. Replication factor = 3.
    // Right after balancer moves the block from storages[5] to
    // storages[3](rack r2), the application changes the storage policy from
    // ONESSD_STORAGE_POLICY_NAME to HOT_STORAGE_POLICY_ID. In this case,
    // we should be able to delete excessSSD since the remaining
    // storages ({storages[3]}, {storages[4], storages[5]})
    // are on different racks (r2, r3).
    nonExcess.clear();
    nonExcess.add(excessSSD);
    nonExcess.add(storages[3]);
    nonExcess.add(storages[4]);
    nonExcess.add(storages[5]);
    excessTypes = storagePolicy.chooseExcess((short) 3,
        DatanodeStorageInfo.toStorageTypes(nonExcess));
    excessReplicas = replicator.chooseReplicasToDelete(nonExcess, nonExcess, 3,
        excessTypes, storages[3].getDatanodeDescriptor(),
        storages[5].getDatanodeDescriptor());
    assertEquals(1, excessReplicas.size());
    assertTrue(excessReplicas.contains(excessSSD));

    // Similar to above, but after policy change and before deletion,
    // the replicas are located on excessSSD(rack r1), storages[1](rack r1),
    // storages[2](rack r2) and storages[3](rack r2). Replication factor = 3.
    // In this case, we should be able to delete excessSSD since the remaining
    // storages ({storages[1]} , {storages[2], storages[3]})
    // are on different racks (r1, r2).
    nonExcess.clear();
    nonExcess.add(excessSSD);
    nonExcess.add(storages[1]);
    nonExcess.add(storages[2]);
    nonExcess.add(storages[3]);
    excessTypes = storagePolicy.chooseExcess((short) 3,
        DatanodeStorageInfo.toStorageTypes(nonExcess));
    excessReplicas = replicator.chooseReplicasToDelete(nonExcess, nonExcess, 3,
        excessTypes, storages[1].getDatanodeDescriptor(),
        storages[3].getDatanodeDescriptor());
    assertEquals(1, excessReplicas.size());
    assertTrue(excessReplicas.contains(excessSSD));

    // Similar to above, but after policy change and before deletion,
    // the replicas are located on excessSSD(rack r1), storages[2](rack r2)
    // Replication factor = 1. We should be able to delete excessSSD.
    nonExcess.clear();
    nonExcess.add(excessSSD);
    nonExcess.add(storages[2]);
    excessTypes = storagePolicy.chooseExcess((short) 1,
        DatanodeStorageInfo.toStorageTypes(nonExcess));
    excessReplicas = replicator.chooseReplicasToDelete(nonExcess, nonExcess, 1,
        excessTypes, storages[2].getDatanodeDescriptor(), null);
    assertEquals(1, excessReplicas.size());
    assertTrue(excessReplicas.contains(excessSSD));

    // The block was initially created on excessSSD(rack r1),
    // storages[4](rack r3) and storages[5](rack r3) with
    // ONESSD_STORAGE_POLICY_NAME storage policy. Replication factor = 2.
    // In this case, no replica can be chosen as the excessive replica by
    // chooseReplicasToDelete because if the SSD storage is deleted,
    // the remaining storages[4] and storages[5] are the same rack (r3),
    // violating block placement policy (i.e. the number of racks >= 2).
    // TODO BlockPlacementPolicyDefault should be able to rebalance the replicas
    // and then delete excessSSD.
    nonExcess.clear();
    nonExcess.add(excessSSD);
    nonExcess.add(storages[4]);
    nonExcess.add(storages[5]);
    excessTypes = storagePolicy.chooseExcess((short) 2,
        DatanodeStorageInfo.toStorageTypes(nonExcess));
    excessReplicas = replicator.chooseReplicasToDelete(nonExcess, nonExcess, 2,
        excessTypes, null, null);
    assertEquals(0, excessReplicas.size());
  }

 @Test
  public void testUseDelHint() throws Exception {
    List<StorageType> excessTypes = new ArrayList<>();
    excessTypes.add(StorageType.ARCHIVE);
   BlockPlacementPolicyDefault policyDefault =
       (BlockPlacementPolicyDefault) replicator;
    // no delHint
    assertFalse(policyDefault.useDelHint(null, null, null, null, null));
    // delHint storage type is not an excess type
    assertFalse(policyDefault.useDelHint(storages[0], null, null, null,
        excessTypes));
    // check if removing delHint reduces the number of racks
    List<DatanodeStorageInfo> moreThanOne = new ArrayList<>();
    moreThanOne.add(storages[0]);
    moreThanOne.add(storages[1]);
    List<DatanodeStorageInfo> exactlyOne = new ArrayList<>();
    exactlyOne.add(storages[3]);
    exactlyOne.add(storages[5]);

    excessTypes.add(StorageType.DEFAULT);
    assertTrue(policyDefault.useDelHint(storages[0], null, moreThanOne,
            exactlyOne, excessTypes));
    // the added node adds a new rack
    assertTrue(policyDefault.useDelHint(storages[3], storages[5], moreThanOne,
        exactlyOne, excessTypes));
    // removing delHint reduces the number of racks;
    assertFalse(policyDefault.useDelHint(storages[3], storages[0], moreThanOne,
        exactlyOne, excessTypes));
    assertFalse(policyDefault.useDelHint(storages[3], null, moreThanOne,
        exactlyOne, excessTypes));
  }

  @Test
  public void testIsMovable() throws Exception {
    List<DatanodeInfo> candidates = new ArrayList<>();

    // after the move, the number of racks remains 2.
    candidates.add(dataNodes[0]);
    candidates.add(dataNodes[1]);
    candidates.add(dataNodes[2]);
    candidates.add(dataNodes[3]);
    assertTrue(replicator.isMovable(candidates, dataNodes[0], dataNodes[3]));

    // after the move, the number of racks remains 3.
    candidates.clear();
    candidates.add(dataNodes[0]);
    candidates.add(dataNodes[1]);
    candidates.add(dataNodes[2]);
    candidates.add(dataNodes[4]);
    assertTrue(replicator.isMovable(candidates, dataNodes[0], dataNodes[1]));

    // after the move, the number of racks changes from 2 to 3.
    candidates.clear();
    candidates.add(dataNodes[0]);
    candidates.add(dataNodes[1]);
    candidates.add(dataNodes[2]);
    candidates.add(dataNodes[4]);
    assertTrue(replicator.isMovable(candidates, dataNodes[0], dataNodes[4]));

    // the move would have reduced the number of racks from 3 to 2.
    candidates.clear();
    candidates.add(dataNodes[0]);
    candidates.add(dataNodes[2]);
    candidates.add(dataNodes[3]);
    candidates.add(dataNodes[4]);
    assertFalse(replicator.isMovable(candidates, dataNodes[0], dataNodes[3]));
  }

  /**
   * This testcase tests whether the default value returned by
   * DFSUtil.getInvalidateWorkPctPerIteration() is positive, 
   * and whether an IllegalArgumentException will be thrown 
   * when 0.0f is retrieved
   */
  @Test
  public void testGetInvalidateWorkPctPerIteration() {
    Configuration conf = new Configuration();
    float blocksInvalidateWorkPct = DFSUtil
        .getInvalidateWorkPctPerIteration(conf);
    assertTrue(blocksInvalidateWorkPct > 0);

    conf.set(DFSConfigKeys.DFS_NAMENODE_INVALIDATE_WORK_PCT_PER_ITERATION,
        "0.5f");
    blocksInvalidateWorkPct = DFSUtil.getInvalidateWorkPctPerIteration(conf);
    assertEquals(blocksInvalidateWorkPct, 0.5f, blocksInvalidateWorkPct * 1e-7);
    
    conf.set(DFSConfigKeys.
        DFS_NAMENODE_INVALIDATE_WORK_PCT_PER_ITERATION, "1.0f");
    blocksInvalidateWorkPct = DFSUtil.getInvalidateWorkPctPerIteration(conf);
    assertEquals(blocksInvalidateWorkPct, 1.0f, blocksInvalidateWorkPct * 1e-7);
    
    conf.set(DFSConfigKeys.
        DFS_NAMENODE_INVALIDATE_WORK_PCT_PER_ITERATION, "0.0f");
    exception.expect(IllegalArgumentException.class);
    blocksInvalidateWorkPct = DFSUtil.getInvalidateWorkPctPerIteration(conf);
  }
  
  /**
   * This testcase tests whether an IllegalArgumentException 
   * will be thrown when a negative value is retrieved by 
   * DFSUtil#getInvalidateWorkPctPerIteration
   */
  @Test
  public void testGetInvalidateWorkPctPerIteration_NegativeValue() {
    Configuration conf = new Configuration();
    float blocksInvalidateWorkPct = DFSUtil
        .getInvalidateWorkPctPerIteration(conf);
    assertTrue(blocksInvalidateWorkPct > 0);
    
    conf.set(DFSConfigKeys.
        DFS_NAMENODE_INVALIDATE_WORK_PCT_PER_ITERATION, "-0.5f");
    exception.expect(IllegalArgumentException.class);
    blocksInvalidateWorkPct = DFSUtil.getInvalidateWorkPctPerIteration(conf);
  }
  
  /**
   * This testcase tests whether an IllegalArgumentException 
   * will be thrown when a value greater than 1 is retrieved by 
   * DFSUtil#getInvalidateWorkPctPerIteration
   */
  @Test
  public void testGetInvalidateWorkPctPerIteration_GreaterThanOne() {
    Configuration conf = new Configuration();
    float blocksInvalidateWorkPct = DFSUtil
        .getInvalidateWorkPctPerIteration(conf);
    assertTrue(blocksInvalidateWorkPct > 0);
    
    conf.set(DFSConfigKeys.
        DFS_NAMENODE_INVALIDATE_WORK_PCT_PER_ITERATION, "1.5f");
    exception.expect(IllegalArgumentException.class);
    blocksInvalidateWorkPct = DFSUtil.getInvalidateWorkPctPerIteration(conf);
  }

  /**
   * This testcase tests whether the value returned by
   * DFSUtil.getReplWorkMultiplier() is positive,
   * and whether an IllegalArgumentException will be thrown 
   * when a non-positive value is retrieved
   */
  @Test
  public void testGetReplWorkMultiplier() {
    Configuration conf = new Configuration();
    int blocksReplWorkMultiplier = DFSUtil.getReplWorkMultiplier(conf);
    assertTrue(blocksReplWorkMultiplier > 0);

    conf.set(DFSConfigKeys.
        DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION,"3");
    blocksReplWorkMultiplier = DFSUtil.getReplWorkMultiplier(conf);
    assertEquals(blocksReplWorkMultiplier, 3);
    
    conf.set(DFSConfigKeys.
        DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION,"-1");
    exception.expect(IllegalArgumentException.class);
    blocksReplWorkMultiplier = DFSUtil.getReplWorkMultiplier(conf);
  }

  @Test(timeout = 60000)
  public void testUpdateDoesNotCauseSkippedReplication() {
    LowRedundancyBlocks lowRedundancyBlocks = new LowRedundancyBlocks();

    BlockInfo block1 = genBlockInfo(ThreadLocalRandom.current().nextLong());
    BlockInfo block2 = genBlockInfo(ThreadLocalRandom.current().nextLong());
    BlockInfo block3 = genBlockInfo(ThreadLocalRandom.current().nextLong());

    // Adding QUEUE_VERY_LOW_REDUNDANCY block
    final int block1CurReplicas = 2;
    final int block1ExpectedReplicas = 7;
    lowRedundancyBlocks.add(block1, block1CurReplicas, 0, 0,
        block1ExpectedReplicas);

    // Adding QUEUE_VERY_LOW_REDUNDANCY block
    lowRedundancyBlocks.add(block2, 2, 0, 0, 7);

    // Adding QUEUE_LOW_REDUNDANCY block
    lowRedundancyBlocks.add(block3, 2, 0, 0, 6);

    List<List<BlockInfo>> chosenBlocks;

    // Choose 1 block from lowRedundancyBlocks. Then it should pick 1 block
    // from QUEUE_VERY_LOW_REDUNDANCY.
    chosenBlocks = lowRedundancyBlocks.chooseLowRedundancyBlocks(1);
    assertTheChosenBlocks(chosenBlocks, 0, 1, 0, 0, 0);

    // Increasing the replications will move the block down a
    // priority.  This simulates a replica being completed in between checks.
    lowRedundancyBlocks.update(block1, block1CurReplicas+1, 0, 0,
        block1ExpectedReplicas, 1, 0);

    // Choose 1 block from lowRedundancyBlocks. Then it should pick 1 block
    // from QUEUE_VERY_LOW_REDUNDANCY.
    // This block was moved up a priority and should not be skipped over.
    chosenBlocks = lowRedundancyBlocks.chooseLowRedundancyBlocks(1);
    assertTheChosenBlocks(chosenBlocks, 0, 1, 0, 0, 0);

    // Choose 1 block from lowRedundancyBlocks. Then it should pick 1 block
    // from QUEUE_LOW_REDUNDANCY.
    chosenBlocks = lowRedundancyBlocks.chooseLowRedundancyBlocks(1);
    assertTheChosenBlocks(chosenBlocks, 0, 0, 1, 0, 0);
  }

  @Test(timeout = 60000)
  public void testAddStoredBlockDoesNotCauseSkippedReplication()
      throws IOException {
    FSNamesystem mockNS = mock(FSNamesystem.class);
    when(mockNS.hasWriteLock()).thenReturn(true);
    when(mockNS.hasReadLock()).thenReturn(true);
    when(mockNS.hasWriteLock(RwLockMode.GLOBAL)).thenReturn(true);
    when(mockNS.hasReadLock(RwLockMode.GLOBAL)).thenReturn(true);
    when(mockNS.hasWriteLock(RwLockMode.BM)).thenReturn(true);
    when(mockNS.hasReadLock(RwLockMode.BM)).thenReturn(true);
    when(mockNS.hasWriteLock(RwLockMode.FS)).thenReturn(true);
    when(mockNS.hasReadLock(RwLockMode.FS)).thenReturn(true);
    BlockManager bm = new BlockManager(mockNS, false, new HdfsConfiguration());
    LowRedundancyBlocks lowRedundancyBlocks = bm.neededReconstruction;

    BlockInfo block1 = genBlockInfo(ThreadLocalRandom.current().nextLong());
    BlockInfo block2 = genBlockInfo(ThreadLocalRandom.current().nextLong());

    // Adding QUEUE_LOW_REDUNDANCY block
    lowRedundancyBlocks.add(block1, 0, 0, 1, 1);

    // Adding QUEUE_LOW_REDUNDANCY block
    lowRedundancyBlocks.add(block2, 0, 0, 1, 1);

    List<List<BlockInfo>> chosenBlocks;

    // Choose 1 block from lowRedundancyBlocks. Then it should pick 1 block
    // from QUEUE_VERY_LOW_REDUNDANCY.
    chosenBlocks = lowRedundancyBlocks.chooseLowRedundancyBlocks(1);
    assertTheChosenBlocks(chosenBlocks, 1, 0, 0, 0, 0);

    // Adding this block collection to the BlockManager, so that when we add
    // block under construction, the BlockManager will realize the expected
    // replication has been achieved and remove it from the low redundancy
    // queue.
    BlockInfoContiguous info = new BlockInfoContiguous(block1, (short) 1);
    info.convertToBlockUnderConstruction(BlockUCState.UNDER_CONSTRUCTION, null);
    info.setBlockCollectionId(1000L);

    final INodeFile file = TestINodeFile.createINodeFile(1000L);
    when(mockNS.getBlockCollection(1000L)).thenReturn(file);
    bm.addBlockCollection(info, file);

    // Adding this block will increase its current replication, and that will
    // remove it from the queue.
    bm.addStoredBlockUnderConstruction(new StatefulBlockInfo(info, info,
        ReplicaState.FINALIZED), storages[0]);

    // Choose 1 block from UnderReplicatedBlocks. Then it should pick 1 block
    // from QUEUE_VERY_LOW_REDUNDANCY.
    // This block remains and should not be skipped over.
    chosenBlocks = lowRedundancyBlocks.chooseLowRedundancyBlocks(1);
    assertTheChosenBlocks(chosenBlocks, 1, 0, 0, 0, 0);
  }

  @Test(timeout = 60000)
  public void
      testConvertLastBlockToUnderConstructionDoesNotCauseSkippedReplication()
          throws IOException {
    Namesystem mockNS = mock(Namesystem.class);
    when(mockNS.hasWriteLock()).thenReturn(true);
    when(mockNS.hasWriteLock(RwLockMode.BM)).thenReturn(true);

    BlockManager bm = new BlockManager(mockNS, false, new HdfsConfiguration());
    LowRedundancyBlocks lowRedundancyBlocks = bm.neededReconstruction;

    long blkID1 = ThreadLocalRandom.current().nextLong();
    if (blkID1 < 0) {
      blkID1 *= -1;
    }
    long blkID2 = ThreadLocalRandom.current().nextLong();
    if (blkID2 < 0) {
      blkID2 *= -1;
    }

    BlockInfo block1 = genBlockInfo(blkID1);
    BlockInfo block2 = genBlockInfo(blkID2);

    // Adding QUEUE_LOW_REDUNDANCY block
    lowRedundancyBlocks.add(block1, 0, 0, 1, 1);

    // Adding QUEUE_LOW_REDUNDANCY block
    lowRedundancyBlocks.add(block2, 0, 0, 1, 1);

    List<List<BlockInfo>> chosenBlocks;

    // Choose 1 block from lowRedundancyBlocks. Then it should pick 1 block
    // from QUEUE_VERY_LOW_REDUNDANCY.
    chosenBlocks = lowRedundancyBlocks.chooseLowRedundancyBlocks(1);
    assertTheChosenBlocks(chosenBlocks, 1, 0, 0, 0, 0);

    final BlockInfoContiguous info = new BlockInfoContiguous(block1, (short) 1);
    final BlockCollection mbc = mock(BlockCollection.class);
    when(mbc.getId()).thenReturn(1000L);
    when(mbc.getLastBlock()).thenReturn(info);
    when(mbc.getPreferredBlockSize()).thenReturn(block1.getNumBytes() + 1);
    when(mbc.isUnderConstruction()).thenReturn(true);
    ContentSummary cs = mock(ContentSummary.class);
    when(cs.getLength()).thenReturn((long)1);
    when(mbc.computeContentSummary(bm.getStoragePolicySuite())).thenReturn(cs);
    info.setBlockCollectionId(1000);
    bm.addBlockCollection(info, mbc);

    DatanodeStorageInfo[] storageAry = {new DatanodeStorageInfo(
        dataNodes[0], new DatanodeStorage("s1"))};
    info.convertToBlockUnderConstruction(BlockUCState.UNDER_CONSTRUCTION,
        storageAry);
    DatanodeStorageInfo storage = mock(DatanodeStorageInfo.class);
    DatanodeDescriptor dn = mock(DatanodeDescriptor.class);
    when(dn.isDecommissioned()).thenReturn(true);
    when(storage.getState()).thenReturn(DatanodeStorage.State.NORMAL);
    when(storage.getDatanodeDescriptor()).thenReturn(dn);
    when(storage.removeBlock(any(BlockInfo.class))).thenReturn(true);
    when(storage.addBlock(any(BlockInfo.class))).thenReturn
        (DatanodeStorageInfo.AddBlockResult.ADDED);
    info.addStorage(storage, info);

    BlockInfo lastBlk = mbc.getLastBlock();
    when(mbc.getLastBlock()).thenReturn(lastBlk, info);

    bm.convertLastBlockToUnderConstruction(mbc, 0L);

    // Choose 1 block from lowRedundancyBlocks. Then it should pick 1 block
    // from QUEUE_VERY_LOW_REDUNDANCY.
    // This block remains and should not be skipped over.
    chosenBlocks = lowRedundancyBlocks.chooseLowRedundancyBlocks(1);
    assertTheChosenBlocks(chosenBlocks, 1, 0, 0, 0, 0);
  }

  @Test(timeout = 60000)
  public void testupdateNeededReplicationsDoesNotCauseSkippedReplication()
      throws IOException {
    Namesystem mockNS = mock(Namesystem.class);
    when(mockNS.hasReadLock()).thenReturn(true);

    BlockManager bm = new BlockManager(mockNS, false, new HdfsConfiguration());
    LowRedundancyBlocks lowRedundancyBlocks = bm.neededReconstruction;

    BlockInfo block1 = genBlockInfo(ThreadLocalRandom.current().nextLong());
    BlockInfo block2 = genBlockInfo(ThreadLocalRandom.current().nextLong());

    // Adding QUEUE_LOW_REDUNDANCY block
    lowRedundancyBlocks.add(block1, 0, 0, 1, 1);

    // Adding QUEUE_LOW_REDUNDANCY block
    lowRedundancyBlocks.add(block2, 0, 0, 1, 1);

    List<List<BlockInfo>> chosenBlocks;

    // Choose 1 block from lowRedundancyBlocks. Then it should pick 1 block
    // from QUEUE_VERY_LOW_REDUNDANCY.
    chosenBlocks = lowRedundancyBlocks.chooseLowRedundancyBlocks(1);
    assertTheChosenBlocks(chosenBlocks, 1, 0, 0, 0, 0);

    bm.setReplication((short)0, (short)1, block1);

    // Choose 1 block from UnderReplicatedBlocks. Then it should pick 1 block
    // from QUEUE_VERY_LOW_REDUNDANCY.
    // This block remains and should not be skipped over.
    chosenBlocks = lowRedundancyBlocks.chooseLowRedundancyBlocks(1);
    assertTheChosenBlocks(chosenBlocks, 1, 0, 0, 0, 0);
  }

  /**
   * In this testcase, passed 2 favored nodes dataNodes[0],dataNodes[1]
   *
   * Both favored nodes should be chosen as target for placing replication and
   * then should fall into BlockPlacement policy for choosing remaining targets
   * ie. third target as local writer rack , forth target on remote rack and
   * fifth on same rack as second.
   *
   * @throws Exception
   */
  @Test
  public void testChooseExcessReplicaApartFromFavoredNodes() throws Exception {
    DatanodeStorageInfo[] targets;
    List<DatanodeDescriptor> expectedTargets =
        new ArrayList<DatanodeDescriptor>();
    expectedTargets.add(dataNodes[0]);
    expectedTargets.add(dataNodes[1]);
    expectedTargets.add(dataNodes[2]);
    expectedTargets.add(dataNodes[4]);
    expectedTargets.add(dataNodes[5]);
    List<DatanodeDescriptor> favouredNodes =
        new ArrayList<DatanodeDescriptor>();
    favouredNodes.add(dataNodes[0]);
    favouredNodes.add(dataNodes[1]);
    targets = chooseTarget(5, dataNodes[2], null, favouredNodes);
    assertEquals(targets.length, 5);
    for (int i = 0; i < targets.length; i++) {
      assertTrue("Target should be a part of Expected Targets",
          expectedTargets.contains(targets[i].getDatanodeDescriptor()));
    }
  }

  @Test
  public void testChooseFromFavoredNodesWhenPreferLocalSetToFalse() {
    ((BlockPlacementPolicyDefault) replicator).setPreferLocalNode(false);
    try {
      DatanodeStorageInfo[] targets;
      List<DatanodeDescriptor> expectedTargets = new ArrayList<>();
      expectedTargets.add(dataNodes[0]);
      expectedTargets.add(dataNodes[2]);
      List<DatanodeDescriptor> favouredNodes = new ArrayList<>();
      favouredNodes.add(dataNodes[0]);
      favouredNodes.add(dataNodes[2]);
      targets = chooseTarget(2, dataNodes[3], null,
          favouredNodes);
      assertEquals(targets.length, 2);
      for (int i = 0; i < targets.length; i++) {
        assertTrue("Target should be a part of Expected Targets",
            expectedTargets.contains(targets[i].getDatanodeDescriptor()));
      }
    } finally {
      ((BlockPlacementPolicyDefault) replicator).setPreferLocalNode(true);
    }
  }

  private DatanodeStorageInfo[] chooseTarget(int numOfReplicas,
      DatanodeDescriptor writer, Set<Node> excludedNodes,
      List<DatanodeDescriptor> favoredNodes) {
    return chooseTarget(numOfReplicas, writer, excludedNodes,
        favoredNodes, null);
  }

  private DatanodeStorageInfo[] chooseTarget(int numOfReplicas,
    DatanodeDescriptor writer, Set<Node> excludedNodes,
        List<DatanodeDescriptor> favoredNodes, EnumSet<AddBlockFlag> flags) {
    return replicator.chooseTarget(filename, numOfReplicas, writer,
        excludedNodes, BLOCK_SIZE, favoredNodes,
        TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY, flags);
  }

  @Test
  public void testAvoidLocalWrite() throws IOException {
    DatanodeDescriptor writer = dataNodes[2];
    EnumSet<AddBlockFlag> flags = EnumSet.of(AddBlockFlag.NO_LOCAL_WRITE);
    DatanodeStorageInfo[] targets;
    targets = chooseTarget(5, writer, null, null, flags);
    for (DatanodeStorageInfo info : targets) {
      assertNotEquals(info.getDatanodeDescriptor(), writer);
    }
  }

  @Test
  public void testAvoidLocalWriteNoEnoughNodes() throws IOException {
    DatanodeDescriptor writer = dataNodes[2];
    EnumSet<AddBlockFlag> flags = EnumSet.of(AddBlockFlag.NO_LOCAL_WRITE);
    DatanodeStorageInfo[] targets;
    targets = chooseTarget(6, writer, null, null, flags);
    assertEquals(6, targets.length);
    boolean found = false;
    for (DatanodeStorageInfo info : targets) {
      if (info.getDatanodeDescriptor().equals(writer)) {
        found = true;
      }
    }
    assertTrue(found);
  }

  @Test
  public void testMaxLoad() {
    FSClusterStats statistics = mock(FSClusterStats.class);
    DatanodeDescriptor node = mock(DatanodeDescriptor.class);

    when(statistics.getInServiceXceiverAverage()).thenReturn(0.0);
    when(node.getXceiverCount()).thenReturn(1);

    final Configuration conf = new Configuration();
    final Class<? extends BlockPlacementPolicy> replicatorClass = conf
        .getClass(DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY,
            DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_DEFAULT,
            BlockPlacementPolicy.class);
    BlockPlacementPolicy bpp = ReflectionUtils.
        newInstance(replicatorClass, conf);
    assertTrue(bpp instanceof  BlockPlacementPolicyDefault);

    BlockPlacementPolicyDefault bppd = (BlockPlacementPolicyDefault) bpp;
    bppd.initialize(conf, statistics, null, null);
    assertFalse(bppd.excludeNodeByLoad(node));

    when(statistics.getInServiceXceiverAverage()).thenReturn(1.0);
    when(node.getXceiverCount()).thenReturn(10);
    assertTrue(bppd.excludeNodeByLoad(node));

    // Enable load check per storage type.
    conf.setBoolean(DFS_NAMENODE_REDUNDANCY_CONSIDERLOADBYSTORAGETYPE_KEY,
        true);
    bppd.initialize(conf, statistics, null, null);
    Map<StorageType, StorageTypeStats> storageStats = new HashMap<>();
    StorageTypeStats diskStorageTypeStats =
        new StorageTypeStats(StorageType.DISK);

    // Set xceiver count as 500 for DISK.
    diskStorageTypeStats.setDataNodesInServiceXceiverCount(50, 10);
    storageStats.put(StorageType.DISK, diskStorageTypeStats);

    //Set xceiver count as 900 for ARCHIVE
    StorageTypeStats archiveStorageTypeStats =
        new StorageTypeStats(StorageType.ARCHIVE);
    archiveStorageTypeStats.setDataNodesInServiceXceiverCount(10, 90);
    storageStats.put(StorageType.ARCHIVE, diskStorageTypeStats);

    when(statistics.getStorageTypeStats()).thenReturn(storageStats);
    when(node.getXceiverCount()).thenReturn(29);
    when(node.getStorageTypes()).thenReturn(EnumSet.of(StorageType.DISK));
    when(statistics.getInServiceXceiverAverage()).thenReturn(0.0);
    //Added for sanity, the number of datanodes are 100, the average xceiver
    // shall be (50*100+90*100)/100 = 14
    when(statistics.getInServiceXceiverAverage()).thenReturn(14.0);
    when(node.getXceiverCount()).thenReturn(100);

    assertFalse(bppd.excludeNodeByLoad(node));
  }

  @Test
  public void testChosenFailureForStorageType() {
    final LogVerificationAppender appender = new LogVerificationAppender();
    final Logger logger = Logger.getRootLogger();
    logger.addAppender(appender);

    DatanodeStorageInfo[] targets = replicator.chooseTarget(filename, 1,
        dataNodes[0], new ArrayList<DatanodeStorageInfo>(), false, null,
        BLOCK_SIZE, TestBlockStoragePolicy.POLICY_SUITE.getPolicy(
            HdfsConstants.StoragePolicy.COLD.value()), null);
    assertEquals(0, targets.length);
    assertNotEquals(0,
        appender.countLinesWithMessage("NO_REQUIRED_STORAGE_TYPE"));
  }

  @Test
  public void testReduceChooseTimesIfNOStaleNode() {
    for(int i = 0; i < 6; i++) {
      updateHeartbeatWithUsage(dataNodes[i],
          2 * HdfsServerConstants.MIN_BLOCKS_FOR_WRITE * BLOCK_SIZE, 0L,
          (HdfsServerConstants.MIN_BLOCKS_FOR_WRITE - 1) * BLOCK_SIZE,
          0L, 0L, 0L, 0, 0);
    }
    assertFalse(dnManager.shouldAvoidStaleDataNodesForWrite());
    resetHeartbeatForStorages();
  }

  @Test
  public void testChosenFailureForNotEnoughStorageSpace() {
    final LogVerificationAppender appender = new LogVerificationAppender();
    final Logger logger = Logger.getRootLogger();
    logger.addAppender(appender);

    // Set all datanode storage remaining space is 1 * BLOCK_SIZE.
    for(int i = 0; i < dataNodes.length; i++) {
      updateHeartbeatWithUsage(dataNodes[i], BLOCK_SIZE, 0L, BLOCK_SIZE,
          0L, 0L, 0L, 0, 0);
    }

    // Set chooseStorage4Block required the minimum number of blocks is 2.
    replicator.setMinBlocksForWrite(2);
    DatanodeStorageInfo[] targets = chooseTarget(1, dataNodes[1],
        new ArrayList<DatanodeStorageInfo>(), null);
    assertEquals(0, targets.length);
    assertNotEquals(0,
        appender.countLinesWithMessage("NOT_ENOUGH_STORAGE_SPACE"));

    resetHeartbeatForStorages();
  }
}