ITestAbfsReadFooterMetrics.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.fs.azurebfs;

import static org.apache.hadoop.fs.CommonConfigurationKeys.IOSTATISTICS_LOGGING_LEVEL_INFO;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_READ_BUFFER_SIZE;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_WRITE_BUFFER_SIZE;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_METRIC_ACCOUNT_KEY;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_METRIC_ACCOUNT_NAME;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_METRIC_FORMAT;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_METRIC_URI;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MIN_BUFFER_SIZE;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_KB;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_MB;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_READ_BUFFER_SIZE;
import static org.apache.hadoop.fs.azurebfs.enums.FileType.NON_PARQUET;
import static org.apache.hadoop.fs.azurebfs.enums.FileType.PARQUET;
import static org.apache.hadoop.fs.azurebfs.enums.AbfsReadFooterMetricsEnum.AVG_FILE_LENGTH;
import static org.apache.hadoop.fs.azurebfs.enums.AbfsReadFooterMetricsEnum.AVG_OFFSET_DIFF_BETWEEN_FIRST_AND_SECOND_READ;
import static org.apache.hadoop.fs.azurebfs.enums.AbfsReadFooterMetricsEnum.AVG_READ_LEN_REQUESTED;
import static org.apache.hadoop.fs.azurebfs.enums.AbfsReadFooterMetricsEnum.AVG_SIZE_READ_BY_FIRST_READ;
import static org.apache.hadoop.fs.azurebfs.enums.AbfsReadFooterMetricsEnum.TOTAL_FILES;
import static org.apache.hadoop.fs.azurebfs.enums.AbfsReadFooterMetricsEnum.AVG_FIRST_OFFSET_DIFF;
import static org.apache.hadoop.fs.azurebfs.enums.AbfsReadFooterMetricsEnum.AVG_SECOND_OFFSET_DIFF;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.azurebfs.utils.MetricFormat;

import org.junit.Assume;
import org.junit.Test;

import java.io.IOException;
import java.util.Random;
import org.apache.hadoop.fs.azurebfs.services.AbfsReadFooterMetrics;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.statistics.IOStatisticsLogging;
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
import org.apache.hadoop.fs.azurebfs.services.AbfsInputStream;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderValidator;
import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;

public class ITestAbfsReadFooterMetrics extends AbstractAbfsScaleTest {

  public ITestAbfsReadFooterMetrics() throws Exception {
    checkPrerequisites();
  }

  private void checkPrerequisites(){
    checkIfConfigIsSet(FS_AZURE_METRIC_ACCOUNT_NAME);
    checkIfConfigIsSet(FS_AZURE_METRIC_ACCOUNT_KEY);
    checkIfConfigIsSet(FS_AZURE_METRIC_URI);
  }

  private void checkIfConfigIsSet(String configKey){
    AbfsConfiguration conf = getConfiguration();
    String value = conf.get(configKey);
    Assume.assumeTrue(configKey + " config is mandatory for the test to run",
        value != null && value.trim().length() > 1);
  }

  private static final String TEST_PATH = "/testfile";
  private static final String SLEEP_PERIOD = "90000";

  /**
   * Integration test for reading footer metrics with both Parquet and non-Parquet reads.
   */
  @Test
  public void testReadFooterMetricsWithParquetAndNonParquet() throws Exception {
    testReadWriteAndSeek(8 * ONE_MB, DEFAULT_READ_BUFFER_SIZE, ONE_KB, 4 * ONE_KB);
  }

  /**
   * Configures the AzureBlobFileSystem with the given buffer size.
   *
   * @param bufferSize Buffer size to set for write and read operations.
   * @return AbfsConfiguration used for configuration.
   */
  private Configuration getConfiguration(int bufferSize) {
    final Configuration configuration = getRawConfiguration();
    configuration.set(FS_AZURE_METRIC_FORMAT, String.valueOf(MetricFormat.INTERNAL_FOOTER_METRIC_FORMAT));
    configuration.setInt(AZURE_READ_BUFFER_SIZE, bufferSize);
    configuration.setInt(AZURE_WRITE_BUFFER_SIZE, bufferSize);
    return configuration;
  }

  /**
   * Writes data to the specified file path in the AzureBlobFileSystem.
   *
   * @param fs      AzureBlobFileSystem instance.
   * @param testPath Path to the file.
   * @param data    Data to write to the file.
   */
  private void writeDataToFile(AzureBlobFileSystem fs, Path testPath, byte[] data) throws IOException {
    FSDataOutputStream stream = fs.create(testPath);
    try {
      stream.write(data);
    } finally {
      stream.close();
    }
    IOStatisticsLogging.logIOStatisticsAtLevel(LOG, IOSTATISTICS_LOGGING_LEVEL_INFO, stream);
  }

  /**
   * Asserts that the actual metrics obtained from the AzureBlobFileSystem match the expected metrics string.
   *
   * @param fs               AzureBlobFileSystem instance.
   * @param expectedMetrics  Expected metrics string.
   */
  private void assertMetricsEquality(AzureBlobFileSystem fs, String expectedMetrics) {
    AbfsReadFooterMetrics actualMetrics = fs.getAbfsClient().getAbfsCounters().getAbfsReadFooterMetrics();
    assertNotNull("AbfsReadFooterMetrics is null", actualMetrics);
    assertEquals("The computed metrics differs from the actual metrics", expectedMetrics, actualMetrics.toString());
  }

  /**
   * Test for reading footer metrics with a non-Parquet file.
   */
  @Test
  public void testReadFooterMetrics() throws Exception {
    // Initialize AzureBlobFileSystem and set buffer size for configuration.
    int bufferSize = MIN_BUFFER_SIZE;
    Configuration configuration = getConfiguration(bufferSize);
    final AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.newInstance(configuration);
    AbfsConfiguration abfsConfiguration = fs.getAbfsStore().getAbfsConfiguration();

    // Generate random data to write to the test file.
    final byte[] b = new byte[2 * bufferSize];
    new Random().nextBytes(b);

    // Set up the test file path.
    Path testPath = path(TEST_PATH);

    // Write random data to the test file.
    writeDataToFile(fs, testPath, b);

    // Initialize a buffer for reading data.
    final byte[] readBuffer = new byte[2 * bufferSize];
    int result;

    // Initialize statistics source for logging.
    IOStatisticsSource statisticsSource = null;

    try (FSDataInputStream inputStream = fs.open(testPath)) {
      // Register a listener for tracing header validation.
      statisticsSource = inputStream;
      ((AbfsInputStream) inputStream.getWrappedStream()).registerListener(
              new TracingHeaderValidator(abfsConfiguration.getClientCorrelationId(),
                      fs.getFileSystemId(), FSOperationType.READ, true, 0,
                      ((AbfsInputStream) inputStream.getWrappedStream())
                              .getStreamID()));

      // Perform the first read operation with seek.
      inputStream.seek(bufferSize);
      result = inputStream.read(readBuffer, bufferSize, bufferSize);
      assertNotEquals(-1, result);

      // To test tracingHeader for case with bypassReadAhead == true
      inputStream.seek(0);
      byte[] temp = new byte[5];
      int t = inputStream.read(temp, 0, 1);

      // Seek back to the beginning and perform another read operation.
      inputStream.seek(0);
      result = inputStream.read(readBuffer, 0, bufferSize);
    }

    // Log IO statistics at the INFO level.
    IOStatisticsLogging.logIOStatisticsAtLevel(LOG,
            IOSTATISTICS_LOGGING_LEVEL_INFO, statisticsSource);

    // Ensure data is read successfully and matches the written data.
    assertNotEquals("data read in final read()", -1, result);
    assertArrayEquals(readBuffer, b);

    // Get non-Parquet metrics and assert metrics equality.
    AbfsReadFooterMetrics nonParquetMetrics = getNonParquetMetrics();
    String metrics = nonParquetMetrics.toString();
    assertMetricsEquality(fs, metrics);

    // Close the AzureBlobFileSystem.
    fs.close();
  }

  /**
   * Generates and returns an instance of AbfsReadFooterMetrics for non-Parquet files.
   */
  private AbfsReadFooterMetrics getNonParquetMetrics() {
    AbfsReadFooterMetrics nonParquetMetrics = new AbfsReadFooterMetrics();
    nonParquetMetrics.addMeanMetricValue(NON_PARQUET, AVG_FILE_LENGTH, Long.parseLong("32768"));
    nonParquetMetrics.addMeanMetricValue(NON_PARQUET, AVG_READ_LEN_REQUESTED, Long.parseLong("10923"));
    nonParquetMetrics.addMeanMetricValue(NON_PARQUET, AVG_SIZE_READ_BY_FIRST_READ, Long.parseLong("16384"));
    nonParquetMetrics.addMeanMetricValue(NON_PARQUET, AVG_OFFSET_DIFF_BETWEEN_FIRST_AND_SECOND_READ, 1);
    nonParquetMetrics.addMeanMetricValue(NON_PARQUET, AVG_FIRST_OFFSET_DIFF, Long.parseLong("16384"));
    nonParquetMetrics.addMeanMetricValue(NON_PARQUET, AVG_SECOND_OFFSET_DIFF, Long.parseLong("16384"));
    nonParquetMetrics.incrementMetricValue(NON_PARQUET, TOTAL_FILES);
    return nonParquetMetrics;
  }

  /**
   * Generates and returns an instance of AbfsReadFooterMetrics for parquet files.
   */
  private AbfsReadFooterMetrics getParquetMetrics() {
    AbfsReadFooterMetrics parquetMetrics = new AbfsReadFooterMetrics();
    parquetMetrics.addMeanMetricValue(PARQUET, AVG_FILE_LENGTH, Long.parseLong("8388608"));
    parquetMetrics.addMeanMetricValue(PARQUET, AVG_READ_LEN_REQUESTED, Long.parseLong("2560"));
    parquetMetrics.addMeanMetricValue(PARQUET, AVG_SIZE_READ_BY_FIRST_READ, Long.parseLong("1024"));
    parquetMetrics.addMeanMetricValue(PARQUET, AVG_OFFSET_DIFF_BETWEEN_FIRST_AND_SECOND_READ, Long.parseLong("4096"));
    parquetMetrics.incrementMetricValue(PARQUET, TOTAL_FILES);
    return parquetMetrics;
  }

  /**
   * Test for reading, writing, and seeking with footer metrics.
   *
   * This method performs the integration test for reading, writing, and seeking operations
   * with footer metrics. It creates an AzureBlobFileSystem, configures it, writes random data
   * to a test file, performs read and seek operations, and checks the footer metrics for both
   * Parquet and non-Parquet scenarios.
   *
   * @param fileSize Size of the test file.
   * @param bufferSize Size of the buffer used for read and write operations.
   * @param seek1 The position to seek to in the test file.
   * @param seek2 Additional position to seek to in the test file (if not 0).
   */
  private void testReadWriteAndSeek(int fileSize, int bufferSize, Integer seek1, Integer seek2) throws Exception {
    // Create an AzureBlobFileSystem instance.
    Configuration configuration = getConfiguration(bufferSize);
    final AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.newInstance(configuration);
    AbfsConfiguration abfsConfiguration = fs.getAbfsStore().getAbfsConfiguration();

    // Generate random data to write to the test file.
    final byte[] b = new byte[fileSize];
    new Random().nextBytes(b);

    // Define the path for the test file.
    Path testPath = path("/testfile");

    // Write the random data to the test file.
    writeDataToFile(fs, testPath, b);

    // Initialize a buffer for reading.
    final byte[] readBuffer = new byte[fileSize];

    // Initialize a source for IO statistics.
    IOStatisticsSource statisticsSource = null;

    // Open an input stream for the test file.
    FSDataInputStream inputStream = fs.open(testPath);
    statisticsSource = inputStream;

    // Register a listener for tracing headers.
    ((AbfsInputStream) inputStream.getWrappedStream()).registerListener(
            new TracingHeaderValidator(abfsConfiguration.getClientCorrelationId(),
                    fs.getFileSystemId(), FSOperationType.READ, true, 0,
                    ((AbfsInputStream) inputStream.getWrappedStream())
                            .getStreamID()));

    // Seek to the specified position in the test file and read data.
    inputStream.seek(fileSize - seek1);
    inputStream.read(readBuffer, 0, seek1);

    // If seek2 is non-zero, perform an additional seek and read.
    if (seek2 != 0) {
      inputStream.seek(fileSize - seek1 - seek2);
      inputStream.read(readBuffer, 0, seek2);
    }

    // Close the input stream.
    inputStream.close();

    // Set a new buffer size for read and write operations.
    int bufferSize1 = MIN_BUFFER_SIZE;
    abfsConfiguration.setWriteBufferSize(bufferSize1);
    abfsConfiguration.setReadBufferSize(bufferSize1);

    // Generate new random data for a second test file.
    final byte[] b1 = new byte[2 * bufferSize1];
    new Random().nextBytes(b1);

    // Define the path for the second test file.
    Path testPath1 = path("/testfile1");

    // Write the new random data to the second test file.
    writeDataToFile(fs, testPath1, b1);

    // Initialize a buffer for reading from the second test file.
    final byte[] readBuffer1 = new byte[2 * bufferSize1];

    // Open an input stream for the second test file.
    FSDataInputStream inputStream1 = fs.open(testPath1);
    statisticsSource = inputStream1;

    // Register a listener for tracing headers.
    ((AbfsInputStream) inputStream1.getWrappedStream()).registerListener(
            new TracingHeaderValidator(abfsConfiguration.getClientCorrelationId(),
                    fs.getFileSystemId(), FSOperationType.READ, true, 0,
                    ((AbfsInputStream) inputStream1.getWrappedStream())
                            .getStreamID()));

    // Seek to a position in the second test file and read data.
    inputStream1.seek(bufferSize1);
    inputStream1.read(readBuffer1, bufferSize1, bufferSize1);

    // To test tracingHeader for case with bypassReadAhead == true.
    inputStream1.seek(0);
    byte[] temp = new byte[5];
    int t = inputStream1.read(temp, 0, 1);

    // Seek to the beginning of the second test file and read data.
    inputStream1.seek(0);
    inputStream1.read(readBuffer1, 0, bufferSize1);

    // Close the input stream for the second test file.
    inputStream1.close();

    // Get footer metrics for both Parquet and non-Parquet scenarios.
    AbfsReadFooterMetrics parquetMetrics = getParquetMetrics();
    AbfsReadFooterMetrics nonParquetMetrics = getNonParquetMetrics();

    // Concatenate and assert the metrics equality.
    String metrics = parquetMetrics.toString();
    metrics += nonParquetMetrics.toString();
    assertMetricsEquality(fs, metrics);

    // Close the AzureBlobFileSystem instance.
    fs.close();
  }

  /**
   * Test for reading footer metrics with an idle period.
   *
   * This method tests reading footer metrics with an idle period. It creates an AzureBlobFileSystem,
   * configures it, writes random data to a test file, performs read operations, introduces an idle
   * period, and checks the footer metrics for non-Parquet scenarios.
   *
   */
  @Test
  public void testMetricWithIdlePeriod() throws Exception {
    // Set the buffer size for the test.
    int bufferSize = MIN_BUFFER_SIZE;
    Configuration configuration = getConfiguration(bufferSize);
    final AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.newInstance(configuration);
    AbfsConfiguration abfsConfiguration = fs.getAbfsStore().getAbfsConfiguration();

    // Generate random data to write to the test file.
    final byte[] b = new byte[2 * bufferSize];
    new Random().nextBytes(b);

    // Define the path for the test file.
    Path testPath = path(TEST_PATH);

    // Write the random data to the test file.
    writeDataToFile(fs, testPath, b);

    // Initialize a buffer for reading.
    final byte[] readBuffer = new byte[2 * bufferSize];

    // Initialize a source for IO statistics.
    IOStatisticsSource statisticsSource = null;

    // Open an input stream for the test file.
    try (FSDataInputStream inputStream = fs.open(testPath)) {
      // Register a listener for tracing headers.
      ((AbfsInputStream) inputStream.getWrappedStream()).registerListener(
              new TracingHeaderValidator(abfsConfiguration.getClientCorrelationId(),
                      fs.getFileSystemId(), FSOperationType.READ, true, 0,
                      ((AbfsInputStream) inputStream.getWrappedStream())
                              .getStreamID()));

      // Seek to the specified position in the test file and read data.
      inputStream.seek(bufferSize);
      inputStream.read(readBuffer, bufferSize, bufferSize);

      // Introduce an idle period by sleeping.
      int sleepPeriod = Integer.parseInt(SLEEP_PERIOD);
      Thread.sleep(sleepPeriod);

      // To test tracingHeader for case with bypassReadAhead == true.
      inputStream.seek(0);
      byte[] temp = new byte[5];
      int t = inputStream.read(temp, 0, 1);

      // Seek to the beginning of the test file and read data.
      inputStream.seek(0);
      inputStream.read(readBuffer, 0, bufferSize);

      // Get and assert the footer metrics for non-Parquet scenarios.
      AbfsReadFooterMetrics nonParquetMetrics = getNonParquetMetrics();
      String metrics = nonParquetMetrics.toString();
      assertMetricsEquality(fs, metrics);

      // Introduce an additional idle period by sleeping.
      Thread.sleep(sleepPeriod);
    }
  }
}