TestReconstructStripedBlocks.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.StripedFileTestUtil;
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.BlockECReconstructionInfo;
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.apache.hadoop.hdfs.util.RwLockMode;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.BitSet;
import java.util.Iterator;
import java.util.List;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class TestReconstructStripedBlocks {
public static final Logger LOG = LoggerFactory.getLogger(
TestReconstructStripedBlocks.class);
private final ErasureCodingPolicy ecPolicy =
StripedFileTestUtil.getDefaultECPolicy();
private final int cellSize = ecPolicy.getCellSize();
private final short dataBlocks = (short) ecPolicy.getNumDataUnits();
private final short parityBlocks = (short) ecPolicy.getNumParityUnits();
private final short groupSize = (short) (dataBlocks + parityBlocks);
private final int blockSize = 4 * cellSize;
private MiniDFSCluster cluster;
private final Path dirPath = new Path("/dir");
private Path filePath = new Path(dirPath, "file");
private int maxReplicationStreams =
DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_DEFAULT;
private void initConf(Configuration conf) {
// Large value to make sure the pending replication request can stay in
// DatanodeDescriptor.replicateBlocks before test timeout.
conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 100);
// Make sure BlockManager can pull all blocks from UnderReplicatedBlocks via
// chooseUnderReplicatedBlocks at once.
conf.setInt(
DFSConfigKeys.DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION, 5);
}
@Test
public void testMissingStripedBlock() throws Exception {
doTestMissingStripedBlock(1, 0);
}
@Test
public void testMissingStripedBlockWithBusyNode() throws Exception {
for (int i = 1; i <= parityBlocks; i++) {
doTestMissingStripedBlock(i, 1);
}
}
/**
* Start GROUP_SIZE + 1 datanodes.
* Inject striped blocks to first GROUP_SIZE datanodes.
* Then make numOfBusy datanodes busy, make numOfMissed datanodes missed.
* Then trigger BlockManager to compute reconstruction works. (so all
* reconstruction work will be scheduled to the last datanode)
* Finally, verify the reconstruction work of the last datanode.
*/
private void doTestMissingStripedBlock(int numOfMissed, int numOfBusy)
throws Exception {
Configuration conf = new HdfsConfiguration();
initConf(conf);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(groupSize + 1)
.build();
try {
cluster.waitActive();
cluster.getFileSystem().enableErasureCodingPolicy(
StripedFileTestUtil.getDefaultECPolicy().getName());
final int numBlocks = 4;
DFSTestUtil.createStripedFile(cluster, filePath,
dirPath, numBlocks, 1, true);
// all blocks will be located at first GROUP_SIZE DNs, the last DN is
// empty because of the util function createStripedFile
// make sure the file is complete in NN
final INodeFile fileNode = cluster.getNamesystem().getFSDirectory()
.getINode4Write(filePath.toString()).asFile();
assertFalse(fileNode.isUnderConstruction());
assertTrue(fileNode.isStriped());
BlockInfo[] blocks = fileNode.getBlocks();
assertEquals(numBlocks, blocks.length);
for (BlockInfo blk : blocks) {
assertTrue(blk.isStriped());
assertTrue(blk.isComplete());
assertEquals(cellSize * dataBlocks,
blk.getNumBytes());
final BlockInfoStriped sb = (BlockInfoStriped) blk;
assertEquals(groupSize, sb.numNodes());
}
final BlockManager bm = cluster.getNamesystem().getBlockManager();
BlockInfo firstBlock = fileNode.getBlocks()[0];
DatanodeStorageInfo[] storageInfos = bm.getStorages(firstBlock);
// make numOfBusy nodes busy
int i = 0;
for (; i < numOfBusy; i++) {
DatanodeDescriptor busyNode = storageInfos[i].getDatanodeDescriptor();
for (int j = 0; j < maxReplicationStreams + 1; j++) {
BlockManagerTestUtil.addBlockToBeReplicated(busyNode, new Block(j),
new DatanodeStorageInfo[]{storageInfos[0]});
}
}
// make numOfMissed internal blocks missed
for (; i < numOfBusy + numOfMissed; i++) {
DatanodeDescriptor missedNode = storageInfos[i].getDatanodeDescriptor();
assertEquals(numBlocks, missedNode.numBlocks());
bm.getDatanodeManager().removeDatanode(missedNode);
}
BlockManagerTestUtil.updateState(bm);
DFSTestUtil.verifyClientStats(conf, cluster);
BlockManagerTestUtil.getComputedDatanodeWork(bm);
// all the reconstruction work will be scheduled on the last DN
DataNode lastDn = cluster.getDataNodes().get(groupSize);
DatanodeDescriptor last =
bm.getDatanodeManager().getDatanode(lastDn.getDatanodeId());
assertEquals("Counting the number of outstanding EC tasks", numBlocks,
last.getNumberOfBlocksToBeErasureCoded());
List<BlockECReconstructionInfo> reconstruction =
last.getErasureCodeCommand(numBlocks);
for (BlockECReconstructionInfo info : reconstruction) {
assertEquals(1, info.getTargetDnInfos().length);
assertEquals(last, info.getTargetDnInfos()[0]);
assertEquals(info.getSourceDnInfos().length,
info.getLiveBlockIndices().length);
if (groupSize - numOfMissed == dataBlocks) {
// It's a QUEUE_HIGHEST_PRIORITY block, so the busy DNs will be chosen
// to make sure we have NUM_DATA_BLOCKS DNs to do reconstruction
// work.
assertEquals(dataBlocks, info.getSourceDnInfos().length);
} else {
// The block has no highest priority, so we don't use the busy DNs as
// sources
assertEquals(groupSize - numOfMissed - numOfBusy,
info.getSourceDnInfos().length);
}
}
BlockManagerTestUtil.updateState(bm);
DFSTestUtil.verifyClientStats(conf, cluster);
} finally {
cluster.shutdown();
}
}
@Test
public void test2RecoveryTasksForSameBlockGroup() throws Exception {
Configuration conf = new HdfsConfiguration();
conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1000);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY,
1000);
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(groupSize + 2)
.build();
try {
cluster.waitActive();
DistributedFileSystem fs = cluster.getFileSystem();
BlockManager bm = cluster.getNamesystem().getBlockManager();
fs.enableErasureCodingPolicy(
StripedFileTestUtil.getDefaultECPolicy().getName());
fs.getClient().setErasureCodingPolicy("/",
StripedFileTestUtil.getDefaultECPolicy().getName());
int fileLen = dataBlocks * blockSize;
Path p = new Path("/test2RecoveryTasksForSameBlockGroup");
final byte[] data = new byte[fileLen];
DFSTestUtil.writeFile(fs, p, data);
DFSTestUtil.waitForReplication(fs, p, groupSize, 5000);
BlockManagerTestUtil.updateState(bm);
DFSTestUtil.verifyClientStats(conf, cluster);
LocatedStripedBlock lb = (LocatedStripedBlock)fs.getClient()
.getLocatedBlocks(p.toString(), 0).get(0);
LocatedBlock[] lbs = StripedBlockUtil.parseStripedBlockGroup(lb,
cellSize, dataBlocks, parityBlocks);
BlockManagerTestUtil.getComputedDatanodeWork(bm);
BlockManagerTestUtil.updateState(bm);
assertEquals(0, getNumberOfBlocksToBeErasureCoded(cluster));
assertEquals(0, bm.getPendingReconstructionBlocksCount());
DFSTestUtil.verifyClientStats(conf, cluster);
// missing 1 block, so 1 task should be scheduled
DatanodeInfo dn0 = lbs[0].getLocations()[0];
cluster.stopDataNode(dn0.getName());
cluster.setDataNodeDead(dn0);
BlockManagerTestUtil.getComputedDatanodeWork(bm);
BlockManagerTestUtil.updateState(bm);
assertEquals(1, getNumberOfBlocksToBeErasureCoded(cluster));
assertEquals(1, bm.getPendingReconstructionBlocksCount());
DFSTestUtil.verifyClientStats(conf, cluster);
// missing another block, but no new task should be scheduled because
// previous task isn't finished.
DatanodeInfo dn1 = lbs[1].getLocations()[0];
cluster.stopDataNode(dn1.getName());
cluster.setDataNodeDead(dn1);
BlockManagerTestUtil.getComputedDatanodeWork(bm);
BlockManagerTestUtil.updateState(bm);
assertEquals(1, getNumberOfBlocksToBeErasureCoded(cluster));
assertEquals(1, bm.getPendingReconstructionBlocksCount());
DFSTestUtil.verifyClientStats(conf, cluster);
} finally {
cluster.shutdown();
}
}
private static int getNumberOfBlocksToBeErasureCoded(MiniDFSCluster cluster)
throws Exception {
DatanodeManager dm =
cluster.getNamesystem().getBlockManager().getDatanodeManager();
int count = 0;
for( DataNode dn : cluster.getDataNodes()){
DatanodeDescriptor dd = dm.getDatanode(dn.getDatanodeId());
count += dd.getNumberOfBlocksToBeErasureCoded();
}
return count;
}
/**
* make sure the NN can detect the scenario where there are enough number of
* internal blocks (>=9 by default) but there is still missing data/parity
* block.
*/
@Test
public void testCountLiveReplicas() throws Exception {
final HdfsConfiguration conf = new HdfsConfiguration();
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY, 1);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(groupSize + 2)
.build();
cluster.waitActive();
DistributedFileSystem fs = cluster.getFileSystem();
fs.enableErasureCodingPolicy(
StripedFileTestUtil.getDefaultECPolicy().getName());
try {
fs.mkdirs(dirPath);
fs.setErasureCodingPolicy(dirPath,
StripedFileTestUtil.getDefaultECPolicy().getName());
DFSTestUtil.createFile(fs, filePath,
cellSize * dataBlocks * 2, (short) 1, 0L);
// stop a dn
LocatedBlocks blks = fs.getClient().getLocatedBlocks(filePath.toString(), 0);
LocatedStripedBlock block = (LocatedStripedBlock) blks.getLastLocatedBlock();
DatanodeInfo dnToStop = block.getLocations()[0];
MiniDFSCluster.DataNodeProperties dnProp =
cluster.stopDataNode(dnToStop.getXferAddr());
cluster.setDataNodeDead(dnToStop);
// wait for reconstruction to happen
DFSTestUtil.waitForReplication(fs, filePath, groupSize, 15 * 1000);
// bring the dn back: 10 internal blocks now
cluster.restartDataNode(dnProp);
cluster.waitActive();
DFSTestUtil.verifyClientStats(conf, cluster);
// stop another dn: 9 internal blocks, but only cover 8 real one
dnToStop = block.getLocations()[1];
cluster.stopDataNode(dnToStop.getXferAddr());
cluster.setDataNodeDead(dnToStop);
// currently namenode is able to track the missing block. but restart NN
cluster.restartNameNode(true);
for (DataNode dn : cluster.getDataNodes()) {
DataNodeTestUtils.triggerBlockReport(dn);
}
FSNamesystem fsn = cluster.getNamesystem();
BlockManager bm = fsn.getBlockManager();
Thread.sleep(3000); // wait 3 running cycles of redundancy monitor
for (DataNode dn : cluster.getDataNodes()) {
DataNodeTestUtils.triggerHeartbeat(dn);
}
// check if NN can detect the missing internal block and finish the
// reconstruction
StripedFileTestUtil.waitForReconstructionFinished(filePath, fs,
groupSize);
boolean reconstructed = false;
for (int i = 0; i < 5; i++) {
NumberReplicas num = null;
fsn.readLock(RwLockMode.GLOBAL);
try {
BlockInfo blockInfo = cluster.getNamesystem().getFSDirectory()
.getINode4Write(filePath.toString()).asFile().getLastBlock();
num = bm.countNodes(blockInfo);
} finally {
fsn.readUnlock(RwLockMode.GLOBAL, "testCountLiveReplicas");
}
if (num.liveReplicas() >= groupSize) {
reconstructed = true;
break;
} else {
Thread.sleep(1000);
}
}
Assert.assertTrue(reconstructed);
blks = fs.getClient().getLocatedBlocks(filePath.toString(), 0);
block = (LocatedStripedBlock) blks.getLastLocatedBlock();
BitSet bitSet = new BitSet(groupSize);
for (byte index : block.getBlockIndices()) {
bitSet.set(index);
}
for (int i = 0; i < groupSize; i++) {
Assert.assertTrue(bitSet.get(i));
}
} finally {
cluster.shutdown();
}
}
@Test(timeout=120000) // 2 min timeout
public void testReconstructionWork() throws Exception {
Configuration conf = new HdfsConfiguration();
conf.setLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, 0);
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 1);
conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, 1);
conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1000);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY,
1000);
conf.setInt(
DFSConfigKeys.DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION,
5);
ErasureCodingPolicy policy = SystemErasureCodingPolicies.getByID(
SystemErasureCodingPolicies.XOR_2_1_POLICY_ID);
Path ecDir = new Path("/ec");
Path ecFilePath = new Path(ecDir, "ec-file");
int blockGroups = 2;
int totalDataNodes = policy.getNumDataUnits() +
policy.getNumParityUnits() + 1;
MiniDFSCluster dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(
totalDataNodes).build();
try {
// create an EC file with 2 block groups
final DistributedFileSystem fs = dfsCluster.getFileSystem();
fs.enableErasureCodingPolicy(policy.getName());
fs.mkdirs(ecDir);
fs.setErasureCodingPolicy(ecDir, policy.getName());
DFSTestUtil.createStripedFile(dfsCluster, ecFilePath, ecDir,
blockGroups, 2, false, policy);
final BlockManager bm = dfsCluster.getNamesystem().getBlockManager();
LocatedBlocks lbs = fs.getClient().getNamenode().getBlockLocations(
ecFilePath.toString(), 0, blockGroups);
assert lbs.get(0) instanceof LocatedStripedBlock;
LocatedStripedBlock bg = (LocatedStripedBlock) (lbs.get(0));
Iterator<DatanodeStorageInfo> storageInfos =
bm.getStorages(bg.getBlock().getLocalBlock()).iterator();
DatanodeDescriptor firstDn = storageInfos.next().getDatanodeDescriptor();
BlockManagerTestUtil.updateState(bm);
DFSTestUtil.verifyClientStats(conf, dfsCluster);
// Remove one of the DataUnit nodes
bm.getDatanodeManager().removeDatanode(firstDn);
// Verify low redundancy count matching EC block groups count
BlockManagerTestUtil.updateState(bm);
assertEquals(blockGroups, bm.getLowRedundancyECBlockGroups());
DFSTestUtil.verifyClientStats(conf, dfsCluster);
// Trigger block group reconstruction
BlockManagerTestUtil.getComputedDatanodeWork(bm);
BlockManagerTestUtil.updateState(bm);
// Verify pending reconstruction count
assertEquals(blockGroups, getNumberOfBlocksToBeErasureCoded(dfsCluster));
assertEquals(0, bm.getLowRedundancyECBlockGroups());
DFSTestUtil.verifyClientStats(conf, dfsCluster);
} finally {
dfsCluster.shutdown();
}
}
private byte[] writeStripedFile(DistributedFileSystem fs, Path ecFile,
int writeBytes) throws Exception {
byte[] bytes = StripedFileTestUtil.generateBytes(writeBytes);
DFSTestUtil.writeFile(fs, ecFile, new String(bytes));
StripedFileTestUtil.waitBlockGroupsReported(fs, ecFile.toString());
return bytes;
}
@Test
public void testReconstrutionWithBusyBlock1() throws Exception {
//When the index of busy block is smaller than the missing block
//[0(busy),1(busy),3,4,5,6,7,8]
int busyNodeIndex1 = 0;
int busyNodeIndex2 = 1;
int deadNodeIndex = 2;
final Path ecDir = new Path(GenericTestUtils.getRandomizedTempPath());
final Path ecFile = new Path(ecDir, "testReconstrutionWithBusyBlock1");
int writeBytes = cellSize * dataBlocks;
HdfsConfiguration conf = new HdfsConfiguration();
initConf(conf);
conf.setBoolean(DFSConfigKeys.DFS_DISK_BALANCER_ENABLED, false);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
2000);
conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY, 1);
conf.setInt(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY,
1000);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY,
4);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY,
1);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(groupSize + 5)
.build();
cluster.waitActive();
DistributedFileSystem dfs = cluster.getFileSystem(0);
dfs.enableErasureCodingPolicy(
StripedFileTestUtil.getDefaultECPolicy().getName());
dfs.mkdirs(ecDir);
dfs.setErasureCodingPolicy(ecDir,
StripedFileTestUtil.getDefaultECPolicy().getName());
byte[] originBytesArray = writeStripedFile(dfs, ecFile, writeBytes);
List<LocatedBlock> lbs = ((HdfsDataInputStream) dfs.open(ecFile))
.getAllBlocks();
LocatedStripedBlock lsb = (LocatedStripedBlock) lbs.get(0);
DatanodeInfo[] dnList = lsb.getLocations();
BlockManager bm = cluster.getNamesystem().getBlockManager();
BlockInfoStriped blockInfo =
(BlockInfoStriped) bm.getStoredBlock(
new Block(lsb.getBlock().getBlockId()));
//1.Make nodes busy
DatanodeDescriptor busyNode = bm.getDatanodeManager()
.getDatanode(dnList[busyNodeIndex1].getDatanodeUuid());
for (int j = 0; j < maxReplicationStreams; j++) {
busyNode.incrementPendingReplicationWithoutTargets();
}
DatanodeDescriptor busyNode2 = bm.getDatanodeManager()
.getDatanode(dnList[busyNodeIndex2].getDatanodeUuid());
for (int j = 0; j < maxReplicationStreams; j++) {
busyNode2.incrementPendingReplicationWithoutTargets();
}
//2.Make a node missing
DataNode dn = cluster.getDataNode(dnList[deadNodeIndex].getIpcPort());
cluster.stopDataNode(dnList[deadNodeIndex].getXferAddr());
cluster.setDataNodeDead(dn.getDatanodeId());
//3.Whether there is excess replicas or not during the recovery?
assertEquals(8, bm.countNodes(blockInfo).liveReplicas());
GenericTestUtils.waitFor(
() -> {
return bm.countNodes(blockInfo).liveReplicas() == 9||
bm.countNodes(blockInfo).excessReplicas() >= 1||
bm.countNodes(blockInfo).redundantInternalBlocks() >= 1;
},
10, 100000);
assertEquals(0, bm.countNodes(blockInfo).excessReplicas());
assertEquals(9, bm.countNodes(blockInfo).liveReplicas());
}
@Test
public void testReconstructionWithStorageTypeNotEnough() throws Exception {
final HdfsConfiguration conf = new HdfsConfiguration();
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY, 1);
// Nine disk node eleven archive node.
int numDn = groupSize * 2 + 2;
StorageType[][] storageTypes = new StorageType[numDn][];
Arrays.fill(storageTypes, 0, groupSize,
new StorageType[]{StorageType.DISK, StorageType.DISK});
Arrays.fill(storageTypes, groupSize, numDn,
new StorageType[]{StorageType.ARCHIVE, StorageType.ARCHIVE});
// Nine disk racks and one archive rack.
String[] racks = {
"/rack1", "/rack2", "/rack3", "/rack4", "/rack5", "/rack6", "/rack7", "/rack8",
"/rack9", "/rack0", "/rack0", "/rack0", "/rack0", "/rack0", "/rack0", "/rack0",
"/rack0", "/rack0", "/rack0", "/rack0"};
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDn)
.storageTypes(storageTypes)
.racks(racks)
.build();
cluster.waitActive();
DistributedFileSystem fs = cluster.getFileSystem();
fs.enableErasureCodingPolicy(
StripedFileTestUtil.getDefaultECPolicy().getName());
try {
fs.mkdirs(dirPath);
fs.setStoragePolicy(dirPath, "COLD");
fs.setErasureCodingPolicy(dirPath,
StripedFileTestUtil.getDefaultECPolicy().getName());
DFSTestUtil.createFile(fs, filePath,
cellSize * dataBlocks * 2, (short) 1, 0L);
// Stop one dn.
LocatedBlocks blks = fs.getClient().getLocatedBlocks(filePath.toString(), 0);
LocatedStripedBlock block = (LocatedStripedBlock) blks.getLastLocatedBlock();
DatanodeInfo dnToStop = block.getLocations()[0];
cluster.stopDataNode(dnToStop.getXferAddr());
cluster.setDataNodeDead(dnToStop);
// Wait for reconstruction to happen.
StripedFileTestUtil.waitForReconstructionFinished(filePath, fs, groupSize);
blks = fs.getClient().getLocatedBlocks(filePath.toString(), 0);
block = (LocatedStripedBlock) blks.getLastLocatedBlock();
BitSet bitSet = new BitSet(groupSize);
for (byte index : block.getBlockIndices()) {
bitSet.set(index);
}
for (int i = 0; i < groupSize; i++) {
Assert.assertTrue(bitSet.get(i));
}
} finally {
cluster.shutdown();
}
}
}