TestDataNodeErasureCodingMetrics.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.hdfs.server.datanode;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.StripedFileTestUtil;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import static org.apache.hadoop.test.MetricsAsserts.getLongCounter;
import static org.apache.hadoop.test.MetricsAsserts.getLongCounterWithoutCheck;
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;

import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;

/**
 * This file tests the erasure coding metrics in DataNode.
 */
public class TestDataNodeErasureCodingMetrics {
  public static final Logger LOG = LoggerFactory.
      getLogger(TestDataNodeErasureCodingMetrics.class);
  private final ErasureCodingPolicy ecPolicy =
      StripedFileTestUtil.getDefaultECPolicy();
  private final int dataBlocks = ecPolicy.getNumDataUnits();
  private final int parityBlocks = ecPolicy.getNumParityUnits();
  private final int cellSize = ecPolicy.getCellSize();
  private final int blockSize = cellSize * 2;
  private final int groupSize = dataBlocks + parityBlocks;
  private final int blockGroupSize = blockSize * dataBlocks;
  private final int numDNs = groupSize + 1;

  private MiniDFSCluster cluster;
  private Configuration conf;
  private DistributedFileSystem fs;

  @Before
  public void setup() throws IOException {
    conf = new Configuration();
    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
    conf.setInt(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY, 1);
    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build();
    cluster.waitActive();
    cluster.getFileSystem().getClient().setErasureCodingPolicy("/",
        StripedFileTestUtil.getDefaultECPolicy().getName());
    fs = cluster.getFileSystem();
    fs.enableErasureCodingPolicy(
        StripedFileTestUtil.getDefaultECPolicy().getName());
  }

  @After
  public void tearDown() {
    if (cluster != null) {
      cluster.shutdown();
    }
  }

  @Test(timeout = 120000)
  public void testFullBlock() throws Exception {
    Assert.assertEquals(0, getLongMetric("EcReconstructionReadTimeMillis"));
    Assert.assertEquals(0, getLongMetric("EcReconstructionDecodingTimeMillis"));
    Assert.assertEquals(0, getLongMetric("EcReconstructionWriteTimeMillis"));

    doTest("/testEcMetrics", blockGroupSize, 0);

    Assert.assertEquals("EcReconstructionTasks should be ",
        1, getLongMetric("EcReconstructionTasks"));
    Assert.assertEquals("EcFailedReconstructionTasks should be ",
        0, getLongMetric("EcFailedReconstructionTasks"));
    Assert.assertTrue(getLongMetric("EcDecodingTimeNanos") > 0);
    Assert.assertEquals("EcReconstructionBytesRead should be ",
        blockGroupSize, getLongMetric("EcReconstructionBytesRead"));
    Assert.assertEquals("EcReconstructionBytesWritten should be ",
        blockSize, getLongMetric("EcReconstructionBytesWritten"));
    Assert.assertEquals("EcReconstructionRemoteBytesRead should be ",
        0, getLongMetricWithoutCheck("EcReconstructionRemoteBytesRead"));
    Assert.assertTrue(getLongMetric("EcReconstructionReadTimeMillis") > 0);
    Assert.assertTrue(getLongMetric("EcReconstructionDecodingTimeMillis") > 0);
    Assert.assertTrue(getLongMetric("EcReconstructionWriteTimeMillis") > 0);
  }

  // A partial block, reconstruct the partial block
  @Test(timeout = 120000)
  public void testReconstructionBytesPartialGroup1() throws Exception {
    final int fileLen = blockSize / 10;
    doTest("/testEcBytes", fileLen, 0);

    Assert.assertEquals("EcReconstructionBytesRead should be ",
        fileLen,  getLongMetric("EcReconstructionBytesRead"));
    Assert.assertEquals("EcReconstructionBytesWritten should be ",
        fileLen, getLongMetric("EcReconstructionBytesWritten"));
    Assert.assertEquals("EcReconstructionRemoteBytesRead should be ",
        0, getLongMetricWithoutCheck("EcReconstructionRemoteBytesRead"));
  }

  // 1 full block + 5 partial block, reconstruct the full block
  @Test(timeout = 120000)
  public void testReconstructionBytesPartialGroup2() throws Exception {
    final int fileLen = cellSize * dataBlocks + cellSize + cellSize / 10;
    doTest("/testEcBytes", fileLen, 0);

    Assert.assertEquals("ecReconstructionBytesRead should be ",
        cellSize * dataBlocks + cellSize + cellSize / 10,
        getLongMetric("EcReconstructionBytesRead"));
    Assert.assertEquals("EcReconstructionBytesWritten should be ",
        blockSize, getLongMetric("EcReconstructionBytesWritten"));
    Assert.assertEquals("EcReconstructionRemoteBytesRead should be ",
        0, getLongMetricWithoutCheck("EcReconstructionRemoteBytesRead"));
  }

  // 1 full block + 5 partial block, reconstruct the partial block
  @Test(timeout = 120000)
  public void testReconstructionBytesPartialGroup3() throws Exception {
    final int fileLen = cellSize * dataBlocks + cellSize + cellSize / 10;
    doTest("/testEcBytes", fileLen, 1);

    Assert.assertEquals("ecReconstructionBytesRead should be ",
        cellSize * dataBlocks + (cellSize / 10) * 2 ,
        getLongMetric("EcReconstructionBytesRead"));
    Assert.assertEquals("ecReconstructionBytesWritten should be ",
        cellSize + cellSize / 10,
        getLongMetric("EcReconstructionBytesWritten"));
    Assert.assertEquals("EcReconstructionRemoteBytesRead should be ",
        0, getLongMetricWithoutCheck("EcReconstructionRemoteBytesRead"));
  }

  private long getLongMetric(String metricName) {
    long metricValue = 0;
    // Add all reconstruction metric value from all data nodes
    for (DataNode dn : cluster.getDataNodes()) {
      MetricsRecordBuilder rb = getMetrics(dn.getMetrics().name());
      metricValue += getLongCounter(metricName, rb);
    }
    return metricValue;
  }

  private long getLongMetricWithoutCheck(String metricName) {
    long metricValue = 0;
    // Add all reconstruction metric value from all data nodes
    for (DataNode dn : cluster.getDataNodes()) {
      MetricsRecordBuilder rb = getMetrics(dn.getMetrics().name());
      metricValue += getLongCounterWithoutCheck(metricName, rb);
    }
    return metricValue;
  }

  private void doTest(String fileName, int fileLen,
      int deadNodeIndex) throws Exception {
    assertTrue(fileLen > 0);
    assertTrue(deadNodeIndex >= 0 && deadNodeIndex < numDNs);
    Path file = new Path(fileName);
    final byte[] data = StripedFileTestUtil.generateBytes(fileLen);
    DFSTestUtil.writeFile(fs, file, data);
    StripedFileTestUtil.waitBlockGroupsReported(fs, fileName);

    final LocatedBlocks locatedBlocks =
        StripedFileTestUtil.getLocatedBlocks(file, fs);
    final LocatedStripedBlock lastBlock =
        (LocatedStripedBlock)locatedBlocks.getLastLocatedBlock();
    assertTrue(lastBlock.getLocations().length > deadNodeIndex);

    final DataNode toCorruptDn = cluster.getDataNode(
        lastBlock.getLocations()[deadNodeIndex].getIpcPort());
    LOG.info("Datanode to be corrupted: " + toCorruptDn);
    assertNotNull("Failed to find a datanode to be corrupted", toCorruptDn);
    toCorruptDn.shutdown();
    setDataNodeDead(toCorruptDn.getDatanodeId());
    DFSTestUtil.waitForDatanodeState(cluster, toCorruptDn.getDatanodeUuid(),
        false, 10000);

    final int workCount = getComputedDatanodeWork();
    assertTrue("Wrongly computed block reconstruction work", workCount > 0);
    cluster.triggerHeartbeats();
    int totalBlocks =  (fileLen / blockGroupSize) * groupSize;
    final int remainder = fileLen % blockGroupSize;
    totalBlocks += (remainder == 0) ? 0 :
        (remainder % blockSize == 0) ? remainder / blockSize + parityBlocks :
            remainder / blockSize + 1 + parityBlocks;
    StripedFileTestUtil.waitForAllReconstructionFinished(file, fs, totalBlocks);
  }

  private int getComputedDatanodeWork()
      throws IOException, InterruptedException {
    final BlockManager bm = cluster.getNamesystem().getBlockManager();
    // Giving a grace period to compute datanode work.
    int workCount = 0;
    int retries = 20;
    while (retries > 0) {
      workCount = BlockManagerTestUtil.getComputedDatanodeWork(bm);
      if (workCount > 0) {
        break;
      }
      retries--;
      Thread.sleep(500);
    }
    LOG.info("Computed datanode work: " + workCount + ", retries: " + retries);
    return workCount;
  }

  private void setDataNodeDead(DatanodeID dnID) throws IOException {
    DatanodeDescriptor dnd =
        NameNodeAdapter.getDatanode(cluster.getNamesystem(), dnID);
    DFSTestUtil.setDatanodeDead(dnd);
    BlockManagerTestUtil.checkHeartbeat(
        cluster.getNamesystem().getBlockManager());
  }
}