SlowPeerTracker.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 * <p>
 * http://www.apache.org/licenses/LICENSE-2.0
 * <p>
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.hdfs.server.blockmanagement;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableMap;
import org.apache.hadoop.thirdparty.com.google.common.primitives.Ints;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.server.protocol.OutlierMetrics;
import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
import org.apache.hadoop.util.Timer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;


/**
 * This class aggregates information from {@link SlowPeerReports} received via
 * heartbeats.
 */
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class SlowPeerTracker {
  public static final Logger LOG =
      LoggerFactory.getLogger(SlowPeerTracker.class);

  /**
   * Time duration after which a report is considered stale. This is
   * set to DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_KEY * 3 i.e.
   * maintained for at least two successive reports.
   */
  private final long reportValidityMs;

  /**
   * Timer object for querying the current time. Separated out for
   * unit testing.
   */
  private final Timer timer;

  /**
   * ObjectWriter to convert JSON reports to String.
   */
  private static final ObjectWriter WRITER = new ObjectMapper().writer();
  /**
   * Number of nodes to include in JSON report. We will return nodes with
   * the highest number of votes from peers.
   */
  private volatile int maxNodesToReport;

  /**
   * Information about peers that have reported a node as being slow.
   * Each outer map entry is a map of (DatanodeId) {@literal ->} (timestamp),
   * mapping reporting nodes to the timestamp of the last report from
   * that node.
   *
   * DatanodeId could be the DataNodeId or its address. We
   * don't care as long as the caller uses it consistently.
   *
   * Stale reports are not evicted proactively and can potentially
   * hang around forever.
   */
  private final ConcurrentMap<String, ConcurrentMap<String, LatencyWithLastReportTime>>
      allReports;

  public SlowPeerTracker(Configuration conf, Timer timer) {
    this.timer = timer;
    this.allReports = new ConcurrentHashMap<>();
    this.reportValidityMs = conf.getTimeDuration(
        DFSConfigKeys.DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_KEY,
        DFSConfigKeys.DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_DEFAULT,
        TimeUnit.MILLISECONDS) * 3;
    this.setMaxSlowPeersToReport(conf.getInt(DFSConfigKeys.DFS_DATANODE_MAX_NODES_TO_REPORT_KEY,
        DFSConfigKeys.DFS_DATANODE_MAX_NODES_TO_REPORT_DEFAULT));
  }

  /**
   * If SlowPeerTracker is enabled, return true, else returns false.
   *
   * @return true if slow peer tracking is enabled, else false.
   */
  public boolean isSlowPeerTrackerEnabled() {
    return true;
  }

  /**
   * Add a new report. DatanodeIds can be the DataNodeIds or addresses
   * We don't care as long as the caller is consistent.
   *
   * @param slowNode DataNodeId of the peer suspected to be slow.
   * @param reportingNode DataNodeId of the node reporting on its peer.
   * @param slowNodeMetrics Aggregate latency metrics of slownode as reported by the
   *     reporting node.
   */
  public void addReport(String slowNode, String reportingNode, OutlierMetrics slowNodeMetrics) {
    ConcurrentMap<String, LatencyWithLastReportTime> nodeEntries = allReports.get(slowNode);

    if (nodeEntries == null) {
      // putIfAbsent guards against multiple writers.
      allReports.putIfAbsent(slowNode, new ConcurrentHashMap<>());
      nodeEntries = allReports.get(slowNode);
    }

    // Replace the existing entry from this node, if any.
    nodeEntries.put(reportingNode,
        new LatencyWithLastReportTime(timer.monotonicNow(), slowNodeMetrics));
  }

  /**
   * Retrieve the non-expired reports that mark a given DataNode
   * as slow. Stale reports are excluded.
   *
   * @param slowNode target node Id.
   * @return set of reports which implicate the target node as being slow.
   */
  public Set<SlowPeerLatencyWithReportingNode> getReportsForNode(String slowNode) {
    final ConcurrentMap<String, LatencyWithLastReportTime> nodeEntries =
        allReports.get(slowNode);

    if (nodeEntries == null || nodeEntries.isEmpty()) {
      return Collections.emptySet();
    }

    return filterNodeReports(nodeEntries, timer.monotonicNow());
  }

  /**
   * Retrieve all reports for all nodes. Stale reports are excluded.
   *
   * @return map from SlowNodeId {@literal ->} (set of nodes reporting peers).
   */
  public Map<String, SortedSet<SlowPeerLatencyWithReportingNode>> getReportsForAllDataNodes() {
    if (allReports.isEmpty()) {
      return ImmutableMap.of();
    }

    final Map<String, SortedSet<SlowPeerLatencyWithReportingNode>> allNodesValidReports =
        new HashMap<>();
    final long now = timer.monotonicNow();

    for (Map.Entry<String, ConcurrentMap<String, LatencyWithLastReportTime>> entry
        : allReports.entrySet()) {
      SortedSet<SlowPeerLatencyWithReportingNode> validReports =
          filterNodeReports(entry.getValue(), now);
      if (!validReports.isEmpty()) {
        allNodesValidReports.put(entry.getKey(), validReports);
      }
    }
    return allNodesValidReports;
  }

  /**
   * Filter the given reports to return just the valid ones.
   *
   * @param reports Current set of reports.
   * @param now Current time.
   * @return Set of valid reports that were created within last reportValidityMs millis.
   */
  private SortedSet<SlowPeerLatencyWithReportingNode> filterNodeReports(
      ConcurrentMap<String, LatencyWithLastReportTime> reports, long now) {
    final SortedSet<SlowPeerLatencyWithReportingNode> validReports = new TreeSet<>();

    for (Map.Entry<String, LatencyWithLastReportTime> entry : reports.entrySet()) {
      if (now - entry.getValue().getTime() < reportValidityMs) {
        OutlierMetrics outlierMetrics = entry.getValue().getLatency();
        validReports.add(
            new SlowPeerLatencyWithReportingNode(entry.getKey(), outlierMetrics.getActualLatency(),
                outlierMetrics.getMedian(), outlierMetrics.getMad(),
                outlierMetrics.getUpperLimitLatency()));
      }
    }
    return validReports;
  }

  /**
   * Retrieve all valid reports as a JSON string.
   * @return serialized representation of valid reports. null if
   *         serialization failed.
   */
  public String getJson() {
    Collection<SlowPeerJsonReport> validReports = getJsonReports(
        maxNodesToReport);
    try {
      return WRITER.writeValueAsString(validReports);
    } catch (JsonProcessingException e) {
      // Failed to serialize. Don't log the exception call stack.
      LOG.debug("Failed to serialize statistics" + e);
      return null;
    }
  }

  /**
   * Returns all tracking slow peers.
   * @param numNodes
   * @return
   */
  public List<String> getSlowNodes(int numNodes) {
    Collection<SlowPeerJsonReport> jsonReports = getJsonReports(numNodes);
    ArrayList<String> slowNodes = new ArrayList<>();
    for (SlowPeerJsonReport jsonReport : jsonReports) {
      slowNodes.add(jsonReport.getSlowNode());
    }
    if (!slowNodes.isEmpty()) {
      LOG.warn("Slow nodes list: " + slowNodes);
    }
    return slowNodes;
  }

  /**
   * Retrieve reports in a structure for generating JSON, limiting the
   * output to the top numNodes nodes i.e nodes with the most reports.
   * @param numNodes number of nodes to return. This is to limit the
   *                 size of the generated JSON.
   */
  private Collection<SlowPeerJsonReport> getJsonReports(int numNodes) {
    if (allReports.isEmpty()) {
      return Collections.emptyList();
    }

    final PriorityQueue<SlowPeerJsonReport> topNReports = new PriorityQueue<>(allReports.size(),
        (o1, o2) -> Ints.compare(o1.getSlowPeerLatencyWithReportingNodes().size(),
            o2.getSlowPeerLatencyWithReportingNodes().size()));

    final long now = timer.monotonicNow();

    for (Map.Entry<String, ConcurrentMap<String, LatencyWithLastReportTime>> entry
        : allReports.entrySet()) {
      SortedSet<SlowPeerLatencyWithReportingNode> validReports =
          filterNodeReports(entry.getValue(), now);
      if (!validReports.isEmpty()) {
        if (topNReports.size() < numNodes) {
          topNReports.add(new SlowPeerJsonReport(entry.getKey(), validReports));
        } else if (topNReports.peek() != null
            && topNReports.peek().getSlowPeerLatencyWithReportingNodes().size()
            < validReports.size()) {
          // Remove the lowest element
          topNReports.poll();
          topNReports.add(new SlowPeerJsonReport(entry.getKey(), validReports));
        }
      }
    }
    return topNReports;
  }

  @VisibleForTesting
  long getReportValidityMs() {
    return reportValidityMs;
  }

  public synchronized void setMaxSlowPeersToReport(int maxSlowPeersToReport) {
    this.maxNodesToReport = maxSlowPeersToReport;
  }

  private static class LatencyWithLastReportTime {
    private final Long time;
    private final OutlierMetrics latency;

    LatencyWithLastReportTime(Long time, OutlierMetrics latency) {
      this.time = time;
      this.latency = latency;
    }

    public Long getTime() {
      return time;
    }

    public OutlierMetrics getLatency() {
      return latency;
    }
  }

}