TestReadStripedFileWithDecoding.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.hdfs;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;

import static org.apache.hadoop.hdfs.ReadStripedFileWithDecodingHelper.BLOCK_SIZE;
import static org.apache.hadoop.hdfs.ReadStripedFileWithDecodingHelper.CELL_SIZE;
import static org.apache.hadoop.hdfs.ReadStripedFileWithDecodingHelper.NUM_DATA_UNITS;
import static org.apache.hadoop.hdfs.ReadStripedFileWithDecodingHelper.NUM_PARITY_UNITS;
import static org.apache.hadoop.hdfs.ReadStripedFileWithDecodingHelper.findDataNodeAtIndex;
import static org.apache.hadoop.hdfs.ReadStripedFileWithDecodingHelper.findFirstDataNode;
import static org.apache.hadoop.hdfs.ReadStripedFileWithDecodingHelper.initializeCluster;
import static org.apache.hadoop.hdfs.ReadStripedFileWithDecodingHelper.tearDownCluster;

@Timeout(300)
public class TestReadStripedFileWithDecoding {
  private static final Logger LOG =
      LoggerFactory.getLogger(TestReadStripedFileWithDecoding.class);

  private MiniDFSCluster cluster;
  private DistributedFileSystem dfs;

  @BeforeEach
  public void setup() throws IOException {
    cluster = initializeCluster();
    dfs = cluster.getFileSystem();
  }

  @AfterEach
  public void tearDown() throws IOException {
    tearDownCluster(cluster);
  }

  /**
   * After reading a corrupted block, make sure the client can correctly report
   * the corruption to the NameNode.
   */
  @Test
  public void testReportBadBlock() throws IOException {
    // create file
    final Path file = new Path("/corrupted");
    final int length = 10; // length of "corruption"
    final byte[] bytes = StripedFileTestUtil.generateBytes(length);
    DFSTestUtil.writeFile(dfs, file, bytes);

    // corrupt the first data block
    int dnIndex = ReadStripedFileWithDecodingHelper.findFirstDataNode(
        cluster, dfs, file, CELL_SIZE * NUM_DATA_UNITS);
    Assertions.assertNotEquals(-1, dnIndex);
    LocatedStripedBlock slb = (LocatedStripedBlock) dfs.getClient()
        .getLocatedBlocks(file.toString(), 0, CELL_SIZE * NUM_DATA_UNITS)
        .get(0);
    final LocatedBlock[] blks = StripedBlockUtil.parseStripedBlockGroup(slb,
        CELL_SIZE, NUM_DATA_UNITS, NUM_PARITY_UNITS);
    // Find the first block file.
    File storageDir = cluster.getInstanceStorageDir(dnIndex, 0);
    File blkFile = MiniDFSCluster.getBlockFile(storageDir, blks[0].getBlock());
    Assertions.assertTrue(blkFile.exists(), "Block file does not exist");
    // corrupt the block file
    LOG.info("Deliberately corrupting file " + blkFile.getName());
    try (FileOutputStream out = new FileOutputStream(blkFile)) {
      out.write("corruption".getBytes());
    }

    // disable the heartbeat from DN so that the corrupted block record is kept
    // in NameNode
    for (DataNode dn : cluster.getDataNodes()) {
      DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, true);
    }

    try {
      // do stateful read
      StripedFileTestUtil.verifyStatefulRead(dfs, file, length, bytes,
          ByteBuffer.allocate(1024));

      // check whether the corruption has been reported to the NameNode
      final FSNamesystem ns = cluster.getNamesystem();
      final BlockManager bm = ns.getBlockManager();
      BlockInfo blockInfo = (ns.getFSDirectory().getINode4Write(file.toString())
          .asFile().getBlocks())[0];
      Assertions.assertEquals(1, bm.getCorruptReplicas(blockInfo).size());
    } finally {
      for (DataNode dn : cluster.getDataNodes()) {
        DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, false);
      }
    }
  }

  @Test
  public void testInvalidateBlock() throws IOException, InterruptedException {
    final Path file = new Path("/invalidate");
    final int length = 10;
    final byte[] bytes = StripedFileTestUtil.generateBytes(length);
    DFSTestUtil.writeFile(dfs, file, bytes);

    int dnIndex = findFirstDataNode(cluster, dfs, file,
        CELL_SIZE * NUM_DATA_UNITS);
    Assertions.assertNotEquals(-1, dnIndex);
    LocatedStripedBlock slb = (LocatedStripedBlock) dfs.getClient()
        .getLocatedBlocks(file.toString(), 0, CELL_SIZE * NUM_DATA_UNITS)
        .get(0);
    final LocatedBlock[] blks = StripedBlockUtil.parseStripedBlockGroup(slb,
        CELL_SIZE, NUM_DATA_UNITS, NUM_PARITY_UNITS);
    final Block b = blks[0].getBlock().getLocalBlock();

    DataNode dn = cluster.getDataNodes().get(dnIndex);
    // disable the heartbeat from DN so that the invalidated block record is
    // kept in NameNode until heartbeat expires and NN mark the dn as dead
    DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, true);

    try {
      // delete the file
      dfs.delete(file, true);
      BlockManagerTestUtil.waitForMarkedDeleteQueueIsEmpty(
          cluster.getNamesystem().getBlockManager());
      // check the block is added to invalidateBlocks
      final FSNamesystem fsn = cluster.getNamesystem();
      final BlockManager bm = fsn.getBlockManager();
      DatanodeDescriptor dnd =
          NameNodeAdapter.getDatanode(fsn, dn.getDatanodeId());
      Assertions.assertTrue(bm.containsInvalidateBlock(
          blks[0].getLocations()[0], b) || dnd.containsInvalidateBlock(b));
    } finally {
      DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, false);
    }
  }

  /**
   * This unit test try to cover the below situation:
   * Suppose we have an EC file with RS(d,p) policy and block group id
   * is blk_-9223372036845119810_1920002.
   * If the first and second data block in this ec block group are corrupted,
   * meanwhile we read this EC file.
   * It will trigger reportBadBlock RPC and
   * add the blk_-9223372036845119810_1920002
   * and blk_-9223372036845119809_1920002 blocks to corruptReplicas.
   * It will also reconstruct the two blocks and send IBR to namenode,
   * then execute BlockManager#addStoredBlock and
   * invalidateCorruptReplicas method. Suppose we first receive the IBR of
   * blk_-9223372036845119810_1920002, then in invalidateCorruptReplicas method,
   * it will only invalidate 9223372036845119809_1920002 on the two datanodes contains
   * the two corrupt blocks.
   *
   * @throws Exception
   */
  @Test
  public void testCorruptionECBlockInvalidate() throws Exception {

    final Path file = new Path("/invalidate_corrupted");
    final int length = BLOCK_SIZE * NUM_DATA_UNITS;
    final byte[] bytes = StripedFileTestUtil.generateBytes(length);
    DFSTestUtil.writeFile(dfs, file, bytes);

    int dnIndex = findFirstDataNode(cluster, dfs, file,
        CELL_SIZE * NUM_DATA_UNITS);
    int dnIndex2 = findDataNodeAtIndex(cluster, dfs, file,
        CELL_SIZE * NUM_DATA_UNITS, 2);
    Assertions.assertNotEquals(-1, dnIndex);
    Assertions.assertNotEquals(-1, dnIndex2);

    LocatedStripedBlock slb = (LocatedStripedBlock) dfs.getClient()
        .getLocatedBlocks(file.toString(), 0, CELL_SIZE * NUM_DATA_UNITS)
        .get(0);
    final LocatedBlock[] blks = StripedBlockUtil.parseStripedBlockGroup(slb,
        CELL_SIZE, NUM_DATA_UNITS, NUM_PARITY_UNITS);

    final Block b = blks[0].getBlock().getLocalBlock();
    final Block b2 = blks[1].getBlock().getLocalBlock();

    // Find the first block file.
    File storageDir = cluster.getInstanceStorageDir(dnIndex, 0);
    File blkFile = MiniDFSCluster.getBlockFile(storageDir, blks[0].getBlock());
    Assertions.assertTrue(blkFile.exists(), "Block file does not exist");
    // Corrupt the block file.
    LOG.info("Deliberately corrupting file " + blkFile.getName());
    try (FileOutputStream out = new FileOutputStream(blkFile)) {
      out.write("corruption".getBytes());
      out.flush();
    }

    // Find the second block file.
    File storageDir2 = cluster.getInstanceStorageDir(dnIndex2, 0);
    File blkFile2 = MiniDFSCluster.getBlockFile(storageDir2, blks[1].getBlock());
    Assertions.assertTrue(blkFile2.exists(), "Block file does not exist");
    // Corrupt the second block file.
    LOG.info("Deliberately corrupting file " + blkFile2.getName());
    try (FileOutputStream out = new FileOutputStream(blkFile2)) {
      out.write("corruption".getBytes());
      out.flush();
    }

    // Disable the heartbeat from DN so that the corrupted block record is kept
    // in NameNode.
    for (DataNode dataNode : cluster.getDataNodes()) {
      DataNodeTestUtils.setHeartbeatsDisabledForTests(dataNode, true);
    }
    try {
      // Do stateful read.
      StripedFileTestUtil.verifyStatefulRead(dfs, file, length, bytes,
          ByteBuffer.allocate(1024));

      // Check whether the corruption has been reported to the NameNode.
      final FSNamesystem ns = cluster.getNamesystem();
      final BlockManager bm = ns.getBlockManager();
      BlockInfo blockInfo = (ns.getFSDirectory().getINode4Write(file.toString())
          .asFile().getBlocks())[0];
      GenericTestUtils.waitFor(() -> {
        if (bm.getCorruptReplicas(blockInfo) == null) {
          return false;
        }
        return bm.getCorruptReplicas(blockInfo).size() == 2;
      }, 250, 60000);
      // Double check.
      Assertions.assertEquals(2, bm.getCorruptReplicas(blockInfo).size());

      DatanodeDescriptor dnd =
          NameNodeAdapter.getDatanode(ns, cluster.getDataNodes().get(dnIndex).getDatanodeId());

      DatanodeDescriptor dnd2 =
          NameNodeAdapter.getDatanode(ns, cluster.getDataNodes().get(dnIndex2).getDatanodeId());

      for (DataNode datanode : cluster.getDataNodes()) {
        if (!datanode.getDatanodeUuid().equals(dnd.getDatanodeUuid()) &&
            !datanode.getDatanodeUuid().equals(dnd2.getDatanodeUuid())) {
          DataNodeTestUtils.setHeartbeatsDisabledForTests(datanode, false);
        }
      }

      GenericTestUtils.waitFor(() -> {
        return bm.containsInvalidateBlock(
            blks[0].getLocations()[0], b) || dnd.containsInvalidateBlock(b);
      }, 250, 60000);
      Assertions.assertTrue(bm.containsInvalidateBlock(
          blks[0].getLocations()[0], b) || dnd.containsInvalidateBlock(b));

      GenericTestUtils.waitFor(() -> {
        return bm.containsInvalidateBlock(
            blks[1].getLocations()[0], b2) || dnd2.containsInvalidateBlock(b2);
      }, 250, 60000);

      Assertions.assertTrue(bm.containsInvalidateBlock(
          blks[1].getLocations()[0], b2) || dnd2.containsInvalidateBlock(b2));

    } finally {
      for (DataNode datanode : cluster.getDataNodes()) {
        DataNodeTestUtils.setHeartbeatsDisabledForTests(datanode, false);
      }
    }
  }

  @Test
  public void testMoreThanOneCorruptedBlock() throws IOException {
    final Path file = new Path("/corrupted");
    final int length = BLOCK_SIZE * NUM_DATA_UNITS;
    final byte[] bytes = StripedFileTestUtil.generateBytes(length);
    DFSTestUtil.writeFile(dfs, file, bytes);

    // read the file with more than one corrupted data block
    byte[] buffer = new byte[length + 100];
    for (int count = 2; count < NUM_PARITY_UNITS; ++count) {
      ReadStripedFileWithDecodingHelper.corruptBlocks(cluster, dfs, file, count, 0,
          false);
      StripedFileTestUtil.verifyStatefulRead(dfs, file, length, bytes,
          buffer);
    }
  }

  @Test
  public void testReadWithCorruptedDataBlockAndParityBlock() throws IOException {
    final Path file = new Path("/corruptedDataBlockAndParityBlock");
    final int length = BLOCK_SIZE * NUM_DATA_UNITS;
    final byte[] bytes = StripedFileTestUtil.generateBytes(length);
    DFSTestUtil.writeFile(dfs, file, bytes);

    // set one dataBlock and the first parityBlock corrupted
    int dataBlkDelNum = 1;
    int parityBlkDelNum = 1;
    int recoverBlkNum = dataBlkDelNum + parityBlkDelNum;
    int[] dataBlkIndices = {0};
    int[] parityBlkIndices = {6};

    LocatedBlocks locatedBlocks = ReadStripedFileWithDecodingHelper.getLocatedBlocks(dfs, file);
    LocatedStripedBlock lastBlock =
        (LocatedStripedBlock)locatedBlocks.getLastLocatedBlock();

    int[] delBlkIndices = new int[recoverBlkNum];
    System.arraycopy(dataBlkIndices, 0,
        delBlkIndices, 0, dataBlkIndices.length);
    System.arraycopy(parityBlkIndices, 0,
        delBlkIndices, dataBlkIndices.length, parityBlkIndices.length);
    ExtendedBlock[] delBlocks = new ExtendedBlock[recoverBlkNum];
    for (int i = 0; i < recoverBlkNum; i++) {
      delBlocks[i] = StripedBlockUtil
          .constructInternalBlock(lastBlock.getBlock(),
              CELL_SIZE, NUM_DATA_UNITS, delBlkIndices[i]);
      cluster.corruptBlockOnDataNodes(delBlocks[i]);
    }

    byte[] buffer = new byte[length + 100];
    StripedFileTestUtil.verifyStatefulRead(dfs, file, length, bytes,
        buffer);
  }
}