ITestAbfsInputStreamStatistics.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.fs.azurebfs;

import java.io.IOException;

import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azurebfs.services.AbfsInputStream;
import org.apache.hadoop.fs.azurebfs.services.AbfsInputStreamContext;
import org.apache.hadoop.fs.azurebfs.services.AbfsInputStreamStatisticsImpl;
import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream;
import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.StoreStatisticNames;
import org.apache.hadoop.io.IOUtils;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.extractStatistics;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.lookupMeanStatistic;
import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToPrettyString;

public class ITestAbfsInputStreamStatistics
    extends AbstractAbfsIntegrationTest {
  private static final int OPERATIONS = 10;
  private static final Logger LOG =
      LoggerFactory.getLogger(ITestAbfsInputStreamStatistics.class);
  private static final int ONE_MB = 1024 * 1024;
  private static final int ONE_KB = 1024;
  private static final int CUSTOM_BLOCK_BUFFER_SIZE = 4 * 1024;
  private byte[] defBuffer = new byte[ONE_MB];

  public ITestAbfsInputStreamStatistics() throws Exception {
  }

  /**
   * Test to check the initial values of the AbfsInputStream statistics.
   */
  @Test
  public void testInitValues() throws IOException {
    describe("Testing the initial values of AbfsInputStream Statistics");

    AzureBlobFileSystem fs = getFileSystem();
    AzureBlobFileSystemStore abfss = fs.getAbfsStore();
    Path initValuesPath = path(getMethodName());
    AbfsOutputStream outputStream = null;
    AbfsInputStream inputStream = null;

    try {

      outputStream = createAbfsOutputStreamWithFlushEnabled(fs, initValuesPath);
      inputStream = abfss.openFileForRead(initValuesPath, fs.getFsStatistics(),
          getTestTracingContext(fs, false));

      AbfsInputStreamStatisticsImpl stats =
          (AbfsInputStreamStatisticsImpl) inputStream.getStreamStatistics();

      checkInitValue(stats.getSeekOperations(), "seekOps");
      checkInitValue(stats.getForwardSeekOperations(), "forwardSeekOps");
      checkInitValue(stats.getBackwardSeekOperations(), "backwardSeekOps");
      checkInitValue(stats.getBytesRead(), "bytesRead");
      checkInitValue(stats.getBytesSkippedOnSeek(), "bytesSkippedOnSeek");
      checkInitValue(stats.getBytesBackwardsOnSeek(), "bytesBackwardsOnSeek");
      checkInitValue(stats.getSeekInBuffer(), "seekInBuffer");
      checkInitValue(stats.getReadOperations(), "readOps");
      checkInitValue(stats.getBytesReadFromBuffer(), "bytesReadFromBuffer");
      checkInitValue(stats.getRemoteReadOperations(), "remoteReadOps");
      checkInitValue(stats.getReadAheadBytesRead(), "readAheadBytesRead");
      checkInitValue(stats.getRemoteBytesRead(), "readAheadRemoteBytesRead");

    } finally {
      IOUtils.cleanupWithLogger(LOG, outputStream, inputStream);
    }
  }

  /**
   * Test to check statistics from seek operation in AbfsInputStream.
   */
  @Test
  public void testSeekStatistics() throws IOException {
    describe("Testing the values of statistics from seek operations in "
        + "AbfsInputStream");

    AzureBlobFileSystem fs = getFileSystem();
    AzureBlobFileSystemStore abfss = fs.getAbfsStore();
    Path seekStatPath = path(getMethodName());

    AbfsOutputStream out = null;
    AbfsInputStream in = null;

    try {
      out = createAbfsOutputStreamWithFlushEnabled(fs, seekStatPath);

      //Writing a default buffer in a file.
      out.write(defBuffer);
      out.hflush();
      in = abfss.openFileForRead(seekStatPath, fs.getFsStatistics(),
          getTestTracingContext(fs, false));

      /*
       * Writing 1MB buffer to the file, this would make the fCursor(Current
       * position of cursor) to the end of file.
       */
      int result = in.read(defBuffer, 0, ONE_MB);
      LOG.info("Result of read : {}", result);

      /*
       * Seeking to start of file and then back to end would result in a
       * backward and a forward seek respectively 10 times.
       */
      for (int i = 0; i < OPERATIONS; i++) {
        in.seek(0);
        in.read();
        in.seek(ONE_MB);
      }

      AbfsInputStreamStatisticsImpl stats =
          (AbfsInputStreamStatisticsImpl) in.getStreamStatistics();

      LOG.info("STATISTICS: {}", stats.toString());

      /*
       * seekOps - Since we are doing backward and forward seek OPERATIONS
       * times, total seeks would be 2 * OPERATIONS.
       *
       * backwardSeekOps - Since we are doing a backward seek inside a loop
       * for OPERATION times, total backward seeks would be OPERATIONS.
       *
       * forwardSeekOps - Since we are doing a forward seek inside a loop
       * for OPERATION times, total forward seeks would be OPERATIONS.
       *
       * negativeBytesBackwardsOnSeek - Since we are doing backward seeks from
       * end of file in a ONE_MB file each time, this would mean the bytes from
       * backward seek would be OPERATIONS * ONE_MB.
       *
       * bytesSkippedOnSeek - Since, we move from start to end in seek, but
       * our fCursor(position of cursor) always remain at end of file, this
       * would mean no bytes were skipped on seek. Since, all forward seeks
       * are in buffer.
       *
       * seekInBuffer - Since all seeks were in buffer, the seekInBuffer
       * would be equal to OPERATIONS.
       *
       */
      assertEquals("Mismatch in seekOps value", 2 * OPERATIONS,
          stats.getSeekOperations());
      assertEquals("Mismatch in backwardSeekOps value", OPERATIONS,
          stats.getBackwardSeekOperations());
      assertEquals("Mismatch in forwardSeekOps value", OPERATIONS,
          stats.getForwardSeekOperations());
      assertEquals("Mismatch in bytesBackwardsOnSeek value",
          OPERATIONS * ONE_MB, stats.getBytesBackwardsOnSeek());
      assertEquals("Mismatch in bytesSkippedOnSeek value",
          0, stats.getBytesSkippedOnSeek());
      assertEquals("Mismatch in seekInBuffer value", OPERATIONS,
          stats.getSeekInBuffer());

      in.close();
      // Verifying whether stats are readable after stream is closed.
      LOG.info("STATISTICS after closing: {}", stats.toString());
    } finally {
      IOUtils.cleanupWithLogger(LOG, out, in);
    }
  }

  /**
   * Test to check statistics value from read operation in AbfsInputStream.
   */
  @Test
  public void testReadStatistics() throws IOException {
    describe("Testing the values of statistics from read operation in "
        + "AbfsInputStream");

    AzureBlobFileSystem fs = getFileSystem();
    AzureBlobFileSystemStore abfss = fs.getAbfsStore();
    Path readStatPath = path(getMethodName());

    AbfsOutputStream out = null;
    AbfsInputStream in = null;

    try {
      out = createAbfsOutputStreamWithFlushEnabled(fs, readStatPath);

      /*
       * Writing 1MB buffer to the file.
       */
      out.write(defBuffer);
      out.hflush();
      in = abfss.openFileForRead(readStatPath, fs.getFsStatistics(),
          getTestTracingContext(fs, false));

      /*
       * Doing file read 10 times.
       */
      for (int i = 0; i < OPERATIONS; i++) {
        in.read();
      }

      AbfsInputStreamStatisticsImpl stats =
          (AbfsInputStreamStatisticsImpl) in.getStreamStatistics();

      LOG.info("STATISTICS: {}", stats.toString());

      /*
       * bytesRead - Since each time a single byte is read, total
       * bytes read would be equal to OPERATIONS.
       *
       * readOps - Since each time read operation is performed OPERATIONS
       * times, total number of read operations would be equal to OPERATIONS.
       *
       * remoteReadOps - Only a single remote read operation is done. Hence,
       * total remote read ops is 1.
       *
       */
      assertEquals("Mismatch in bytesRead value", OPERATIONS,
          stats.getBytesRead());
      assertEquals("Mismatch in readOps value", OPERATIONS,
          stats.getReadOperations());
      assertEquals("Mismatch in remoteReadOps value", 1,
          stats.getRemoteReadOperations());

      in.close();
      // Verifying if stats are still readable after stream is closed.
      LOG.info("STATISTICS after closing: {}", stats.toString());
    } finally {
      IOUtils.cleanupWithLogger(LOG, out, in);
    }
  }

  /**
   * Testing AbfsInputStream works with null Statistics.
   */
  @Test
  public void testWithNullStreamStatistics() throws IOException {
    describe("Testing AbfsInputStream operations with statistics as null");

    AzureBlobFileSystem fs = getFileSystem();
    Path nullStatFilePath = path(getMethodName());
    byte[] oneKbBuff = new byte[ONE_KB];

    // Creating an AbfsInputStreamContext instance with null StreamStatistics.
    AbfsInputStreamContext abfsInputStreamContext =
        new AbfsInputStreamContext(
            getConfiguration().getSasTokenRenewPeriodForStreamsInSeconds())
            .withReadBufferSize(getConfiguration().getReadBufferSize())
            .withReadAheadQueueDepth(getConfiguration().getReadAheadQueueDepth())
            .withStreamStatistics(null)
            .withReadAheadRange(getConfiguration().getReadAheadRange())
            .build();

    AbfsOutputStream out = null;
    AbfsInputStream in = null;

    try {
      out = createAbfsOutputStreamWithFlushEnabled(fs, nullStatFilePath);

      // Writing a 1KB buffer in the file.
      out.write(oneKbBuff);
      out.hflush();

      // AbfsRestOperation Instance required for eTag.
      AbfsRestOperation abfsRestOperation = fs.getAbfsClient()
          .getPathStatus(nullStatFilePath.toUri().getPath(), false,
              getTestTracingContext(fs, false), null);

      // AbfsInputStream with no StreamStatistics.
      in = new AbfsInputStream(fs.getAbfsClient(), null,
          nullStatFilePath.toUri().getPath(), ONE_KB, abfsInputStreamContext,
          abfsRestOperation.getResult().getResponseHeader("ETag"),
          getTestTracingContext(fs, false));

      // Verifying that AbfsInputStream Operations works with null statistics.
      assertNotEquals("AbfsInputStream read() with null statistics should "
          + "work", -1, in.read());
      in.seek(ONE_KB);

      // Verifying toString() with no StreamStatistics.
      LOG.info("AbfsInputStream: {}", in.toString());
    } finally {
      IOUtils.cleanupWithLogger(LOG, out, in);
    }
  }

  /**
   * Testing readAhead counters in AbfsInputStream with 30 seconds timeout.
   */
  @Test
  public void testReadAheadCounters() throws IOException {
    describe("Test to check correct values for readAhead counters in "
        + "AbfsInputStream");

    AzureBlobFileSystem fs = getFileSystem();
    AzureBlobFileSystemStore abfss = fs.getAbfsStore();
    Path readAheadCountersPath = path(getMethodName());

    /*
     * Setting the block size for readAhead as 4KB.
     */
    abfss.getAbfsConfiguration().setReadBufferSize(CUSTOM_BLOCK_BUFFER_SIZE);

    AbfsOutputStream out = null;
    AbfsInputStream in = null;

    try {

      /*
       * Creating a file of 1MB size.
       */
      out = createAbfsOutputStreamWithFlushEnabled(fs, readAheadCountersPath);
      out.write(defBuffer);
      out.close();

      in = abfss.openFileForRead(readAheadCountersPath, fs.getFsStatistics(),
          getTestTracingContext(fs, false));

      /*
       * Reading 1KB after each i * KB positions. Hence the reads are from 0
       * to 1KB, 1KB to 2KB, and so on.. for 5 operations.
       */
      for (int i = 0; i < 5; i++) {
        in.seek(ONE_KB * i);
        in.read(defBuffer, ONE_KB * i, ONE_KB);
      }
      AbfsInputStreamStatisticsImpl stats =
          (AbfsInputStreamStatisticsImpl) in.getStreamStatistics();

      /*
       * Verifying the counter values of readAheadBytesRead and remoteBytesRead.
       *
       * readAheadBytesRead : Since, we read 1KBs 5 times, that means we go
       * from 0 to 5KB in the file. The bufferSize is set to 4KB, and since
       * we have 8 blocks of readAhead buffer. We would have 8 blocks of 4KB
       * buffer. Our read is till 5KB, hence readAhead would ideally read 2
       * blocks of 4KB which is equal to 8KB. But, sometimes to get blocks
       * from readAhead buffer we might have to wait for background
       * threads to fill the buffer and hence we might do remote read which
       * would be faster. Therefore, readAheadBytesRead would be greater than
       * or equal to the value of bytesFromReadAhead at the point we measure it.
       *
       * remoteBytesRead : Since, the bufferSize is set to 4KB and the number
       * of blocks or readAheadQueueDepth is equal to 8. We would read 8 * 4
       * KB buffer on the first read, which is equal to 32KB. But, if we are not
       * able to read some bytes that were in the buffer after doing
       * readAhead, we might use remote read again. Thus, the bytes read
       * remotely would be greater than or equal to the bytesFromRemoteRead
       * value that we measure at some point of the operation.
       *
       */
      Assertions.assertThat(stats.getReadAheadBytesRead()).describedAs(
          "Mismatch in readAheadBytesRead counter value")
          .isGreaterThanOrEqualTo(in.getBytesFromReadAhead());

      Assertions.assertThat(stats.getRemoteBytesRead()).describedAs(
          "Mismatch in remoteBytesRead counter value")
          .isGreaterThanOrEqualTo(in.getBytesFromRemoteRead());

    } finally {
      IOUtils.cleanupWithLogger(LOG, out, in);
    }
  }

  /**
   * Testing time taken by AbfsInputStream to complete a GET request.
   */
  @Test
  public void testActionHttpGetRequest() throws IOException {
    describe("Test to check the correct value of Time taken by http get "
        + "request in AbfsInputStream");
    AzureBlobFileSystem fs = getFileSystem();
    AzureBlobFileSystemStore abfss = fs.getAbfsStore();
    Path actionHttpGetRequestPath = path(getMethodName());
    AbfsInputStream abfsInputStream = null;
    AbfsOutputStream abfsOutputStream = null;
    try {
      abfsOutputStream = createAbfsOutputStreamWithFlushEnabled(fs,
          actionHttpGetRequestPath);
      abfsOutputStream.write('a');
      abfsOutputStream.hflush();

      abfsInputStream =
          abfss.openFileForRead(actionHttpGetRequestPath,
              fs.getFsStatistics(), getTestTracingContext(fs, false));
      abfsInputStream.read();
      IOStatistics ioStatistics = extractStatistics(fs);
      LOG.info("AbfsInputStreamStats info: {}",
          ioStatisticsToPrettyString(ioStatistics));
      Assertions.assertThat(
          lookupMeanStatistic(ioStatistics,
              AbfsStatistic.HTTP_GET_REQUEST.getStatName()
                  + StoreStatisticNames.SUFFIX_MEAN).mean())
          .describedAs("Mismatch in time taken by a GET request")
          .isGreaterThan(0.0);
    } finally {
      IOUtils.cleanupWithLogger(LOG, abfsInputStream, abfsOutputStream);
    }
  }

  /**
   * Method to assert the initial values of the statistics.
   *
   * @param actualValue the actual value of the statistics.
   * @param statistic   the name of operation or statistic being asserted.
   */
  private void checkInitValue(long actualValue, String statistic) {
    assertEquals("Mismatch in " + statistic + " value", 0, actualValue);
  }
}