AbfsReadFooterMetrics.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.fs.azurebfs.services;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.StringJoiner;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_KB;

public class AbfsReadFooterMetrics {
  private final AtomicBoolean isParquetFile;
  private final AtomicBoolean isParquetEvaluated;
  private final AtomicBoolean isLenUpdated;
  private String sizeReadByFirstRead;
  private String offsetDiffBetweenFirstAndSecondRead;
  private final AtomicLong fileLength;
  private double avgFileLength;
  private double avgReadLenRequested;
  private final AtomicBoolean collectMetrics;
  private final AtomicBoolean collectMetricsForNextRead;
  private final AtomicBoolean collectLenMetrics;
  private final AtomicLong dataLenRequested;
  private final AtomicLong offsetOfFirstRead;
  private final AtomicInteger readCount;
  private final ConcurrentSkipListMap<String, AbfsReadFooterMetrics> metricsMap;
  private static final String FOOTER_LENGTH = "20";

  public AbfsReadFooterMetrics() {
    this.isParquetFile = new AtomicBoolean(false);
    this.isParquetEvaluated = new AtomicBoolean(false);
    this.isLenUpdated = new AtomicBoolean(false);
    this.fileLength = new AtomicLong();
    this.readCount = new AtomicInteger(0);
    this.offsetOfFirstRead = new AtomicLong();
    this.collectMetrics = new AtomicBoolean(false);
    this.collectMetricsForNextRead = new AtomicBoolean(false);
    this.collectLenMetrics = new AtomicBoolean(false);
    this.dataLenRequested = new AtomicLong(0);
    this.metricsMap = new ConcurrentSkipListMap<>();
  }

  public Map<String, AbfsReadFooterMetrics> getMetricsMap() {
    return metricsMap;
  }

  private boolean getIsParquetFile() {
    return isParquetFile.get();
  }

  public void setIsParquetFile(boolean isParquetFile) {
    this.isParquetFile.set(isParquetFile);
  }

  private String getSizeReadByFirstRead() {
    return sizeReadByFirstRead;
  }

  public void setSizeReadByFirstRead(final String sizeReadByFirstRead) {
    this.sizeReadByFirstRead = sizeReadByFirstRead;
  }

  private String getOffsetDiffBetweenFirstAndSecondRead() {
    return offsetDiffBetweenFirstAndSecondRead;
  }

  public void setOffsetDiffBetweenFirstAndSecondRead(final String offsetDiffBetweenFirstAndSecondRead) {
    this.offsetDiffBetweenFirstAndSecondRead
        = offsetDiffBetweenFirstAndSecondRead;
  }

  private long getFileLength() {
    return fileLength.get();
  }

  private void setFileLength(long fileLength) {
    this.fileLength.set(fileLength);
  }

  private double getAvgFileLength() {
    return avgFileLength;
  }

  public void setAvgFileLength(final double avgFileLength) {
    this.avgFileLength = avgFileLength;
  }

  private double getAvgReadLenRequested() {
    return avgReadLenRequested;
  }

  public void setAvgReadLenRequested(final double avgReadLenRequested) {
    this.avgReadLenRequested = avgReadLenRequested;
  }

  private boolean getCollectMetricsForNextRead() {
    return collectMetricsForNextRead.get();
  }

  private void setCollectMetricsForNextRead(boolean collectMetricsForNextRead) {
    this.collectMetricsForNextRead.set(collectMetricsForNextRead);
  }

  private long getOffsetOfFirstRead() {
    return offsetOfFirstRead.get();
  }

  private void setOffsetOfFirstRead(long offsetOfFirstRead) {
    this.offsetOfFirstRead.set(offsetOfFirstRead);
  }

  private int getReadCount() {
    return readCount.get();
  }

  private void setReadCount(int readCount) {
    this.readCount.set(readCount);
  }

  private int incrementReadCount() {
    this.readCount.incrementAndGet();
    return getReadCount();
  }

  private boolean getCollectLenMetrics() {
    return collectLenMetrics.get();
  }

  private void setCollectLenMetrics(boolean collectLenMetrics) {
    this.collectLenMetrics.set(collectLenMetrics);

  }

  private long getDataLenRequested() {
    return dataLenRequested.get();
  }

  private void setDataLenRequested(long dataLenRequested) {
    this.dataLenRequested.set(dataLenRequested);
  }

  private void updateDataLenRequested(long dataLenRequested){
    this.dataLenRequested.addAndGet(dataLenRequested);
  }

  private boolean getCollectMetrics() {
    return collectMetrics.get();
  }

  private void setCollectMetrics(boolean collectMetrics) {
    this.collectMetrics.set(collectMetrics);
  }

  private boolean getIsParquetEvaluated() {
    return isParquetEvaluated.get();
  }

  private void setIsParquetEvaluated(boolean isParquetEvaluated) {
    this.isParquetEvaluated.set(isParquetEvaluated);
  }

  private boolean getIsLenUpdated() {
    return isLenUpdated.get();
  }

  private void setIsLenUpdated(boolean isLenUpdated) {
    this.isLenUpdated.set(isLenUpdated);
  }

  /**
   * Updates the metrics map with an entry for the specified file if it doesn't already exist.
   *
   * @param filePathIdentifier The unique identifier for the file.
   */
  public void updateMap(String filePathIdentifier) {
    // If the file is not already in the metrics map, add it with a new AbfsReadFooterMetrics object.
    metricsMap.computeIfAbsent(filePathIdentifier, key -> new AbfsReadFooterMetrics());
  }

  /**
   * Checks and updates metrics for a specific file identified by filePathIdentifier.
   * If the metrics do not exist for the file, they are initialized.
   *
   * @param filePathIdentifier The unique identifier for the file.
   * @param len               The length of the read operation.
   * @param contentLength     The total content length of the file.
   * @param nextReadPos       The position of the next read operation.
   */
  public void checkMetricUpdate(final String filePathIdentifier, final int len, final long contentLength,
      final long nextReadPos) {
    AbfsReadFooterMetrics readFooterMetrics = metricsMap.computeIfAbsent(
            filePathIdentifier, key -> new AbfsReadFooterMetrics());
    if (readFooterMetrics.getReadCount() == 0
        || (readFooterMetrics.getReadCount() >= 1
        && readFooterMetrics.getCollectMetrics())) {
      updateMetrics(filePathIdentifier, len, contentLength, nextReadPos);
    }
  }

  /**
   * Updates metrics for a specific file identified by filePathIdentifier.
   *
   * @param filePathIdentifier  The unique identifier for the file.
   * @param len                The length of the read operation.
   * @param contentLength      The total content length of the file.
   * @param nextReadPos        The position of the next read operation.
   */
  private void updateMetrics(final String filePathIdentifier, final int len, final long contentLength,
                             final long nextReadPos) {
    AbfsReadFooterMetrics readFooterMetrics = metricsMap.get(filePathIdentifier);

    // Create a new AbfsReadFooterMetrics object if it doesn't exist in the metricsMap.
    if (readFooterMetrics == null) {
      readFooterMetrics = new AbfsReadFooterMetrics();
      metricsMap.put(filePathIdentifier, readFooterMetrics);
    }

    int readCount;
    synchronized (this) {
      readCount = readFooterMetrics.incrementReadCount();
    }

    if (readCount == 1) {
      // Update metrics for the first read.
      updateMetricsOnFirstRead(readFooterMetrics, nextReadPos, len, contentLength);
    }

    synchronized (this) {
      if (readFooterMetrics.getCollectLenMetrics()) {
        readFooterMetrics.updateDataLenRequested(len);
      }
    }

    if (readCount == 2) {
      // Update metrics for the second read.
      updateMetricsOnSecondRead(readFooterMetrics, nextReadPos, len);
    }
  }

  /**
   * Updates metrics for the first read operation.
   *
   * @param readFooterMetrics The metrics object to update.
   * @param nextReadPos       The position of the next read operation.
   * @param len               The length of the read operation.
   * @param contentLength     The total content length of the file.
   */
  private void updateMetricsOnFirstRead(AbfsReadFooterMetrics readFooterMetrics, long nextReadPos, int len, long contentLength) {
    if (nextReadPos >= contentLength - (long) Integer.parseInt(FOOTER_LENGTH) * ONE_KB) {
      readFooterMetrics.setCollectMetrics(true);
      readFooterMetrics.setCollectMetricsForNextRead(true);
      readFooterMetrics.setOffsetOfFirstRead(nextReadPos);
      readFooterMetrics.setSizeReadByFirstRead(len + "_" + Math.abs(contentLength - nextReadPos));
      readFooterMetrics.setFileLength(contentLength);
    }
  }

  /**
   * Updates metrics for the second read operation.
   *
   * @param readFooterMetrics The metrics object to update.
   * @param nextReadPos       The position of the next read operation.
   * @param len               The length of the read operation.
   */
  private void updateMetricsOnSecondRead(AbfsReadFooterMetrics readFooterMetrics, long nextReadPos, int len) {
    if (readFooterMetrics.getCollectMetricsForNextRead()) {
      long offsetDiff = Math.abs(nextReadPos - readFooterMetrics.getOffsetOfFirstRead());
      readFooterMetrics.setOffsetDiffBetweenFirstAndSecondRead(len + "_" + offsetDiff);
      readFooterMetrics.setCollectLenMetrics(true);
    }
  }


  /**
   * Check if the given file should be marked as a Parquet file.
   *
   * @param metrics The metrics to evaluate.
   * @return True if the file meet the criteria for being marked as a Parquet file, false otherwise.
   */
  private boolean shouldMarkAsParquet(AbfsReadFooterMetrics metrics) {
    return metrics.getCollectMetrics()
            && metrics.getReadCount() >= 2
            && !metrics.getIsParquetEvaluated()
            && haveEqualValues(metrics.getSizeReadByFirstRead())
            && haveEqualValues(metrics.getOffsetDiffBetweenFirstAndSecondRead());
  }

  /**
   * Check if two values are equal, considering they are in the format "value1_value2".
   *
   * @param value The value to check.
   * @return True if the two parts of the value are equal, false otherwise.
   */
  private boolean haveEqualValues(String value) {
    String[] parts = value.split("_");
    return parts.length == 2 && parts[0].equals(parts[1]);
  }

  /**
   * Mark the given metrics as a Parquet file and update related values.
   *
   * @param metrics The metrics to mark as Parquet.
   */
  private void markAsParquet(AbfsReadFooterMetrics metrics) {
    metrics.setIsParquetFile(true);
    String[] parts = metrics.getSizeReadByFirstRead().split("_");
    metrics.setSizeReadByFirstRead(parts[0]);
    parts = metrics.getOffsetDiffBetweenFirstAndSecondRead().split("_");
    metrics.setOffsetDiffBetweenFirstAndSecondRead(parts[0]);
    metrics.setIsParquetEvaluated(true);
  }

  /**
   * Check each metric in the provided map and mark them as Parquet files if they meet the criteria.
   *
   * @param metricsMap The map containing metrics to evaluate.
   */
  public void checkIsParquet(Map<String, AbfsReadFooterMetrics> metricsMap) {
    for (Map.Entry<String, AbfsReadFooterMetrics> entry : metricsMap.entrySet()) {
      AbfsReadFooterMetrics readFooterMetrics = entry.getValue();
      if (shouldMarkAsParquet(readFooterMetrics)) {
        markAsParquet(readFooterMetrics);
        metricsMap.replace(entry.getKey(), readFooterMetrics);
      }
    }
  }

  /**
   * Updates the average read length requested for metrics of all files in the metrics map.
   * If the metrics indicate that the update is needed, it calculates the average read length and updates the metrics.
   *
   * @param metricsMap A map containing metrics for different files with unique identifiers.
   */
  private void updateLenRequested(Map<String, AbfsReadFooterMetrics> metricsMap) {
    for (AbfsReadFooterMetrics readFooterMetrics : metricsMap.values()) {
      if (shouldUpdateLenRequested(readFooterMetrics)) {
        int readReqCount = readFooterMetrics.getReadCount() - 2;
        readFooterMetrics.setAvgReadLenRequested(
                (double) readFooterMetrics.getDataLenRequested() / readReqCount);
        readFooterMetrics.setIsLenUpdated(true);
      }
    }
  }

  /**
   * Checks whether the average read length requested should be updated for the given metrics.
   *
   * The method returns true if the following conditions are met:
   * - Metrics collection is enabled.
   * - The number of read counts is greater than 2.
   * - The average read length has not been updated previously.
   *
   * @param readFooterMetrics The metrics object to evaluate.
   * @return True if the average read length should be updated, false otherwise.
   */
  private boolean shouldUpdateLenRequested(AbfsReadFooterMetrics readFooterMetrics) {
    return readFooterMetrics.getCollectMetrics()
            && readFooterMetrics.getReadCount() > 2
            && !readFooterMetrics.getIsLenUpdated();
  }

  /**
   * Calculates the average metrics from a list of AbfsReadFooterMetrics and sets the values in the provided 'avgParquetReadFooterMetrics' object.
   *
   * @param isParquetList The list of AbfsReadFooterMetrics to compute the averages from.
   * @param avgParquetReadFooterMetrics The target AbfsReadFooterMetrics object to store the computed average values.
   *
   * This method calculates various average metrics from the provided list and sets them in the 'avgParquetReadFooterMetrics' object.
   * The metrics include:
   * - Size read by the first read
   * - Offset difference between the first and second read
   * - Average file length
   * - Average requested read length
   */
  private void getParquetReadFooterMetricsAverage(List<AbfsReadFooterMetrics> isParquetList,
      AbfsReadFooterMetrics avgParquetReadFooterMetrics){
    avgParquetReadFooterMetrics.setSizeReadByFirstRead(
        String.format("%.3f", isParquetList.stream()
            .map(AbfsReadFooterMetrics::getSizeReadByFirstRead).mapToDouble(
                Double::parseDouble).average().orElse(0.0)));
    avgParquetReadFooterMetrics.setOffsetDiffBetweenFirstAndSecondRead(
        String.format("%.3f", isParquetList.stream()
            .map(AbfsReadFooterMetrics::getOffsetDiffBetweenFirstAndSecondRead)
            .mapToDouble(Double::parseDouble).average().orElse(0.0)));
    avgParquetReadFooterMetrics.setAvgFileLength(isParquetList.stream()
        .mapToDouble(AbfsReadFooterMetrics::getFileLength).average().orElse(0.0));
    avgParquetReadFooterMetrics.setAvgReadLenRequested(isParquetList.stream().
        map(AbfsReadFooterMetrics::getAvgReadLenRequested).
        mapToDouble(Double::doubleValue).average().orElse(0.0));
  }

  /**
   * Calculates the average metrics from a list of non-Parquet AbfsReadFooterMetrics instances.
   *
   * This method takes a list of AbfsReadFooterMetrics representing non-Parquet reads and calculates
   * the average values for the size read by the first read and the offset difference between the first
   * and second read. The averages are then set in the provided AbfsReadFooterMetrics instance.
   *
   * @param isNonParquetList A list of AbfsReadFooterMetrics instances representing non-Parquet reads.
   * @param avgNonParquetReadFooterMetrics The AbfsReadFooterMetrics instance to store the calculated averages.
   *                                      It is assumed that the size of the list is at least 1, and the first
   *                                      element of the list is used to determine the size of arrays.
   *                                      The instance is modified in-place with the calculated averages.
   *
   *
   **/
  private void getNonParquetReadFooterMetricsAverage(List<AbfsReadFooterMetrics> isNonParquetList,
                                                     AbfsReadFooterMetrics avgNonParquetReadFooterMetrics) {
    int size = isNonParquetList.get(0).getSizeReadByFirstRead().split("_").length;
    double[] store = new double[2 * size];
    // Calculating sum of individual values
    isNonParquetList.forEach(abfsReadFooterMetrics -> {
      String[] firstReadSize = abfsReadFooterMetrics.getSizeReadByFirstRead().split("_");
      String[] offDiffFirstSecondRead = abfsReadFooterMetrics.getOffsetDiffBetweenFirstAndSecondRead().split("_");

      for (int i = 0; i < firstReadSize.length; i++) {
        store[i] += Long.parseLong(firstReadSize[i]);
        store[i + size] += Long.parseLong(offDiffFirstSecondRead[i]);
      }
    });

    // Calculating averages and creating formatted strings
    StringJoiner firstReadSize = new StringJoiner("_");
    StringJoiner offDiffFirstSecondRead = new StringJoiner("_");

    for (int j = 0; j < size; j++) {
      firstReadSize.add(String.format("%.3f", store[j] / isNonParquetList.size()));
      offDiffFirstSecondRead.add(String.format("%.3f", store[j + size] / isNonParquetList.size()));
    }

    avgNonParquetReadFooterMetrics.setSizeReadByFirstRead(firstReadSize.toString());
    avgNonParquetReadFooterMetrics.setOffsetDiffBetweenFirstAndSecondRead(offDiffFirstSecondRead.toString());
    avgNonParquetReadFooterMetrics.setAvgFileLength(isNonParquetList.stream()
            .mapToDouble(AbfsReadFooterMetrics::getFileLength).average().orElse(0.0));
    avgNonParquetReadFooterMetrics.setAvgReadLenRequested(isNonParquetList.stream()
            .mapToDouble(AbfsReadFooterMetrics::getAvgReadLenRequested).average().orElse(0.0));
  }

  /*
  Acronyms:
  1.FR :- First Read (In case of parquet we only maintain the size requested by application for
  the first read, in case of non parquet we maintain a string separated by "_" delimiter where the first
  substring represents the len requested for first read and the second substring represents the seek pointer difference from the
  end of the file.)
  2.SR :- Second Read (In case of parquet we only maintain the size requested by application for
  the second read, in case of non parquet we maintain a string separated by "_" delimiter where the first
  substring represents the len requested for second read and the second substring represents the seek pointer difference from the
  offset of the first read.)
  3.FL :- Total length of the file requested for read
   */
  public String getReadFooterMetrics(AbfsReadFooterMetrics avgReadFooterMetrics) {
    String readFooterMetric = "";
    if (avgReadFooterMetrics.getIsParquetFile()) {
      readFooterMetric += "$Parquet:";
    } else {
      readFooterMetric += "$NonParquet:";
    }
    readFooterMetric += "$FR=" + avgReadFooterMetrics.getSizeReadByFirstRead()
        + "$SR="
        + avgReadFooterMetrics.getOffsetDiffBetweenFirstAndSecondRead()
        + "$FL=" + String.format("%.3f",
        avgReadFooterMetrics.getAvgFileLength())
        + "$RL=" + String.format("%.3f",
        avgReadFooterMetrics.getAvgReadLenRequested());
    return readFooterMetric;
  }

/**
 * Retrieves and aggregates read footer metrics for both Parquet and non-Parquet files from a list
 * of AbfsReadFooterMetrics instances. The function calculates the average metrics separately for
 * Parquet and non-Parquet files and returns a formatted string containing the aggregated metrics.
 *
 * @param readFooterMetricsList A list of AbfsReadFooterMetrics instances containing read footer metrics
 *                              for both Parquet and non-Parquet files.
 *
 * @return A formatted string containing the aggregated read footer metrics for both Parquet and non-Parquet files.
 *
 **/
private String getFooterMetrics(List<AbfsReadFooterMetrics> readFooterMetricsList) {
  List<AbfsReadFooterMetrics> isParquetList = new ArrayList<>();
  List<AbfsReadFooterMetrics> isNonParquetList = new ArrayList<>();
  for (AbfsReadFooterMetrics abfsReadFooterMetrics : readFooterMetricsList) {
    if (abfsReadFooterMetrics.getIsParquetFile()) {
      isParquetList.add(abfsReadFooterMetrics);
    } else {
      if (abfsReadFooterMetrics.getReadCount() >= 2) {
        isNonParquetList.add(abfsReadFooterMetrics);
      }
    }
  }
  AbfsReadFooterMetrics avgParquetReadFooterMetrics = new AbfsReadFooterMetrics();
  AbfsReadFooterMetrics avgNonparquetReadFooterMetrics = new AbfsReadFooterMetrics();
  String readFooterMetric = "";
  if (!isParquetList.isEmpty()) {
    avgParquetReadFooterMetrics.setIsParquetFile(true);
    getParquetReadFooterMetricsAverage(isParquetList, avgParquetReadFooterMetrics);
    readFooterMetric += getReadFooterMetrics(avgParquetReadFooterMetrics);
  }
  if (!isNonParquetList.isEmpty()) {
    avgNonparquetReadFooterMetrics.setIsParquetFile(false);
    getNonParquetReadFooterMetricsAverage(isNonParquetList, avgNonparquetReadFooterMetrics);
    readFooterMetric += getReadFooterMetrics(avgNonparquetReadFooterMetrics);
  }
  return readFooterMetric;
}


  @Override
  public String toString() {
    Map<String, AbfsReadFooterMetrics> metricsMap = getMetricsMap();
    List<AbfsReadFooterMetrics> readFooterMetricsList = new ArrayList<>();
    if (metricsMap != null && !(metricsMap.isEmpty())) {
      checkIsParquet(metricsMap);
      updateLenRequested(metricsMap);
      for (Map.Entry<String, AbfsReadFooterMetrics> entry : metricsMap.entrySet()) {
        AbfsReadFooterMetrics abfsReadFooterMetrics = entry.getValue();
        if (abfsReadFooterMetrics.getCollectMetrics()) {
          readFooterMetricsList.add(entry.getValue());
        }
      }
    }
    String readFooterMetrics = "";
    if (!readFooterMetricsList.isEmpty()) {
      readFooterMetrics = getFooterMetrics(readFooterMetricsList);
    }
    return readFooterMetrics;
  }
}