SlidingWindowHdrHistogram.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.fs.azurebfs.services;
import org.HdrHistogram.Histogram;
import org.HdrHistogram.Recorder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.hadoop.classification.VisibleForTesting;

import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.HUNDRED;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ZERO;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ZERO_D;

/**
 * Sliding Window HdrHistogram for tracking latencies over a time window.
 * Uses a ring buffer of histograms to represent time segments within the window.
 * Thread-safe for concurrent recording and querying.
 */
public final class SlidingWindowHdrHistogram {

  private static final Logger LOG = LoggerFactory.getLogger(
      SlidingWindowHdrHistogram.class);
  private static final int PERCENTILE_50 = 50;
  private static final int PERCENTILE_90 = 90;
  private static final int PERCENTILE_99 = 99;

  // Configuration
  private final long windowSizeMillis;          // Total analysis window
  private final long timeSegmentDurationMillis; // Subdivision on analysis window
  private final int numSegments;
  private final long highestTrackableValue;
  private final int significantFigures;

  // Ring buffer of immutable snapshots for completed time segments
  private final Histogram[] completedSegments;
  private final AtomicInteger currentIndex = new AtomicInteger(0);

  // Active Time Segment
  private volatile Recorder activeSegmentRecorder;
  private Histogram currentSegmentAccumulation;
  private volatile long currentSegmentStartMillis;
  private final AtomicLong currentTotalCount = new AtomicLong(0L);

  // Synchronization
  // Writers never take locks. Readers (queries) and rotation use this lock
  // to mutate currentAccumulation and ring-buffer pointers safely.
  private final ReentrantLock rotateLock = new ReentrantLock();

  // Reusable temp histograms to minimize allocations
  private Histogram tmpForDelta;
  private Histogram tmpForMerge;

  private final AbfsRestOperationType operationType;

  private boolean isAnalysisWindowFilled = false;
  private int minSampleSize;
  private double tailLatencyPercentile;
  private int tailLatencyMinDeviation;

  private double p50 = ZERO_D;
  private double p90 = ZERO_D;
  private double p99 = ZERO_D;
  private double tailLatency = ZERO_D;
  private int deviation = ZERO;

  public SlidingWindowHdrHistogram(long windowSizeMillis,
      int numberOfSegments,
      int minSampleSize,
      int tailLatencyPercentile,
      int tailLatencyMinDeviation,
      long highestTrackableValue,
      int significantFigures,
      final AbfsRestOperationType operationType) {
    if (windowSizeMillis <= ZERO) {
      throw new IllegalArgumentException("windowSizeMillis > 0");
    }
    if (numberOfSegments <= ZERO) {
      throw new IllegalArgumentException("numberOfSegments > 0");
    }
    if (highestTrackableValue <= ZERO) {
      throw new IllegalArgumentException("highestTrackableValue > 0");
    }
    if (significantFigures < 1 || significantFigures > 5) {
      throw new IllegalArgumentException("significantFigures in [1,5]");
    }

    this.windowSizeMillis = windowSizeMillis;
    this.numSegments = numberOfSegments;
    this.timeSegmentDurationMillis = windowSizeMillis / numberOfSegments;
    this.highestTrackableValue = highestTrackableValue;
    this.significantFigures = significantFigures;
    this.operationType = operationType;
    this.minSampleSize = minSampleSize;
    this.tailLatencyPercentile = adjustPercentile(tailLatencyPercentile);
    this.tailLatencyMinDeviation = tailLatencyMinDeviation; // 5ms

    this.completedSegments = new Histogram[numSegments];
    long now = System.currentTimeMillis();
    this.currentSegmentStartMillis = alignToSegmentDuration(now);
    currentIndex.set(0);
    this.activeSegmentRecorder = new Recorder(highestTrackableValue,
        significantFigures);
    this.currentSegmentAccumulation = new Histogram(highestTrackableValue,
        significantFigures);
    this.tmpForDelta = new Histogram(highestTrackableValue, significantFigures);
    this.tmpForMerge = new Histogram(highestTrackableValue, significantFigures);

    LOG.debug(
        "[{}] Initialized SlidingWindowHdrHistogram with WindowSize {}, TimeSegmentDur: {}, "
            + "NumOfSegments: {}", operationType, windowSizeMillis, timeSegmentDurationMillis,
        numSegments);
  }

  /**
   * Record a single latency value (in your chosen time unit). Thread-safe and lock-free.
   * @param value latency value to record
   */
  public void recordValue(long value) {
    if (value < 0 || value > highestTrackableValue) {
      LOG.warn("[{}] Value {} outside of range [0, {}]. Ignoring",
          operationType, value, highestTrackableValue);
      return;
    }
    activeSegmentRecorder.recordValue(value);
    currentTotalCount.incrementAndGet();
    LOG.debug("[{}] Recorded latency value: {}. Current total count: {}",
        operationType, value, currentTotalCount.get());
  }

  /**
   * Get any percentile over the current sliding window.
   */
  public void computeLatency() {
    if (getCurrentTotalCount() < minSampleSize) {
      LOG.debug(
          "[{}] Not enough data to report percentiles. Current total count: {}",
          operationType, getCurrentTotalCount());
      return;
    } else {
      rotateLock.lock();
      try {
        tmpForMerge.reset();
        for (int i = 0; i < numSegments; i++) {
          Histogram h = completedSegments[i];
          if (h != null && h.getTotalCount() > 0) {
            tmpForMerge.add(h);
          }
        }

        if (tmpForMerge.getTotalCount() == 0) {
          return;
        }

        tailLatency = tmpForMerge.getValueAtPercentile(tailLatencyPercentile);
        p50 = tmpForMerge.getValueAtPercentile(PERCENTILE_50);
        p90 = tmpForMerge.getValueAtPercentile(PERCENTILE_90);
        p99 = tmpForMerge.getValueAtPercentile(PERCENTILE_99);
        if (p50 == ZERO || tailLatency < p50) {
          deviation = ZERO;
        } else {
          deviation = (int) ((tailLatency - p50) / p50 * HUNDRED);
        }
      } finally {
        rotateLock.unlock();
      }
    }
    LOG.debug(
        "[{}] Computed Latencies. p50: {}, p90: {}, p99: {}, tailLatency: {}, "
            + "deviation with p50: {} Current total count: {}",
        operationType, p50, p90, p99, tailLatency, deviation,
        getCurrentTotalCount());
  }

  private long alignToSegmentDuration(long timeMs) {
    return timeMs - (timeMs % timeSegmentDurationMillis);
  }

  /**
   * Ensure active bucket is aligned to current time; rotate if we've crossed a boundary.
   */
  public void rotateIfNeeded() {
    LOG.debug("[{}] Triggering Histogram Rotation", operationType);
    long expectedStart = alignToSegmentDuration(System.currentTimeMillis());
    if (expectedStart == currentSegmentStartMillis) {
      LOG.debug(
          "[{}] Current Time Segment Still Active at {}. Skipping Rotation",
          operationType, expectedStart);
      return; // still current
    }

    rotateLock.lock();
    try {
      // Re-check inside lock
      expectedStart = alignToSegmentDuration(System.currentTimeMillis());
      if (expectedStart == currentSegmentStartMillis) {
        return;
      }

      // Finalize the current bucket:
      // Pull any remaining deltas from active recorder and add to currentAccumulation
      tmpForDelta.reset();
      activeSegmentRecorder.getIntervalHistogramInto(tmpForDelta);
      currentSegmentAccumulation.add(tmpForDelta);

      if (currentSegmentAccumulation.getTotalCount() <= ZERO) {
        currentSegmentStartMillis = alignToSegmentDuration(
            System.currentTimeMillis());
        LOG.debug(
            "[{}] No data recorded in current time segment at {}. Skipping Rotation. Current Index is {}.",
            operationType, currentSegmentStartMillis, currentIndex.get());
        return;
      }

      LOG.debug(
          "[{}] Rotating current segment with total count {} into slot {}",
          operationType, currentSegmentAccumulation.getTotalCount(),
          currentIndex.get());

      // Place the finished currentAccumulation into the ring buffer slot ahead.
      int currentIdx = (currentIndex.getAndIncrement()) % numSegments;
      // Next slot is now going to be eradicated. Remove its count from total.
      currentTotalCount.set(
          currentTotalCount.get() - (completedSegments[currentIdx] == null
              ? ZERO
              : completedSegments[currentIdx].getTotalCount()));
      // Store an immutable snapshot (make sure we don't mutate the instance after storing)
      completedSegments[currentIdx] = currentSegmentAccumulation;
      currentSegmentStartMillis = alignToSegmentDuration(
          System.currentTimeMillis());

      // Start a fresh current bucket
      currentSegmentAccumulation = new Histogram(highestTrackableValue,
          significantFigures);
      activeSegmentRecorder = new Recorder(highestTrackableValue,
          significantFigures);

      if (currentIndex.get() >= numSegments) {
        LOG.debug("[{}] Analysis window is now filled", operationType);
        isAnalysisWindowFilled = true;
        // Prevent overflow of currentIndex
        currentIndex.set(currentIndex.get() % numSegments);
      }
      LOG.debug(
          "[{}] Completed rotation. New current index {}, New segment start time {}, New total count {}",
          operationType, currentIndex.get(), currentSegmentStartMillis,
          currentTotalCount.get());
    } finally {
      rotateLock.unlock();
    }
  }

  /**
   * If percentile is configured to more than 100, adjust it to a decimal value.
   * @param number configured percentile
   * @return adjusted percentile
   */
  public static double adjustPercentile(int number) {
    if (number <= HUNDRED) {
      return number; // No change for numbers ��� 100
    }

    String numStr = String.valueOf(number);
    String withDecimal = numStr.substring(0, 2) + "." + numStr.substring(2);
    return Double.parseDouble(withDecimal);
  }

  @VisibleForTesting
  public double getTailLatency() {
    LOG.debug(
        "[{}] Getting Tail Latency. Current total count: {}, Deviation: {}%, "
            + "p50: {}, Tail Latency: {}, isAnalysisWindowFilled: {}",
        operationType, getCurrentTotalCount(), deviation, p50, tailLatency,
        isAnalysisWindowFilled);
    if (!isAnalysisWindowFilled()) {
      LOG.debug(
          "[{}] Analysis window not yet filled. Not reporting tail latency",
          operationType);
      return ZERO_D;
    }
    if (deviation < tailLatencyMinDeviation) {
      LOG.debug(
          "[{}] Tail latency deviation {}% is less than minimum required {}%. Not reporting tail latency",
          operationType, deviation, tailLatencyMinDeviation);
      return ZERO_D;
    }
    return tailLatency;
  }

  @VisibleForTesting
  public long getCurrentTotalCount() {
    return currentTotalCount.get();
  }

  @VisibleForTesting
  public int getCurrentIndex() {
    return currentIndex.get();
  }

  @VisibleForTesting
  public double getP50() {
    return p50;
  }

  @VisibleForTesting
  public boolean isAnalysisWindowFilled() {
    return isAnalysisWindowFilled;
  }
}