TestDataNodeMXBean.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.hdfs.server.datanode;

import java.lang.management.ManagementFactory;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;

import javax.management.MBeanServer;
import javax.management.ObjectName;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferTestCase;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.jupiter.api.Test;
import org.eclipse.jetty.util.ajax.JSON;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
/**
 * Class for testing {@link DataNodeMXBean} implementation
 */
public class TestDataNodeMXBean extends SaslDataTransferTestCase {

  public static final Logger LOG =
      LoggerFactory.getLogger(TestDataNodeMXBean.class);

  @Test
  public void testDataNodeMXBean() throws Exception {
    Configuration conf = new Configuration();
    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();

    try {
      List<DataNode> datanodes = cluster.getDataNodes();
      assertEquals(datanodes.size(), 1);
      DataNode datanode = datanodes.get(0);

      MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); 
      ObjectName mxbeanName = new ObjectName(
          "Hadoop:service=DataNode,name=DataNodeInfo");
      // get attribute "ClusterId"
      String clusterId = (String) mbs.getAttribute(mxbeanName, "ClusterId");
      assertEquals(datanode.getClusterId(), clusterId);
      // get attribute "Version"
      String version = (String)mbs.getAttribute(mxbeanName, "Version");
      assertEquals(datanode.getVersion(), version);
      // get attribute "DNStartedTimeInMillis"
      long startTime = (long) mbs.getAttribute(mxbeanName, "DNStartedTimeInMillis");
      assertTrue(startTime > 0, "Datanode start time should not be 0");
      assertEquals(datanode.getDNStartedTimeInMillis(), startTime);
      // get attribute "SotfwareVersion"
      String softwareVersion =
          (String)mbs.getAttribute(mxbeanName, "SoftwareVersion");
      assertEquals(datanode.getSoftwareVersion(), softwareVersion);
      assertEquals(version, softwareVersion
          + ", r" + datanode.getRevision());
      // get attribute "RpcPort"
      String rpcPort = (String)mbs.getAttribute(mxbeanName, "RpcPort");
      assertEquals(datanode.getRpcPort(), rpcPort);
      // get attribute "HttpPort"
      String httpPort = (String)mbs.getAttribute(mxbeanName, "HttpPort");
      assertNotNull(httpPort);
      assertEquals(datanode.getHttpPort(), httpPort);
      // get attribute "NamenodeAddresses"
      String namenodeAddresses = (String)mbs.getAttribute(mxbeanName, 
          "NamenodeAddresses");
      assertEquals(datanode.getNamenodeAddresses(), namenodeAddresses);
      // get attribute "getDatanodeHostname"
      String datanodeHostname = (String)mbs.getAttribute(mxbeanName,
          "DatanodeHostname");
      assertEquals(datanode.getDatanodeHostname(), datanodeHostname);
      // get attribute "getVolumeInfo"
      String volumeInfo = (String)mbs.getAttribute(mxbeanName, "VolumeInfo");
      assertEquals(replaceDigits(datanode.getVolumeInfo()),
          replaceDigits(volumeInfo));
      // Ensure mxbean's XceiverCount is same as the DataNode's
      // live value.
      int xceiverCount = (Integer)mbs.getAttribute(mxbeanName,
          "XceiverCount");
      assertEquals(datanode.getXceiverCount(), xceiverCount);
      // Ensure mxbean's XmitsInProgress is same as the DataNode's
      // live value.
      int xmitsInProgress =
          (Integer) mbs.getAttribute(mxbeanName, "XmitsInProgress");
      assertEquals(datanode.getXmitsInProgress(), xmitsInProgress);
      String bpActorInfo = (String)mbs.getAttribute(mxbeanName,
          "BPServiceActorInfo");
      assertEquals(datanode.getBPServiceActorInfo(), bpActorInfo);
      String slowDisks = (String)mbs.getAttribute(mxbeanName, "SlowDisks");
      assertEquals(datanode.getSlowDisks(), slowDisks);
    } finally {
      if (cluster != null) {
        cluster.shutdown();
      }
    }
  }

  @Test
  public void testDataNodeMXBeanSecurityEnabled() throws Exception {
    Configuration simpleConf = new Configuration();
    Configuration secureConf = createSecureConfig("authentication");

    // get attribute "SecurityEnabled" with simple configuration
    try (MiniDFSCluster cluster =
                 new MiniDFSCluster.Builder(simpleConf).build()) {
      List<DataNode> datanodes = cluster.getDataNodes();
      assertEquals(datanodes.size(), 1);
      DataNode datanode = datanodes.get(0);

      MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
      ObjectName mxbeanName = new ObjectName(
              "Hadoop:service=DataNode,name=DataNodeInfo");

      boolean securityEnabled = (boolean) mbs.getAttribute(mxbeanName,
              "SecurityEnabled");
      assertFalse(securityEnabled);
      assertEquals(datanode.isSecurityEnabled(), securityEnabled);
    }

    // get attribute "SecurityEnabled" with secure configuration
    try (MiniDFSCluster cluster =
                 new MiniDFSCluster.Builder(secureConf).build()) {
      List<DataNode> datanodes = cluster.getDataNodes();
      assertEquals(datanodes.size(), 1);
      DataNode datanode = datanodes.get(0);

      MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
      ObjectName mxbeanName = new ObjectName(
              "Hadoop:service=DataNode,name=DataNodeInfo");

      boolean securityEnabled = (boolean) mbs.getAttribute(mxbeanName,
              "SecurityEnabled");
      assertTrue(securityEnabled);
      assertEquals(datanode.isSecurityEnabled(), securityEnabled);
    }

    // setting back the authentication method
    UserGroupInformation.setConfiguration(simpleConf);
  }
  
  private static String replaceDigits(final String s) {
    return s.replaceAll("[0-9]+", "_DIGITS_");
  }

  @Test
  public void testDataNodeMXBeanBlockSize() throws Exception {
    Configuration conf = new Configuration();

    try(MiniDFSCluster cluster =
        new MiniDFSCluster.Builder(conf).build()) {
      DataNode dn = cluster.getDataNodes().get(0);
      for (int i = 0; i < 100; i++) {
        DFSTestUtil.writeFile(
            cluster.getFileSystem(),
            new Path("/foo" + String.valueOf(i) + ".txt"), "test content");
      }
      DataNodeTestUtils.triggerBlockReport(dn);
      MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
      ObjectName mxbeanName = new ObjectName(
          "Hadoop:service=DataNode,name=DataNodeInfo");
      String bpActorInfo = (String)mbs.getAttribute(mxbeanName,
          "BPServiceActorInfo");
      assertEquals(dn.getBPServiceActorInfo(), bpActorInfo);
      LOG.info("bpActorInfo is " + bpActorInfo);
      TypeReference<ArrayList<Map<String, String>>> typeRef
          = new TypeReference<ArrayList<Map<String, String>>>() {};
      ArrayList<Map<String, String>> bpActorInfoList =
          new ObjectMapper().readValue(bpActorInfo, typeRef);
      int maxDataLength =
          Integer.valueOf(bpActorInfoList.get(0).get("maxDataLength"));
      int confMaxDataLength = dn.getConf().getInt(
          CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH,
          CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH_DEFAULT);
      int maxBlockReportSize =
          Integer.valueOf(bpActorInfoList.get(0).get("maxBlockReportSize"));
      LOG.info("maxDataLength is " + maxDataLength);
      LOG.info("maxBlockReportSize is " + maxBlockReportSize);
      assertTrue(maxBlockReportSize > 0,
          "maxBlockReportSize should be greater than zero");
      assertEquals(confMaxDataLength, maxDataLength, "maxDataLength should be exactly "
          + "the same value of ipc.maximum.data.length");
    }
  }

  @Test
  public void testDataNodeMXBeanBlockCount() throws Exception {
    Configuration conf = new Configuration();
    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();

    try {
      List<DataNode> datanodes = cluster.getDataNodes();
      assertEquals(datanodes.size(), 1);

      MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
      ObjectName mxbeanName =
              new ObjectName("Hadoop:service=DataNode,name=DataNodeInfo");
      FileSystem fs = cluster.getFileSystem();
      for (int i = 0; i < 5; i++) {
        DFSTestUtil.createFile(fs, new Path("/tmp.txt" + i), 1024, (short) 1,
                1L);
      }
      assertEquals(5, getTotalNumBlocks(mbs, mxbeanName), "Before restart DN");
      cluster.restartDataNode(0);
      cluster.waitActive();
      assertEquals(5, getTotalNumBlocks(mbs, mxbeanName), "After restart DN");
      fs.delete(new Path("/tmp.txt1"), true);
      // The total numBlocks should be updated after one file is deleted
      GenericTestUtils.waitFor(new Supplier<Boolean>() {
        @Override
        public Boolean get() {
          try {
            return getTotalNumBlocks(mbs, mxbeanName) == 4;
          } catch (Exception e) {
            e.printStackTrace();
            return false;
          }
        }
      }, 100, 30000);
    } finally {
      if (cluster != null) {
        cluster.shutdown();
      }
    }
  }

  @SuppressWarnings("unchecked")
  private int getTotalNumBlocks(MBeanServer mbs, ObjectName mxbeanName)
          throws Exception {
    int totalBlocks = 0;
    String volumeInfo = (String) mbs.getAttribute(mxbeanName, "VolumeInfo");
    Map<?, ?> m = (Map<?, ?>) JSON.parse(volumeInfo);
    Collection<Map<String, Long>> values =
            (Collection<Map<String, Long>>) m.values();
    for (Map<String, Long> volumeInfoMap : values) {
      totalBlocks += volumeInfoMap.get("numBlocks");
    }
    return totalBlocks;
  }

  @Test
  public void testDataNodeMXBeanSlowDisksEnabled() throws Exception {
    Configuration conf = new Configuration();
    conf.setInt(DFSConfigKeys
        .DFS_DATANODE_FILEIO_PROFILING_SAMPLING_PERCENTAGE_KEY, 100);
    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();

    try {
      List<DataNode> datanodes = cluster.getDataNodes();
      assertEquals(datanodes.size(), 1);
      DataNode datanode = datanodes.get(0);
      String slowDiskPath = "test/data1/slowVolume";
      datanode.getDiskMetrics().addSlowDiskForTesting(slowDiskPath, null);

      MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
      ObjectName mxbeanName = new ObjectName(
          "Hadoop:service=DataNode,name=DataNodeInfo");

      String slowDisks = (String)mbs.getAttribute(mxbeanName, "SlowDisks");
      assertEquals(datanode.getSlowDisks(), slowDisks);
      assertTrue(slowDisks.contains(slowDiskPath));
    } finally {
      if (cluster != null) {cluster.shutdown();}
    }
  }

  @Test
  public void testDataNodeMXBeanLastHeartbeats() throws Exception {
    Configuration conf = new Configuration();
    try (MiniDFSCluster cluster = new MiniDFSCluster
        .Builder(conf)
        .nnTopology(MiniDFSNNTopology.simpleHATopology(2))
        .build()) {
      cluster.waitActive();
      cluster.transitionToActive(0);
      cluster.transitionToStandby(1);

      DataNode datanode = cluster.getDataNodes().get(0);

      // Verify and wait until one of the BP service actor identifies active namenode as active
      // and another as standby.
      cluster.waitDatanodeConnectedToActive(datanode, 5000);

      // Verify that last heartbeat sent to both namenodes in last 5 sec.
      assertLastHeartbeatSentTime(datanode, "LastHeartbeat");
      // Verify that last heartbeat response from both namenodes have been received within
      // last 5 sec.
      assertLastHeartbeatSentTime(datanode, "LastHeartbeatResponseTime");


      NameNode sbNameNode = cluster.getNameNode(1);

      // Stopping standby namenode
      sbNameNode.stop();

      // Verify that last heartbeat response time from one of the namenodes would stay much higher
      // after stopping one namenode.
      GenericTestUtils.waitFor(() -> {
        List<Map<String, String>> bpServiceActorInfo = datanode.getBPServiceActorInfoMap();
        Map<String, String> bpServiceActorInfo1 = bpServiceActorInfo.get(0);
        Map<String, String> bpServiceActorInfo2 = bpServiceActorInfo.get(1);

        long lastHeartbeatResponseTime1 =
            Long.parseLong(bpServiceActorInfo1.get("LastHeartbeatResponseTime"));
        long lastHeartbeatResponseTime2 =
            Long.parseLong(bpServiceActorInfo2.get("LastHeartbeatResponseTime"));

        LOG.info("Last heartbeat response from namenode 1: {}", lastHeartbeatResponseTime1);
        LOG.info("Last heartbeat response from namenode 2: {}", lastHeartbeatResponseTime2);

        return (lastHeartbeatResponseTime1 < 5L && lastHeartbeatResponseTime2 > 5L) || (
            lastHeartbeatResponseTime1 > 5L && lastHeartbeatResponseTime2 < 5L);

      }, 200, 15000,
          "Last heartbeat response should be higher than 5s for at least one namenode");

      // Verify that last heartbeat sent to both namenodes in last 5 sec even though
      // the last heartbeat received from one of the namenodes is greater than 5 sec ago.
      assertLastHeartbeatSentTime(datanode, "LastHeartbeat");
    }
  }

  private static void assertLastHeartbeatSentTime(DataNode datanode, String lastHeartbeat) {
    List<Map<String, String>> bpServiceActorInfo = datanode.getBPServiceActorInfoMap();
    Map<String, String> bpServiceActorInfo1 = bpServiceActorInfo.get(0);
    Map<String, String> bpServiceActorInfo2 = bpServiceActorInfo.get(1);

    long lastHeartbeatSent1 =
        Long.parseLong(bpServiceActorInfo1.get(lastHeartbeat));
    long lastHeartbeatSent2 =
        Long.parseLong(bpServiceActorInfo2.get(lastHeartbeat));

    assertTrue(lastHeartbeatSent1 < 5L, lastHeartbeat
        + " for first bp service actor is higher than 5s");
    assertTrue(lastHeartbeatSent2 < 5L, lastHeartbeat
        + " for second bp service actor is higher than 5s");
  }

}