TestLowRedundancyBlockQueues.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.hdfs.server.blockmanagement;

import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.hadoop.hdfs.StripedFileTestUtil;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.fail;

/**
 * Test {@link LowRedundancyBlocks}.
 */
@RunWith(Parameterized.class)
public class TestLowRedundancyBlockQueues {

  private final ErasureCodingPolicy ecPolicy;
  private static AtomicLong mockINodeId = new AtomicLong(0);

  public TestLowRedundancyBlockQueues(ErasureCodingPolicy policy) {
    ecPolicy = policy;
  }

  @Parameterized.Parameters(name = "{index}: {0}")
  public static Collection<Object[]> policies() {
    return StripedFileTestUtil.getECPolicies();
  }

  private BlockInfo genBlockInfo(long id) {
    return genBlockInfo(id, false);
  }

  private BlockInfo genBlockInfo(long id, boolean isCorruptBlock) {
    BlockInfo bInfo = new BlockInfoContiguous(new Block(id), (short) 3);
    if (!isCorruptBlock) {
      bInfo.setBlockCollectionId(mockINodeId.incrementAndGet());
    }
    return bInfo;
  }

  private BlockInfo genStripedBlockInfo(long id, long numBytes) {
    BlockInfoStriped sblk =  new BlockInfoStriped(new Block(id), ecPolicy);
    sblk.setNumBytes(numBytes);
    return sblk;
  }

  private void verifyBlockStats(LowRedundancyBlocks queues,
      int lowRedundancyReplicaCount, int corruptReplicaCount,
      int corruptReplicationOneCount, int lowRedundancyStripedCount,
      int corruptStripedCount, int highestPriorityReplicatedBlockCount,
      int highestPriorityECBlockCount, int badlyDistributedBlockCount) {
    assertEquals("Low redundancy replica count incorrect!",
        lowRedundancyReplicaCount, queues.getLowRedundancyBlocks());
    assertEquals("Corrupt replica count incorrect!",
        corruptReplicaCount, queues.getCorruptBlocks());
    assertEquals("Corrupt replica one count incorrect!",
        corruptReplicationOneCount,
        queues.getCorruptReplicationOneBlocks());
    assertEquals("Low redundancy striped blocks count incorrect!",
        lowRedundancyStripedCount, queues.getLowRedundancyECBlockGroups());
    assertEquals("Corrupt striped blocks count incorrect!",
        corruptStripedCount, queues.getCorruptECBlockGroups());
    assertEquals("Low Redundancy count incorrect!",
        lowRedundancyReplicaCount + lowRedundancyStripedCount,
        queues.getLowRedundancyBlockCount());
    assertEquals("LowRedundancyBlocks queue size incorrect!",
        (lowRedundancyReplicaCount + corruptReplicaCount +
        lowRedundancyStripedCount + corruptStripedCount), queues.size());
    assertEquals("Badly Distributed Blocks queue size incorrect!",
        badlyDistributedBlockCount, queues.getBadlyDistributedBlocks());
    assertEquals("Highest priority replicated low redundancy " +
            "blocks count is incorrect!",
        highestPriorityReplicatedBlockCount,
        queues.getHighestPriorityReplicatedBlockCount());
    assertEquals("Highest priority erasure coded low redundancy " +
            "blocks count is incorrect!",
        highestPriorityECBlockCount,
        queues.getHighestPriorityECBlockCount());
  }

  /**
   * Tests that deleted blocks should not be returned by
   * {@link LowRedundancyBlocks#chooseLowRedundancyBlocks(int, boolean)}.
   * @throws Exception
   */
  @Test
  public void testDeletedBlocks() throws Exception {
    int numBlocks = 5;
    LowRedundancyBlocks queues = new LowRedundancyBlocks();
    // create 5 blockinfos. The first one is corrupt.
    for (int ind = 0; ind < numBlocks; ind++) {
      BlockInfo blockInfo = genBlockInfo(ind, ind == 0);
      queues.add(blockInfo, 2, 0, 0, 3);
    }
    List<List<BlockInfo>> blocks;
    // Get two blocks from the queue, but we should only get one because first
    // block is deleted.
    blocks = queues.chooseLowRedundancyBlocks(2, false);

    assertEquals(1, blocks.get(2).size());
    assertEquals(1, blocks.get(2).get(0).getBlockId());

    // Get the next blocks - should be ID 2
    blocks = queues.chooseLowRedundancyBlocks(1, false);
    assertEquals(2, blocks.get(2).get(0).getBlockId());

    // Get the next block, but also reset this time - should be ID 3 returned
    blocks = queues.chooseLowRedundancyBlocks(1, true);
    assertEquals(3, blocks.get(2).get(0).getBlockId());

    // Get one more block and due to resetting the queue it will be block id 1
    blocks = queues.chooseLowRedundancyBlocks(1, false);
    assertEquals(1, blocks.get(2).get(0).getBlockId());
  }

  @Test
  public void testQueuePositionCanBeReset() throws Throwable {
    LowRedundancyBlocks queues = new LowRedundancyBlocks();
    for (int i=0; i< 4; i++) {
      BlockInfo block = genBlockInfo(i);
      queues.add(block, 2, 0, 0, 3);
    }
    List<List<BlockInfo>> blocks;
    // Get one block from the queue - should be block ID 0 returned
    blocks = queues.chooseLowRedundancyBlocks(1, false);
    assertEquals(1, blocks.get(2).size());
    assertEquals(0, blocks.get(2).get(0).getBlockId());

    // Get the next blocks - should be ID 1
    blocks = queues.chooseLowRedundancyBlocks(1, false);
    assertEquals(1, blocks.get(2).get(0).getBlockId());

    // Get the next block, but also reset this time - should be ID 2 returned
    blocks = queues.chooseLowRedundancyBlocks(1, true);
    assertEquals(2, blocks.get(2).get(0).getBlockId());

    // Get one more block and due to resetting the queue it will be block id 0
    blocks = queues.chooseLowRedundancyBlocks(1, false);
    assertEquals(0, blocks.get(2).get(0).getBlockId());
  }

  /**
   * Test that adding blocks with different replication counts puts them
   * into different queues.
   * @throws Throwable if something goes wrong
   */
  @Test
  public void testBlockPriorities() throws Throwable {
    LowRedundancyBlocks queues = new LowRedundancyBlocks();
    BlockInfo block1 = genBlockInfo(1);
    BlockInfo block2 = genBlockInfo(2);
    BlockInfo block_very_low_redundancy = genBlockInfo(3);
    BlockInfo block_corrupt = genBlockInfo(4);
    BlockInfo block_corrupt_repl_one = genBlockInfo(5);
    BlockInfo blockBadlyDistributed = genBlockInfo(6);

    // Add a block with a single entry
    assertAdded(queues, block1, 1, 0, 3);
    assertInLevel(queues, block1, LowRedundancyBlocks.QUEUE_HIGHEST_PRIORITY);
    verifyBlockStats(queues, 1, 0, 0, 0, 0, 1, 0, 0);

    // Repeated additions fail
    assertFalse(queues.add(block1, 1, 0, 0, 3));
    verifyBlockStats(queues, 1, 0, 0, 0, 0, 1, 0, 0);

    // Add a second block with two replicas
    assertAdded(queues, block2, 2, 0, 3);
    assertInLevel(queues, block2, LowRedundancyBlocks.QUEUE_LOW_REDUNDANCY);
    verifyBlockStats(queues, 2, 0, 0, 0, 0, 1, 0, 0);

    // Now try to add a block that is corrupt
    assertAdded(queues, block_corrupt, 0, 0, 3);
    assertInLevel(queues, block_corrupt,
                  LowRedundancyBlocks.QUEUE_WITH_CORRUPT_BLOCKS);
    verifyBlockStats(queues, 2, 1, 0, 0, 0, 1, 0, 0);

    // Insert a very insufficiently redundancy block
    assertAdded(queues, block_very_low_redundancy, 4, 0, 25);
    assertInLevel(queues, block_very_low_redundancy,
                  LowRedundancyBlocks.QUEUE_VERY_LOW_REDUNDANCY);
    verifyBlockStats(queues, 3, 1, 0, 0, 0, 1, 0, 0);

    // Insert a corrupt block with replication factor 1
    assertAdded(queues, block_corrupt_repl_one, 0, 0, 1);
    verifyBlockStats(queues, 3, 2, 1, 0, 0, 1, 0, 0);

    // Bump up the expected count for corrupt replica one block from 1 to 3
    queues.update(block_corrupt_repl_one, 0, 0, 0, 3, 0, 2);
    verifyBlockStats(queues, 3, 2, 0, 0, 0, 1, 0, 0);

    // Reduce the expected replicas to 1
    queues.update(block_corrupt, 0, 0, 0, 1, 0, -2);
    verifyBlockStats(queues, 3, 2, 1, 0, 0, 1, 0, 0);
    queues.update(block_very_low_redundancy, 0, 0, 0, 1, -4, -24);
    verifyBlockStats(queues, 2, 3, 2, 0, 0, 1, 0, 0);

    // Reduce the expected replicas to 1 for block1
    queues.update(block1, 1, 0, 0, 1, 0, 0);
    // expect 1 badly distributed block
    verifyBlockStats(queues, 2, 3, 2, 0, 0, 0, 0, 1);

    // insert a block with too many replicas to make badly distributed
    assertAdded(queues, blockBadlyDistributed, 2, 0, 1);
    assertInLevel(queues, blockBadlyDistributed,
        LowRedundancyBlocks.QUEUE_REPLICAS_BADLY_DISTRIBUTED);
    verifyBlockStats(queues, 3, 3, 2, 0, 0, 0, 0, 2);
  }

  @Test
  public void testRemoveWithWrongPriority() {
    final LowRedundancyBlocks queues = new LowRedundancyBlocks();
    final BlockInfo corruptBlock = genBlockInfo(1);
    assertAdded(queues, corruptBlock, 0, 0, 3);
    assertInLevel(queues, corruptBlock,
        LowRedundancyBlocks.QUEUE_WITH_CORRUPT_BLOCKS);
    verifyBlockStats(queues, 0, 1, 0, 0, 0, 0, 0, 0);

    // Remove with wrong priority
    queues.remove(corruptBlock, LowRedundancyBlocks.QUEUE_LOW_REDUNDANCY);
    // Verify the number of corrupt block is decremented
    verifyBlockStats(queues, 0, 0, 0, 0, 0, 0, 0, 0);
  }

  @Test
  public void testStripedBlockPriorities() throws Throwable {
    int dataBlkNum = ecPolicy.getNumDataUnits();
    int parityBlkNUm = ecPolicy.getNumParityUnits();
    doTestStripedBlockPriorities(1, parityBlkNUm);
    doTestStripedBlockPriorities(dataBlkNum, parityBlkNUm);
  }

  private void doTestStripedBlockPriorities(int dataBlkNum, int parityBlkNum)
      throws Throwable {
    int groupSize = dataBlkNum + parityBlkNum;
    long numBytes = ecPolicy.getCellSize() * dataBlkNum;
    LowRedundancyBlocks queues = new LowRedundancyBlocks();
    int numUR = 0;
    int numCorrupt = 0;

    // add low redundancy blocks
    for (int i = 0; dataBlkNum + i < groupSize; i++) {
      BlockInfo block = genStripedBlockInfo(-100 - 100 * i, numBytes);
      assertAdded(queues, block, dataBlkNum + i, 0, groupSize);
      numUR++;
      assertEquals(numUR, queues.getLowRedundancyBlockCount());
      assertEquals(numUR + numCorrupt, queues.size());
      if (i == 0) {
        assertInLevel(queues, block,
            LowRedundancyBlocks.QUEUE_HIGHEST_PRIORITY);
      } else if (i * 3 < parityBlkNum + 1) {
        assertInLevel(queues, block,
            LowRedundancyBlocks.QUEUE_VERY_LOW_REDUNDANCY);
      } else {
        assertInLevel(queues, block,
            LowRedundancyBlocks.QUEUE_LOW_REDUNDANCY);
      }
      verifyBlockStats(queues, 0, 0, 0, numUR, 0, 0, 1, 0);
    }

    // add a corrupted block
    BlockInfo block_corrupt = genStripedBlockInfo(-10, numBytes);
    assertEquals(numCorrupt, queues.getCorruptBlockSize());
    verifyBlockStats(queues, 0, 0, 0, numUR, numCorrupt, 0, 1, 0);

    assertAdded(queues, block_corrupt, dataBlkNum - 1, 0, groupSize);
    numCorrupt++;
    verifyBlockStats(queues, 0, 0, 0, numUR, numCorrupt, 0, 1, 0);

    assertInLevel(queues, block_corrupt,
        LowRedundancyBlocks.QUEUE_WITH_CORRUPT_BLOCKS);
  }

  private void assertAdded(LowRedundancyBlocks queues,
                           BlockInfo block,
                           int curReplicas,
                           int decommissionedReplicas,
                           int expectedReplicas) {
    assertTrue("Failed to add " + block,
               queues.add(block,
                          curReplicas, 0,
                          decommissionedReplicas,
                          expectedReplicas));
  }

  /**
   * Determine whether or not a block is in a level without changing the API.
   * Instead get the per-level iterator and run though it looking for a match.
   * If the block is not found, an assertion is thrown.
   *
   * This is inefficient, but this is only a test case.
   * @param queues queues to scan
   * @param block block to look for
   * @param level level to select
   */
  private void assertInLevel(LowRedundancyBlocks queues,
                             Block block,
                             int level) {
    final Iterator<BlockInfo> bi = queues.iterator(level);
    while (bi.hasNext()) {
      Block next = bi.next();
      if (block.equals(next)) {
        return;
      }
    }
    fail("Block " + block + " not found in level " + level);
  }

  @Test
  public void testRemoveBlockInManyQueues() {
    LowRedundancyBlocks neededReconstruction = new LowRedundancyBlocks();
    BlockInfo block = new BlockInfoContiguous(new Block(), (short)1024);
    neededReconstruction.add(block, 2, 0, 1, 3);
    neededReconstruction.add(block, 0, 0, 0, 3);
    neededReconstruction.remove(block, LowRedundancyBlocks.LEVEL);
    assertFalse("Should not contain the block.",
        neededReconstruction.contains(block));
  }
}