TestDecommissionWithStriped.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.hdfs;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.BitSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileChecksum;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.INodeFile;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * This class tests the decommissioning of datanode with striped blocks.
 */
public class TestDecommissionWithStriped {
  private static final Logger LOG = LoggerFactory
      .getLogger(TestDecommissionWithStriped.class);

  // heartbeat interval in seconds
  private static final int HEARTBEAT_INTERVAL = 1;
  // block report in msec
  private static final int BLOCKREPORT_INTERVAL_MSEC = 1000;
  // replication interval
  private static final int NAMENODE_REPLICATION_INTERVAL = 1;

  private int replicationStreamsHardLimit =
      DFSConfigKeys.DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_DEFAULT;

  private Path decommissionDir;
  private Path hostsFile;
  private Path excludeFile;
  private LocalFileSystem localFileSys;

  @Rule
  public TemporaryFolder baseDir = new TemporaryFolder();

  private Configuration conf;
  private MiniDFSCluster cluster;
  private DistributedFileSystem dfs;
  private final ErasureCodingPolicy ecPolicy =
      StripedFileTestUtil.getDefaultECPolicy();
  private int numDNs;
  private final int cellSize = ecPolicy.getCellSize();
  private final int dataBlocks = ecPolicy.getNumDataUnits();
  private final int parityBlocks = ecPolicy.getNumParityUnits();
  private final int blockSize = cellSize * 4;
  private final int blockGroupSize = blockSize * dataBlocks;
  private final Path ecDir = new Path("/" + this.getClass().getSimpleName());

  private FSNamesystem fsn;
  private BlockManager bm;
  private DFSClient client;

  protected Configuration createConfiguration() {
    return new HdfsConfiguration();
  }

  @Before
  public void setup() throws IOException {
    conf = createConfiguration();
    // Set up the hosts/exclude files.
    localFileSys = FileSystem.getLocal(conf);
    localFileSys.setWorkingDirectory(new Path(baseDir.getRoot().getPath()));
    Path workingDir = localFileSys.getWorkingDirectory();
    decommissionDir = new Path(workingDir, "work-dir/decommission");
    hostsFile = new Path(decommissionDir, "hosts");
    excludeFile = new Path(decommissionDir, "exclude");
    writeConfigFile(hostsFile, null);
    writeConfigFile(excludeFile, null);

    // Setup conf
    conf.set(DFSConfigKeys.DFS_HOSTS, hostsFile.toUri().getPath());
    conf.set(DFSConfigKeys.DFS_HOSTS_EXCLUDE, excludeFile.toUri().getPath());
    conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
        2000);
    conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, HEARTBEAT_INTERVAL);
    conf.setInt(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY, 1);
    conf.setInt(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY,
        BLOCKREPORT_INTERVAL_MSEC);
    conf.setInt(DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY,
        4);
    conf.setInt(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY,
        NAMENODE_REPLICATION_INTERVAL);
    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
    conf.setInt(
        DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_BUFFER_SIZE_KEY,
        cellSize - 1);
    conf.setInt(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY, 1);
    conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOAD_KEY,
        false);

    numDNs = dataBlocks + parityBlocks + 5;
    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build();
    cluster.waitActive();
    dfs = cluster.getFileSystem(0);
    fsn = cluster.getNamesystem();
    bm = fsn.getBlockManager();
    client = getDfsClient(cluster.getNameNode(0), conf);

    dfs.enableErasureCodingPolicy(
        StripedFileTestUtil.getDefaultECPolicy().getName());
    dfs.mkdirs(ecDir);
    dfs.setErasureCodingPolicy(ecDir,
        StripedFileTestUtil.getDefaultECPolicy().getName());
  }

  @After
  public void teardown() throws IOException {
    cleanupFile(localFileSys, decommissionDir);
    if (cluster != null) {
      cluster.shutdown();
      cluster = null;
    }
  }

  @Test(timeout = 120000)
  public void testFileFullBlockGroup() throws Exception {
    LOG.info("Starting test testFileFullBlockGroup");
    testDecommission(blockSize * dataBlocks, 9, 1, "testFileFullBlockGroup");
  }

  @Test(timeout = 120000)
  public void testFileMultipleBlockGroups() throws Exception {
    LOG.info("Starting test testFileMultipleBlockGroups");
    int writeBytes = 2 * blockSize * dataBlocks;
    testDecommission(writeBytes, 9, 1, "testFileMultipleBlockGroups");
  }

  @Test(timeout = 120000)
  public void testFileSmallerThanOneCell() throws Exception {
    LOG.info("Starting test testFileSmallerThanOneCell");
    testDecommission(cellSize - 1, 4, 1, "testFileSmallerThanOneCell");
  }

  @Test(timeout = 120000)
  public void testFileSmallerThanOneStripe() throws Exception {
    LOG.info("Starting test testFileSmallerThanOneStripe");
    testDecommission(cellSize * 2, 5, 1, "testFileSmallerThanOneStripe");
  }

  @Test(timeout = 120000)
  public void testDecommissionTwoNodes() throws Exception {
    LOG.info("Starting test testDecommissionTwoNodes");
    testDecommission(blockSize * dataBlocks, 9, 2, "testDecommissionTwoNodes");
  }

  @Test(timeout = 120000)
  public void testDecommissionWithURBlockForSameBlockGroup() throws Exception {
    LOG.info("Starting test testDecommissionWithURBlocksForSameBlockGroup");

    final Path ecFile = new Path(ecDir, "testDecommissionWithCorruptBlocks");
    int writeBytes = cellSize * dataBlocks * 2;
    writeStripedFile(dfs, ecFile, writeBytes);
    Assert.assertEquals(0, bm.numOfUnderReplicatedBlocks());

    final List<DatanodeInfo> decommisionNodes = new ArrayList<DatanodeInfo>();
    LocatedBlock lb = dfs.getClient().getLocatedBlocks(ecFile.toString(), 0)
        .get(0);
    DatanodeInfo[] dnLocs = lb.getLocations();
    assertEquals(dataBlocks + parityBlocks, dnLocs.length);
    int decommNodeIndex = dataBlocks - 1;
    int stopNodeIndex = 1;

    // add the nodes which will be decommissioning
    decommisionNodes.add(dnLocs[decommNodeIndex]);

    // stop excess dns to avoid immediate reconstruction.
    DatanodeInfo[] info = client.datanodeReport(DatanodeReportType.LIVE);
    List<DataNodeProperties> stoppedDns = new ArrayList<>();
    for (DatanodeInfo liveDn : info) {
      boolean usedNode = false;
      for (DatanodeInfo datanodeInfo : dnLocs) {
        if (liveDn.getXferAddr().equals(datanodeInfo.getXferAddr())) {
          usedNode = true;
          break;
        }
      }
      if (!usedNode) {
        DataNode dn = cluster.getDataNode(liveDn.getIpcPort());
        stoppedDns.add(cluster.stopDataNode(liveDn.getXferAddr()));
        cluster.setDataNodeDead(dn.getDatanodeId());
        LOG.info("stop datanode " + dn.getDatanodeId().getHostName());
      }
    }
    DataNode dn = cluster.getDataNode(dnLocs[stopNodeIndex].getIpcPort());
    cluster.stopDataNode(dnLocs[stopNodeIndex].getXferAddr());
    cluster.setDataNodeDead(dn.getDatanodeId());
    numDNs = numDNs - 1;

    // Decommission node in a new thread. Verify that node is decommissioned.
    final CountDownLatch decomStarted = new CountDownLatch(0);
    Thread decomTh = new Thread() {
      public void run() {
        try {
          decomStarted.countDown();
          decommissionNode(0, decommisionNodes, AdminStates.DECOMMISSIONED);
        } catch (Exception e) {
          LOG.error("Exception while decommissioning", e);
          Assert.fail("Shouldn't throw exception!");
        }
      };
    };
    int deadDecommissioned = fsn.getNumDecomDeadDataNodes();
    int liveDecommissioned = fsn.getNumDecomLiveDataNodes();
    decomTh.start();
    decomStarted.await(5, TimeUnit.SECONDS);
    Thread.sleep(3000); // grace period to trigger decommissioning call
    // start datanode so that decommissioning live node will be finished
    for (DataNodeProperties dnp : stoppedDns) {
      cluster.restartDataNode(dnp);
      LOG.info("Restarts stopped datanode:{} to trigger block reconstruction",
          dnp.datanode);
    }
    cluster.waitActive();

    LOG.info("Waiting to finish decommissioning node:{}", decommisionNodes);
    decomTh.join(20000); // waiting 20secs to finish decommission
    LOG.info("Finished decommissioning node:{}", decommisionNodes);

    assertEquals(deadDecommissioned, fsn.getNumDecomDeadDataNodes());
    assertEquals(liveDecommissioned + decommisionNodes.size(),
        fsn.getNumDecomLiveDataNodes());

    // Ensure decommissioned datanode is not automatically shutdown
    assertEquals("All datanodes must be alive", numDNs,
        client.datanodeReport(DatanodeReportType.LIVE).length);

    assertNull(checkFile(dfs, ecFile, 9, decommisionNodes, numDNs));
    StripedFileTestUtil.checkData(dfs, ecFile, writeBytes, decommisionNodes,
        null, blockGroupSize);
    cleanupFile(dfs, ecFile);
  }

  /**
   * DN decommission shouldn't reconstruction busy DN block.
   * @throws Exception
   */
  @Test(timeout = 120000)
  public void testDecommissionWithBusyNode() throws Exception {
    byte busyDNIndex = 1;
    byte decommisionDNIndex = 0;
    //1. create EC file
    final Path ecFile = new Path(ecDir, "testDecommissionWithBusyNode");
    int writeBytes = cellSize * dataBlocks;
    writeStripedFile(dfs, ecFile, writeBytes);
    Assert.assertEquals(0, bm.numOfUnderReplicatedBlocks());
    FileChecksum fileChecksum1 = dfs.getFileChecksum(ecFile, writeBytes);

    //2. make once DN busy
    final INodeFile fileNode = cluster.getNamesystem().getFSDirectory()
        .getINode4Write(ecFile.toString()).asFile();
    BlockInfo firstBlock = fileNode.getBlocks()[0];
    DatanodeStorageInfo[] dnStorageInfos = bm.getStorages(firstBlock);
    DatanodeDescriptor busyNode =
        dnStorageInfos[busyDNIndex].getDatanodeDescriptor();
    for (int j = 0; j < replicationStreamsHardLimit; j++) {
      busyNode.incrementPendingReplicationWithoutTargets();
    }

    //3. decomission one node
    List<DatanodeInfo> decommisionNodes = new ArrayList<>();
    decommisionNodes.add(
        dnStorageInfos[decommisionDNIndex].getDatanodeDescriptor());
    decommissionNode(0, decommisionNodes, AdminStates.DECOMMISSIONED);
    assertEquals(decommisionNodes.size(), fsn.getNumDecomLiveDataNodes());

    //4. wait for decommission block to replicate
    Thread.sleep(3000);
    DatanodeStorageInfo[] newDnStorageInfos = bm.getStorages(firstBlock);
    Assert.assertEquals("Busy DN shouldn't be reconstructed",
        dnStorageInfos[busyDNIndex].getStorageID(),
        newDnStorageInfos[busyDNIndex].getStorageID());

    //5. check decommission DN block index, it should be reconstructed again
    LocatedBlocks lbs = cluster.getNameNodeRpc().getBlockLocations(
        ecFile.toString(), 0, writeBytes);
    LocatedStripedBlock bg = (LocatedStripedBlock) (lbs.get(0));
    int decommissionBlockIndexCount = 0;
    for (byte index : bg.getBlockIndices()) {
      if (index == decommisionDNIndex) {
        decommissionBlockIndexCount++;
      }
    }

    Assert.assertEquals("Decommission DN block should be reconstructed", 2,
        decommissionBlockIndexCount);

    FileChecksum fileChecksum2 = dfs.getFileChecksum(ecFile, writeBytes);
    Assert.assertTrue("Checksum mismatches!",
        fileChecksum1.equals(fileChecksum2));
  }

  /**
   * Decommission may generate the parity block's content with all 0
   * in some case.
   * @throws Exception
   */
  @Test(timeout = 120000)
  public void testDecommission2NodeWithBusyNode() throws Exception {
    byte busyDNIndex = 6;
    byte decommissionDNIndex = 6;
    byte decommissionDNIndex2 = 8;
    //1. create EC file
    final Path ecFile = new Path(ecDir, "testDecommission2NodeWithBusyNode");
    int writeBytes = cellSize * dataBlocks;
    writeStripedFile(dfs, ecFile, writeBytes);

    Assert.assertEquals(0, bm.numOfUnderReplicatedBlocks());
    FileChecksum fileChecksum1 = dfs.getFileChecksum(ecFile, writeBytes);

    //2. make once DN busy
    final INodeFile fileNode = cluster.getNamesystem().getFSDirectory()
        .getINode4Write(ecFile.toString()).asFile();
    BlockInfo firstBlock = fileNode.getBlocks()[0];
    DatanodeStorageInfo[] dnStorageInfos = bm.getStorages(firstBlock);
    DatanodeDescriptor busyNode = dnStorageInfos[busyDNIndex]
        .getDatanodeDescriptor();
    for (int j = 0; j < replicationStreamsHardLimit; j++) {
      busyNode.incrementPendingReplicationWithoutTargets();
    }

    //3. decommissioning one node
    List<DatanodeInfo> decommissionNodes = new ArrayList<>();
    decommissionNodes.add(dnStorageInfos[decommissionDNIndex]
        .getDatanodeDescriptor());
    decommissionNodes.add(dnStorageInfos[decommissionDNIndex2]
        .getDatanodeDescriptor());
    decommissionNode(0, decommissionNodes, AdminStates.DECOMMISSION_INPROGRESS);

    //4. wait for decommissioning and not busy block to replicate(9-2+1=8)
    GenericTestUtils.waitFor(
        () -> bm.countNodes(firstBlock).liveReplicas() >= 8,
        100, 60000);

    //5. release busy DN, make the decommissioning and busy block can replicate
    busyNode.decrementPendingReplicationWithoutTargets();

    //6. decommissioned one node,make the decommission finished
    decommissionNode(0, decommissionNodes, AdminStates.DECOMMISSIONED);

    //7. Busy DN shouldn't be reconstructed
    DatanodeStorageInfo[] newDnStorageInfos = bm.getStorages(firstBlock);
    Assert.assertEquals("Busy DN shouldn't be reconstructed",
        dnStorageInfos[busyDNIndex].getStorageID(),
        newDnStorageInfos[busyDNIndex].getStorageID());

    //8. check the checksum of a file
    FileChecksum fileChecksum2 = dfs.getFileChecksum(ecFile, writeBytes);
    Assert.assertEquals("Checksum mismatches!", fileChecksum1, fileChecksum2);

    //9. check the data is correct
    StripedFileTestUtil.checkData(dfs, ecFile, writeBytes, decommissionNodes,
        null, blockGroupSize);
  }

  /**
   * Tests to verify that the file checksum should be able to compute after the
   * decommission operation.
   *
   * Below is the block indices list after the decommission. ' represents
   * decommissioned node index.
   *
   * 0, 2, 3, 4, 5, 6, 7, 8, 1, 1'
   *
   * Here, this list contains duplicated blocks and does not maintaining any
   * order.
   */
  @Test(timeout = 120000)
  public void testFileChecksumAfterDecommission() throws Exception {
    LOG.info("Starting test testFileChecksumAfterDecommission");

    final Path ecFile = new Path(ecDir, "testFileChecksumAfterDecommission");
    int writeBytes = cellSize * dataBlocks;
    writeStripedFile(dfs, ecFile, writeBytes);
    Assert.assertEquals(0, bm.numOfUnderReplicatedBlocks());
    FileChecksum fileChecksum1 = dfs.getFileChecksum(ecFile, writeBytes);

    final List<DatanodeInfo> decommisionNodes = new ArrayList<DatanodeInfo>();
    LocatedBlock lb = dfs.getClient().getLocatedBlocks(ecFile.toString(), 0)
        .get(0);
    DatanodeInfo[] dnLocs = lb.getLocations();
    assertEquals(dataBlocks + parityBlocks, dnLocs.length);
    int decommNodeIndex = 1;

    // add the node which will be decommissioning
    decommisionNodes.add(dnLocs[decommNodeIndex]);
    decommissionNode(0, decommisionNodes, AdminStates.DECOMMISSIONED);
    assertEquals(decommisionNodes.size(), fsn.getNumDecomLiveDataNodes());
    assertNull(checkFile(dfs, ecFile, 9, decommisionNodes, numDNs));
    StripedFileTestUtil.checkData(dfs, ecFile, writeBytes, decommisionNodes,
        null, blockGroupSize);

    // verify checksum
    FileChecksum fileChecksum2 = dfs.getFileChecksum(ecFile, writeBytes);
    LOG.info("fileChecksum1:" + fileChecksum1);
    LOG.info("fileChecksum2:" + fileChecksum2);

    Assert.assertTrue("Checksum mismatches!",
        fileChecksum1.equals(fileChecksum2));
  }

  /**
   * Test decommission when DN marked as busy.
   * @throwsException
   */
  @Test(timeout = 120000)
  public void testBusyAfterDecommissionNode() throws Exception {
    int busyDNIndex = 0;
    //1. create EC file.
    final Path ecFile = new Path(ecDir, "testBusyAfterDecommissionNode");
    int writeBytes = cellSize * dataBlocks;
    writeStripedFile(dfs, ecFile, writeBytes);
    Assert.assertEquals(0, bm.numOfUnderReplicatedBlocks());
    FileChecksum fileChecksum1 = dfs.getFileChecksum(ecFile, writeBytes);

    //2. make once DN busy.
    final INodeFile fileNode = cluster.getNamesystem().getFSDirectory()
        .getINode4Write(ecFile.toString()).asFile();
    BlockInfo firstBlock = fileNode.getBlocks()[0];
    DatanodeStorageInfo[] dnStorageInfos = bm.getStorages(firstBlock);
    DatanodeDescriptor busyNode =
        dnStorageInfos[busyDNIndex].getDatanodeDescriptor();
    for (int j = 0; j < replicationStreamsHardLimit; j++) {
      busyNode.incrementPendingReplicationWithoutTargets();
    }

    //3. decomission one node.
    List<DatanodeInfo> decommisionNodes = new ArrayList<>();
    decommisionNodes.add(busyNode);
    decommissionNode(0, decommisionNodes, AdminStates.DECOMMISSION_INPROGRESS);

    final List<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>();
    bm.getDatanodeManager().fetchDatanodes(live, null, false);
    int liveDecommissioning = 0;
    for (DatanodeDescriptor node : live) {
      liveDecommissioning += node.isDecommissionInProgress() ? 1 : 0;
    }
    assertEquals(decommisionNodes.size(), liveDecommissioning);

    //4. wait for decommission block to replicate.
    GenericTestUtils.waitFor(() -> bm.getLowRedundancyBlocksCount() == 1,
        100, 3000);

    int blocksScheduled = 0;
    final List<DatanodeDescriptor> dnList = new ArrayList<>();
    fsn.getBlockManager().getDatanodeManager().fetchDatanodes(dnList, null,
        false);
    for (DatanodeDescriptor dn : dnList) {
      blocksScheduled += dn.getBlocksScheduled();
    }
    assertEquals(0, blocksScheduled);
    assertEquals(0, bm.getPendingReconstructionBlocksCount());
    assertEquals(1, bm.getLowRedundancyBlocksCount());
  }

  private void testDecommission(int writeBytes, int storageCount,
      int decomNodeCount, String filename) throws IOException, Exception {
    Path ecFile = new Path(ecDir, filename);
    writeStripedFile(dfs, ecFile, writeBytes);
    List<DatanodeInfo> decommisionNodes = getDecommissionDatanode(dfs, ecFile,
        writeBytes, decomNodeCount);

    int deadDecommissioned = fsn.getNumDecomDeadDataNodes();
    int liveDecommissioned = fsn.getNumDecomLiveDataNodes();
    List<LocatedBlock> lbs = ((HdfsDataInputStream) dfs.open(ecFile))
        .getAllBlocks();

    // prepare expected block index and token list.
    List<HashMap<DatanodeInfo, Byte>> locToIndexList = new ArrayList<>();
    List<HashMap<DatanodeInfo, Token<BlockTokenIdentifier>>> locToTokenList =
        new ArrayList<>();
    prepareBlockIndexAndTokenList(lbs, locToIndexList, locToTokenList);

    // Decommission node. Verify that node is decommissioned.
    decommissionNode(0, decommisionNodes, AdminStates.DECOMMISSIONED);

    assertEquals(deadDecommissioned, fsn.getNumDecomDeadDataNodes());
    assertEquals(liveDecommissioned + decommisionNodes.size(),
        fsn.getNumDecomLiveDataNodes());

    // Ensure decommissioned datanode is not automatically shutdown
    DFSClient client = getDfsClient(cluster.getNameNode(0), conf);
    assertEquals("All datanodes must be alive", numDNs,
        client.datanodeReport(DatanodeReportType.LIVE).length);

    assertNull(checkFile(dfs, ecFile, storageCount, decommisionNodes, numDNs));
    StripedFileTestUtil.checkData(dfs, ecFile, writeBytes, decommisionNodes,
        null, blockGroupSize);

    assertBlockIndexAndTokenPosition(lbs, locToIndexList, locToTokenList);

    cleanupFile(dfs, ecFile);
  }

  private void prepareBlockIndexAndTokenList(List<LocatedBlock> lbs,
      List<HashMap<DatanodeInfo, Byte>> locToIndexList,
      List<HashMap<DatanodeInfo, Token<BlockTokenIdentifier>>> locToTokenList) {
    for (LocatedBlock lb : lbs) {
      HashMap<DatanodeInfo, Byte> locToIndex = new HashMap<DatanodeInfo, Byte>();
      locToIndexList.add(locToIndex);

      HashMap<DatanodeInfo, Token<BlockTokenIdentifier>> locToToken =
          new HashMap<DatanodeInfo, Token<BlockTokenIdentifier>>();
      locToTokenList.add(locToToken);

      DatanodeInfo[] di = lb.getLocations();
      LocatedStripedBlock stripedBlk = (LocatedStripedBlock) lb;
      for (int i = 0; i < di.length; i++) {
        locToIndex.put(di[i], stripedBlk.getBlockIndices()[i]);
        locToToken.put(di[i], stripedBlk.getBlockTokens()[i]);
      }
    }
  }

  /**
   * Verify block index and token values. Must update block indices and block
   * tokens after sorting.
   */
  private void assertBlockIndexAndTokenPosition(List<LocatedBlock> lbs,
      List<HashMap<DatanodeInfo, Byte>> locToIndexList,
      List<HashMap<DatanodeInfo, Token<BlockTokenIdentifier>>> locToTokenList) {
    for (int i = 0; i < lbs.size(); i++) {
      LocatedBlock lb = lbs.get(i);
      LocatedStripedBlock stripedBlk = (LocatedStripedBlock) lb;
      HashMap<DatanodeInfo, Byte> locToIndex = locToIndexList.get(i);
      HashMap<DatanodeInfo, Token<BlockTokenIdentifier>> locToToken =
          locToTokenList.get(i);
      DatanodeInfo[] di = lb.getLocations();
      for (int j = 0; j < di.length; j++) {
        Assert.assertEquals("Block index value mismatches after sorting",
            (byte) locToIndex.get(di[j]), stripedBlk.getBlockIndices()[j]);
        Assert.assertEquals("Block token value mismatches after sorting",
            locToToken.get(di[j]), stripedBlk.getBlockTokens()[j]);
      }
    }
  }

  private List<DatanodeInfo> getDecommissionDatanode(DistributedFileSystem dfs,
      Path ecFile, int writeBytes, int decomNodeCount) throws IOException {
    ArrayList<DatanodeInfo> decommissionedNodes = new ArrayList<>();
    DatanodeInfo[] info = client.datanodeReport(DatanodeReportType.LIVE);
    BlockLocation[] fileBlockLocations = dfs.getFileBlockLocations(ecFile, 0,
        writeBytes);
    for (String dnName : fileBlockLocations[0].getNames()) {
      for (DatanodeInfo dn : info) {
        if (dnName.equals(dn.getXferAddr())) {
          decommissionedNodes.add(dn);
        }
        if (decommissionedNodes.size() >= decomNodeCount) {
          return decommissionedNodes;
        }
      }
    }
    return decommissionedNodes;
  }

  /* Get DFSClient to the namenode */
  private static DFSClient getDfsClient(NameNode nn, Configuration conf)
      throws IOException {
    return new DFSClient(nn.getNameNodeAddress(), conf);
  }

  private byte[] writeStripedFile(DistributedFileSystem fs, Path ecFile,
      int writeBytes) throws Exception {
    byte[] bytes = StripedFileTestUtil.generateBytes(writeBytes);
    DFSTestUtil.writeFile(fs, ecFile, new String(bytes));
    StripedFileTestUtil.waitBlockGroupsReported(fs, ecFile.toString());

    StripedFileTestUtil.checkData(fs, ecFile, writeBytes,
        new ArrayList<DatanodeInfo>(), null, blockGroupSize);
    return bytes;
  }

  private void writeConfigFile(Path name, List<String> nodes)
      throws IOException {
    // delete if it already exists
    if (localFileSys.exists(name)) {
      localFileSys.delete(name, true);
    }

    try (FSDataOutputStream stm = localFileSys.create(name)) {
      if (nodes != null) {
        for (String node: nodes) {
          stm.writeBytes(node);
          stm.writeBytes("\n");
        }
      }
    }
  }

  private void cleanupFile(FileSystem fileSys, Path name) throws IOException {
    assertTrue(fileSys.exists(name));
    fileSys.delete(name, true);
    assertTrue(!fileSys.exists(name));
  }

  /*
   * decommission the DN at index dnIndex or one random node if dnIndex is set
   * to -1 and wait for the node to reach the given {@code waitForState}.
   */
  private void decommissionNode(int nnIndex,
      List<DatanodeInfo> decommissionedNodes, AdminStates waitForState)
          throws IOException {
    DFSClient client = getDfsClient(cluster.getNameNode(nnIndex), conf);
    DatanodeInfo[] info = client.datanodeReport(DatanodeReportType.LIVE);

    // write nodename into the exclude file.
    ArrayList<String> excludeNodes = new ArrayList<String>();
    for (DatanodeInfo dn : decommissionedNodes) {
      boolean nodeExists = false;
      for (DatanodeInfo dninfo : info) {
        if (dninfo.getDatanodeUuid().equals(dn.getDatanodeUuid())) {
          nodeExists = true;
          break;
        }
      }
      assertTrue("Datanode: " + dn + " is not LIVE", nodeExists);
      excludeNodes.add(dn.getName());
      LOG.info("Decommissioning node: " + dn.getName());
    }
    writeConfigFile(excludeFile, excludeNodes);
    refreshNodes(cluster.getNamesystem(nnIndex), conf);
    for (DatanodeInfo dn : decommissionedNodes) {
      DatanodeInfo ret = NameNodeAdapter
          .getDatanode(cluster.getNamesystem(nnIndex), dn);
      waitNodeState(ret, waitForState);
    }
  }

  private static void refreshNodes(final FSNamesystem ns,
      final Configuration conf) throws IOException {
    ns.getBlockManager().getDatanodeManager().refreshNodes(conf);
  }

  /*
   * Wait till node is fully decommissioned.
   */
  private void waitNodeState(DatanodeInfo node, AdminStates state) {
    boolean done = state == node.getAdminState();
    while (!done) {
      LOG.info("Waiting for node " + node + " to change state to " + state
          + " current state: " + node.getAdminState());
      try {
        Thread.sleep(HEARTBEAT_INTERVAL * 500);
      } catch (InterruptedException e) {
        // nothing
      }
      done = state == node.getAdminState();
    }
    LOG.info("node " + node + " reached the state " + state);
  }

  /**
   * Verify that the number of replicas are as expected for each block in the
   * given file. For blocks with a decommissioned node, verify that their
   * replication is 1 more than what is specified. For blocks without
   * decommissioned nodes, verify their replication is equal to what is
   * specified.
   *
   * @param decommissionedNodes
   *          - if null, there is no decommissioned node for this file.
   * @return - null if no failure found, else an error message string.
   */
  private static String checkFile(FileSystem fileSys, Path name, int repl,
      List<DatanodeInfo> decommissionedNodes, int numDatanodes)
          throws IOException {
    boolean isNodeDown = decommissionedNodes.size() > 0;
    // need a raw stream
    assertTrue("Not HDFS:" + fileSys.getUri(),
        fileSys instanceof DistributedFileSystem);
    HdfsDataInputStream dis = (HdfsDataInputStream) fileSys.open(name);
    Collection<LocatedBlock> dinfo = dis.getAllBlocks();
    for (LocatedBlock blk : dinfo) { // for each block
      int hasdown = 0;
      DatanodeInfo[] nodes = blk.getLocations();
      for (int j = 0; j < nodes.length; j++) { // for each replica
        LOG.info("Block Locations size={}, locs={}, j=", nodes.length,
            nodes[j].toString(), j);
        boolean found = false;
        for (DatanodeInfo datanodeInfo : decommissionedNodes) {
          // check against decommissioned list
          if (isNodeDown
              && nodes[j].getXferAddr().equals(datanodeInfo.getXferAddr())) {
            found = true;
            hasdown++;
            // Downnode must actually be decommissioned
            if (!nodes[j].isDecommissioned()) {
              return "For block " + blk.getBlock() + " replica on " + nodes[j]
                  + " is given as downnode, " + "but is not decommissioned";
            }
            // Decommissioned node (if any) should only be last node in list.
            if (j < repl) {
              return "For block " + blk.getBlock() + " decommissioned node "
                  + nodes[j] + " was not last node in list: " + (j + 1) + " of "
                  + nodes.length;
            }
            // should only be last node in list.
            LOG.info("Block " + blk.getBlock() + " replica on " + nodes[j]
                + " is decommissioned.");
          }
        }
        // Non-downnodes must not be decommissioned
        if (!found && nodes[j].isDecommissioned()) {
          return "For block " + blk.getBlock() + " replica on " + nodes[j]
              + " is unexpectedly decommissioned";
        }
      }

      LOG.info("Block " + blk.getBlock() + " has " + hasdown
          + " decommissioned replica.");
      if (Math.min(numDatanodes, repl + hasdown) != nodes.length) {
        return "Wrong number of replicas for block " + blk.getBlock() + ": "
            + nodes.length + ", expected "
            + Math.min(numDatanodes, repl + hasdown);
      }
    }
    return null;
  }

  /**
   * Simulate that There are 2 nodes(dn0,dn1) in decommission. Firstly dn0
   * replicates in success, dn1 replicates in failure. Decommissions go on.
   */
  @Test (timeout = 120000)
  public void testDecommissionWithFailedReplicating() throws Exception {

    // Write ec file.
    Path ecFile = new Path(ecDir, "firstReplicationFailedFile");
    int writeBytes = cellSize * 6;
    writeStripedFile(dfs, ecFile, writeBytes);

    // Get 2 nodes of ec block and set them in decommission.
    // The 2 nodes are not in pendingNodes of DatanodeAdminManager.
    List<LocatedBlock> lbs = ((HdfsDataInputStream) dfs.open(ecFile))
        .getAllBlocks();
    LocatedStripedBlock blk = (LocatedStripedBlock) lbs.get(0);
    DatanodeInfo[] dnList = blk.getLocations();
    DatanodeDescriptor dn0 = bm.getDatanodeManager()
        .getDatanode(dnList[0].getDatanodeUuid());
    dn0.startDecommission();
    DatanodeDescriptor dn1 = bm.getDatanodeManager()
        .getDatanode(dnList[1].getDatanodeUuid());
    dn1.startDecommission();

    assertEquals(0, bm.getDatanodeManager().getDatanodeAdminManager()
        .getNumPendingNodes());

    // Replicate dn0 block to another dn
    // Simulate that dn0 replicates in success, dn1 replicates in failure.
    final byte blockIndex = blk.getBlockIndices()[0];
    final Block targetBlk = new Block(blk.getBlock().getBlockId() + blockIndex,
        cellSize, blk.getBlock().getGenerationStamp());
    DatanodeInfo extraDn = getDatanodeOutOfTheBlock(blk);
    DatanodeDescriptor target = bm.getDatanodeManager()
        .getDatanode(extraDn.getDatanodeUuid());
    dn0.addECBlockToBeReplicated(targetBlk,
        new DatanodeStorageInfo[] {target.getStorageInfos()[0]});

    // dn0 replicates in success
    GenericTestUtils.waitFor(
        () -> dn0.getNumberOfReplicateBlocks() == 0,
        100, 60000);
    GenericTestUtils.waitFor(
        () -> {
          Iterator<DatanodeStorageInfo> it =
              bm.getStoredBlock(targetBlk).getStorageInfos();
          while(it.hasNext()) {
            if (it.next().getDatanodeDescriptor().equals(target)) {
              return true;
            }
          }
          return false;
        },
        100, 60000);

    // There are 8 live replicas
    BlockInfoStriped blockInfo =
        (BlockInfoStriped)bm.getStoredBlock(
            new Block(blk.getBlock().getBlockId()));
    assertEquals(8, bm.countNodes(blockInfo).liveReplicas());

    // Add the 2 nodes to pendingNodes of DatanodeAdminManager
    bm.getDatanodeManager().getDatanodeAdminManager()
        .getPendingNodes().add(dn0);
    bm.getDatanodeManager().getDatanodeAdminManager()
        .getPendingNodes().add(dn1);

    waitNodeState(dn0, AdminStates.DECOMMISSIONED);
    waitNodeState(dn1, AdminStates.DECOMMISSIONED);

    // There are 9 live replicas
    assertEquals(9, bm.countNodes(blockInfo).liveReplicas());

    // After dn0 & dn1 decommissioned, all internal Blocks(0~8) are there
    Iterator<DatanodeStorageInfo> it = blockInfo.getStorageInfos();
    BitSet indexBitSet = new BitSet(9);
    while(it.hasNext()) {
      DatanodeStorageInfo storageInfo = it.next();
      if(storageInfo.getDatanodeDescriptor().equals(dn0)
          || storageInfo.getDatanodeDescriptor().equals(dn1)) {
        // Skip decommissioned nodes
        continue;
      }
      byte index = blockInfo.getStorageBlockIndex(storageInfo);
      indexBitSet.set(index);
    }
    for (int i = 0; i < 9; ++i) {
      assertEquals(true, indexBitSet.get(i));
    }
  }

  /**
   * Get a Datanode which does not contain the block.
   */
  private DatanodeInfo getDatanodeOutOfTheBlock(LocatedStripedBlock blk)
      throws Exception {
    DatanodeInfo[] allDnInfos = client.datanodeReport(DatanodeReportType.LIVE);
    DatanodeInfo[] blkDnInos= blk.getLocations();
    for (DatanodeInfo dnInfo : allDnInfos) {
      boolean in = false;
      for (DatanodeInfo blkDnInfo : blkDnInos) {
        if (blkDnInfo.equals(dnInfo)) {
          in = true;
        }
      }
      if(!in) {
        return dnInfo;
      }
    }
    return null;
  }

  @Test (timeout = 120000)
  public void testDecommissionWithMissingBlock() throws Exception {
    // Write ec file.
    Path ecFile = new Path(ecDir, "missingOneInternalBLockFile");
    int writeBytes = cellSize * 6;
    writeStripedFile(dfs, ecFile, writeBytes);

    final List<DatanodeInfo> decommisionNodes = new ArrayList<DatanodeInfo>();
    LocatedBlock lb = dfs.getClient().getLocatedBlocks(ecFile.toString(), 0)
        .get(0);
    LocatedStripedBlock lsb = (LocatedStripedBlock)lb;
    DatanodeInfo[] dnLocs = lsb.getLocations();
    BlockInfoStriped blockInfo =
        (BlockInfoStriped)bm.getStoredBlock(
            new Block(lsb.getBlock().getBlockId()));

    assertEquals(dataBlocks + parityBlocks, dnLocs.length);
    int decommNodeIndex = 1;
    int numDecommission= 4;
    int stopNodeIndex = 0;

    // Add the 4 nodes, and set the 4 nodes decommissioning.
    // So that they are decommissioning at the same time
    for (int i = decommNodeIndex; i < numDecommission + decommNodeIndex; ++i) {
      decommisionNodes.add(dnLocs[i]);
      DatanodeDescriptor dn = bm.getDatanodeManager()
          .getDatanode(dnLocs[i].getDatanodeUuid());
      dn.startDecommission();
    }
    GenericTestUtils.waitFor(
        () -> bm.countNodes(blockInfo).decommissioning() == numDecommission,
        100, 10000);

    // Namenode does not handle decommissioning nodes now
    assertEquals(0, bm.getDatanodeManager().getDatanodeAdminManager()
        .getNumPendingNodes());

    // Replicate dn1 block to another dn
    // So that one of the 4 replicas has been replicated.
    final byte blockIndex = lsb.getBlockIndices()[decommNodeIndex];
    final Block targetBlk = new Block(lsb.getBlock().getBlockId() + blockIndex,
        cellSize, lsb.getBlock().getGenerationStamp());
    DatanodeInfo extraDn = getDatanodeOutOfTheBlock(lsb);
    DatanodeDescriptor target = bm.getDatanodeManager()
        .getDatanode(extraDn.getDatanodeUuid());
    DatanodeDescriptor dnStartIndexDecommission = bm.getDatanodeManager()
        .getDatanode(dnLocs[decommNodeIndex].getDatanodeUuid());
    dnStartIndexDecommission.addECBlockToBeReplicated(targetBlk,
        new DatanodeStorageInfo[] {target.getStorageInfos()[0]});

    // Wait for replication success.
    GenericTestUtils.waitFor(
        () -> {
          Iterator<DatanodeStorageInfo> it =
              bm.getStoredBlock(targetBlk).getStorageInfos();
          while(it.hasNext()) {
            if (it.next().getDatanodeDescriptor().equals(target)) {
              return true;
            }
          }
          return false;
        },
        100, 60000);

    // Reopen ecFile, get the new locations.
    lb = dfs.getClient().getLocatedBlocks(ecFile.toString(), 0)
        .get(0);
    lsb = (LocatedStripedBlock)lb;
    DatanodeInfo[] newDnLocs = lsb.getLocations();

    // Now the block has 10 internal blocks.
    assertEquals(10, newDnLocs.length);

    // Stop the dn0(stopNodeIndex) datanode
    // So that the internal block from this dn misses
    DataNode dn = cluster.getDataNode(dnLocs[stopNodeIndex].getIpcPort());
    cluster.stopDataNode(dnLocs[stopNodeIndex].getXferAddr());
    cluster.setDataNodeDead(dn.getDatanodeId());

    // So far, there are 4 decommissioning nodes, 1 replica has been
    // replicated, and 1 replica misses. There are 8 total internal
    // blocks, 5 live and 3 decommissioning internal blocks.
    assertEquals(5, bm.countNodes(blockInfo).liveReplicas());
    assertEquals(3, bm.countNodes(blockInfo).decommissioning());

    // Handle decommission nodes in a new thread.
    // Verify that nodes are decommissioned.
    final CountDownLatch decomStarted = new CountDownLatch(0);
    new Thread(
        () -> {
          try {
            decomStarted.countDown();
            decommissionNode(0, decommisionNodes, AdminStates.DECOMMISSIONED);
          } catch (Exception e) {
            LOG.error("Exception while decommissioning", e);
            Assert.fail("Shouldn't throw exception!");
          }
        }).start();
    decomStarted.await(5, TimeUnit.SECONDS);

    // Wake up to reconstruct the block.
    BlockManagerTestUtil.wakeupPendingReconstructionTimerThread(bm);

    // Wait for decommissioning
    GenericTestUtils.waitFor(
        // Whether there are 8 live replicas after decommission.
        () -> bm.countNodes(blockInfo).liveReplicas() == 9,
        100, 60000);

    StripedFileTestUtil.checkData(dfs, ecFile, writeBytes, decommisionNodes,
        null, blockGroupSize);
    cleanupFile(dfs, ecFile);
  }

  @Test (timeout = 120000)
  public void testCountNodes() throws Exception{
    // Write ec file.
    Path ecFile = new Path(ecDir, "testCountNodes");
    int writeBytes = cellSize * 6;
    writeStripedFile(dfs, ecFile, writeBytes);

    List<LocatedBlock> lbs = ((HdfsDataInputStream) dfs.open(ecFile))
        .getAllBlocks();
    LocatedStripedBlock blk = (LocatedStripedBlock) lbs.get(0);
    DatanodeInfo[] dnList = blk.getLocations();
    DatanodeDescriptor dn0 = bm.getDatanodeManager()
        .getDatanode(dnList[0].getDatanodeUuid());
    dn0.startDecommission();

    // Replicate dn0 block to another dn
    final byte blockIndex = blk.getBlockIndices()[0];
    final Block targetBlk = new Block(blk.getBlock().getBlockId() + blockIndex,
        cellSize, blk.getBlock().getGenerationStamp());
    DatanodeInfo extraDn = getDatanodeOutOfTheBlock(blk);
    DatanodeDescriptor target = bm.getDatanodeManager()
        .getDatanode(extraDn.getDatanodeUuid());
    dn0.addECBlockToBeReplicated(targetBlk,
        new DatanodeStorageInfo[] {target.getStorageInfos()[0]});

    // dn0 replicates in success
    GenericTestUtils.waitFor(
        () -> dn0.getNumberOfReplicateBlocks() == 0,
        100, 60000);
    GenericTestUtils.waitFor(
        () -> {
          Iterator<DatanodeStorageInfo> it =
              bm.getStoredBlock(targetBlk).getStorageInfos();
          while(it.hasNext()) {
            if (it.next().getDatanodeDescriptor().equals(target)) {
              return true;
            }
          }
          return false;
        },
        100, 60000);

    // There are 9 live replicas, 0 decommissioning replicas.
    BlockInfoStriped blockInfo =
        (BlockInfoStriped)bm.getStoredBlock(
            new Block(blk.getBlock().getBlockId()));
    Iterator<BlockInfoStriped.StorageAndBlockIndex> it =
        blockInfo.getStorageAndIndexInfos().iterator();
    DatanodeStorageInfo decommissioningStorage = null;
    DatanodeStorageInfo liveStorage = null;
    while(it.hasNext()) {
      BlockInfoStriped.StorageAndBlockIndex si = it.next();
      if(si.getStorage().getDatanodeDescriptor().equals(dn0)) {
        decommissioningStorage = si.getStorage();
      }
      if(si.getStorage().getDatanodeDescriptor().equals(target)) {
        liveStorage = si.getStorage();
      }
    }
    assertNotNull(decommissioningStorage);
    assertNotNull(liveStorage);

    // Adjust internal block locations
    // [b0(decommissioning), b1, b2, b3, b4, b5, b6, b7, b8, b0(live)] changed
    // to [b0(live), b1, b2, b3, b4, b5, b6, b7, b8, b0(decommissioning)]
    BlockManagerTestUtil.removeStorage(blockInfo, decommissioningStorage);
    BlockManagerTestUtil.addStorage(blockInfo, liveStorage, targetBlk);
    BlockManagerTestUtil.addStorage(blockInfo, decommissioningStorage,
        targetBlk);
    assertEquals(0, bm.countNodes(blockInfo).decommissioning());
    assertEquals(9, bm.countNodes(blockInfo).liveReplicas());
    cleanupFile(dfs, ecFile);
  }

  /**
   * Test recovery for an ec block, its storage array contains these internal
   * blocks which are {b0, b1, b2, b3, null, b5, b6, b7, b8, b0, b1, b2,
   * b3}, array[0]{b0} in decommissioning, array[1-3]{b1, b2, b3} are
   * in decommissioned. array[4] is null, array[5-12]{b[5-8],b[0-3]} are
   * in live.
   */
  @Test (timeout = 120000)
  public void testRecoveryWithDecommission() throws Exception {
    final Path ecFile = new Path(ecDir, "testRecoveryWithDecommission");
    int writeBytes = cellSize * dataBlocks;
    byte[] originBytesArray = writeStripedFile(dfs, ecFile, writeBytes);
    List<LocatedBlock> lbs = ((HdfsDataInputStream) dfs.open(ecFile))
        .getAllBlocks();
    LocatedStripedBlock blk = (LocatedStripedBlock) lbs.get(0);
    DatanodeInfo[] dnList = blk.getLocations();
    BlockInfoStriped blockInfo =
        (BlockInfoStriped)bm.getStoredBlock(
            new Block(blk.getBlock().getBlockId()));

    // Decommission datanode dn0 contains block b0
    // Aim to add storageinfo of replicated block b0 to storages[9] of ec block
    List<DatanodeInfo> decommissionedNodes = new ArrayList<>();
    decommissionedNodes.add(dnList[0]);
    decommissionNode(0, decommissionedNodes, AdminStates.DECOMMISSIONED);

    // Now storages of ec block are (b0{decommissioned}, b[1-8]{live},
    // b0{live})
    assertEquals(9, bm.countNodes(blockInfo).liveReplicas());
    assertEquals(1, bm.countNodes(blockInfo).decommissioned());

    int decommissionNodesNum = 4;

    // Decommission nodes contain blocks of b[0-3]
    // dn0 has been decommissioned
    for (int i = 1; i < decommissionNodesNum; i++) {
      decommissionedNodes.add(dnList[i]);
    }
    decommissionNode(0, decommissionedNodes, AdminStates.DECOMMISSIONED);

    // Now storages of ec block are (b[0-3]{decommissioned}, b[4-8]{live},
    // b0{live}, b[1-3]{live})
    // There are 9 live and 4 decommissioned internal blocks
    assertEquals(9, bm.countNodes(blockInfo).liveReplicas());
    assertEquals(4, bm.countNodes(blockInfo).decommissioned());

    // There are no reconstruction tasks
    assertEquals(0, bm.getDatanodeManager().getDatanodeAdminManager()
        .getNumPendingNodes());
    assertEquals(0, bm.getUnderReplicatedNotMissingBlocks());

    // Set dn0 in decommissioning
    // So that the block on dn0 can be used for reconstruction task
    DatanodeDescriptor dn0 = bm.getDatanodeManager()
        .getDatanode(dnList[0].getDatanodeUuid());
    dn0.startDecommission();

    // Stop the datanode contains b4
    DataNode dn = cluster.getDataNode(
        dnList[decommissionNodesNum].getIpcPort());
    cluster.stopDataNode(dnList[decommissionNodesNum].getXferAddr());
    cluster.setDataNodeDead(dn.getDatanodeId());

    // Now storages of ec block are (b[0]{decommissioning},
    // b[1-3]{decommissioned}, null, b[5-8]{live}, b0{live}, b[1-3]{live})
    // There are 8 live and 1 decommissioning internal blocks
    // Wait for reconstruction EC block.
    GenericTestUtils.waitFor(
        () -> bm.countNodes(blockInfo).liveReplicas() == 9,
        100, 10000);

    byte[] readBytesArray = new byte[writeBytes];
    StripedFileTestUtil.verifyPread(dfs, ecFile, writeBytes,
        originBytesArray, readBytesArray, ecPolicy);
    cleanupFile(dfs, ecFile);
  }
}