TestMissingBlocksAlert.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.hdfs;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.blockmanagement.AvailableSpaceBlockPlacementPolicy;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.junit.jupiter.api.Test;

import javax.management.*;

import java.io.IOException;
import java.lang.management.ManagementFactory;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

/**
 * The test makes sure that NameNode detects presense blocks that do not have
 * any valid replicas. In addition, it verifies that HDFS front page displays
 * a warning in such a case.
 */
public class TestMissingBlocksAlert {
  
  private static final Logger LOG =
      LoggerFactory.getLogger(TestMissingBlocksAlert.class);
  
  @Test
  public void testMissingBlocksAlert()
          throws IOException, InterruptedException,
                 MalformedObjectNameException, AttributeNotFoundException,
                 MBeanException, ReflectionException,
                 InstanceNotFoundException {
    
    MiniDFSCluster cluster = null;
    
    try {
      Configuration conf = new HdfsConfiguration();
      //minimize test delay
      conf.setInt(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY,
          0);
      conf.setInt(HdfsClientConfigKeys.Retry.WINDOW_BASE_KEY, 10);
      int fileLen = 10*1024;
      conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, fileLen/2);

      //start a cluster with single datanode
      cluster = new MiniDFSCluster.Builder(conf).build();
      cluster.waitActive();

      final BlockManager bm = cluster.getNamesystem().getBlockManager();
      DistributedFileSystem dfs =
          cluster.getFileSystem();

      // create a normal file
      DFSTestUtil.createFile(dfs, new Path("/testMissingBlocksAlert/file1"), 
                             fileLen, (short)3, 0);

      Path corruptFile = new Path("/testMissingBlocks/corruptFile");
      DFSTestUtil.createFile(dfs, corruptFile, fileLen, (short)3, 0);

      // Corrupt the block
      ExtendedBlock block = DFSTestUtil.getFirstBlock(dfs, corruptFile);
      cluster.corruptReplica(0, block);

      // read the file so that the corrupt block is reported to NN
      FSDataInputStream in = dfs.open(corruptFile); 
      try {
        in.readFully(new byte[fileLen]);
      } catch (ChecksumException ignored) { // checksum error is expected.      
      }
      in.close();

      LOG.info("Waiting for missing blocks count to increase...");

      while (dfs.getMissingBlocksCount() <= 0) {
        Thread.sleep(100);
      }
      assertTrue(dfs.getMissingBlocksCount() == 1);
      assertEquals(4, dfs.getLowRedundancyBlocksCount());
      assertEquals(3, bm.getUnderReplicatedNotMissingBlocks());

      MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
      ObjectName mxbeanName = new ObjectName(
              "Hadoop:service=NameNode,name=NameNodeInfo");
      assertEquals(1, (long) (Long) mbs.getAttribute(mxbeanName,
          "NumberOfMissingBlocks"));

      // now do the reverse : remove the file expect the number of missing 
      // blocks to go to zero

      dfs.delete(corruptFile, true);

      LOG.info("Waiting for missing blocks count to be zero...");
      while (dfs.getMissingBlocksCount() > 0) {
        Thread.sleep(100);
      }

      assertEquals(2, dfs.getLowRedundancyBlocksCount());
      assertEquals(2, bm.getUnderReplicatedNotMissingBlocks());

      assertEquals(0, (long) (Long) mbs.getAttribute(mxbeanName,
          "NumberOfMissingBlocks"));

      Path replOneFile = new Path("/testMissingBlocks/replOneFile");
      DFSTestUtil.createFile(dfs, replOneFile, fileLen, (short)1, 0);
      ExtendedBlock replOneBlock = DFSTestUtil.getFirstBlock(
          dfs, replOneFile);
      cluster.corruptReplica(0, replOneBlock);

      // read the file so that the corrupt block is reported to NN
      in = dfs.open(replOneFile);
      try {
        in.readFully(new byte[fileLen]);
      } catch (ChecksumException ignored) { // checksum error is expected.
      }
      in.close();
      assertEquals(1, dfs.getMissingReplOneBlocksCount());
      assertEquals(1, (long) (Long) mbs.getAttribute(mxbeanName,
          "NumberOfMissingBlocksWithReplicationFactorOne"));
    } finally {
      if (cluster != null) {
        cluster.shutdown();
      }
    }
  }

  @Test
  public void testMissReplicatedBlockwithTwoRack() throws Exception {
    Configuration conf = new Configuration();
    //Start cluster with rack /default/rack1
    String[] hosts = new String[] {"host0", "host1", "host2", "host3"};
    String[] racks = new String[] {"/default/rack1", "/default/rack1",
        "/default/rack1", "/default/rack1"};
    conf.set(DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY,
        AvailableSpaceBlockPlacementPolicy.class.getName());
    conf.setInt(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY, 1);
    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(4)
        .hosts(hosts).racks(racks).build();
    Path file = new Path("/file2");
    try {
      DistributedFileSystem dfs = cluster.getFileSystem();
      DFSTestUtil.createFile(dfs, file, 1024, (short) 2, 0);
      dfs.getFileStatus(file);
      //Add one more rack /default/rack2
      cluster.startDataNodes(conf, 2, true, null,
          new String[] {"/default/rack2", "/default/rack2"},
          new String[] {"host4", "host5"}, null);
      dfs.setReplication(file, (short) 3);
      // wait for block replication
      DFSTestUtil.waitForReplication(dfs, file, (short) 3, 60000);
    } finally {
      cluster.shutdown();
    }
  }
}