TestLowRedundancyBlockQueues.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.hdfs.StripedFileTestUtil;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedClass;
import org.junit.jupiter.params.provider.MethodSource;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.fail;
/**
* Test {@link LowRedundancyBlocks}.
*/
@ParameterizedClass(name = "{index}: {0}")
@MethodSource("policies")
public class TestLowRedundancyBlockQueues {
private final ErasureCodingPolicy ecPolicy;
private static AtomicLong mockINodeId = new AtomicLong(0);
public TestLowRedundancyBlockQueues(ErasureCodingPolicy policy) {
ecPolicy = policy;
}
public static Collection<Object[]> policies() {
return StripedFileTestUtil.getECPolicies();
}
private BlockInfo genBlockInfo(long id) {
return genBlockInfo(id, false);
}
private BlockInfo genBlockInfo(long id, boolean isCorruptBlock) {
BlockInfo bInfo = new BlockInfoContiguous(new Block(id), (short) 3);
if (!isCorruptBlock) {
bInfo.setBlockCollectionId(mockINodeId.incrementAndGet());
}
return bInfo;
}
private BlockInfo genStripedBlockInfo(long id, long numBytes) {
BlockInfoStriped sblk = new BlockInfoStriped(new Block(id), ecPolicy);
sblk.setNumBytes(numBytes);
return sblk;
}
private void verifyBlockStats(LowRedundancyBlocks queues,
int lowRedundancyReplicaCount, int corruptReplicaCount,
int corruptReplicationOneCount, int lowRedundancyStripedCount,
int corruptStripedCount, int highestPriorityReplicatedBlockCount,
int highestPriorityECBlockCount, int badlyDistributedBlockCount) {
assertEquals(lowRedundancyReplicaCount, queues.getLowRedundancyBlocks(),
"Low redundancy replica count incorrect!");
assertEquals(corruptReplicaCount, queues.getCorruptBlocks(),
"Corrupt replica count incorrect!");
assertEquals(corruptReplicationOneCount, queues.getCorruptReplicationOneBlocks(),
"Corrupt replica one count incorrect!");
assertEquals(lowRedundancyStripedCount, queues.getLowRedundancyECBlockGroups(),
"Low redundancy striped blocks count incorrect!");
assertEquals(corruptStripedCount, queues.getCorruptECBlockGroups(),
"Corrupt striped blocks count incorrect!");
assertEquals(lowRedundancyReplicaCount + lowRedundancyStripedCount,
queues.getLowRedundancyBlockCount(), "Low Redundancy count incorrect!");
assertEquals((lowRedundancyReplicaCount + corruptReplicaCount + lowRedundancyStripedCount
+ corruptStripedCount), queues.size(), "LowRedundancyBlocks queue size incorrect!");
assertEquals(badlyDistributedBlockCount, queues.getBadlyDistributedBlocks(),
"Badly Distributed Blocks queue size incorrect!");
assertEquals(highestPriorityReplicatedBlockCount,
queues.getHighestPriorityReplicatedBlockCount(),
"Highest priority replicated low redundancy " + "blocks count is incorrect!");
assertEquals(highestPriorityECBlockCount, queues.getHighestPriorityECBlockCount(),
"Highest priority erasure coded low redundancy " + "blocks count is incorrect!");
}
/**
* Tests that deleted blocks should not be returned by
* {@link LowRedundancyBlocks#chooseLowRedundancyBlocks(int, boolean)}.
* @throws Exception
*/
@Test
public void testDeletedBlocks() throws Exception {
int numBlocks = 5;
LowRedundancyBlocks queues = new LowRedundancyBlocks();
// create 5 blockinfos. The first one is corrupt.
for (int ind = 0; ind < numBlocks; ind++) {
BlockInfo blockInfo = genBlockInfo(ind, ind == 0);
queues.add(blockInfo, 2, 0, 0, 3);
}
List<List<BlockInfo>> blocks;
// Get two blocks from the queue, but we should only get one because first
// block is deleted.
blocks = queues.chooseLowRedundancyBlocks(2, false);
assertEquals(1, blocks.get(2).size());
assertEquals(1, blocks.get(2).get(0).getBlockId());
// Get the next blocks - should be ID 2
blocks = queues.chooseLowRedundancyBlocks(1, false);
assertEquals(2, blocks.get(2).get(0).getBlockId());
// Get the next block, but also reset this time - should be ID 3 returned
blocks = queues.chooseLowRedundancyBlocks(1, true);
assertEquals(3, blocks.get(2).get(0).getBlockId());
// Get one more block and due to resetting the queue it will be block id 1
blocks = queues.chooseLowRedundancyBlocks(1, false);
assertEquals(1, blocks.get(2).get(0).getBlockId());
}
@Test
public void testQueuePositionCanBeReset() throws Throwable {
LowRedundancyBlocks queues = new LowRedundancyBlocks();
for (int i=0; i< 4; i++) {
BlockInfo block = genBlockInfo(i);
queues.add(block, 2, 0, 0, 3);
}
List<List<BlockInfo>> blocks;
// Get one block from the queue - should be block ID 0 returned
blocks = queues.chooseLowRedundancyBlocks(1, false);
assertEquals(1, blocks.get(2).size());
assertEquals(0, blocks.get(2).get(0).getBlockId());
// Get the next blocks - should be ID 1
blocks = queues.chooseLowRedundancyBlocks(1, false);
assertEquals(1, blocks.get(2).get(0).getBlockId());
// Get the next block, but also reset this time - should be ID 2 returned
blocks = queues.chooseLowRedundancyBlocks(1, true);
assertEquals(2, blocks.get(2).get(0).getBlockId());
// Get one more block and due to resetting the queue it will be block id 0
blocks = queues.chooseLowRedundancyBlocks(1, false);
assertEquals(0, blocks.get(2).get(0).getBlockId());
}
/**
* Test that adding blocks with different replication counts puts them
* into different queues.
* @throws Throwable if something goes wrong
*/
@Test
public void testBlockPriorities() throws Throwable {
LowRedundancyBlocks queues = new LowRedundancyBlocks();
BlockInfo block1 = genBlockInfo(1);
BlockInfo block2 = genBlockInfo(2);
BlockInfo block_very_low_redundancy = genBlockInfo(3);
BlockInfo block_corrupt = genBlockInfo(4);
BlockInfo block_corrupt_repl_one = genBlockInfo(5);
BlockInfo blockBadlyDistributed = genBlockInfo(6);
// Add a block with a single entry
assertAdded(queues, block1, 1, 0, 3);
assertInLevel(queues, block1, LowRedundancyBlocks.QUEUE_HIGHEST_PRIORITY);
verifyBlockStats(queues, 1, 0, 0, 0, 0, 1, 0, 0);
// Repeated additions fail
assertFalse(queues.add(block1, 1, 0, 0, 3));
verifyBlockStats(queues, 1, 0, 0, 0, 0, 1, 0, 0);
// Add a second block with two replicas
assertAdded(queues, block2, 2, 0, 3);
assertInLevel(queues, block2, LowRedundancyBlocks.QUEUE_LOW_REDUNDANCY);
verifyBlockStats(queues, 2, 0, 0, 0, 0, 1, 0, 0);
// Now try to add a block that is corrupt
assertAdded(queues, block_corrupt, 0, 0, 3);
assertInLevel(queues, block_corrupt,
LowRedundancyBlocks.QUEUE_WITH_CORRUPT_BLOCKS);
verifyBlockStats(queues, 2, 1, 0, 0, 0, 1, 0, 0);
// Insert a very insufficiently redundancy block
assertAdded(queues, block_very_low_redundancy, 4, 0, 25);
assertInLevel(queues, block_very_low_redundancy,
LowRedundancyBlocks.QUEUE_VERY_LOW_REDUNDANCY);
verifyBlockStats(queues, 3, 1, 0, 0, 0, 1, 0, 0);
// Insert a corrupt block with replication factor 1
assertAdded(queues, block_corrupt_repl_one, 0, 0, 1);
verifyBlockStats(queues, 3, 2, 1, 0, 0, 1, 0, 0);
// Bump up the expected count for corrupt replica one block from 1 to 3
queues.update(block_corrupt_repl_one, 0, 0, 0, 3, 0, 2);
verifyBlockStats(queues, 3, 2, 0, 0, 0, 1, 0, 0);
// Reduce the expected replicas to 1
queues.update(block_corrupt, 0, 0, 0, 1, 0, -2);
verifyBlockStats(queues, 3, 2, 1, 0, 0, 1, 0, 0);
queues.update(block_very_low_redundancy, 0, 0, 0, 1, -4, -24);
verifyBlockStats(queues, 2, 3, 2, 0, 0, 1, 0, 0);
// Reduce the expected replicas to 1 for block1
queues.update(block1, 1, 0, 0, 1, 0, 0);
// expect 1 badly distributed block
verifyBlockStats(queues, 2, 3, 2, 0, 0, 0, 0, 1);
// insert a block with too many replicas to make badly distributed
assertAdded(queues, blockBadlyDistributed, 2, 0, 1);
assertInLevel(queues, blockBadlyDistributed,
LowRedundancyBlocks.QUEUE_REPLICAS_BADLY_DISTRIBUTED);
verifyBlockStats(queues, 3, 3, 2, 0, 0, 0, 0, 2);
}
@Test
public void testRemoveWithWrongPriority() {
final LowRedundancyBlocks queues = new LowRedundancyBlocks();
final BlockInfo corruptBlock = genBlockInfo(1);
assertAdded(queues, corruptBlock, 0, 0, 3);
assertInLevel(queues, corruptBlock,
LowRedundancyBlocks.QUEUE_WITH_CORRUPT_BLOCKS);
verifyBlockStats(queues, 0, 1, 0, 0, 0, 0, 0, 0);
// Remove with wrong priority
queues.remove(corruptBlock, LowRedundancyBlocks.QUEUE_LOW_REDUNDANCY);
// Verify the number of corrupt block is decremented
verifyBlockStats(queues, 0, 0, 0, 0, 0, 0, 0, 0);
}
@Test
public void testStripedBlockPriorities() throws Throwable {
int dataBlkNum = ecPolicy.getNumDataUnits();
int parityBlkNUm = ecPolicy.getNumParityUnits();
doTestStripedBlockPriorities(1, parityBlkNUm);
doTestStripedBlockPriorities(dataBlkNum, parityBlkNUm);
}
private void doTestStripedBlockPriorities(int dataBlkNum, int parityBlkNum)
throws Throwable {
int groupSize = dataBlkNum + parityBlkNum;
long numBytes = ecPolicy.getCellSize() * dataBlkNum;
LowRedundancyBlocks queues = new LowRedundancyBlocks();
int numUR = 0;
int numCorrupt = 0;
// add low redundancy blocks
for (int i = 0; dataBlkNum + i < groupSize; i++) {
BlockInfo block = genStripedBlockInfo(-100 - 100 * i, numBytes);
assertAdded(queues, block, dataBlkNum + i, 0, groupSize);
numUR++;
assertEquals(numUR, queues.getLowRedundancyBlockCount());
assertEquals(numUR + numCorrupt, queues.size());
if (i == 0) {
assertInLevel(queues, block,
LowRedundancyBlocks.QUEUE_HIGHEST_PRIORITY);
} else if (i * 3 < parityBlkNum + 1) {
assertInLevel(queues, block,
LowRedundancyBlocks.QUEUE_VERY_LOW_REDUNDANCY);
} else {
assertInLevel(queues, block,
LowRedundancyBlocks.QUEUE_LOW_REDUNDANCY);
}
verifyBlockStats(queues, 0, 0, 0, numUR, 0, 0, 1, 0);
}
// add a corrupted block
BlockInfo block_corrupt = genStripedBlockInfo(-10, numBytes);
assertEquals(numCorrupt, queues.getCorruptBlockSize());
verifyBlockStats(queues, 0, 0, 0, numUR, numCorrupt, 0, 1, 0);
assertAdded(queues, block_corrupt, dataBlkNum - 1, 0, groupSize);
numCorrupt++;
verifyBlockStats(queues, 0, 0, 0, numUR, numCorrupt, 0, 1, 0);
assertInLevel(queues, block_corrupt,
LowRedundancyBlocks.QUEUE_WITH_CORRUPT_BLOCKS);
}
private void assertAdded(LowRedundancyBlocks queues,
BlockInfo block,
int curReplicas,
int decommissionedReplicas,
int expectedReplicas) {
assertTrue(queues.add(block,
curReplicas, 0,
decommissionedReplicas,
expectedReplicas), "Failed to add " + block);
}
/**
* Determine whether or not a block is in a level without changing the API.
* Instead get the per-level iterator and run though it looking for a match.
* If the block is not found, an assertion is thrown.
*
* This is inefficient, but this is only a test case.
* @param queues queues to scan
* @param block block to look for
* @param level level to select
*/
private void assertInLevel(LowRedundancyBlocks queues,
Block block,
int level) {
final Iterator<BlockInfo> bi = queues.iterator(level);
while (bi.hasNext()) {
Block next = bi.next();
if (block.equals(next)) {
return;
}
}
fail("Block " + block + " not found in level " + level);
}
@Test
public void testRemoveBlockInManyQueues() {
LowRedundancyBlocks neededReconstruction = new LowRedundancyBlocks();
BlockInfo block = new BlockInfoContiguous(new Block(), (short)1024);
neededReconstruction.add(block, 2, 0, 1, 3);
neededReconstruction.add(block, 0, 0, 0, 3);
neededReconstruction.remove(block, LowRedundancyBlocks.LEVEL);
assertFalse(neededReconstruction.contains(block),
"Should not contain the block.");
}
}