TestReplicationPolicyWithUpgradeDomain.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.TestBlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.net.Node;
import org.junit.Test;
public class TestReplicationPolicyWithUpgradeDomain
extends BaseReplicationPolicyTest {
public TestReplicationPolicyWithUpgradeDomain() {
this.blockPlacementPolicy =
BlockPlacementPolicyWithUpgradeDomain.class.getName();
}
@Override
DatanodeDescriptor[] getDatanodeDescriptors(Configuration conf) {
final String[] racks = {
"/d1/r1",
"/d1/r1",
"/d1/r1",
"/d1/r2",
"/d1/r2",
"/d1/r2",
"/d1/r3",
"/d1/r3",
"/d1/r3"};
storages = DFSTestUtil.createDatanodeStorageInfos(racks);
DatanodeDescriptor dataNodes[] =
DFSTestUtil.toDatanodeDescriptor(storages);
for (int i=0; i < dataNodes.length; i++) {
// each rack has 3 DNs with upgrade domain id 1,2,3 respectively.
String upgradeDomain = Integer.toString((i%3)+1);
dataNodes[i].setUpgradeDomain(upgradeDomain);
}
return dataNodes;
}
/**
* Verify the targets are chosen to honor both
* rack and upgrade domain policies when number of replica is
* 0, 1, 2, 3, 4 respectively.
* @throws Exception
*/
@Test
public void testChooseTarget1() throws Exception {
updateHeartbeatWithUsage(dataNodes[0],
2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
0L, 0L, 4, 0);
DatanodeStorageInfo[] targets;
targets = chooseTarget(0);
assertEquals(targets.length, 0);
targets = chooseTarget(1);
assertEquals(targets.length, 1);
assertEquals(storages[0], targets[0]);
targets = chooseTarget(2);
assertEquals(targets.length, 2);
assertEquals(storages[0], targets[0]);
assertFalse(isOnSameRack(targets[0], targets[1]));
assertEquals(getUpgradeDomains(targets).size(), 2);
targets = chooseTarget(3);
assertEquals(targets.length, 3);
assertEquals(storages[0], targets[0]);
assertFalse(isOnSameRack(targets[0], targets[1]));
assertTrue(isOnSameRack(targets[1], targets[2]));
assertEquals(getUpgradeDomains(targets).size(), 3);
targets = chooseTarget(4);
assertEquals(targets.length, 4);
assertEquals(storages[0], targets[0]);
assertTrue(isOnSameRack(targets[1], targets[2]) ||
isOnSameRack(targets[2], targets[3]));
assertFalse(isOnSameRack(targets[0], targets[2]));
assertEquals(getUpgradeDomains(targets).size(), 3);
updateHeartbeatWithUsage(dataNodes[0],
2*HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, 0);
}
/**
* Verify the rack and upgrade domain policies when excludeNodes are
* specified.
* @throws Exception
*/
@Test
public void testChooseTargetWithExcludeNodes() throws Exception {
Set<Node> excludedNodes = new HashSet<>();
DatanodeStorageInfo[] targets;
List<DatanodeStorageInfo> chosenNodes = new ArrayList<>();
excludedNodes.clear();
chosenNodes.clear();
excludedNodes.add(dataNodes[4]);
targets = chooseTarget(3, chosenNodes, excludedNodes);
assertEquals(targets.length, 3);
assertEquals(storages[0], targets[0]);
assertEquals(getRacks(targets).size(), 2);
assertEquals(getUpgradeDomains(targets).size(), 3);
excludedNodes.clear();
chosenNodes.clear();
excludedNodes.add(dataNodes[4]);
excludedNodes.add(dataNodes[8]);
targets = chooseTarget(3, chosenNodes, excludedNodes);
assertEquals(targets.length, 3);
assertEquals(storages[0], targets[0]);
assertEquals(getRacks(targets).size(), 2);
assertEquals(getUpgradeDomains(targets).size(), 3);
excludedNodes.clear();
chosenNodes.clear();
excludedNodes.add(dataNodes[4]);
excludedNodes.add(dataNodes[5]);
excludedNodes.add(dataNodes[8]);
targets = chooseTarget(3, chosenNodes, excludedNodes);
assertEquals(targets.length, 3);
assertEquals(storages[0], targets[0]);
assertEquals(storages[2], targets[1]);
assertEquals(storages[7], targets[2]);
excludedNodes.clear();
chosenNodes.clear();
excludedNodes.add(dataNodes[4]);
targets = chooseTarget(4, chosenNodes, excludedNodes);
assertEquals(targets.length, 4);
assertEquals(storages[0], targets[0]);
assertTrue(getRacks(targets).size()>=2);
assertEquals(getUpgradeDomains(targets).size(), 3);
excludedNodes.clear();
chosenNodes.clear();
excludedNodes.add(dataNodes[4]);
excludedNodes.add(dataNodes[8]);
targets = chooseTarget(4, chosenNodes, excludedNodes);
assertEquals(targets.length, 4);
assertEquals(storages[0], targets[0]);
assertTrue(getRacks(targets).size()>=2);
assertEquals(getUpgradeDomains(targets).size(), 3);
excludedNodes.clear();
chosenNodes.clear();
excludedNodes.add(dataNodes[1]);
chosenNodes.add(storages[2]);
targets = replicator.chooseTarget(filename, 1, dataNodes[0], chosenNodes,
true, excludedNodes, BLOCK_SIZE,
TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY, null);
System.out.println("targets=" + Arrays.asList(targets));
assertEquals(2, targets.length);
}
/**
* Test the scenario where not enough replicas can't satisfy the policy.
* @throws Exception
*/
@Test
public void testChooseTargetWithoutEnoughReplica() throws Exception {
Set<Node> excludedNodes = new HashSet<>();
DatanodeStorageInfo[] targets;
List<DatanodeStorageInfo> chosenNodes = new ArrayList<>();
excludedNodes.clear();
chosenNodes.clear();
excludedNodes.add(dataNodes[4]);
excludedNodes.add(dataNodes[5]);
excludedNodes.add(dataNodes[7]);
excludedNodes.add(dataNodes[8]);
targets = chooseTarget(3, chosenNodes, excludedNodes);
assertEquals(targets.length, 2);
assertEquals(storages[0], targets[0]);
assertTrue(targets[1].equals(storages[1]) ||
targets[1].equals(storages[2]));
}
/**
* Test block placement verification.
* @throws Exception
*/
@Test
public void testVerifyBlockPlacement() throws Exception {
LocatedBlock locatedBlock;
BlockPlacementStatus status;
ExtendedBlock b = new ExtendedBlock("fake-pool", new Block(12345L));
List<DatanodeStorageInfo> set = new ArrayList<>();
// 2 upgrade domains (not enough), 2 racks (enough)
set.clear();
set.add(storages[0]);
set.add(storages[1]);
set.add(storages[4]);
locatedBlock = BlockManager.newLocatedBlock(b,
set.toArray(new DatanodeStorageInfo[set.size()]), 0, false);
status = replicator.verifyBlockPlacement(locatedBlock.getLocations(),
set.size());
assertFalse(status.isPlacementPolicySatisfied());
// 3 upgrade domains (enough), 2 racks (enough)
set.clear();
set.add(storages[0]);
set.add(storages[1]);
set.add(storages[5]);
locatedBlock = BlockManager.newLocatedBlock(b,
set.toArray(new DatanodeStorageInfo[set.size()]), 0, false);
status = replicator.verifyBlockPlacement(locatedBlock.getLocations(),
set.size());
assertTrue(status.isPlacementPolicySatisfied());
// 3 upgrade domains (enough), 1 rack (not enough)
set.clear();
set.add(storages[0]);
set.add(storages[1]);
set.add(storages[2]);
locatedBlock = BlockManager.newLocatedBlock(b,
set.toArray(new DatanodeStorageInfo[set.size()]), 0, false);
status = replicator.verifyBlockPlacement(locatedBlock.getLocations(),
set.size());
assertFalse(status.isPlacementPolicySatisfied());
assertFalse(status.getErrorDescription().contains("upgrade domain"));
// 2 upgrade domains( not enough), 3 racks (enough)
set.clear();
set.add(storages[0]);
set.add(storages[5]);
set.add(storages[8]);
locatedBlock = BlockManager.newLocatedBlock(b,
set.toArray(new DatanodeStorageInfo[set.size()]), 0, false);
status = replicator.verifyBlockPlacement(locatedBlock.getLocations(),
set.size());
assertFalse(status.isPlacementPolicySatisfied());
assertTrue(status.getErrorDescription().contains("upgrade domain"));
// 3 upgrade domains (enough), 3 racks (enough)
set.clear();
set.add(storages[0]);
set.add(storages[4]);
set.add(storages[8]);
locatedBlock = BlockManager.newLocatedBlock(b,
set.toArray(new DatanodeStorageInfo[set.size()]), 0, false);
status = replicator.verifyBlockPlacement(locatedBlock.getLocations(),
set.size());
assertTrue(status.isPlacementPolicySatisfied());
// 3 upgrade domains (enough), 3 racks (enough), 4 replicas
set.clear();
set.add(storages[0]);
set.add(storages[1]);
set.add(storages[5]);
set.add(storages[8]);
locatedBlock = BlockManager.newLocatedBlock(b,
set.toArray(new DatanodeStorageInfo[set.size()]), 0, false);
status = replicator.verifyBlockPlacement(locatedBlock.getLocations(),
set.size());
assertTrue(status.isPlacementPolicySatisfied());
// 2 upgrade domains (not enough), 3 racks (enough), 4 replicas
set.clear();
set.add(storages[0]);
set.add(storages[3]);
set.add(storages[5]);
set.add(storages[8]);
locatedBlock = BlockManager.newLocatedBlock(b,
set.toArray(new DatanodeStorageInfo[set.size()]), 0, false);
status = replicator.verifyBlockPlacement(locatedBlock.getLocations(),
set.size());
assertFalse(status.isPlacementPolicySatisfied());
}
/**
* Verify the correct replica is chosen to satisfy both rack and upgrade
* domain policy.
* @throws Exception
*/
@Test
public void testChooseReplicasToDelete() throws Exception {
Collection<DatanodeStorageInfo> nonExcess = new ArrayList<>();
nonExcess.add(storages[0]);
nonExcess.add(storages[1]);
nonExcess.add(storages[2]);
nonExcess.add(storages[3]);
List<DatanodeStorageInfo> excessReplicas;
BlockStoragePolicySuite POLICY_SUITE = BlockStoragePolicySuite
.createDefaultSuite();
BlockStoragePolicy storagePolicy = POLICY_SUITE.getDefaultPolicy();
// delete hint accepted.
DatanodeDescriptor delHintNode = storages[0].getDatanodeDescriptor();
List<StorageType> excessTypes = storagePolicy.chooseExcess((short) 3,
DatanodeStorageInfo.toStorageTypes(nonExcess));
excessReplicas = replicator.chooseReplicasToDelete(nonExcess, nonExcess, 3,
excessTypes, storages[3].getDatanodeDescriptor(), delHintNode);
assertTrue(excessReplicas.size() == 1);
assertTrue(excessReplicas.contains(storages[0]));
// delete hint rejected because deleting storages[1] would have
// cause only two upgrade domains left.
delHintNode = storages[1].getDatanodeDescriptor();
excessTypes = storagePolicy.chooseExcess((short) 3,
DatanodeStorageInfo.toStorageTypes(nonExcess));
excessReplicas = replicator.chooseReplicasToDelete(nonExcess, nonExcess, 3,
excessTypes, storages[3].getDatanodeDescriptor(), delHintNode);
assertTrue(excessReplicas.size() == 1);
assertTrue(excessReplicas.contains(storages[0]));
// no delete hint, case 1
nonExcess.clear();
nonExcess.add(storages[0]);
nonExcess.add(storages[1]);
nonExcess.add(storages[4]);
nonExcess.add(storages[8]);
excessTypes = storagePolicy.chooseExcess((short) 3,
DatanodeStorageInfo.toStorageTypes(nonExcess));
excessReplicas = replicator.chooseReplicasToDelete(nonExcess, nonExcess, 3,
excessTypes, storages[8].getDatanodeDescriptor(), null);
assertTrue(excessReplicas.size() == 1);
assertTrue(excessReplicas.contains(storages[1]));
// no delete hint, case 2
nonExcess.clear();
nonExcess.add(storages[0]);
nonExcess.add(storages[1]);
nonExcess.add(storages[4]);
nonExcess.add(storages[5]);
excessTypes = storagePolicy.chooseExcess((short) 3,
DatanodeStorageInfo.toStorageTypes(nonExcess));
excessReplicas = replicator.chooseReplicasToDelete(nonExcess, nonExcess, 3,
excessTypes, storages[8].getDatanodeDescriptor(), null);
assertTrue(excessReplicas.size() == 1);
assertTrue(excessReplicas.contains(storages[1]) ||
excessReplicas.contains(storages[4]));
// No delete hint, different excess type deletion
nonExcess.clear();
nonExcess.add(storages[0]);
nonExcess.add(storages[1]);
nonExcess.add(storages[2]);
nonExcess.add(storages[3]);
DatanodeStorageInfo excessStorage = DFSTestUtil.createDatanodeStorageInfo(
"Storage-excess-ID", "localhost", delHintNode.getNetworkLocation(),
"foo.com", StorageType.ARCHIVE, delHintNode.getUpgradeDomain());
nonExcess.add(excessStorage);
excessTypes = storagePolicy.chooseExcess((short) 3,
DatanodeStorageInfo.toStorageTypes(nonExcess));
excessReplicas = replicator.chooseReplicasToDelete(nonExcess, nonExcess, 3,
excessTypes, storages[3].getDatanodeDescriptor(), null);
assertTrue(excessReplicas.size() == 2);
assertTrue(excessReplicas.contains(storages[0]));
assertTrue(excessReplicas.contains(excessStorage));
// Test SSD related deletion. With different rack settings here, but
// similar to {@link TestReplicationPolicy#testChooseReplicasToDelete}.
// The block was initially created on excessSSD(rack r1, UD 4),
// storages[7](rack r3, UD 2) and storages[8](rack r3, UD 3) with
// ONESSD_STORAGE_POLICY_NAME storage policy. Replication factor = 3.
// Right after balancer moves the block from storages[7] to
// storages[3](rack r2, UD 1), the application changes the storage policy
// from ONESSD_STORAGE_POLICY_NAME to HOT_STORAGE_POLICY_ID. In this case,
// we should be able to delete excessSSD since the remaining
// storages ({storages[3]}, {storages[7], storages[8]})
// are on different racks (r2, r3) and different UDs (1, 2, 3).
DatanodeStorageInfo excessSSD = DFSTestUtil.createDatanodeStorageInfo(
"Storage-excess-SSD-ID", "localhost",
storages[0].getDatanodeDescriptor().getNetworkLocation(), "foo.com",
StorageType.SSD, null);
DatanodeStorageInfo[] ssds = { excessSSD };
DatanodeDescriptor ssdNodes[] = DFSTestUtil.toDatanodeDescriptor(ssds);
ssdNodes[0].setUpgradeDomain(Integer.toString(4));
nonExcess.clear();
nonExcess.add(excessSSD);
nonExcess.add(storages[3]);
nonExcess.add(storages[7]);
nonExcess.add(storages[8]);
excessTypes = storagePolicy.chooseExcess((short) 3,
DatanodeStorageInfo.toStorageTypes(nonExcess));
excessReplicas = replicator.chooseReplicasToDelete(nonExcess, nonExcess, 3,
excessTypes, storages[3].getDatanodeDescriptor(),
storages[7].getDatanodeDescriptor());
assertEquals(1, excessReplicas.size());
assertTrue(excessReplicas.contains(excessSSD));
}
@Test
public void testIsMovable() throws Exception {
List<DatanodeInfo> candidates = new ArrayList<>();
// after the move, the number of racks changes from 1 to 2.
// and number of upgrade domains remains 3.
candidates.add(dataNodes[0]);
candidates.add(dataNodes[1]);
candidates.add(dataNodes[2]);
candidates.add(dataNodes[3]);
assertTrue(replicator.isMovable(candidates, dataNodes[0], dataNodes[3]));
// the move would have changed the number of racks from 1 to 2.
// and the number of UDs from 3 to 2.
candidates.clear();
candidates.add(dataNodes[0]);
candidates.add(dataNodes[1]);
candidates.add(dataNodes[2]);
candidates.add(dataNodes[4]);
assertFalse(replicator.isMovable(candidates, dataNodes[0], dataNodes[4]));
// after the move, the number of racks remains 2.
// the number of UDs remains 3.
candidates.clear();
candidates.add(dataNodes[0]);
candidates.add(dataNodes[4]);
candidates.add(dataNodes[5]);
candidates.add(dataNodes[6]);
assertTrue(replicator.isMovable(candidates, dataNodes[0], dataNodes[6]));
// after the move, the number of racks remains 2.
// the number of UDs remains 2.
candidates.clear();
candidates.add(dataNodes[0]);
candidates.add(dataNodes[1]);
candidates.add(dataNodes[3]);
candidates.add(dataNodes[4]);
assertTrue(replicator.isMovable(candidates, dataNodes[0], dataNodes[4]));
// the move would have changed the number of racks from 2 to 3.
// and the number of UDs from 2 to 1.
candidates.clear();
candidates.add(dataNodes[0]);
candidates.add(dataNodes[3]);
candidates.add(dataNodes[4]);
candidates.add(dataNodes[6]);
assertFalse(replicator.isMovable(candidates, dataNodes[4], dataNodes[6]));
}
private Set<String> getUpgradeDomains(DatanodeStorageInfo[] nodes) {
HashSet<String> upgradeDomains = new HashSet<>();
for (DatanodeStorageInfo node : nodes) {
upgradeDomains.add(node.getDatanodeDescriptor().getUpgradeDomain());
}
return upgradeDomains;
}
private Set<String> getRacks(DatanodeStorageInfo[] nodes) {
HashSet<String> racks = new HashSet<>();
for (DatanodeStorageInfo node : nodes) {
String rack = node.getDatanodeDescriptor().getNetworkLocation();
racks.add(rack);
}
return racks;
}
}