TestBlocksWithNotEnoughRacks.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeoutException;
import org.apache.hadoop.hdfs.server.datanode.InternalDataNodeTestUtils;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.hdfs.util.HostsFileWriter;
import org.apache.hadoop.hdfs.util.RwLockMode;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Test;
import org.slf4j.event.Level;
import static org.junit.Assert.*;
public class TestBlocksWithNotEnoughRacks {
public static final Logger LOG =
LoggerFactory.getLogger(TestBlocksWithNotEnoughRacks.class);
static {
GenericTestUtils.setLogLevel(FSNamesystem.LOG, Level.TRACE);
GenericTestUtils.setLogLevel(LOG, Level.TRACE);
}
/*
* Return a configuration object with low timeouts for testing and
* a topology script set (which enables rack awareness).
*/
private Configuration getConf() {
Configuration conf = new HdfsConfiguration();
// Lower the heart beat interval so the NN quickly learns of dead
// or decommissioned DNs and the NN issues replication and invalidation
// commands quickly (as replies to heartbeats)
conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L);
// Have the NN RedundancyMonitor compute the reconstruction and
// invalidation commands to send DNs every second.
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY, 1);
// Have the NN check for pending replications every second so it
// quickly schedules additional replicas as they are identified.
conf.setInt(DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY, 1);
// The DNs report blocks every second.
conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 1000L);
// Indicates we have multiple racks
conf.set(DFSConfigKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY, "xyz");
return conf;
}
/*
* Creates a block with all datanodes on the same rack, though the block
* is sufficiently replicated. Adds an additional datanode on a new rack.
* The block should be replicated to the new rack.
*/
@Test
public void testSufficientlyReplBlocksUsesNewRack() throws Exception {
Configuration conf = getConf();
final short REPLICATION_FACTOR = 3;
final Path filePath = new Path("/testFile");
// All datanodes are on the same rack
String racks[] = {"/rack1", "/rack1", "/rack1"};
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(racks.length).racks(racks).build();
try {
// Create a file with one block with a replication factor of 3
final FileSystem fs = cluster.getFileSystem();
DFSTestUtil.createFile(fs, filePath, 1L, REPLICATION_FACTOR, 1L);
ExtendedBlock b = DFSTestUtil.getFirstBlock(fs, filePath);
DFSTestUtil.waitForReplication(cluster, b, 1, REPLICATION_FACTOR, 0);
// Add a new datanode on a different rack
String newRacks[] = {"/rack2"};
cluster.startDataNodes(conf, 1, true, null, newRacks);
cluster.waitActive();
DFSTestUtil.waitForReplication(cluster, b, 2, REPLICATION_FACTOR, 0);
} finally {
cluster.shutdown();
}
}
/*
* Like the previous test but the block starts with a single replica,
* and therefore unlike the previous test the block does not start
* off needing replicas.
*/
@Test
public void testSufficientlySingleReplBlockUsesNewRack() throws Exception {
Configuration conf = getConf();
short REPLICATION_FACTOR = 1;
final Path filePath = new Path("/testFile");
String racks[] = {"/rack1", "/rack1", "/rack1", "/rack2"};
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(racks.length).racks(racks).build();
final FSNamesystem ns = cluster.getNameNode().getNamesystem();
try {
// Create a file with one block with a replication factor of 1
final FileSystem fs = cluster.getFileSystem();
DFSTestUtil.createFile(fs, filePath, 1L, REPLICATION_FACTOR, 1L);
ExtendedBlock b = DFSTestUtil.getFirstBlock(fs, filePath);
DFSTestUtil.waitForReplication(cluster, b, 1, REPLICATION_FACTOR, 0);
REPLICATION_FACTOR = 2;
NameNodeAdapter.setReplication(ns, "/testFile", REPLICATION_FACTOR);
DFSTestUtil.waitForReplication(cluster, b, 2, REPLICATION_FACTOR, 0);
} finally {
cluster.shutdown();
}
}
/*
* Initialize a cluster with datanodes on two different racks and shutdown
* all datanodes on one rack. Now create a file with a single block. Even
* though the block is sufficiently replicated, it violates the replica
* placement policy. Now restart the datanodes stopped earlier. Run the fsck
* command with -replicate option to schedule the replication of these
* mis-replicated blocks and verify if it indeed works as expected.
*/
@Test
public void testMisReplicatedBlockUsesNewRack() throws Exception {
Configuration conf = getConf();
conf.setInt("dfs.namenode.heartbeat.recheck-interval", 500);
final short replicationFactor = 3;
final Path filePath = new Path("/testFile");
// All datanodes are on two different racks
String[] racks = new String[]{"/rack1", "/rack1", "/rack1", "/rack2"};
try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(racks.length).racks(racks).build()) {
cluster.waitActive();
String poolId = cluster.getNamesystem().getBlockPoolId();
DatanodeRegistration reg = InternalDataNodeTestUtils.
getDNRegistrationForBP(cluster.getDataNodes().get(3), poolId);
// Shutdown datanode on rack2 and wait for it to be marked dead
cluster.stopDataNode(3);
DFSTestUtil.waitForDatanodeState(cluster, reg.getDatanodeUuid(),
false, 20000);
// Create a file with one block with a replication factor of 3
final FileSystem fs = cluster.getFileSystem();
DFSTestUtil.createFile(fs, filePath, 1L, replicationFactor, 1L);
ExtendedBlock b = DFSTestUtil.getFirstBlock(fs, filePath);
DFSTestUtil.waitReplication(cluster.getFileSystem(), filePath,
replicationFactor);
// Add datanode on rack2 and wait for it be recognized as alive by NN
cluster.startDataNodes(conf, 1, true,
null, new String[]{"/rack2"});
cluster.waitActive();
try {
DFSTestUtil.waitForReplication(cluster, b, 2, replicationFactor, 0);
fail("NameNode should not have fixed the mis-replicated blocks" +
" automatically.");
} catch (TimeoutException e) {
//Expected.
}
String fsckOp = DFSTestUtil.runFsck(conf, 0, true, filePath.toString(),
"-replicate");
LOG.info("fsck response {}", fsckOp);
assertTrue(fsckOp.contains(
"/testFile: Replica placement policy is violated"));
assertTrue(fsckOp.contains(" Block should be additionally replicated" +
" on 1 more rack(s). Total number of racks in the cluster: 2"));
assertTrue(fsckOp.contains(" Blocks queued for replication:\t1"));
try {
DFSTestUtil.waitForReplication(cluster, b, 2, replicationFactor, 0);
} catch (TimeoutException e) {
fail("NameNode should have fixed the mis-replicated blocks as a" +
" result of fsck command.");
}
}
}
/*
* Creates a block with all datanodes on the same rack. Add additional
* datanodes on a different rack and increase the replication factor,
* making sure there are enough replicas across racks. If the previous
* test passes this one should too, however this test may pass when
* the previous one fails because the replication code is explicitly
* triggered by setting the replication factor.
*/
@Test
public void testUnderReplicatedUsesNewRacks() throws Exception {
Configuration conf = getConf();
short REPLICATION_FACTOR = 3;
final Path filePath = new Path("/testFile");
// All datanodes are on the same rack
String racks[] = {"/rack1", "/rack1", "/rack1", "/rack1", "/rack1"};
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(racks.length).racks(racks).build();
final FSNamesystem ns = cluster.getNameNode().getNamesystem();
try {
// Create a file with one block
final FileSystem fs = cluster.getFileSystem();
DFSTestUtil.createFile(fs, filePath, 1L, REPLICATION_FACTOR, 1L);
ExtendedBlock b = DFSTestUtil.getFirstBlock(fs, filePath);
DFSTestUtil.waitForReplication(cluster, b, 1, REPLICATION_FACTOR, 0);
// Add new datanodes on a different rack and increase the
// replication factor so the block is underreplicated and make
// sure at least one of the hosts on the new rack is used.
String newRacks[] = {"/rack2", "/rack2"};
cluster.startDataNodes(conf, 2, true, null, newRacks);
REPLICATION_FACTOR = 5;
NameNodeAdapter.setReplication(ns, "/testFile", REPLICATION_FACTOR);
DFSTestUtil.waitForReplication(cluster, b, 2, REPLICATION_FACTOR, 0);
} finally {
cluster.shutdown();
}
}
/*
* Test that a block that is re-replicated because one of its replicas
* is found to be corrupt and is re-replicated across racks.
*/
@Test
public void testCorruptBlockRereplicatedAcrossRacks() throws Exception {
Configuration conf = getConf();
short REPLICATION_FACTOR = 2;
int fileLen = 512;
final Path filePath = new Path("/testFile");
// Datanodes are spread across two racks
String racks[] = {"/rack1", "/rack1", "/rack2", "/rack2"};
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(racks.length).racks(racks).build();
final FSNamesystem ns = cluster.getNameNode().getNamesystem();
try {
// Create a file with one block with a replication factor of 2
final FileSystem fs = cluster.getFileSystem();
DFSTestUtil.createFile(fs, filePath, fileLen, REPLICATION_FACTOR, 1L);
final byte[] fileContent = DFSTestUtil.readFileAsBytes(fs, filePath);
ExtendedBlock b = DFSTestUtil.getFirstBlock(fs, filePath);
DFSTestUtil.waitForReplication(cluster, b, 2, REPLICATION_FACTOR, 0);
// Corrupt a replica of the block
int dnToCorrupt = DFSTestUtil.firstDnWithBlock(cluster, b);
cluster.corruptReplica(dnToCorrupt, b);
// Restart the datanode so blocks are re-scanned, and the corrupt
// block is detected.
cluster.restartDataNode(dnToCorrupt);
// Wait for the namenode to notice the corrupt replica
DFSTestUtil.waitCorruptReplicas(fs, ns, filePath, b, 1);
// The rack policy is still respected
DFSTestUtil.waitForReplication(cluster, b, 2, REPLICATION_FACTOR, 0);
// Ensure all replicas are valid (the corrupt replica may not
// have been cleaned up yet).
for (int i = 0; i < racks.length; i++) {
byte[] blockContent = cluster.readBlockOnDataNodeAsBytes(i, b);
if (blockContent != null && i != dnToCorrupt) {
assertArrayEquals("Corrupt replica", fileContent, blockContent);
}
}
} finally {
cluster.shutdown();
}
}
/*
* Reduce the replication factor of a file, making sure that the only
* cross rack replica is not removed when deleting replicas.
*/
@Test
public void testReduceReplFactorRespectsRackPolicy() throws Exception {
Configuration conf = getConf();
short REPLICATION_FACTOR = 3;
final Path filePath = new Path("/testFile");
String racks[] = {"/rack1", "/rack1", "/rack2", "/rack2"};
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(racks.length).racks(racks).build();
final FSNamesystem ns = cluster.getNameNode().getNamesystem();
try {
// Create a file with one block
final FileSystem fs = cluster.getFileSystem();
DFSTestUtil.createFile(fs, filePath, 1L, REPLICATION_FACTOR, 1L);
ExtendedBlock b = DFSTestUtil.getFirstBlock(fs, filePath);
DFSTestUtil.waitForReplication(cluster, b, 2, REPLICATION_FACTOR, 0);
// Decrease the replication factor, make sure the deleted replica
// was not the one that lived on the rack with only one replica,
// ie we should still have 2 racks after reducing the repl factor.
REPLICATION_FACTOR = 2;
NameNodeAdapter.setReplication(ns, "/testFile", REPLICATION_FACTOR);
DFSTestUtil.waitForReplication(cluster, b, 2, REPLICATION_FACTOR, 0);
} finally {
cluster.shutdown();
}
}
/*
* Test that when a block is replicated because a replica is lost due
* to host failure the the rack policy is preserved.
*/
@Test
public void testReplDueToNodeFailRespectsRackPolicy() throws Exception {
Configuration conf = getConf();
short REPLICATION_FACTOR = 3;
final Path filePath = new Path("/testFile");
// Last datanode is on a different rack
String racks[] = {"/rack1", "/rack1", "/rack1", "/rack2", "/rack2"};
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(racks.length).racks(racks).build();
final FSNamesystem ns = cluster.getNameNode().getNamesystem();
final DatanodeManager dm = ns.getBlockManager().getDatanodeManager();
try {
// Create a file with one block with a replication factor of 3
final FileSystem fs = cluster.getFileSystem();
DFSTestUtil.createFile(fs, filePath, 1L, REPLICATION_FACTOR, 1L);
ExtendedBlock b = DFSTestUtil.getFirstBlock(fs, filePath);
DFSTestUtil.waitForReplication(cluster, b, 2, REPLICATION_FACTOR, 0);
// Make the last datanode look like it failed to heartbeat by
// calling removeDatanode and stopping it.
ArrayList<DataNode> datanodes = cluster.getDataNodes();
int idx = datanodes.size() - 1;
DataNode dataNode = datanodes.get(idx);
DatanodeID dnId = dataNode.getDatanodeId();
cluster.stopDataNode(idx);
dm.removeDatanode(dnId);
// The block should still have sufficient # replicas, across racks.
// The last node may not have contained a replica, but if it did
// it should have been replicated within the same rack.
DFSTestUtil.waitForReplication(cluster, b, 2, REPLICATION_FACTOR, 0);
// Fail the last datanode again, it's also on rack2 so there is
// only 1 rack for all the replicas
datanodes = cluster.getDataNodes();
idx = datanodes.size() - 1;
dataNode = datanodes.get(idx);
dnId = dataNode.getDatanodeId();
cluster.stopDataNode(idx);
dm.removeDatanode(dnId);
// Make sure we have enough live replicas even though we are
// short one rack. The cluster now has only 1 rack thus we just make sure
// we still have 3 replicas.
DFSTestUtil.waitForReplication(cluster, b, 1, REPLICATION_FACTOR, 0);
} finally {
cluster.shutdown();
}
}
/*
* Test that when the excess replicas of a block are reduced due to
* a node re-joining the cluster the rack policy is not violated.
*/
@Test
public void testReduceReplFactorDueToRejoinRespectsRackPolicy()
throws Exception {
Configuration conf = getConf();
short REPLICATION_FACTOR = 2;
final Path filePath = new Path("/testFile");
// Last datanode is on a different rack
String racks[] = {"/rack1", "/rack1", "/rack2"};
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(racks.length).racks(racks).build();
final FSNamesystem ns = cluster.getNameNode().getNamesystem();
final DatanodeManager dm = ns.getBlockManager().getDatanodeManager();
try {
// Create a file with one block
final FileSystem fs = cluster.getFileSystem();
DFSTestUtil.createFile(fs, filePath, 1L, REPLICATION_FACTOR, 1L);
ExtendedBlock b = DFSTestUtil.getFirstBlock(fs, filePath);
DFSTestUtil.waitForReplication(cluster, b, 2, REPLICATION_FACTOR, 0);
// Make the last (cross rack) datanode look like it failed
// to heartbeat by stopping it and calling removeDatanode.
ArrayList<DataNode> datanodes = cluster.getDataNodes();
assertEquals(3, datanodes.size());
DataNode dataNode = datanodes.get(2);
DatanodeID dnId = dataNode.getDatanodeId();
cluster.stopDataNode(2);
dm.removeDatanode(dnId);
// The block gets re-replicated to another datanode so it has a
// sufficient # replicas, but not across racks, so there should
// be 1 rack.
DFSTestUtil.waitForReplication(cluster, b, 1, REPLICATION_FACTOR, 0);
// Start the "failed" datanode, which has a replica so the block is
// now over-replicated and therefore a replica should be removed but
// not on the restarted datanode as that would violate the rack policy.
String rack2[] = {"/rack2"};
cluster.startDataNodes(conf, 1, true, null, rack2);
cluster.waitActive();
// The block now has sufficient # replicas, across racks
DFSTestUtil.waitForReplication(cluster, b, 2, REPLICATION_FACTOR, 0);
} finally {
cluster.shutdown();
}
}
/*
* Test that rack policy is still respected when blocks are replicated
* due to node decommissioning.
*/
@Test
public void testNodeDecomissionRespectsRackPolicy() throws Exception {
Configuration conf = getConf();
short REPLICATION_FACTOR = 2;
final Path filePath = new Path("/testFile");
HostsFileWriter hostsFileWriter = new HostsFileWriter();
hostsFileWriter.initialize(conf, "temp/decommission");
// Two blocks and four racks
String racks[] = {"/rack1", "/rack1", "/rack2", "/rack2"};
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(racks.length).racks(racks).build();
final FSNamesystem ns = cluster.getNameNode().getNamesystem();
try {
// Create a file with one block
final FileSystem fs = cluster.getFileSystem();
DFSTestUtil.createFile(fs, filePath, 1L, REPLICATION_FACTOR, 1L);
ExtendedBlock b = DFSTestUtil.getFirstBlock(fs, filePath);
DFSTestUtil.waitForReplication(cluster, b, 2, REPLICATION_FACTOR, 0);
// Decommission one of the hosts with the block, this should cause
// the block to get replicated to another host on the same rack,
// otherwise the rack policy is violated.
BlockLocation locs[] = fs.getFileBlockLocations(
fs.getFileStatus(filePath), 0, Long.MAX_VALUE);
String name = locs[0].getNames()[0];
hostsFileWriter.initExcludeHost(name);
ns.getBlockManager().getDatanodeManager().refreshNodes(conf);
DFSTestUtil.waitForDecommission(fs, name);
// Check the block still has sufficient # replicas across racks
DFSTestUtil.waitForReplication(cluster, b, 2, REPLICATION_FACTOR, 0);
} finally {
cluster.shutdown();
hostsFileWriter.cleanup();
}
}
/*
* Test that rack policy is still respected when blocks are replicated
* due to node decommissioning, when the blocks are over-replicated.
*/
@Test
public void testNodeDecomissionWithOverreplicationRespectsRackPolicy()
throws Exception {
Configuration conf = getConf();
short REPLICATION_FACTOR = 5;
final Path filePath = new Path("/testFile");
HostsFileWriter hostsFileWriter = new HostsFileWriter();
hostsFileWriter.initialize(conf, "temp/decommission");
// All hosts are on two racks, only one host on /rack2
String racks[] = {"/rack1", "/rack2", "/rack1", "/rack1", "/rack1"};
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(racks.length).racks(racks).build();
final FSNamesystem ns = cluster.getNameNode().getNamesystem();
try {
final FileSystem fs = cluster.getFileSystem();
DFSTestUtil.createFile(fs, filePath, 1L, REPLICATION_FACTOR, 1L);
ExtendedBlock b = DFSTestUtil.getFirstBlock(fs, filePath);
DFSTestUtil.waitForReplication(cluster, b, 2, REPLICATION_FACTOR, 0);
// Lower the replication factor so the blocks are over replicated
REPLICATION_FACTOR = 2;
fs.setReplication(filePath, REPLICATION_FACTOR);
// Decommission one of the hosts with the block that is not on
// the lone host on rack2 (if we decomission that host it would
// be impossible to respect the rack policy).
BlockLocation locs[] = fs.getFileBlockLocations(
fs.getFileStatus(filePath), 0, Long.MAX_VALUE);
for (String top : locs[0].getTopologyPaths()) {
if (!top.startsWith("/rack2")) {
String name = top.substring("/rack1".length()+1);
hostsFileWriter.initExcludeHost(name);
ns.getBlockManager().getDatanodeManager().refreshNodes(conf);
DFSTestUtil.waitForDecommission(fs, name);
break;
}
}
// Check the block still has sufficient # replicas across racks,
// ie we didn't remove the replica on the host on /rack1.
DFSTestUtil.waitForReplication(cluster, b, 2, REPLICATION_FACTOR, 0);
} finally {
cluster.shutdown();
hostsFileWriter.cleanup();
}
}
@Test
public void testMultipleReplicasScheduledForUpgradeDomain() throws Exception {
Configuration conf = getConf();
final short replicationFactor = 3;
final Path filePath = new Path("/testFile");
conf.set("dfs.block.replicator.classname",
"org.apache.hadoop.hdfs.server.blockmanagement." +
"BlockPlacementPolicyWithUpgradeDomain");
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(6).build();
cluster.waitClusterUp();
List<DatanodeDescriptor> dnDescriptors = getDnDescriptors(cluster);
try {
// Create a file with one block with a replication factor of 3
// No upgrade domains are set.
final FileSystem fs = cluster.getFileSystem();
DFSTestUtil.createFile(fs, filePath, 1L, replicationFactor, 1L);
ExtendedBlock b = DFSTestUtil.getFirstBlock(fs, filePath);
BlockManager bm = cluster.getNamesystem().getBlockManager();
BlockInfo storedBlock = bm.getStoredBlock(b.getLocalBlock());
// The block should be replicated OK - so Reconstruction Work will be null
BlockReconstructionWork work = scheduleReconstruction(
cluster.getNamesystem(), storedBlock, 2);
assertNull(work);
// Set the upgradeDomain to "3" for the 3 nodes hosting the block.
// Then alternately set the remaining 3 nodes to have an upgradeDomain
// of 0 or 1 giving a total of 3 upgradeDomains.
for (int i=0; i<storedBlock.getReplication(); i++) {
storedBlock.getDatanode(i).setUpgradeDomain("3");
}
int udInd = 0;
for (DatanodeDescriptor d : dnDescriptors) {
if (d.getUpgradeDomain() == null) {
d.setUpgradeDomain(Integer.toString(udInd % 2));
udInd++;
}
}
// Now reconWork is non-null and 2 extra targets are needed
work = scheduleReconstruction(
cluster.getNamesystem(), storedBlock, 2);
assertEquals(2, work.getAdditionalReplRequired());
// Add the block to the replication queue and ensure it is replicated
// correctly.
bm.neededReconstruction.add(storedBlock, 3, 0, 0, replicationFactor);
DFSTestUtil.waitForReplication(cluster, b, 1, replicationFactor, 0, 3);
} finally {
cluster.shutdown();
}
}
static BlockReconstructionWork scheduleReconstruction(
FSNamesystem fsn, BlockInfo block, int priority) {
fsn.writeLock(RwLockMode.BM);
try {
return fsn.getBlockManager().scheduleReconstruction(block, priority);
} finally {
fsn.writeUnlock(RwLockMode.BM, "scheduleReconstruction");
}
}
@Test
public void testUnderReplicatedRespectsRacksAndUpgradeDomain()
throws Exception {
Configuration conf = getConf();
final short replicationFactor = 3;
final Path filePath = new Path("/testFile");
conf.set("dfs.block.replicator.classname",
"org.apache.hadoop.hdfs.server.blockmanagement." +
"BlockPlacementPolicyWithUpgradeDomain");
// All hosts are on two racks
String[] racks = {"/r1", "/r1", "/r1", "/r2", "/r2", "/r2"};
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(6).racks(racks).build();
cluster.waitClusterUp();
List<DatanodeDescriptor> dnDescriptors = getDnDescriptors(cluster);
for (int i=0; i < dnDescriptors.size(); i++) {
dnDescriptors.get(i).setUpgradeDomain(Integer.toString(i%3));
}
try {
final FileSystem fs = cluster.getFileSystem();
DFSTestUtil.createFile(fs, filePath, 1L, (short)1, 1L);
ExtendedBlock b = DFSTestUtil.getFirstBlock(fs, filePath);
fs.setReplication(filePath, replicationFactor);
DFSTestUtil.waitForReplication(cluster, b, 2, replicationFactor, 0, 3);
} finally {
cluster.shutdown();
}
}
private List<DatanodeDescriptor> getDnDescriptors(MiniDFSCluster cluster)
throws IOException {
List<DatanodeDescriptor> dnDesc = new ArrayList<>();
DatanodeManager dnManager = cluster.getNamesystem().getBlockManager()
.getDatanodeManager();
for (DataNode dn : cluster.getDataNodes()) {
DatanodeDescriptor d = dnManager.getDatanode(dn.getDatanodeUuid());
if (d == null) {
throw new IOException("DatanodeDescriptor not found for DN "+
dn.getDatanodeUuid());
}
dnDesc.add(d);
}
return dnDesc;
}
}