TestRBFMetrics.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.hdfs.server.federation.metrics;

import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.getBean;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.io.IOException;
import java.math.BigInteger;
import java.util.Iterator;
import java.util.List;

import javax.management.MalformedObjectNameException;

import org.apache.commons.collections4.ListUtils;
import org.apache.hadoop.hdfs.server.federation.router.Router;
import org.apache.hadoop.hdfs.server.federation.store.protocol.NamenodeHeartbeatRequest;
import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState;
import org.apache.hadoop.hdfs.server.federation.store.records.MembershipStats;
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
import org.apache.hadoop.hdfs.server.federation.store.records.RouterState;
import org.apache.hadoop.hdfs.server.federation.store.records.StateStoreVersion;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import org.junit.jupiter.api.Test;

/**
 * Test the JMX interface for the {@link Router}.
 */
public class TestRBFMetrics extends TestMetricsBase {

  public static final String FEDERATION_BEAN =
      "Hadoop:service=Router,name=FederationState";
  public static final String ROUTER_BEAN =
      "Hadoop:service=Router,name=Router";

  @Test
  public void testClusterStatsJMX()
      throws MalformedObjectNameException, IOException {

    FederationMBean federationBean = getBean(FEDERATION_BEAN,
        FederationMBean.class);
    validateClusterStatsFederationBean(federationBean);
    testCapacity(federationBean);
    RouterMBean routerBean = getBean(ROUTER_BEAN, RouterMBean.class);
    validateClusterStatsRouterBean(routerBean);
  }

  @Test
  public void testClusterStatsDataSource() throws IOException {
    RBFMetrics metrics = getRouter().getMetrics();
    validateClusterStatsFederationBean(metrics);
    validateClusterStatsRouterBean(metrics);
  }

  @Test
  public void testMountTableStatsDataSource()
      throws IOException, JSONException {

    RBFMetrics metrics = getRouter().getMetrics();
    String jsonString = metrics.getMountTable();
    JSONArray jsonArray = new JSONArray(jsonString);
    assertEquals(jsonArray.length(), getMockMountTable().size());

    int match = 0;
    for (int i = 0; i < jsonArray.length(); i++) {
      JSONObject json = jsonArray.getJSONObject(i);
      String src = json.getString("sourcePath");

      for (MountTable entry : getMockMountTable()) {
        if (entry.getSourcePath().equals(src)) {
          assertEquals(entry.getDefaultLocation().getNameserviceId(),
              json.getString("nameserviceId"));
          assertEquals(entry.getDefaultLocation().getDest(),
              json.getString("path"));
          assertEquals(entry.getOwnerName(), json.getString("ownerName"));
          assertEquals(entry.getGroupName(), json.getString("groupName"));
          assertEquals(entry.getMode().toString(), json.getString("mode"));
          assertEquals(entry.getQuota().toString(), json.getString("quota"));
          assertNotNullAndNotEmpty(json.getString("dateCreated"));
          assertNotNullAndNotEmpty(json.getString("dateModified"));
          match++;
        }
      }
    }
    assertEquals(match, getMockMountTable().size());
  }

  private MembershipState findMockNamenode(String nsId, String nnId) {

    @SuppressWarnings("unchecked")
    List<MembershipState> namenodes =
        ListUtils.union(getActiveMemberships(), getStandbyMemberships());
    for (MembershipState nn : namenodes) {
      if (nn.getNamenodeId().equals(nnId)
          && nn.getNameserviceId().equals(nsId)) {
        return nn;
      }
    }
    return null;
  }

  @Test
  public void testNamenodeStatsDataSource() throws IOException, JSONException {

    RBFMetrics metrics = getRouter().getMetrics();
    String jsonString = metrics.getNamenodes();
    JSONObject jsonObject = new JSONObject(jsonString);
    Iterator<?> keys = jsonObject.keys();
    int nnsFound = 0;
    while (keys.hasNext()) {
      // Validate each entry against our mocks
      JSONObject json = jsonObject.getJSONObject((String) keys.next());
      String nameserviceId = json.getString("nameserviceId");
      String namenodeId = json.getString("namenodeId");

      MembershipState mockEntry =
          this.findMockNamenode(nameserviceId, namenodeId);
      assertNotNull(mockEntry);

      assertEquals(json.getString("state"), mockEntry.getState().toString());
      MembershipStats stats = mockEntry.getStats();
      assertEquals(json.getLong("numOfActiveDatanodes"),
          stats.getNumOfActiveDatanodes());
      assertEquals(json.getLong("numOfDeadDatanodes"),
          stats.getNumOfDeadDatanodes());
      assertEquals(json.getLong("numOfStaleDatanodes"),
          stats.getNumOfStaleDatanodes());
      assertEquals(json.getLong("numOfDecommissioningDatanodes"),
          stats.getNumOfDecommissioningDatanodes());
      assertEquals(json.getLong("numOfDecomActiveDatanodes"),
          stats.getNumOfDecomActiveDatanodes());
      assertEquals(json.getLong("numOfDecomDeadDatanodes"),
          stats.getNumOfDecomDeadDatanodes());
      assertEquals(json.getLong("numOfInMaintenanceLiveDataNodes"),
          stats.getNumOfInMaintenanceLiveDataNodes());
      assertEquals(json.getLong("numOfInMaintenanceDeadDataNodes"),
          stats.getNumOfInMaintenanceDeadDataNodes());
      assertEquals(json.getLong("numOfEnteringMaintenanceDataNodes"),
          stats.getNumOfEnteringMaintenanceDataNodes());
      assertEquals(json.getLong("numOfBlocks"), stats.getNumOfBlocks());
      assertEquals(json.getString("rpcAddress"), mockEntry.getRpcAddress());
      assertEquals(json.getString("webScheme"), mockEntry.getWebScheme());
      assertEquals(json.getString("webAddress"), mockEntry.getWebAddress());
      nnsFound++;
    }
    // Validate all memberships are present
    assertEquals(getActiveMemberships().size() + getStandbyMemberships().size(),
        nnsFound);
  }

  @Test
  public void testNameserviceStatsDataSource()
      throws IOException, JSONException {

    RBFMetrics metrics = getRouter().getMetrics();
    String jsonString = metrics.getNameservices();
    JSONObject jsonObject = new JSONObject(jsonString);
    Iterator<?> keys = jsonObject.keys();
    int nameservicesFound = 0;
    while (keys.hasNext()) {
      JSONObject json = jsonObject.getJSONObject((String) keys.next());
      String nameserviceId = json.getString("nameserviceId");
      String namenodeId = json.getString("namenodeId");

      MembershipState mockEntry =
          this.findMockNamenode(nameserviceId, namenodeId);
      assertNotNull(mockEntry);

      // NS should report the active NN
      assertEquals(mockEntry.getState().toString(), json.getString("state"));
      assertEquals("ACTIVE", json.getString("state"));

      // Stats in the NS should reflect the stats for the most active NN
      MembershipStats stats = mockEntry.getStats();
      assertEquals(stats.getNumOfFiles(), json.getLong("numOfFiles"));
      assertEquals(stats.getTotalSpace(), json.getLong("totalSpace"));
      assertEquals(stats.getAvailableSpace(),
          json.getLong("availableSpace"));
      assertEquals(stats.getNumOfBlocksMissing(),
          json.getLong("numOfBlocksMissing"));
      assertEquals(stats.getNumOfActiveDatanodes(),
          json.getLong("numOfActiveDatanodes"));
      assertEquals(stats.getNumOfDeadDatanodes(),
          json.getLong("numOfDeadDatanodes"));
      assertEquals(stats.getNumOfStaleDatanodes(),
          json.getLong("numOfStaleDatanodes"));
      assertEquals(stats.getNumOfDecommissioningDatanodes(),
          json.getLong("numOfDecommissioningDatanodes"));
      assertEquals(stats.getNumOfDecomActiveDatanodes(),
          json.getLong("numOfDecomActiveDatanodes"));
      assertEquals(stats.getNumOfDecomDeadDatanodes(),
          json.getLong("numOfDecomDeadDatanodes"));
      assertEquals(stats.getNumOfInMaintenanceLiveDataNodes(),
          json.getLong("numOfInMaintenanceLiveDataNodes"));
      assertEquals(stats.getNumOfInMaintenanceDeadDataNodes(),
          json.getLong("numOfInMaintenanceDeadDataNodes"));
      assertEquals(stats.getNumOfStaleDatanodes(),
          json.getLong("numOfEnteringMaintenanceDataNodes"));
      assertEquals(stats.getProvidedSpace(),
          json.getLong("providedSpace"));
      assertEquals(stats.getPendingSPSPaths(),
          json.getInt("pendingSPSPaths"));
      nameservicesFound++;
    }
    assertEquals(getNameservices().size(), nameservicesFound);
  }

  @Test
  public void testRouterStatsDataSource() throws IOException, JSONException {

    RBFMetrics metrics = getRouter().getMetrics();
    String jsonString = metrics.getRouters();
    JSONObject jsonObject = new JSONObject(jsonString);
    Iterator<?> keys = jsonObject.keys();
    int routersFound = 0;
    while (keys.hasNext()) {
      JSONObject json = jsonObject.getJSONObject((String) keys.next());
      String address = json.getString("address");
      assertNotNullAndNotEmpty(address);
      RouterState router = findMockRouter(address);
      assertNotNull(router);

      assertEquals(router.getStatus().toString(), json.getString("status"));
      assertEquals(router.getCompileInfo(), json.getString("compileInfo"));
      assertEquals(router.getVersion(), json.getString("version"));
      assertEquals(router.getDateStarted(), json.getLong("dateStarted"));
      assertEquals(router.getDateCreated(), json.getLong("dateCreated"));
      assertEquals(router.getDateModified(), json.getLong("dateModified"));

      StateStoreVersion version = router.getStateStoreVersion();
      assertEquals(
          RBFMetrics.getDateString(version.getMembershipVersion()),
          json.get("lastMembershipUpdate"));
      assertEquals(
          RBFMetrics.getDateString(version.getMountTableVersion()),
          json.get("lastMountTableUpdate"));
      assertEquals(version.getMembershipVersion(),
          json.get("membershipVersion"));
      assertEquals(version.getMountTableVersion(),
          json.get("mountTableVersion"));
      routersFound++;
    }

    assertEquals(getMockRouters().size(), routersFound);
  }

  private void assertNotNullAndNotEmpty(String field) {
    assertNotNull(field);
    assertTrue(field.length() > 0);
  }

  private RouterState findMockRouter(String routerId) {
    for (RouterState router : getMockRouters()) {
      if (router.getAddress().equals(routerId)) {
        return router;
      }
    }
    return null;
  }

  private void validateClusterStatsFederationBean(FederationMBean bean) {

    // Determine aggregates
    long numBlocks = 0;
    long numLive = 0;
    long numDead = 0;
    long numStale = 0;
    long numDecom = 0;
    long numDecomLive = 0;
    long numDecomDead = 0;
    long numInMaintenanceLive = 0;
    long numInMaintenanceDead = 0;
    long numEnteringMaintenance = 0;
    int numCorruptsFilesCount = 0;
    long scheduledReplicationBlocks = 0;
    long numberOfMissingBlocksWithReplicationFactorOne = 0;
    long numberOfBadlyDistributedBlocks = 0;
    long highestPriorityLowRedundancyReplicatedBlocks = 0;
    long highestPriorityLowRedundancyECBlocks = 0;
    long numFiles = 0;
    int pendingSPSPaths = 0;
    for (MembershipState mock : getActiveMemberships()) {
      MembershipStats stats = mock.getStats();
      numBlocks += stats.getNumOfBlocks();
      numLive += stats.getNumOfActiveDatanodes();
      numDead += stats.getNumOfDeadDatanodes();
      numStale += stats.getNumOfStaleDatanodes();
      numDecom += stats.getNumOfDecommissioningDatanodes();
      numDecomLive += stats.getNumOfDecomActiveDatanodes();
      numDecomDead += stats.getNumOfDecomDeadDatanodes();
      numInMaintenanceLive += stats.getNumOfInMaintenanceLiveDataNodes();
      numInMaintenanceDead += stats.getNumOfInMaintenanceLiveDataNodes();
      numEnteringMaintenance += stats.getNumOfEnteringMaintenanceDataNodes();
      numCorruptsFilesCount += stats.getCorruptFilesCount();
      scheduledReplicationBlocks += stats.getScheduledReplicationBlocks();
      numberOfMissingBlocksWithReplicationFactorOne +=
          stats.getNumberOfMissingBlocksWithReplicationFactorOne();
      numberOfBadlyDistributedBlocks += stats.getNumberOfBadlyDistributedBlocks();
      highestPriorityLowRedundancyReplicatedBlocks +=
          stats.getHighestPriorityLowRedundancyReplicatedBlocks();
      highestPriorityLowRedundancyECBlocks +=
          stats.getHighestPriorityLowRedundancyECBlocks();
      pendingSPSPaths += stats.getPendingSPSPaths();
    }

    assertEquals(numBlocks, bean.getNumBlocks());
    assertEquals(numLive, bean.getNumLiveNodes());
    assertEquals(numDead, bean.getNumDeadNodes());
    assertEquals(numStale, bean.getNumStaleNodes());
    assertEquals(numDecom, bean.getNumDecommissioningNodes());
    assertEquals(numDecomLive, bean.getNumDecomLiveNodes());
    assertEquals(numDecomDead, bean.getNumDecomDeadNodes());
    assertEquals(numInMaintenanceLive, bean.getNumInMaintenanceLiveDataNodes());
    assertEquals(numInMaintenanceDead, bean.getNumInMaintenanceDeadDataNodes());
    assertEquals(numEnteringMaintenance,
        bean.getNumEnteringMaintenanceDataNodes());
    assertEquals(numFiles, bean.getNumFiles());
    assertEquals(getActiveMemberships().size() + getStandbyMemberships().size(),
        bean.getNumNamenodes());
    assertEquals(getNameservices().size(), bean.getNumNameservices());
    assertEquals(numCorruptsFilesCount, bean.getCorruptFilesCount());
    assertEquals(scheduledReplicationBlocks,
        bean.getScheduledReplicationBlocks());
    assertEquals(numberOfMissingBlocksWithReplicationFactorOne,
        bean.getNumberOfMissingBlocksWithReplicationFactorOne());
    assertEquals(numberOfBadlyDistributedBlocks,
        bean.getNumberOfBadlyDistributedBlocks());
    assertEquals(highestPriorityLowRedundancyReplicatedBlocks,
        bean.getHighestPriorityLowRedundancyReplicatedBlocks());
    assertEquals(highestPriorityLowRedundancyECBlocks,
        bean.getHighestPriorityLowRedundancyECBlocks());
    assertEquals(pendingSPSPaths, bean.getPendingSPSPaths());
  }

  private void validateClusterStatsRouterBean(RouterMBean bean) {
    assertTrue(bean.getVersion().length() > 0);
    assertTrue(bean.getCompiledDate().length() > 0);
    assertTrue(bean.getCompileInfo().length() > 0);
    assertTrue(bean.getRouterStarted().length() > 0);
    assertTrue(bean.getHostAndPort().length() > 0);
    assertFalse(bean.isSecurityEnabled());
  }

  private void testCapacity(FederationMBean bean) throws IOException {
    List<MembershipState> memberships = getActiveMemberships();
    assertTrue(memberships.size() > 1);

    BigInteger availableCapacity = BigInteger.valueOf(0);
    BigInteger totalCapacity = BigInteger.valueOf(0);
    BigInteger unitCapacity = BigInteger.valueOf(Long.MAX_VALUE);
    for (MembershipState mock : memberships) {
      MembershipStats stats = mock.getStats();
      stats.setTotalSpace(Long.MAX_VALUE);
      stats.setAvailableSpace(Long.MAX_VALUE);
      // reset stats to make the new value persistent
      mock.setStats(stats);
      // write back the new namenode information to state store
      assertTrue(refreshNamenodeRegistration(
          NamenodeHeartbeatRequest.newInstance(mock)));
      totalCapacity = totalCapacity.add(unitCapacity);
      availableCapacity = availableCapacity.add(unitCapacity);
    }

    // for local cache update
    assertEquals(totalCapacity, bean.getTotalCapacityBigInt());
    // not equal since overflow happened.
    assertNotEquals(totalCapacity, BigInteger.valueOf(bean.getTotalCapacity()));
    assertEquals(availableCapacity, bean.getRemainingCapacityBigInt());
    // not equal since overflow happened.
    assertNotEquals(availableCapacity,
        BigInteger.valueOf(bean.getRemainingCapacity()));
  }
}