AbfsBackoffMetrics.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.fs.azurebfs.services;

import java.util.Arrays;
import java.util.List;
import java.util.stream.Stream;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.fs.azurebfs.enums.AbfsBackoffMetricsEnum;
import org.apache.hadoop.fs.azurebfs.enums.RetryValue;
import org.apache.hadoop.fs.azurebfs.enums.StatisticTypeEnum;
import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;

import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.COLON;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.THOUSAND;
import static org.apache.hadoop.fs.azurebfs.constants.MetricsConstants.DOUBLE_PRECISION_FORMAT;
import static org.apache.hadoop.fs.azurebfs.constants.MetricsConstants.RETRY;
import static org.apache.hadoop.fs.azurebfs.constants.MetricsConstants.REQUESTS;
import static org.apache.hadoop.fs.azurebfs.constants.MetricsConstants.REQUEST_COUNT;
import static org.apache.hadoop.fs.azurebfs.constants.MetricsConstants.SECONDS;
import static org.apache.hadoop.fs.azurebfs.constants.MetricsConstants.MIN_MAX_AVERAGE;
import static org.apache.hadoop.fs.azurebfs.constants.MetricsConstants.BANDWIDTH_THROTTLED_REQUESTS;
import static org.apache.hadoop.fs.azurebfs.constants.MetricsConstants.IOPS_THROTTLED_REQUESTS;
import static org.apache.hadoop.fs.azurebfs.constants.MetricsConstants.OTHER_THROTTLED_REQUESTS;
import static org.apache.hadoop.fs.azurebfs.constants.MetricsConstants.PERCENTAGE_THROTTLED_REQUESTS;
import static org.apache.hadoop.fs.azurebfs.constants.MetricsConstants.NETWORK_ERROR_REQUESTS;
import static org.apache.hadoop.fs.azurebfs.constants.MetricsConstants.SUCCESS_REQUESTS_WITHOUT_RETRY;
import static org.apache.hadoop.fs.azurebfs.constants.MetricsConstants.FAILED_REQUESTS;
import static org.apache.hadoop.fs.azurebfs.constants.MetricsConstants.TOTAL_REQUESTS_COUNT;
import static org.apache.hadoop.fs.azurebfs.constants.MetricsConstants.MAX_RETRY;
import static org.apache.hadoop.fs.azurebfs.enums.AbfsBackoffMetricsEnum.MAX_BACK_OFF;
import static org.apache.hadoop.fs.azurebfs.enums.AbfsBackoffMetricsEnum.MIN_BACK_OFF;
import static org.apache.hadoop.fs.azurebfs.enums.AbfsBackoffMetricsEnum.MAX_RETRY_COUNT;
import static org.apache.hadoop.fs.azurebfs.enums.AbfsBackoffMetricsEnum.NUMBER_OF_BANDWIDTH_THROTTLED_REQUESTS;
import static org.apache.hadoop.fs.azurebfs.enums.AbfsBackoffMetricsEnum.NUMBER_OF_IOPS_THROTTLED_REQUESTS;
import static org.apache.hadoop.fs.azurebfs.enums.AbfsBackoffMetricsEnum.NUMBER_OF_NETWORK_FAILED_REQUESTS;
import static org.apache.hadoop.fs.azurebfs.enums.AbfsBackoffMetricsEnum.NUMBER_OF_OTHER_THROTTLED_REQUESTS;
import static org.apache.hadoop.fs.azurebfs.enums.AbfsBackoffMetricsEnum.NUMBER_OF_REQUESTS_FAILED;
import static org.apache.hadoop.fs.azurebfs.enums.AbfsBackoffMetricsEnum.NUMBER_OF_REQUESTS_SUCCEEDED;
import static org.apache.hadoop.fs.azurebfs.enums.AbfsBackoffMetricsEnum.NUMBER_OF_REQUESTS_SUCCEEDED_WITHOUT_RETRYING;
import static org.apache.hadoop.fs.azurebfs.enums.AbfsBackoffMetricsEnum.TOTAL_BACK_OFF;
import static org.apache.hadoop.fs.azurebfs.enums.AbfsBackoffMetricsEnum.TOTAL_REQUESTS;
import static org.apache.hadoop.fs.azurebfs.enums.AbfsBackoffMetricsEnum.TOTAL_NUMBER_OF_REQUESTS;
import static org.apache.hadoop.fs.azurebfs.enums.StatisticTypeEnum.TYPE_COUNTER;
import static org.apache.hadoop.fs.azurebfs.enums.StatisticTypeEnum.TYPE_GAUGE;
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.iostatisticsStore;
import static org.apache.hadoop.util.StringUtils.format;
import static org.apache.hadoop.util.StringUtils.formatPercent;

/**
 * This class is responsible for tracking and
 * updating metrics related to backoff and
 * retry operations in Azure Blob File System (ABFS).
 */
public class AbfsBackoffMetrics extends AbstractAbfsStatisticsSource {

  private static final Logger LOG = LoggerFactory.getLogger(
      AbfsBackoffMetrics.class);

  private static final List<RetryValue> RETRY_LIST = Arrays.asList(
      RetryValue.values());

  private final boolean isRetryMetricEnabled;

  /**
   * Constructor to initialize the IOStatisticsStore with counters and gauges.
   */
  public AbfsBackoffMetrics(final boolean isRetryMetricEnabled) {
    this.isRetryMetricEnabled = isRetryMetricEnabled;
    IOStatisticsStore ioStatisticsStore = iostatisticsStore()
        .withCounters(getMetricNames(TYPE_COUNTER))
        .withGauges(getMetricNames(TYPE_GAUGE))
        .build();
    setIOStatistics(ioStatisticsStore);
  }

  /**
   * Retrieves the metric names based on the statistic type.
   *
   * @param type the type of the statistic (counter or gauge)
   * @return an array of metric names
   */
  private String[] getMetricNames(StatisticTypeEnum type) {
    return Arrays.stream(AbfsBackoffMetricsEnum.values())
        .filter(backoffMetricsEnum -> backoffMetricsEnum
            .getStatisticType()
            .equals(type))
        .flatMap(backoffMetricsEnum ->
            RETRY.equals(backoffMetricsEnum.getType())
                ? RETRY_LIST.stream().map(retryCount ->
                getMetricName(backoffMetricsEnum, retryCount))
                : Stream.of(backoffMetricsEnum.getName())
        ).toArray(String[]::new);
  }

  /**
   * Constructs the metric name based on the metric and retry value.
   *
   * @param metric the metric enum
   * @param retryValue the retry value
   * @return the constructed metric name
   */
  private String getMetricName(AbfsBackoffMetricsEnum metric,
      RetryValue retryValue) {
    if (metric == null) {
      LOG.error("ABFS Backoff Metric should not be null");
      return EMPTY_STRING;
    }
    if (RETRY.equals(metric.getType()) && retryValue != null) {
      return retryValue.getValue() + COLON + metric.getName();
    }
    return metric.getName();
  }

  /**
   * Retrieves the value of a specific metric.
   *
   * @param metric the metric enum
   * @param retryValue the retry value
   * @return the value of the metric
   */
  public long getMetricValue(AbfsBackoffMetricsEnum metric,
      RetryValue retryValue) {
    String metricName = getMetricName(metric, retryValue);
    switch (metric.getStatisticType()) {
    case TYPE_COUNTER:
      return lookupCounterValue(metricName);
    case TYPE_GAUGE:
      return lookupGaugeValue(metricName);
    default:
      return 0;
    }
  }

  /**
   * Retrieves the value of a specific metric.
   *
   * @param metric the metric enum
   * @return the value of the metric
   */
  public long getMetricValue(AbfsBackoffMetricsEnum metric) {
    return getMetricValue(metric, null);
  }

  /**
   * Increments the value of a specific metric.
   *
   * @param metric the metric enum
   * @param retryValue the retry value
   */
  public void incrementMetricValue(AbfsBackoffMetricsEnum metric,
      RetryValue retryValue) {
    String metricName = getMetricName(metric, retryValue);
    switch (metric.getStatisticType()) {
    case TYPE_COUNTER:
      incCounterValue(metricName);
      break;
    case TYPE_GAUGE:
      incGaugeValue(metricName);
      break;
    default:
      // Do nothing
      break;
    }
  }

  /**
   * Increments the value of a specific metric.
   *
   * @param metric the metric enum
   */
  public void incrementMetricValue(AbfsBackoffMetricsEnum metric) {
    incrementMetricValue(metric, null);
  }

  /**
   * Sets the value of a specific metric.
   *
   * @param metric the metric enum
   * @param value the new value of the metric
   * @param retryValue the retry value
   */
  public void setMetricValue(AbfsBackoffMetricsEnum metric,
      long value,
      RetryValue retryValue) {
    String metricName = getMetricName(metric, retryValue);
    switch (metric.getStatisticType()) {
    case TYPE_COUNTER:
      setCounterValue(metricName, value);
      break;
    case TYPE_GAUGE:
      setGaugeValue(metricName, value);
      break;
    default:
      // Do nothing
      break;
    }
  }

  /**
   * Sets the value of a specific metric.
   *
   * @param metric the metric enum
   * @param value the new value of the metric
   */
  public void setMetricValue(AbfsBackoffMetricsEnum metric, long value) {
    setMetricValue(metric, value, null);
  }

  /**
   * Get the precision metrics.
   *
   * @param metricName the metric name
   * @param retryCount the retry count
   * @param denominator the denominator
   * @return String metrics value with precision
   */
  private String getPrecisionMetrics(AbfsBackoffMetricsEnum metricName,
      RetryValue retryCount,
      long denominator) {
    return format(DOUBLE_PRECISION_FORMAT,
        (double) getMetricValue(metricName, retryCount) / denominator);
  }

  /**
   * Retrieves the retry metrics.
   *
   * @param metricBuilder the string builder to append the metrics
   */
  private void getRetryMetrics(StringBuilder metricBuilder) {
    for (RetryValue retryCount : RETRY_LIST) {
      long totalRequests = getMetricValue(TOTAL_REQUESTS, retryCount);
      if (getMetricValue(NUMBER_OF_REQUESTS_SUCCEEDED, retryCount) > 0) {
        metricBuilder.append(REQUEST_COUNT)
            .append(retryCount.getValue())
            .append(REQUESTS)
            .append(getMetricValue(NUMBER_OF_REQUESTS_SUCCEEDED, retryCount));
      }

      if (totalRequests > 0) {
        metricBuilder.append(MIN_MAX_AVERAGE)
            .append(retryCount.getValue())
            .append(REQUESTS)
            .append(getPrecisionMetrics(MIN_BACK_OFF, retryCount, THOUSAND))
            .append(SECONDS)
            .append(getPrecisionMetrics(MAX_BACK_OFF, retryCount, THOUSAND))
            .append(SECONDS)
            .append(getPrecisionMetrics(TOTAL_BACK_OFF, retryCount,
                totalRequests * THOUSAND))
            .append(SECONDS);
      }
    }
  }

  /**
   * Retrieves the base metrics.
   *
   * @param metricBuilder the string builder to append the metrics
   */
  private void getBaseMetrics(StringBuilder metricBuilder) {
    long totalRequestsThrottled =
        getMetricValue(NUMBER_OF_NETWORK_FAILED_REQUESTS)
            + getMetricValue(NUMBER_OF_IOPS_THROTTLED_REQUESTS)
            + getMetricValue(NUMBER_OF_OTHER_THROTTLED_REQUESTS)
            + getMetricValue(NUMBER_OF_BANDWIDTH_THROTTLED_REQUESTS);

    appendIfPositive(metricBuilder, BANDWIDTH_THROTTLED_REQUESTS,
        getMetricValue(NUMBER_OF_BANDWIDTH_THROTTLED_REQUESTS));

    appendIfPositive(metricBuilder, IOPS_THROTTLED_REQUESTS,
        getMetricValue(NUMBER_OF_IOPS_THROTTLED_REQUESTS));

    appendIfPositive(metricBuilder, OTHER_THROTTLED_REQUESTS,
        getMetricValue(NUMBER_OF_OTHER_THROTTLED_REQUESTS));

    // For percentage, we always want it (even if 0%)
    if (totalRequestsThrottled > 0) {
      appendAlways(metricBuilder, PERCENTAGE_THROTTLED_REQUESTS,
          formatPercent(totalRequestsThrottled / (double) getMetricValue(
              TOTAL_NUMBER_OF_REQUESTS), 3));
    }

    appendIfPositive(metricBuilder, NETWORK_ERROR_REQUESTS,
        getMetricValue(NUMBER_OF_NETWORK_FAILED_REQUESTS));

    appendIfPositive(metricBuilder, SUCCESS_REQUESTS_WITHOUT_RETRY,
        getMetricValue(NUMBER_OF_REQUESTS_SUCCEEDED_WITHOUT_RETRYING));

    appendIfPositive(metricBuilder, FAILED_REQUESTS,
        getMetricValue(NUMBER_OF_REQUESTS_FAILED));

    appendIfPositive(metricBuilder, TOTAL_REQUESTS_COUNT,
        getMetricValue(TOTAL_NUMBER_OF_REQUESTS));

    appendIfPositive(metricBuilder, MAX_RETRY,
        getMetricValue(MAX_RETRY_COUNT));
  }

  /**
   * Retrieves the string representation of the metrics.
   *
   * @return the string representation of the metrics
   */
  @Override
  public String toString() {
    if (getMetricValue(TOTAL_NUMBER_OF_REQUESTS) == 0) {
      return EMPTY_STRING;
    }
    StringBuilder metricBuilder = new StringBuilder();
    if (isRetryMetricEnabled) {
      getRetryMetrics(metricBuilder);
    }
    getBaseMetrics(metricBuilder);
    return metricBuilder.toString();
  }

  /**
   * Retrieves the metric names based on the statistic type.
   *
   * @param type the type of the statistic (counter or gauge)
   * @return an array of metric names
   */
  @VisibleForTesting
  String[] getMetricNamesByType(StatisticTypeEnum type) {
    return getMetricNames(type);
  }
}