AbfsReadFooterMetrics.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs.services;
import java.util.List;
import java.util.Map;
import java.util.Arrays;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.azurebfs.enums.AbfsReadFooterMetricsEnum;
import org.apache.hadoop.fs.azurebfs.enums.FileType;
import org.apache.hadoop.fs.azurebfs.enums.StatisticTypeEnum;
import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CHAR_UNDERSCORE;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.COLON;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_KB;
import static org.apache.hadoop.fs.azurebfs.constants.MetricsConstants.CHAR_DOLLAR;
import static org.apache.hadoop.fs.azurebfs.constants.MetricsConstants.DOUBLE_PRECISION_FORMAT;
import static org.apache.hadoop.fs.azurebfs.constants.MetricsConstants.FILE;
import static org.apache.hadoop.fs.azurebfs.constants.MetricsConstants.FIRST_READ;
import static org.apache.hadoop.fs.azurebfs.constants.MetricsConstants.SECOND_READ;
import static org.apache.hadoop.fs.azurebfs.constants.MetricsConstants.FILE_LENGTH;
import static org.apache.hadoop.fs.azurebfs.constants.MetricsConstants.READ_LENGTH;
import static org.apache.hadoop.fs.azurebfs.enums.AbfsReadFooterMetricsEnum.TOTAL_FILES;
import static org.apache.hadoop.fs.azurebfs.enums.AbfsReadFooterMetricsEnum.AVG_FILE_LENGTH;
import static org.apache.hadoop.fs.azurebfs.enums.AbfsReadFooterMetricsEnum.AVG_SIZE_READ_BY_FIRST_READ;
import static org.apache.hadoop.fs.azurebfs.enums.AbfsReadFooterMetricsEnum.AVG_OFFSET_DIFF_BETWEEN_FIRST_AND_SECOND_READ;
import static org.apache.hadoop.fs.azurebfs.enums.AbfsReadFooterMetricsEnum.AVG_READ_LEN_REQUESTED;
import static org.apache.hadoop.fs.azurebfs.enums.AbfsReadFooterMetricsEnum.AVG_FIRST_OFFSET_DIFF;
import static org.apache.hadoop.fs.azurebfs.enums.AbfsReadFooterMetricsEnum.AVG_SECOND_OFFSET_DIFF;
import static org.apache.hadoop.fs.azurebfs.enums.FileType.PARQUET;
import static org.apache.hadoop.fs.azurebfs.enums.FileType.NON_PARQUET;
import static org.apache.hadoop.fs.azurebfs.enums.StatisticTypeEnum.TYPE_COUNTER;
import static org.apache.hadoop.fs.azurebfs.enums.StatisticTypeEnum.TYPE_MEAN;
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.iostatisticsStore;
import static org.apache.hadoop.util.StringUtils.format;
/**
* This class is responsible for tracking and updating metrics related to reading footers in files.
*/
public class AbfsReadFooterMetrics extends AbstractAbfsStatisticsSource {
private static final Logger LOG = LoggerFactory.getLogger(AbfsReadFooterMetrics.class);
private static final String FOOTER_LENGTH = "20";
private static final List<FileType> FILE_TYPE_LIST =
Arrays.asList(FileType.values());
private final Map<String, FileTypeMetrics> fileTypeMetricsMap =
new ConcurrentHashMap<>();
/**
* Inner class to handle file type checks.
*/
private static final class FileTypeMetrics {
private final AtomicBoolean collectMetrics;
private final AtomicBoolean collectMetricsForNextRead;
private final AtomicBoolean collectLenMetrics;
private final AtomicLong readCount;
private final AtomicLong offsetOfFirstRead;
private FileType fileType = null;
private String sizeReadByFirstRead;
private String offsetDiffBetweenFirstAndSecondRead;
/**
* Constructor to initialize the file type metrics.
*/
private FileTypeMetrics() {
collectMetrics = new AtomicBoolean(false);
collectMetricsForNextRead = new AtomicBoolean(false);
collectLenMetrics = new AtomicBoolean(false);
readCount = new AtomicLong(0);
offsetOfFirstRead = new AtomicLong(0);
}
/**
* Updates the file type based on the metrics collected.
*/
private void updateFileType() {
if (fileType == null) {
fileType = collectMetrics.get() && readCount.get() >= 2
&& haveEqualValues(sizeReadByFirstRead)
&& haveEqualValues(offsetDiffBetweenFirstAndSecondRead) ? PARQUET : NON_PARQUET;
}
}
/**
* Checks if the given value has equal parts.
*
* @param value the value to check
* @return true if the value has equal parts, false otherwise
*/
private boolean haveEqualValues(String value) {
String[] parts = value.split("_");
return parts.length == 2
&& parts[0].equals(parts[1]);
}
/**
* Increments the read count.
*/
private void incrementReadCount() {
readCount.incrementAndGet();
}
/**
* Returns the read count.
*
* @return the read count
*/
private long getReadCount() {
return readCount.get();
}
/**
* Sets the collect metrics flag.
*
* @param collect the value to set
*/
private void setCollectMetrics(boolean collect) {
collectMetrics.set(collect);
}
/**
* Returns the collect metrics flag.
*
* @return the collect metrics flag
*/
private boolean getCollectMetrics() {
return collectMetrics.get();
}
/**
* Sets the collect metrics for the next read flag.
*
* @param collect the value to set
*/
private void setCollectMetricsForNextRead(boolean collect) {
collectMetricsForNextRead.set(collect);
}
/**
* Returns the collect metrics for the next read flag.
*
* @return the collect metrics for the next read flag
*/
private boolean getCollectMetricsForNextRead() {
return collectMetricsForNextRead.get();
}
/**
* Returns the collect length metrics flag.
*
* @return the collect length metrics flag
*/
private boolean getCollectLenMetrics() {
return collectLenMetrics.get();
}
/**
* Sets the collect length metrics flag.
*
* @param collect the value to set
*/
private void setCollectLenMetrics(boolean collect) {
collectLenMetrics.set(collect);
}
/**
* Sets the offset of the first read.
*
* @param offset the value to set
*/
private void setOffsetOfFirstRead(long offset) {
offsetOfFirstRead.set(offset);
}
/**
* Returns the offset of the first read.
*
* @return the offset of the first read
*/
private long getOffsetOfFirstRead() {
return offsetOfFirstRead.get();
}
/**
* Sets the size read by the first read.
*
* @param size the value to set
*/
private void setSizeReadByFirstRead(String size) {
sizeReadByFirstRead = size;
}
/**
* Returns the size read by the first read.
*
* @return the size read by the first read
*/
private String getSizeReadByFirstRead() {
return sizeReadByFirstRead;
}
/**
* Sets the offset difference between the first and second read.
*
* @param offsetDiff the value to set
*/
private void setOffsetDiffBetweenFirstAndSecondRead(String offsetDiff) {
offsetDiffBetweenFirstAndSecondRead = offsetDiff;
}
/**
* Returns the offset difference between the first and second read.
*
* @return the offset difference between the first and second read
*/
private String getOffsetDiffBetweenFirstAndSecondRead() {
return offsetDiffBetweenFirstAndSecondRead;
}
/**
* Returns the file type.
*
* @return the file type
*/
private FileType getFileType() {
return fileType;
}
}
/**
* Constructor to initialize the IOStatisticsStore with counters and mean statistics.
*/
public AbfsReadFooterMetrics() {
IOStatisticsStore ioStatisticsStore = iostatisticsStore()
.withCounters(getMetricNames(TYPE_COUNTER))
.withMeanStatistics(getMetricNames(TYPE_MEAN))
.build();
setIOStatistics(ioStatisticsStore);
}
/**
* Returns the metric names for a specific statistic type.
*
* @param type the statistic type
* @return the metric names
*/
private String[] getMetricNames(StatisticTypeEnum type) {
return Arrays.stream(AbfsReadFooterMetricsEnum.values())
.filter(readFooterMetricsEnum -> readFooterMetricsEnum.getStatisticType().equals(type))
.flatMap(readFooterMetricsEnum ->
FILE.equals(readFooterMetricsEnum.getType())
? FILE_TYPE_LIST.stream().map(fileType ->
getMetricName(fileType, readFooterMetricsEnum))
: Stream.of(readFooterMetricsEnum.getName()))
.toArray(String[]::new);
}
/**
* Returns the metric name for a specific file type and metric.
*
* @param fileType the type of the file
* @param readFooterMetricsEnum the metric to get the name for
* @return the metric name
*/
private String getMetricName(FileType fileType,
AbfsReadFooterMetricsEnum readFooterMetricsEnum) {
if (fileType == null || readFooterMetricsEnum == null) {
LOG.error("File type or ABFS read footer metrics should not be null");
return EMPTY_STRING;
}
return fileType + COLON + readFooterMetricsEnum.getName();
}
/**
* Looks up the counter value for a specific metric.
*
* @param fileType the type of the file
* @param abfsReadFooterMetricsEnum the metric to look up
* @return the counter value
*/
private long getCounterMetricValue(FileType fileType,
AbfsReadFooterMetricsEnum abfsReadFooterMetricsEnum) {
return lookupCounterValue(getMetricName(fileType, abfsReadFooterMetricsEnum));
}
/**
* Looks up the mean statistic value for a specific metric.
*
* @param fileType the type of the file
* @param abfsReadFooterMetricsEnum the metric to look up
* @return the mean statistic value
*/
private String getMeanMetricValue(FileType fileType,
AbfsReadFooterMetricsEnum abfsReadFooterMetricsEnum) {
return format(DOUBLE_PRECISION_FORMAT,
lookupMeanStatistic(getMetricName(fileType, abfsReadFooterMetricsEnum)));
}
/**
* Increments the value of a specific metric.
*
* @param fileType the type of the file
* @param abfsReadFooterMetricsEnum the metric to increment
*/
public void incrementMetricValue(FileType fileType,
AbfsReadFooterMetricsEnum abfsReadFooterMetricsEnum) {
incCounterValue(getMetricName(fileType, abfsReadFooterMetricsEnum));
}
/**
* Adds a mean statistic value for a specific metric.
*
* @param fileType the type of the file
* @param abfsReadFooterMetricsEnum the metric to update
* @param value the new value of the metric
*/
public void addMeanMetricValue(FileType fileType,
AbfsReadFooterMetricsEnum abfsReadFooterMetricsEnum,
long value) {
addMeanStatistic(getMetricName(fileType, abfsReadFooterMetricsEnum), value);
}
/**
* Returns the total number of files.
*
* @return the total number of files
*/
public Long getTotalFiles() {
return getCounterMetricValue(PARQUET, TOTAL_FILES) + getCounterMetricValue(NON_PARQUET, TOTAL_FILES);
}
/**
* Updates the map with a new file path identifier.
*
* @param filePathIdentifier the file path identifier
*/
public void updateMap(String filePathIdentifier) {
fileTypeMetricsMap.computeIfAbsent(filePathIdentifier, key -> new FileTypeMetrics());
}
/**
* Checks and updates the metrics for a given file read.
*
* @param filePathIdentifier the file path identifier
* @param len the length of the read
* @param contentLength the total content length of the file
* @param nextReadPos the position of the next read
*/
public void updateReadMetrics(final String filePathIdentifier,
final int len,
final long contentLength,
final long nextReadPos) {
FileTypeMetrics fileTypeMetrics = fileTypeMetricsMap.computeIfAbsent(filePathIdentifier, key -> new FileTypeMetrics());
if (fileTypeMetrics.getReadCount() == 0 || (fileTypeMetrics.getReadCount() >= 1 && fileTypeMetrics.getCollectMetrics())) {
updateMetrics(fileTypeMetrics, len, contentLength, nextReadPos);
}
}
/**
* Updates metrics for a specific file identified by filePathIdentifier.
*
* @param fileTypeMetrics File metadata to know file type.
* @param len The length of the read operation.
* @param contentLength The total content length of the file.
* @param nextReadPos The position of the next read operation.
*/
private void updateMetrics(FileTypeMetrics fileTypeMetrics,
int len,
long contentLength,
long nextReadPos) {
fileTypeMetrics.incrementReadCount();
long readCount = fileTypeMetrics.getReadCount();
if (readCount == 1) {
handleFirstRead(fileTypeMetrics, nextReadPos, len, contentLength);
} else if (readCount == 2) {
handleSecondRead(fileTypeMetrics, nextReadPos, len, contentLength);
} else {
handleFurtherRead(fileTypeMetrics, len);
}
}
/**
* Handles the first read operation by checking if the current read position is near the end of the file.
* If it is, updates the {@link FileTypeMetrics} object to enable metrics collection and records the first read's
* offset and size.
*
* @param fileTypeMetrics The {@link FileTypeMetrics} object to update with metrics and read details.
* @param nextReadPos The position where the next read will start.
* @param len The length of the current read operation.
* @param contentLength The total length of the file content.
*/
private void handleFirstRead(FileTypeMetrics fileTypeMetrics,
long nextReadPos,
int len,
long contentLength) {
if (nextReadPos >= contentLength - (long) Integer.parseInt(FOOTER_LENGTH) * ONE_KB) {
fileTypeMetrics.setCollectMetrics(true);
fileTypeMetrics.setCollectMetricsForNextRead(true);
fileTypeMetrics.setOffsetOfFirstRead(nextReadPos);
fileTypeMetrics.setSizeReadByFirstRead(len + "_" + Math.abs(contentLength - nextReadPos));
}
}
/**
* Handles the second read operation by checking if metrics collection is enabled for the next read.
* If it is, calculates the offset difference between the first and second reads, updates the {@link FileTypeMetrics}
* object with this information, and sets the file type. Then, updates the metrics data.
*
* @param fileTypeMetrics The {@link FileTypeMetrics} object to update with metrics and read details.
* @param nextReadPos The position where the next read will start.
* @param len The length of the current read operation.
* @param contentLength The total length of the file content.
*/
private void handleSecondRead(FileTypeMetrics fileTypeMetrics,
long nextReadPos,
int len,
long contentLength) {
if (fileTypeMetrics.getCollectMetricsForNextRead()) {
long offsetDiff = Math.abs(nextReadPos - fileTypeMetrics.getOffsetOfFirstRead());
fileTypeMetrics.setOffsetDiffBetweenFirstAndSecondRead(len + "_" + offsetDiff);
fileTypeMetrics.setCollectLenMetrics(true);
fileTypeMetrics.updateFileType();
updateMetricsData(fileTypeMetrics, len, contentLength);
}
}
/**
* Handles further read operations beyond the second read. If metrics collection is enabled and the file type is set,
* updates the read length requested and increments the read count for the specific file type.
*
* @param fileTypeMetrics The {@link FileTypeMetrics} object containing metrics and read details.
* @param len The length of the current read operation.
*/
private void handleFurtherRead(FileTypeMetrics fileTypeMetrics, int len) {
if (fileTypeMetrics.getCollectLenMetrics() && fileTypeMetrics.getFileType() != null) {
FileType fileType = fileTypeMetrics.getFileType();
addMeanMetricValue(fileType, AVG_READ_LEN_REQUESTED, len);
}
}
/**
* Updates the metrics data for a specific file identified by the {@link FileTypeMetrics} object.
* This method calculates and updates various metrics such as read length requested, file length,
* size read by the first read, and offset differences between reads.
*
* @param fileTypeMetrics The {@link FileTypeMetrics} object containing metrics and read details.
* @param len The length of the current read operation.
* @param contentLength The total length of the file content.
*/
private void updateMetricsData(FileTypeMetrics fileTypeMetrics,
int len,
long contentLength) {
long sizeReadByFirstRead = Long.parseLong(fileTypeMetrics.getSizeReadByFirstRead().split("_")[0]);
long firstOffsetDiff = Long.parseLong(fileTypeMetrics.getSizeReadByFirstRead().split("_")[1]);
long secondOffsetDiff = Long.parseLong(fileTypeMetrics.getOffsetDiffBetweenFirstAndSecondRead().split("_")[1]);
FileType fileType = fileTypeMetrics.getFileType();
addMeanMetricValue(fileType, AVG_READ_LEN_REQUESTED, len);
addMeanMetricValue(fileType, AVG_READ_LEN_REQUESTED, sizeReadByFirstRead);
addMeanMetricValue(fileType, AVG_FILE_LENGTH, contentLength);
addMeanMetricValue(fileType, AVG_SIZE_READ_BY_FIRST_READ, sizeReadByFirstRead);
addMeanMetricValue(fileType, AVG_OFFSET_DIFF_BETWEEN_FIRST_AND_SECOND_READ, len);
addMeanMetricValue(fileType, AVG_FIRST_OFFSET_DIFF, firstOffsetDiff);
addMeanMetricValue(fileType, AVG_SECOND_OFFSET_DIFF, secondOffsetDiff);
incrementMetricValue(fileType, TOTAL_FILES);
}
/**
* Appends the metrics for a specific file type to the given metric builder.
*
* @param metricBuilder the metric builder to append the metrics to
* @param fileType the file type to append the metrics for
*/
private void appendMetrics(StringBuilder metricBuilder, FileType fileType) {
long totalFiles = getCounterMetricValue(fileType, TOTAL_FILES);
if (totalFiles <= 0) {
return;
}
String sizeReadByFirstRead = getMeanMetricValue(fileType, AVG_SIZE_READ_BY_FIRST_READ);
String offsetDiffBetweenFirstAndSecondRead = getMeanMetricValue(fileType, AVG_OFFSET_DIFF_BETWEEN_FIRST_AND_SECOND_READ);
if (NON_PARQUET.equals(fileType)) {
sizeReadByFirstRead += CHAR_UNDERSCORE + getMeanMetricValue(fileType, AVG_FIRST_OFFSET_DIFF);
offsetDiffBetweenFirstAndSecondRead += CHAR_UNDERSCORE + getMeanMetricValue(fileType, AVG_SECOND_OFFSET_DIFF);
}
metricBuilder.append(CHAR_DOLLAR)
.append(fileType)
.append(FIRST_READ)
.append(sizeReadByFirstRead)
.append(SECOND_READ)
.append(offsetDiffBetweenFirstAndSecondRead)
.append(FILE_LENGTH)
.append(getMeanMetricValue(fileType, AVG_FILE_LENGTH))
.append(READ_LENGTH)
.append(getMeanMetricValue(fileType, AVG_READ_LEN_REQUESTED));
}
/**
* Returns the read footer metrics for all file types.
*
* @return the read footer metrics as a string
*/
@Override
public String toString() {
StringBuilder readFooterMetric = new StringBuilder();
appendMetrics(readFooterMetric, PARQUET);
appendMetrics(readFooterMetric, NON_PARQUET);
return readFooterMetric.toString();
}
}