TestReconstructStripedBlocksWithRackAwareness.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.hdfs.server.blockmanagement;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.StripedFileTestUtil;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.INodeFile;
import org.apache.hadoop.hdfs.util.RwLockMode;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.Whitebox;
import org.junit.After;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.event.Level;

import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;

public class TestReconstructStripedBlocksWithRackAwareness {
  public static final Logger LOG = LoggerFactory.getLogger(
      TestReconstructStripedBlocksWithRackAwareness.class);

  static {
    GenericTestUtils.setLogLevel(BlockPlacementPolicy.LOG, Level.TRACE);
    GenericTestUtils.setLogLevel(BlockManager.blockLog, Level.TRACE);
    GenericTestUtils.setLogLevel(BlockManager.LOG, Level.TRACE);
  }

  private final ErasureCodingPolicy ecPolicy =
      StripedFileTestUtil.getDefaultECPolicy();
  private final int cellSize = ecPolicy.getCellSize();
  private final short dataBlocks = (short) ecPolicy.getNumDataUnits();
  private final short parityBlocks = (short) ecPolicy.getNumParityUnits();
  private final String[] hosts = getHosts(dataBlocks + parityBlocks + 1);
  private final String[] racks =
      getRacks(dataBlocks + parityBlocks + 1, dataBlocks);

  private static String[] getHosts(int numHosts) {
    String[] hosts = new String[numHosts];
    for (int i = 0; i < hosts.length; i++) {
      hosts[i] = "host" + (i + 1);
    }
    return hosts;
  }

  private static String[] getRacks(int numHosts, int numRacks) {
    String[] racks = new String[numHosts];
    int numHostEachRack = numHosts / numRacks;
    int residue = numHosts % numRacks;
    int j = 0;
    for (int i = 1; i <= numRacks; i++) {
      int limit = i <= residue ? numHostEachRack + 1 : numHostEachRack;
      for (int k = 0; k < limit; k++) {
        racks[j++] = "/r" + i;
      }
    }
    assert j == numHosts;
    return racks;
  }

  private MiniDFSCluster cluster;
  private static final HdfsConfiguration conf = new HdfsConfiguration();
  private DistributedFileSystem fs;

  @BeforeClass
  public static void setup() throws Exception {
    conf.setInt(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY, 1);
    conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOAD_KEY,
        false);
    conf.setInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY, 1);
  }

  @After
  public void tearDown() {
    if (cluster != null) {
      cluster.shutdown();
    }
  }

  private MiniDFSCluster.DataNodeProperties stopDataNode(String hostname)
      throws IOException {
    MiniDFSCluster.DataNodeProperties dnProp = null;
    for (int i = 0; i < cluster.getDataNodes().size(); i++) {
      DataNode dn = cluster.getDataNodes().get(i);
      if (dn.getDatanodeId().getHostName().equals(hostname)) {
        dnProp = cluster.stopDataNode(i);
        cluster.setDataNodeDead(dn.getDatanodeId());
        LOG.info("stop datanode " + dn.getDatanodeId().getHostName());
      }
    }
    return dnProp;
  }

  private DataNode getDataNode(String host) {
    for (DataNode dn : cluster.getDataNodes()) {
      if (dn.getDatanodeId().getHostName().equals(host)) {
        return dn;
      }
    }
    return null;
  }

  /**
   * When there are all the internal blocks available but they are not placed on
   * enough racks, NameNode should avoid normal decoding reconstruction but copy
   * an internal block to a new rack.
   *
   * In this test, we first need to create a scenario that a striped block has
   * all the internal blocks but distributed in <6 racks. Then we check if the
   * redundancy monitor can correctly schedule the reconstruction work for it.
   */
  @Test
  public void testReconstructForNotEnoughRacks() throws Exception {
    LOG.info("cluster hosts: {}, racks: {}", Arrays.asList(hosts),
        Arrays.asList(racks));
    cluster = new MiniDFSCluster.Builder(conf).racks(racks).hosts(hosts)
        .numDataNodes(hosts.length).build();
    cluster.waitActive();
    fs = cluster.getFileSystem();
    fs.enableErasureCodingPolicy(
        StripedFileTestUtil.getDefaultECPolicy().getName());
    fs.setErasureCodingPolicy(new Path("/"),
        StripedFileTestUtil.getDefaultECPolicy().getName());
    FSNamesystem fsn = cluster.getNamesystem();
    BlockManager bm = fsn.getBlockManager();

    MiniDFSCluster.DataNodeProperties lastHost = stopDataNode(
        hosts[hosts.length - 1]);
    final Path file = new Path("/foo");
    // the file's block is in 9 dn but 5 racks
    DFSTestUtil.createFile(fs, file,
        cellSize * dataBlocks * 2, (short) 1, 0L);
    GenericTestUtils.waitFor(() ->
        bm.numOfUnderReplicatedBlocks() == 0, 100, 30000);
    LOG.info("Created file {}", file);

    final INodeFile fileNode = fsn.getFSDirectory()
        .getINode4Write(file.toString()).asFile();
    BlockInfoStriped blockInfo = (BlockInfoStriped) fileNode.getLastBlock();

    // we now should have 9 internal blocks distributed in 5 racks
    Set<String> rackSet = new HashSet<>();
    Iterator<DatanodeStorageInfo> it = blockInfo.getStorageInfos();
    while (it.hasNext()){
      DatanodeStorageInfo storage = it.next();
      rackSet.add(storage.getDatanodeDescriptor().getNetworkLocation());
    }
    Assert.assertEquals("rackSet size is wrong: " + rackSet, dataBlocks - 1,
        rackSet.size());

    // restart the stopped datanode
    cluster.restartDataNode(lastHost);
    cluster.waitActive();

    // make sure we have 6 racks again
    NetworkTopology topology = bm.getDatanodeManager().getNetworkTopology();
    LOG.info("topology is: {}", topology);
    Assert.assertEquals(hosts.length, topology.getNumOfLeaves());
    Assert.assertEquals(dataBlocks, topology.getNumOfRacks());

    // pause all the heartbeats
    for (DataNode dn : cluster.getDataNodes()) {
      DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, true);
    }

    fsn.writeLock(RwLockMode.BM);
    try {
      bm.processMisReplicatedBlocks();
    } finally {
      fsn.writeUnlock(RwLockMode.BM, "testReconstructForNotEnoughRacks");
    }

    // check if redundancy monitor correctly schedule the reconstruction work.
    boolean scheduled = false;
    for (int i = 0; i < 5; i++) { // retry 5 times
      it = blockInfo.getStorageInfos();
      while (it.hasNext()){
        DatanodeStorageInfo storage = it.next();
        if (storage != null) {
          DatanodeDescriptor dn = storage.getDatanodeDescriptor();
          Assert.assertEquals("Block to be erasure coded is wrong for datanode:"
              + dn, 0, dn.getNumberOfBlocksToBeErasureCoded());
          if (dn.getNumberOfBlocksToBeReplicated() == 1) {
            scheduled = true;
          }
        }
      }
      if (scheduled) {
        break;
      }
      Thread.sleep(1000);
    }
    Assert.assertTrue(scheduled);
  }

  @Test
  public void testChooseExcessReplicasToDelete() throws Exception {
    cluster = new MiniDFSCluster.Builder(conf).racks(racks).hosts(hosts)
        .numDataNodes(hosts.length).build();
    cluster.waitActive();
    fs = cluster.getFileSystem();
    fs.enableErasureCodingPolicy(
        StripedFileTestUtil.getDefaultECPolicy().getName());
    fs.setErasureCodingPolicy(new Path("/"),
        StripedFileTestUtil.getDefaultECPolicy().getName());

    MiniDFSCluster.DataNodeProperties lastHost = stopDataNode(
        hosts[hosts.length - 1]);

    final Path file = new Path("/foo");
    DFSTestUtil.createFile(fs, file,
        cellSize * dataBlocks * 2, (short) 1, 0L);

    // stop host1
    MiniDFSCluster.DataNodeProperties host1 = stopDataNode("host1");
    // bring last host back
    cluster.restartDataNode(lastHost);
    cluster.waitActive();

    // wait for reconstruction to finish
    final short blockNum = (short) (dataBlocks + parityBlocks);
    DFSTestUtil.waitForReplication(fs, file, blockNum, 15 * 1000);

    // restart host1
    cluster.restartDataNode(host1);
    cluster.waitActive();
    for (DataNode dn : cluster.getDataNodes()) {
      if (dn.getDatanodeId().getHostName().equals("host1")) {
        DataNodeTestUtils.triggerBlockReport(dn);
        break;
      }
    }

    // make sure the excess replica is detected, and we delete host1's replica
    // so that we have 6 racks
    DFSTestUtil.waitForReplication(fs, file, blockNum, 15 * 1000);
    LocatedBlocks blks = fs.getClient().getLocatedBlocks(file.toString(), 0);
    LocatedStripedBlock block = (LocatedStripedBlock) blks.getLastLocatedBlock();
    for (DatanodeInfo dn : block.getLocations()) {
      Assert.assertFalse(dn.getHostName().equals("host1"));
    }
  }

  /**
   * In case we have 10 internal blocks on 5 racks, where 9 of blocks are live
   * and 1 decommissioning, make sure the reconstruction happens correctly.
   */
  @Test
  public void testReconstructionWithDecommission() throws Exception {
    final String[] rackNames = getRacks(dataBlocks + parityBlocks + 2,
        dataBlocks);
    final String[] hostNames = getHosts(dataBlocks + parityBlocks + 2);
    // we now have 11 hosts on 6 racks with distribution: 2-2-2-2-2-1
    cluster = new MiniDFSCluster.Builder(conf).racks(rackNames).hosts(hostNames)
        .numDataNodes(hostNames.length).build();
    cluster.waitActive();
    fs = cluster.getFileSystem();
    fs.enableErasureCodingPolicy(
        StripedFileTestUtil.getDefaultECPolicy().getName());
    fs.setErasureCodingPolicy(new Path("/"),
        StripedFileTestUtil.getDefaultECPolicy().getName());

    final BlockManager bm = cluster.getNamesystem().getBlockManager();
    final DatanodeManager dm = bm.getDatanodeManager();

    // stop h9 and h10 and create a file with 6+3 internal blocks
    MiniDFSCluster.DataNodeProperties h9 =
        stopDataNode(hostNames[hostNames.length - 3]);
    MiniDFSCluster.DataNodeProperties h10 =
        stopDataNode(hostNames[hostNames.length - 2]);
    final Path file = new Path("/foo");
    DFSTestUtil.createFile(fs, file,
        cellSize * dataBlocks * 2, (short) 1, 0L);
    final BlockInfo blockInfo = cluster.getNamesystem().getFSDirectory()
        .getINode(file.toString()).asFile().getLastBlock();

    // bring h9 back
    cluster.restartDataNode(h9);
    cluster.waitActive();

    // stop h11 so that the reconstruction happens
    MiniDFSCluster.DataNodeProperties h11 =
        stopDataNode(hostNames[hostNames.length - 1]);
    boolean recovered = bm.countNodes(blockInfo).liveReplicas() >=
        dataBlocks + parityBlocks;
    for (int i = 0; i < 10 & !recovered; i++) {
      Thread.sleep(1000);
      recovered = bm.countNodes(blockInfo).liveReplicas() >=
          dataBlocks + parityBlocks;
    }
    Assert.assertTrue(recovered);

    // mark h9 as decommissioning
    DataNode datanode9 = getDataNode(hostNames[hostNames.length - 3]);
    Assert.assertNotNull(datanode9);
    final DatanodeDescriptor dn9 = dm.getDatanode(datanode9.getDatanodeId());
    dn9.startDecommission();

    // restart h10 and h11
    cluster.restartDataNode(h10);
    cluster.restartDataNode(h11);
    cluster.waitActive();
    DataNodeTestUtils.triggerBlockReport(
        getDataNode(hostNames[hostNames.length - 1]));

    // start decommissioning h9
    boolean satisfied = bm.isPlacementPolicySatisfied(blockInfo);
    Assert.assertFalse(satisfied);
    final DatanodeAdminManager decomManager =
        (DatanodeAdminManager) Whitebox.getInternalState(
            dm, "datanodeAdminManager");
    cluster.getNamesystem().writeLock(RwLockMode.BM);
    try {
      dn9.stopDecommission();
      decomManager.startDecommission(dn9);
    } finally {
      cluster.getNamesystem().writeUnlock(RwLockMode.BM,
          "testReconstructionWithDecommission");
    }

    // make sure the decommission finishes and the block in on 6 racks
    boolean decommissioned = dn9.isDecommissioned();
    for (int i = 0; i < 10 && !decommissioned; i++) {
      Thread.sleep(1000);
      decommissioned = dn9.isDecommissioned();
    }
    Assert.assertTrue(decommissioned);
    Assert.assertTrue(bm.isPlacementPolicySatisfied(blockInfo));
  }
}