AbfsReadFooterMetrics.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.fs.azurebfs.services;

import java.util.List;
import java.util.Map;
import java.util.Arrays;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.fs.azurebfs.enums.AbfsReadFooterMetricsEnum;
import org.apache.hadoop.fs.azurebfs.enums.FileType;
import org.apache.hadoop.fs.azurebfs.enums.StatisticTypeEnum;
import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;

import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CHAR_UNDERSCORE;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.COLON;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.UNDERSCORE;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_KB;
import static org.apache.hadoop.fs.azurebfs.constants.MetricsConstants.CHAR_DOLLAR;
import static org.apache.hadoop.fs.azurebfs.constants.MetricsConstants.DOUBLE_PRECISION_FORMAT;
import static org.apache.hadoop.fs.azurebfs.constants.MetricsConstants.FILE;
import static org.apache.hadoop.fs.azurebfs.constants.MetricsConstants.FIRST_READ;
import static org.apache.hadoop.fs.azurebfs.constants.MetricsConstants.SECOND_READ;
import static org.apache.hadoop.fs.azurebfs.constants.MetricsConstants.FILE_LENGTH;
import static org.apache.hadoop.fs.azurebfs.constants.MetricsConstants.READ_LENGTH;
import static org.apache.hadoop.fs.azurebfs.enums.AbfsReadFooterMetricsEnum.TOTAL_FILES;
import static org.apache.hadoop.fs.azurebfs.enums.AbfsReadFooterMetricsEnum.AVG_FILE_LENGTH;
import static org.apache.hadoop.fs.azurebfs.enums.AbfsReadFooterMetricsEnum.AVG_SIZE_READ_BY_FIRST_READ;
import static org.apache.hadoop.fs.azurebfs.enums.AbfsReadFooterMetricsEnum.AVG_OFFSET_DIFF_BETWEEN_FIRST_AND_SECOND_READ;
import static org.apache.hadoop.fs.azurebfs.enums.AbfsReadFooterMetricsEnum.AVG_READ_LEN_REQUESTED;
import static org.apache.hadoop.fs.azurebfs.enums.AbfsReadFooterMetricsEnum.AVG_FIRST_OFFSET_DIFF;
import static org.apache.hadoop.fs.azurebfs.enums.AbfsReadFooterMetricsEnum.AVG_SECOND_OFFSET_DIFF;
import static org.apache.hadoop.fs.azurebfs.enums.FileType.PARQUET;
import static org.apache.hadoop.fs.azurebfs.enums.FileType.NON_PARQUET;
import static org.apache.hadoop.fs.azurebfs.enums.StatisticTypeEnum.TYPE_COUNTER;
import static org.apache.hadoop.fs.azurebfs.enums.StatisticTypeEnum.TYPE_MEAN;
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.iostatisticsStore;
import static org.apache.hadoop.util.StringUtils.format;

/**
 * This class is responsible for tracking and updating metrics related to reading footers in files.
 */
public class AbfsReadFooterMetrics extends AbstractAbfsStatisticsSource {

  private static final Logger LOG = LoggerFactory.getLogger(
      AbfsReadFooterMetrics.class);

  // Footer length in KB to identify Parquet files.
  private static final String FOOTER_LENGTH = "20";

  private static final List<FileType> FILE_TYPE_LIST =
      Arrays.asList(FileType.values());

  private Map<String, FileTypeMetrics> fileTypeMetricsMap =
      new ConcurrentHashMap<>();

  /**
   * Inner class to handle file type checks.
   */
  private static final class FileTypeMetrics {

    private final AtomicBoolean collectMetrics;

    private final AtomicBoolean collectMetricsForNextRead;

    private final AtomicBoolean collectLenMetrics;

    private final AtomicLong readCount;

    private final AtomicLong offsetOfFirstRead;

    private FileType fileType = null;

    private String sizeReadByFirstRead;

    private String offsetDiffBetweenFirstAndSecondRead;

    /**
     * Constructor to initialize the file type metrics.
     */
    private FileTypeMetrics() {
      collectMetrics = new AtomicBoolean(false);
      collectMetricsForNextRead = new AtomicBoolean(false);
      collectLenMetrics = new AtomicBoolean(false);
      readCount = new AtomicLong(0);
      offsetOfFirstRead = new AtomicLong(0);
    }

    /**
     * Updates the file type based on the metrics collected.
     * In case the first two reads have equal size and offset differences,
     * the file is classified as PARQUET; otherwise, it is classified as NON_PARQUET.
     */
    private void updateFileType() {
      if (fileType == null) {
        fileType = collectMetrics.get() && readCount.get() >= 2
            && haveEqualValues(sizeReadByFirstRead)
            && haveEqualValues(offsetDiffBetweenFirstAndSecondRead)
            ? PARQUET
            : NON_PARQUET;
      }
    }

    /**
     * Checks if the given value has equal parts.
     *
     * @param value the value to check
     * @return true if the value has equal parts, false otherwise
     */
    private boolean haveEqualValues(String value) {
      String[] parts = value.split(UNDERSCORE);
      return parts.length == 2
          && parts[0].equals(parts[1]);
    }

    /**
     * Increments the read count.
     */
    private void incrementReadCount() {
      readCount.incrementAndGet();
    }

    /**
     * Returns the read count.
     *
     * @return the read count
     */
    private long getReadCount() {
      return readCount.get();
    }

    /**
     * Sets the collect metrics flag.
     *
     * @param collect the value to set
     */
    private void setCollectMetrics(boolean collect) {
      collectMetrics.set(collect);
    }

    /**
     * Returns the collect metrics flag.
     *
     * @return the collect metrics flag
     */
    private boolean getCollectMetrics() {
      return collectMetrics.get();
    }

    /**
     * Sets the collect metrics for the next read flag.
     *
     * @param collect the value to set
     */
    private void setCollectMetricsForNextRead(boolean collect) {
      collectMetricsForNextRead.set(collect);
    }

    /**
     * Returns the collect metrics for the next read flag.
     *
     * @return the collect metrics for the next read flag
     */
    private boolean getCollectMetricsForNextRead() {
      return collectMetricsForNextRead.get();
    }

    /**
     * Returns the collect length metrics flag.
     *
     * @return the collect length metrics flag
     */
    private boolean getCollectLenMetrics() {
      return collectLenMetrics.get();
    }

    /**
     * Sets the collect length metrics flag.
     *
     * @param collect the value to set
     */
    private void setCollectLenMetrics(boolean collect) {
      collectLenMetrics.set(collect);
    }

    /**
     * Sets the offset of the first read.
     *
     * @param offset the value to set
     */
    private void setOffsetOfFirstRead(long offset) {
      offsetOfFirstRead.set(offset);
    }

    /**
     * Returns the offset of the first read.
     *
     * @return the offset of the first read
     */
    private long getOffsetOfFirstRead() {
      return offsetOfFirstRead.get();
    }

    /**
     * Sets the size read by the first read.
     *
     * @param size the value to set
     */
    private void setSizeReadByFirstRead(String size) {
      sizeReadByFirstRead = size;
    }

    /**
     * Returns the size read by the first read.
     *
     * @return the size read by the first read
     */
    private String getSizeReadByFirstRead() {
      return sizeReadByFirstRead;
    }

    /**
     * Sets the offset difference between the first and second read.
     *
     * @param offsetDiff the value to set
     */
    private void setOffsetDiffBetweenFirstAndSecondRead(String offsetDiff) {
      offsetDiffBetweenFirstAndSecondRead = offsetDiff;
    }

    /**
     * Returns the offset difference between the first and second read.
     *
     * @return the offset difference between the first and second read
     */
    private String getOffsetDiffBetweenFirstAndSecondRead() {
      return offsetDiffBetweenFirstAndSecondRead;
    }

    /**
     * Returns the file type.
     *
     * @return the file type
     */
    private FileType getFileType() {
      return fileType;
    }
  }

  /**
   * Constructor to initialize the IOStatisticsStore with counters and mean statistics.
   */
  public AbfsReadFooterMetrics() {
    IOStatisticsStore ioStatisticsStore = iostatisticsStore()
        .withCounters(getMetricNames(TYPE_COUNTER))
        .withMeanStatistics(getMetricNames(TYPE_MEAN))
        .build();
    setIOStatistics(ioStatisticsStore);
  }

  /**
   * Constructor to initialize the IOStatisticsStore with counters and mean statistics,
   * and a map to track file type metrics.
   *
   * @param fileTypeMetricsMap the map to track file type metrics
   */
  public AbfsReadFooterMetrics(Map<String, FileTypeMetrics> fileTypeMetricsMap) {
    this();
    this.fileTypeMetricsMap = fileTypeMetricsMap == null
        ? new ConcurrentHashMap<>()
        : fileTypeMetricsMap;
  }

  /**
   * Returns the map of file type metrics.
   *
   * @return the map of file type metrics
   */
  public Map<String, FileTypeMetrics> getFileTypeMetricsMap() {
    return fileTypeMetricsMap;
  }

  /**
   * Returns the metric names for a specific statistic type.
   *
   * @param type the statistic type
   * @return the metric names
   */
  private String[] getMetricNames(StatisticTypeEnum type) {
    return Arrays.stream(AbfsReadFooterMetricsEnum.values())
        .filter(
            readFooterMetricsEnum -> readFooterMetricsEnum.getStatisticType()
                .equals(type))
        .flatMap(readFooterMetricsEnum ->
            FILE.equals(readFooterMetricsEnum.getType())
                ? FILE_TYPE_LIST.stream().map(fileType ->
                getMetricName(fileType, readFooterMetricsEnum))
                : Stream.of(readFooterMetricsEnum.getName()))
        .toArray(String[]::new);
  }

  /**
   * Returns the metric name for a specific file type and metric.
   *
   * @param fileType the type of the file
   * @param readFooterMetricsEnum the metric to get the name for
   * @return the metric name
   */
  private String getMetricName(FileType fileType,
      AbfsReadFooterMetricsEnum readFooterMetricsEnum) {
    if (fileType == null || readFooterMetricsEnum == null) {
      LOG.error("File type or ABFS read footer metrics should not be null");
      return EMPTY_STRING;
    }
    return fileType + COLON + readFooterMetricsEnum.getName();
  }

  /**
   * Looks up the counter value for a specific metric.
   *
   * @param fileType the type of the file
   * @param abfsReadFooterMetricsEnum the metric to look up
   * @return the counter value
   */
  private long getCounterMetricValue(FileType fileType,
      AbfsReadFooterMetricsEnum abfsReadFooterMetricsEnum) {
    return lookupCounterValue(
        getMetricName(fileType, abfsReadFooterMetricsEnum));
  }

  /**
   * Looks up the mean statistic value for a specific metric.
   *
   * @param fileType the type of the file
   * @param abfsReadFooterMetricsEnum the metric to look up
   * @return the mean statistic value
   */
  private String getMeanMetricValue(FileType fileType,
      AbfsReadFooterMetricsEnum abfsReadFooterMetricsEnum) {
    return format(DOUBLE_PRECISION_FORMAT,
        lookupMeanStatistic(
            getMetricName(fileType, abfsReadFooterMetricsEnum)));
  }

  /**
   * Increments the value of a specific metric.
   *
   * @param fileType the type of the file
   * @param abfsReadFooterMetricsEnum the metric to increment
   */
  public void incrementMetricValue(FileType fileType,
      AbfsReadFooterMetricsEnum abfsReadFooterMetricsEnum) {
    incCounterValue(getMetricName(fileType, abfsReadFooterMetricsEnum));
  }

  /**
   * Adds a mean statistic value for a specific metric.
   *
   * @param fileType the type of the file
   * @param abfsReadFooterMetricsEnum the metric to update
   * @param value the new value of the metric
   */
  public void addMeanMetricValue(FileType fileType,
      AbfsReadFooterMetricsEnum abfsReadFooterMetricsEnum,
      long value) {
    addMeanStatistic(getMetricName(fileType, abfsReadFooterMetricsEnum), value);
  }

  /**
   * Returns the total number of files.
   *
   * @return the total number of files
   */
  public Long getTotalFiles() {
    return getCounterMetricValue(PARQUET, TOTAL_FILES) + getCounterMetricValue(
        NON_PARQUET, TOTAL_FILES);
  }

  /**
   * Updates the map with a new file path identifier.
   *
   * @param filePathIdentifier the file path identifier
   */
  public void updateMap(String filePathIdentifier) {
    fileTypeMetricsMap.computeIfAbsent(filePathIdentifier,
        key -> new FileTypeMetrics());
  }

  /**
   * Checks and updates the metrics for a given file read.
   *
   * @param filePathIdentifier the file path identifier
   * @param len the length of the read
   * @param contentLength the total content length of the file
   * @param nextReadPos the position of the next read
   */
  public void updateReadMetrics(final String filePathIdentifier,
      final int len,
      final long contentLength,
      final long nextReadPos) {
    FileTypeMetrics fileTypeMetrics = fileTypeMetricsMap.computeIfAbsent(
        filePathIdentifier, key -> new FileTypeMetrics());
    if (fileTypeMetrics.getReadCount() == 0 || (
        fileTypeMetrics.getReadCount() >= 1
            && fileTypeMetrics.getCollectMetrics())) {
      updateMetrics(fileTypeMetrics, len, contentLength, nextReadPos);
    }
  }

  /**
   * Updates metrics for a specific file identified by filePathIdentifier.
   *
   * @param fileTypeMetrics    File metadata to know file type.
   * @param len                The length of the read operation.
   * @param contentLength      The total content length of the file.
   * @param nextReadPos        The position of the next read operation.
   */
  private void updateMetrics(FileTypeMetrics fileTypeMetrics,
      int len,
      long contentLength,
      long nextReadPos) {
    fileTypeMetrics.incrementReadCount();

    long readCount = fileTypeMetrics.getReadCount();

    if (readCount == 1) {
      handleFirstRead(fileTypeMetrics, nextReadPos, len, contentLength);
    } else if (readCount == 2) {
      handleSecondRead(fileTypeMetrics, nextReadPos, len, contentLength);
    } else {
      handleFurtherRead(fileTypeMetrics, len);
    }
  }

  /**
   * Handles the first read operation by checking if the current read position is near the end of the file.
   * If it is, updates the {@link FileTypeMetrics} object to enable metrics collection and records the first read's
   * offset and size.
   *
   * @param fileTypeMetrics The {@link FileTypeMetrics} object to update with metrics and read details.
   * @param nextReadPos The position where the next read will start.
   * @param len The length of the current read operation.
   * @param contentLength The total length of the file content.
   */
  private void handleFirstRead(FileTypeMetrics fileTypeMetrics,
      long nextReadPos,
      int len,
      long contentLength) {
    if (nextReadPos
        >= contentLength - (long) Integer.parseInt(FOOTER_LENGTH) * ONE_KB) {
      fileTypeMetrics.setCollectMetrics(true);
      fileTypeMetrics.setCollectMetricsForNextRead(true);
      fileTypeMetrics.setOffsetOfFirstRead(nextReadPos);
      fileTypeMetrics.setSizeReadByFirstRead(
          len + UNDERSCORE + Math.abs(contentLength - nextReadPos));
    }
  }

  /**
   * Handles the second read operation by checking if metrics collection is enabled for the next read.
   * If it is, calculates the offset difference between the first and second reads, updates the {@link FileTypeMetrics}
   * object with this information, and sets the file type. Then, updates the metrics data.
   *
   * @param fileTypeMetrics The {@link FileTypeMetrics} object to update with metrics and read details.
   * @param nextReadPos The position where the next read will start.
   * @param len The length of the current read operation.
   * @param contentLength The total length of the file content.
   */
  private void handleSecondRead(FileTypeMetrics fileTypeMetrics,
      long nextReadPos,
      int len,
      long contentLength) {
    if (fileTypeMetrics.getCollectMetricsForNextRead()) {
      long offsetDiff = Math.abs(
          nextReadPos - fileTypeMetrics.getOffsetOfFirstRead());
      fileTypeMetrics.setOffsetDiffBetweenFirstAndSecondRead(
          len + UNDERSCORE + offsetDiff);
      fileTypeMetrics.setCollectLenMetrics(true);
      fileTypeMetrics.updateFileType();
      updateMetricsData(fileTypeMetrics, len, contentLength);
    }
  }

  /**
   * Handles further read operations beyond the second read. If metrics collection is enabled and the file type is set,
   * updates the read length requested and increments the read count for the specific file type.
   *
   * @param fileTypeMetrics The {@link FileTypeMetrics} object containing metrics and read details.
   * @param len The length of the current read operation.
   */
  private void handleFurtherRead(FileTypeMetrics fileTypeMetrics, int len) {
    if (fileTypeMetrics.getCollectLenMetrics()
        && fileTypeMetrics.getFileType() != null) {
      FileType fileType = fileTypeMetrics.getFileType();
      addMeanMetricValue(fileType, AVG_READ_LEN_REQUESTED, len);
    }
  }

  /**
   * Updates the metrics data for a specific file identified by the {@link FileTypeMetrics} object.
   * This method calculates and updates various metrics such as read length requested, file length,
   * size read by the first read, and offset differences between reads.
   *
   * @param fileTypeMetrics The {@link FileTypeMetrics} object containing metrics and read details.
   * @param len The length of the current read operation.
   * @param contentLength The total length of the file content.
   */
  private void updateMetricsData(FileTypeMetrics fileTypeMetrics,
      int len,
      long contentLength) {
    long sizeReadByFirstRead = Long.parseLong(
        fileTypeMetrics.getSizeReadByFirstRead().split(UNDERSCORE)[0]);
    long firstOffsetDiff = Long.parseLong(
        fileTypeMetrics.getSizeReadByFirstRead().split(UNDERSCORE)[1]);
    long secondOffsetDiff = Long.parseLong(
        fileTypeMetrics.getOffsetDiffBetweenFirstAndSecondRead().split(
            UNDERSCORE)[1]);
    FileType fileType = fileTypeMetrics.getFileType();

    addMeanMetricValue(fileType, AVG_READ_LEN_REQUESTED, len);
    addMeanMetricValue(fileType, AVG_READ_LEN_REQUESTED, sizeReadByFirstRead);
    addMeanMetricValue(fileType, AVG_FILE_LENGTH, contentLength);
    addMeanMetricValue(fileType, AVG_SIZE_READ_BY_FIRST_READ,
        sizeReadByFirstRead);
    addMeanMetricValue(fileType, AVG_OFFSET_DIFF_BETWEEN_FIRST_AND_SECOND_READ,
        len);
    addMeanMetricValue(fileType, AVG_FIRST_OFFSET_DIFF, firstOffsetDiff);
    addMeanMetricValue(fileType, AVG_SECOND_OFFSET_DIFF, secondOffsetDiff);
    incrementMetricValue(fileType, TOTAL_FILES);
  }

  /**
   * Appends the metrics for a specific file type to the given metric builder.
   *
   * @param metricBuilder the metric builder to append the metrics to
   * @param fileType the file type to append the metrics for
   */
  private void appendMetrics(StringBuilder metricBuilder, FileType fileType) {
    long totalFiles = getCounterMetricValue(fileType, TOTAL_FILES);
    if (totalFiles <= 0) {
      return;
    }

    String sizeReadByFirstRead = getMeanMetricValue(fileType,
        AVG_SIZE_READ_BY_FIRST_READ);
    String offsetDiffBetweenFirstAndSecondRead = getMeanMetricValue(fileType,
        AVG_OFFSET_DIFF_BETWEEN_FIRST_AND_SECOND_READ);

    if (NON_PARQUET.equals(fileType)) {
      sizeReadByFirstRead += CHAR_UNDERSCORE + getMeanMetricValue(fileType,
          AVG_FIRST_OFFSET_DIFF);
      offsetDiffBetweenFirstAndSecondRead += CHAR_UNDERSCORE
          + getMeanMetricValue(fileType, AVG_SECOND_OFFSET_DIFF);
    }

    appendAlways(metricBuilder, CHAR_DOLLAR, fileType.toString());
    appendAlways(metricBuilder, FIRST_READ, sizeReadByFirstRead);
    appendAlways(metricBuilder, SECOND_READ,
        offsetDiffBetweenFirstAndSecondRead);
    appendAlways(metricBuilder, FILE_LENGTH,
        getMeanMetricValue(fileType, AVG_FILE_LENGTH));
    appendAlways(metricBuilder, READ_LENGTH,
        getMeanMetricValue(fileType, AVG_READ_LEN_REQUESTED));
  }

  /**
   * Returns the read footer metrics for all file types.
   *
   * @return the read footer metrics as a string
   */
  @Override
  public String toString() {
    StringBuilder readFooterMetric = new StringBuilder();
    appendMetrics(readFooterMetric, PARQUET);
    appendMetrics(readFooterMetric, NON_PARQUET);
    return readFooterMetric.toString();
  }
}