NamenodeHeartbeatService.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.hdfs.server.federation.router;

import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HEALTH_MONITOR_TIMEOUT;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HEALTH_MONITOR_TIMEOUT_DEFAULT;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HEARTBEAT_INTERVAL_MS;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HEARTBEAT_INTERVAL_MS_DEFAULT;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_NAMENODE_HEARTBEAT_JMX_INTERVAL_MS;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_NAMENODE_HEARTBEAT_JMX_INTERVAL_MS_DEFAULT;

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.URI;
import java.security.PrivilegedExceptionAction;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.ha.HAServiceStatus;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.NameNodeProxies;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.NamenodeStatusReport;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.tools.DFSHAAdmin;
import org.apache.hadoop.hdfs.tools.NNHAServiceTarget;
import org.apache.hadoop.hdfs.web.URLConnectionFactory;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.util.Time;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.classification.VisibleForTesting;

/**
 * The {@link Router} periodically checks the state of a Namenode (usually on
 * the same server) and reports their high availability (HA) state and
 * load/space status to the
 * {@link org.apache.hadoop.hdfs.server.federation.store.StateStoreService}
 * . Note that this is an optional role as a Router can be independent of any
 * subcluster.
 * <p>
 * For performance with Namenode HA, the Router uses the high availability state
 * information in the State Store to forward the request to the Namenode that is
 * most likely to be active.
 * <p>
 * Note that this service can be embedded into the Namenode itself to simplify
 * the operation.
 */
public class NamenodeHeartbeatService extends PeriodicService {

  private static final Logger LOG =
      LoggerFactory.getLogger(NamenodeHeartbeatService.class);


  /** Configuration for the heartbeat. */
  private Configuration conf;

  /** Router performing the heartbeating. */
  private final ActiveNamenodeResolver resolver;

  /** Interface to the tracked NN. */
  private final String nameserviceId;
  private final String namenodeId;

  /** Namenode HA target. */
  private NNHAServiceTarget localTarget;
  /** Cache HA protocol. */
  private HAServiceProtocol localTargetHAProtocol;
  /** Cache NN protocol. */
  private NamenodeProtocol namenodeProtocol;
  /** Cache Client protocol. */
  private ClientProtocol clientProtocol;
  /** RPC address for the namenode. */
  private String rpcAddress;
  /** Service RPC address for the namenode. */
  private String serviceAddress;
  /** Service RPC address for the namenode. */
  private String lifelineAddress;
  /** HTTP address for the namenode. */
  private String webAddress;
  /** Connection factory for JMX calls. */
  private URLConnectionFactory connectionFactory;
  /** URL scheme to use for JMX calls. */
  private String scheme;

  /** Frequency of updates to JMX report. */
  private long updateJmxIntervalMs;
  /** Timestamp of last attempt to update JMX report. */
  private long lastJmxUpdateAttempt;
  /** Result of the last successful FsNamesystemMetrics report. */
  private JSONArray fsNamesystemMetrics;
  /** Result of the last successful NamenodeInfoMetrics report. */
  private JSONArray namenodeInfoMetrics;

  private String resolvedHost;
  private String originalNnId;

  private int healthMonitorTimeoutMs = (int) DFS_ROUTER_HEALTH_MONITOR_TIMEOUT_DEFAULT;

  /**
   * Create a new Namenode status updater.
   * @param resolver Namenode resolver service to handle NN registration.
   * @param nsId Identifier of the nameservice.
   * @param nnId Identifier of the namenode in HA.
   */
  public NamenodeHeartbeatService(
      ActiveNamenodeResolver resolver, String nsId, String nnId) {
    super(NamenodeHeartbeatService.class.getSimpleName() +
        (nsId == null ? "" : " " + nsId) +
        (nnId == null ? "" : " " + nnId));

    this.resolver = resolver;

    this.nameserviceId = nsId;
    this.namenodeId = nnId;
  }

  /**
   * Create a new Namenode status updater.
   *
   * @param resolver Namenode resolver service to handle NN registration.
   * @param nsId          Identifier of the nameservice.
   * @param nnId          Identifier of the namenode in HA.
   * @param resolvedHost  resolvedHostname for this specific namenode.
   */
  public NamenodeHeartbeatService(
      ActiveNamenodeResolver resolver, String nsId, String nnId, String resolvedHost) {
    super(getNnHeartBeatServiceName(nsId, nnId));

    this.resolver = resolver;

    this.nameserviceId = nsId;
    // Concat a uniq id from original nnId and resolvedHost
    this.namenodeId = nnId + "-" + resolvedHost;
    this.resolvedHost = resolvedHost;
    // Same the original nnid to get the ports from config.
    this.originalNnId = nnId;

  }

  @Override
  protected void serviceInit(Configuration configuration) throws Exception {

    this.conf = DFSHAAdmin.addSecurityConfiguration(configuration);

    String nnDesc = nameserviceId;
    if (this.namenodeId != null && !this.namenodeId.isEmpty()) {
      nnDesc += "-" + namenodeId;
    } else {
      this.localTarget = null;
    }

    if (originalNnId == null) {
      originalNnId = namenodeId;
    }
    // Get the RPC address for the clients to connect
    this.rpcAddress = getRpcAddress(conf, nameserviceId, originalNnId);

    // Get the Service RPC address for monitoring
    this.serviceAddress =
        DFSUtil.getNamenodeServiceAddr(conf, nameserviceId, originalNnId);
    if (this.serviceAddress == null) {
      LOG.error("Cannot locate RPC service address for NN {}, " +
          "using RPC address {}", nnDesc, this.rpcAddress);
      this.serviceAddress = this.rpcAddress;
    }

    // Get the Lifeline RPC address for faster monitoring
    this.lifelineAddress =
        DFSUtil.getNamenodeLifelineAddr(conf, nameserviceId, originalNnId);
    if (this.lifelineAddress == null) {
      this.lifelineAddress = this.serviceAddress;
    }

    // Get the Web address for UI
    this.webAddress =
        DFSUtil.getNamenodeWebAddr(conf, nameserviceId, originalNnId);

    if (resolvedHost != null) {
      // Get the addresses from resolvedHost plus the configured ports.
      rpcAddress = resolvedHost + ":"
          + NetUtils.getPortFromHostPortString(rpcAddress);
      serviceAddress = resolvedHost + ":"
          + NetUtils.getPortFromHostPortString(serviceAddress);
      lifelineAddress = resolvedHost + ":"
          + NetUtils.getPortFromHostPortString(lifelineAddress);
      webAddress = resolvedHost + ":"
          + NetUtils.getPortFromHostPortString(webAddress);
    }

    LOG.info("{} RPC address: {}", nnDesc, rpcAddress);
    LOG.info("{} Service RPC address: {}", nnDesc, serviceAddress);
    LOG.info("{} Lifeline RPC address: {}", nnDesc, lifelineAddress);
    LOG.info("{} Web address: {}", nnDesc, webAddress);

    if (this.namenodeId != null && !this.namenodeId.isEmpty()) {
      this.localTarget = new NNHAServiceTarget(
          conf, nameserviceId, namenodeId, serviceAddress, lifelineAddress);
    }

    this.connectionFactory =
        URLConnectionFactory.newDefaultURLConnectionFactory(conf);

    this.scheme =
        DFSUtil.getHttpPolicy(conf).isHttpEnabled() ? "http" : "https";

    this.setIntervalMs(conf.getLong(
        DFS_ROUTER_HEARTBEAT_INTERVAL_MS,
        DFS_ROUTER_HEARTBEAT_INTERVAL_MS_DEFAULT));

    long timeoutMs = conf.getTimeDuration(DFS_ROUTER_HEALTH_MONITOR_TIMEOUT,
        DFS_ROUTER_HEALTH_MONITOR_TIMEOUT_DEFAULT, TimeUnit.MILLISECONDS);
    if (timeoutMs < 0) {
      LOG.warn("Invalid value {} configured for {} should be greater than or equal to 0. " +
          "Using value of : 0ms instead.", timeoutMs, DFS_ROUTER_HEALTH_MONITOR_TIMEOUT);
      this.healthMonitorTimeoutMs = 0;
    } else {
      this.healthMonitorTimeoutMs = (int) timeoutMs;
    }

    this.updateJmxIntervalMs = conf.getTimeDuration(DFS_ROUTER_NAMENODE_HEARTBEAT_JMX_INTERVAL_MS,
        DFS_ROUTER_NAMENODE_HEARTBEAT_JMX_INTERVAL_MS_DEFAULT, TimeUnit.MILLISECONDS);

    super.serviceInit(configuration);
  }

  @Override
  public void periodicInvoke() {
    try {
      // Run using the login user credentials
      SecurityUtil.doAsLoginUser((PrivilegedExceptionAction<Void>) () -> {
        updateState();
        return null;
      });
    } catch (IOException e) {
      LOG.error("Cannot update namenode state", e);
    }
  }

  /**
   * Get the RPC address for a Namenode.
   * @param conf Configuration.
   * @param nsId Name service identifier.
   * @param nnId Name node identifier.
   * @return RPC address in format hostname:1234.
   */
  private static String getRpcAddress(
      Configuration conf, String nsId, String nnId) {

    // Get it from the regular RPC setting
    String confKey = DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
    String ret = conf.get(confKey);

    if (nsId != null || nnId != null) {
      // Get if for the proper nameservice and namenode
      confKey = DFSUtil.addKeySuffixes(confKey, nsId, nnId);
      ret = conf.get(confKey);

      // If not available, get it from the map
      if (ret == null) {
        Map<String, InetSocketAddress> rpcAddresses =
            DFSUtil.getRpcAddressesForNameserviceId(conf, nsId, null);
        InetSocketAddress sockAddr = null;
        if (nnId != null) {
          sockAddr = rpcAddresses.get(nnId);
        } else if (rpcAddresses.size() == 1) {
          // Get the only namenode in the namespace
          sockAddr = rpcAddresses.values().iterator().next();
        }
        if (sockAddr != null) {
          InetAddress addr = sockAddr.getAddress();
          ret = addr.getHostName() + ":" + sockAddr.getPort();
        }
      }
    }
    return ret;
  }

  /**
   * Update the state of the Namenode.
   */
  private void updateState() {
    NamenodeStatusReport report = getNamenodeStatusReport();
    if (!report.registrationValid()) {
      // Not operational
      LOG.error("Namenode is not operational: {}", getNamenodeDesc());
    } else if (report.haStateValid()) {
      // block and HA status available
      if (LOG.isDebugEnabled()) {
        LOG.debug("Received service state: {} from HA namenode: {}",
            report.getState(), getNamenodeDesc());
      }
    } else if (localTarget == null) {
      // block info available, HA status not expected
      if (LOG.isDebugEnabled()) {
        LOG.debug(
            "Reporting non-HA namenode as operational: {}", getNamenodeDesc());
      }
    } else {
      // block info available, HA status should be available, but was not
      // fetched do nothing and let the current state stand
      return;
    }
    try {
      if (!resolver.registerNamenode(report)) {
        LOG.warn("Cannot register namenode {}", report);
      }
    } catch (Exception e) {
      LOG.error("Cannot register namenode {} in the State Store", getNamenodeDesc(), e);
    }
  }

  /**
   * Get the status report for the Namenode monitored by this heartbeater.
   * @return Namenode status report.
   */
  protected NamenodeStatusReport getNamenodeStatusReport() {
    NamenodeStatusReport report = new NamenodeStatusReport(nameserviceId,
        namenodeId, rpcAddress, serviceAddress,
        lifelineAddress, scheme, webAddress);

    try {
      LOG.debug("Probing NN at service address: {}", serviceAddress);

      URI serviceURI = new URI("hdfs://" + serviceAddress);

      // Read the filesystem info from RPC (required)
      updateNameSpaceInfoParameters(serviceURI, report);
      if (!report.registrationValid()) {
        return report;
      }

      // Check for safemode from the client protocol. Currently optional, but
      // should be required at some point for QoS
      updateSafeModeParameters(serviceURI, report);

      // Read the stats from JMX (optional)
      updateJMXParameters(webAddress, report);

      // Try to get the HA status
      updateHAStatusParameters(report);
    } catch (IOException e) {
      LOG.error("Cannot communicate with {}: {}",
          getNamenodeDesc(), e.getMessage());
    } catch (Throwable e) {
      // Generic error that we don't know about
      LOG.error("Unexpected exception while communicating with {}: {}",
          getNamenodeDesc(), e.getMessage(), e);
    }
    return report;
  }

  @VisibleForTesting
  NNHAServiceTarget getLocalTarget(){
    return this.localTarget;
  }

  /**
   * Get the description of the Namenode to monitor.
   * @return Description of the Namenode to monitor.
   */
  public String getNamenodeDesc() {
    if (namenodeId != null && !namenodeId.isEmpty()) {
      return nameserviceId + "-" + namenodeId + ":" + serviceAddress;
    } else {
      return nameserviceId + ":" + serviceAddress;
    }
  }

  private static String getNnHeartBeatServiceName(String nsId, String nnId) {
    return NamenodeHeartbeatService.class.getSimpleName() +
        (nsId == null ? "" : " " + nsId) +
        (nnId == null ? "" : " " + nnId);
  }

  /**
   * Get the namespace information for a Namenode via RPC and add them to the report.
   * @param serviceURI Server address of the Namenode to monitor.
   * @param report Namenode status report updating with namespace information data.
   * @throws IOException This method will throw IOException up, because RBF need
   *                     use Namespace Info to identify this NS. If there are some IOExceptions,
   *                     RBF doesn't need to get other information from NameNode,
   *                     so throw IOException up.
   */
  private void updateNameSpaceInfoParameters(URI serviceURI,
      NamenodeStatusReport report) throws IOException {
    try {
      if (this.namenodeProtocol == null) {
        this.namenodeProtocol = NameNodeProxies.createProxy(this.conf, serviceURI,
            NamenodeProtocol.class).getProxy();
      }
      if (namenodeProtocol != null) {
        NamespaceInfo info = namenodeProtocol.versionRequest();
        if (info != null) {
          report.setNamespaceInfo(info);
        }
      }
    } catch (IOException e) {
      this.namenodeProtocol = null;
      throw e;
    }
  }

  /**
   * Get the safemode information for a Namenode via RPC and add them to the report.
   * Safemode is only one status of NameNode and is useless for RBF identify one NameNode.
   * So If there are some IOExceptions, RBF can just ignore it and try to collect
   * other information form namenode continue.
   * @param serviceURI Server address of the Namenode to monitor.
   * @param report Namenode status report updating with safemode information data.
   */
  private void updateSafeModeParameters(URI serviceURI, NamenodeStatusReport report) {
    try {
      if (this.clientProtocol == null) {
        this.clientProtocol = NameNodeProxies
            .createProxy(this.conf, serviceURI, ClientProtocol.class)
            .getProxy();
      }
      if (clientProtocol != null) {
        boolean isSafeMode = clientProtocol.setSafeMode(SafeModeAction.SAFEMODE_GET, false);
        report.setSafeMode(isSafeMode);
      }
    } catch (Exception e) {
      LOG.error("Cannot fetch safemode state for {}", getNamenodeDesc(), e);
      this.clientProtocol = null;
    }
  }

  /**
   * Get the parameters for a Namenode from JMX and add them to the report.
   * @param address Web interface of the Namenode to monitor.
   * @param report Namenode status report to update with JMX data.
   */
  private void updateJMXParameters(
      String address, NamenodeStatusReport report) {
    try {
      // TODO part of this should be moved to its own utility
      if (shouldUpdateJmx()) {
        this.lastJmxUpdateAttempt = Time.monotonicNow();
        getFsNamesystemMetrics(address);
        getNamenodeInfoMetrics(address);
      }
      populateFsNamesystemMetrics(this.fsNamesystemMetrics, report);
      populateNamenodeInfoMetrics(this.namenodeInfoMetrics, report);
    } catch (Exception e) {
      LOG.error("Cannot get stat from {} using JMX", getNamenodeDesc(), e);
    }
  }

  /**
   * Get the HA status for a Namenode via RPC and add them to the report.
   * @param report Namenode status report updating with HA status information data.
   */
  private void updateHAStatusParameters(NamenodeStatusReport report) {
    if (localTarget != null) {
      try {
        // Determine if NN is active
        if (localTargetHAProtocol == null) {
          localTargetHAProtocol = localTarget.getHealthMonitorProxy(
              conf, this.healthMonitorTimeoutMs);
          LOG.debug("Get HA status with address {}", lifelineAddress);
        }
        HAServiceStatus status = localTargetHAProtocol.getServiceStatus();
        report.setHAServiceState(status.getState());
      } catch (Throwable e) {
        if (e.getMessage().startsWith("HA for namenode is not enabled")) {
          LOG.error("HA for {} is not enabled", getNamenodeDesc());
          localTarget = null;
        } else {
          // Failed to fetch HA status, ignoring failure
          LOG.error("Cannot fetch HA status for {}", getNamenodeDesc(), e);
        }
        localTargetHAProtocol = null;
      }
    }
  }

  /**
   * Evaluates whether the JMX report should be refreshed by
   * calling the Namenode, based on the following conditions:
   * 1. JMX Updates must be enabled.
   * 2. The last attempt to update JMX occurred before the
   *    configured interval (if any).
   */
  private boolean shouldUpdateJmx() {
    if (this.updateJmxIntervalMs < 0) {
      return false;
    }

    return Time.monotonicNow() - this.lastJmxUpdateAttempt > this.updateJmxIntervalMs;
  }

  /**
   * Fetches NamenodeInfo metrics from namenode.
   * @param address Web interface of the Namenode to monitor.
   */
  private void getNamenodeInfoMetrics(String address) {
    String query = "Hadoop:service=NameNode,name=NameNodeInfo";
    this.namenodeInfoMetrics = FederationUtil.getJmx(query, address, connectionFactory, scheme);
  }

  /**
   * Populates NamenodeInfo metrics into report.
   * @param aux NamenodeInfo metrics from namenode.
   * @param report Namenode status report to update with JMX data.
   * @throws JSONException When an invalid JSONObject is found
   */
  private void populateNamenodeInfoMetrics(JSONArray aux, NamenodeStatusReport report)
      throws JSONException {
    if (aux != null && aux.length() > 0) {
      JSONObject jsonObject = aux.getJSONObject(0);
      String name = jsonObject.getString("name");
      if (name.equals("Hadoop:service=NameNode,name=NameNodeInfo")) {
        report.setNamenodeInfo(jsonObject.optInt("CorruptFilesCount"),
            jsonObject
                .optLong("NumberOfMissingBlocksWithReplicationFactorOne"),
            jsonObject
                .optLong("HighestPriorityLowRedundancyReplicatedBlocks"),
            jsonObject.optLong("HighestPriorityLowRedundancyECBlocks"),
            jsonObject.optLong("BadlyDistributedBlocks"));
      }
    }
  }

  /**
   * Fetches FSNamesystem* metrics from namenode.
   * @param address Web interface of the Namenode to monitor.
   */
  private void getFsNamesystemMetrics(String address) {
    String query = "Hadoop:service=NameNode,name=FSNamesystem*";
    this.fsNamesystemMetrics = FederationUtil.getJmx(query, address, connectionFactory, scheme);
  }

  /**
   * Populates FSNamesystem* metrics into report.
   * @param aux FSNamesystem* metrics from namenode.
   * @param report Namenode status report to update with JMX data.
   * @throws JSONException When invalid JSONObject is found.
   */
  private void populateFsNamesystemMetrics(JSONArray aux, NamenodeStatusReport report)
      throws JSONException {
    if (aux != null) {
      for (int i = 0; i < aux.length(); i++) {
        JSONObject jsonObject = aux.getJSONObject(i);
        String name = jsonObject.getString("name");
        if (name.equals("Hadoop:service=NameNode,name=FSNamesystemState")) {
          report.setDatanodeInfo(
              jsonObject.getInt("NumLiveDataNodes"),
              jsonObject.getInt("NumDeadDataNodes"),
              jsonObject.getInt("NumStaleDataNodes"),
              jsonObject.getInt("NumDecommissioningDataNodes"),
              jsonObject.getInt("NumDecomLiveDataNodes"),
              jsonObject.getInt("NumDecomDeadDataNodes"),
              jsonObject.optInt("NumInMaintenanceLiveDataNodes"),
              jsonObject.optInt("NumInMaintenanceDeadDataNodes"),
              jsonObject.optInt("NumEnteringMaintenanceDataNodes"),
              jsonObject.optLong("ScheduledReplicationBlocks"));
        } else if (name.equals(
            "Hadoop:service=NameNode,name=FSNamesystem")) {
          report.setNamesystemInfo(
              jsonObject.getLong("CapacityRemaining"),
              jsonObject.getLong("CapacityTotal"),
              jsonObject.getLong("FilesTotal"),
              jsonObject.getLong("BlocksTotal"),
              jsonObject.getLong("MissingBlocks"),
              jsonObject.getLong("PendingReplicationBlocks"),
              jsonObject.getLong("UnderReplicatedBlocks"),
              jsonObject.getLong("PendingDeletionBlocks"),
              jsonObject.optLong("ProvidedCapacityTotal"),
              jsonObject.getInt("PendingSPSPaths"));
        }
      }
    }
  }

  @Override
  protected void serviceStop() throws Exception {
    LOG.info("Stopping NamenodeHeartbeat service for, NS {} NN {} ",
        this.nameserviceId, this.namenodeId);
    if (this.connectionFactory != null) {
      this.connectionFactory.destroy();
    }
    super.serviceStop();
  }
}