ITestBlockBlobInputStream.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.fs.azure;

import java.io.EOFException;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.EnumSet;
import java.util.Random;
import java.util.concurrent.Callable;

import org.junit.FixMethodOrder;
import org.junit.Test;
import org.junit.runners.MethodSorters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FutureDataInputStreamBuilder;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azure.integration.AbstractAzureScaleTest;
import org.apache.hadoop.fs.azure.integration.AzureTestUtils;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.contract.ContractTestUtils.NanoTimer;

import static org.junit.Assume.assumeNotNull;

import static org.apache.hadoop.test.LambdaTestUtils.*;

/**
 * Test semantics and performance of the original block blob input stream
 * (KEY_INPUT_STREAM_VERSION=1) and the new
 * <code>BlockBlobInputStream</code> (KEY_INPUT_STREAM_VERSION=2).
 */
@FixMethodOrder(MethodSorters.NAME_ASCENDING)

public class ITestBlockBlobInputStream extends AbstractAzureScaleTest {
  private static final Logger LOG = LoggerFactory.getLogger(
      ITestBlockBlobInputStream.class);
  private static final int KILOBYTE = 1024;
  private static final int MEGABYTE = KILOBYTE * KILOBYTE;
  private static final int TEST_FILE_SIZE = 6 * MEGABYTE;
  private static final Path TEST_FILE_PATH = new Path(
      "TestBlockBlobInputStream.txt");

  private AzureBlobStorageTestAccount accountUsingInputStreamV1;
  private AzureBlobStorageTestAccount accountUsingInputStreamV2;
  private long testFileLength;



  private FileStatus testFileStatus;
  private Path hugefile;

  @Override
  public void setUp() throws Exception {
    super.setUp();
    Configuration conf = new Configuration();
    conf.setInt(AzureNativeFileSystemStore.KEY_INPUT_STREAM_VERSION, 1);

    accountUsingInputStreamV1 = AzureBlobStorageTestAccount.create(
        "testblockblobinputstream",
        EnumSet.of(AzureBlobStorageTestAccount.CreateOptions.CreateContainer),
        conf,
        true);

    accountUsingInputStreamV2 = AzureBlobStorageTestAccount.create(
        "testblockblobinputstream",
        EnumSet.noneOf(AzureBlobStorageTestAccount.CreateOptions.class),
        null,
        true);

    assumeNotNull(accountUsingInputStreamV1);
    assumeNotNull(accountUsingInputStreamV2);
    hugefile = fs.makeQualified(TEST_FILE_PATH);
    try {
      testFileStatus = fs.getFileStatus(TEST_FILE_PATH);
      testFileLength = testFileStatus.getLen();
    } catch (FileNotFoundException e) {
      // file doesn't exist
      testFileLength = 0;
    }
  }

  @Override
  protected AzureBlobStorageTestAccount createTestAccount() throws Exception {
    Configuration conf = new Configuration();
    conf.setInt(AzureNativeFileSystemStore.KEY_INPUT_STREAM_VERSION, 1);

    accountUsingInputStreamV1 = AzureBlobStorageTestAccount.create(
        "testblockblobinputstream",
        EnumSet.of(AzureBlobStorageTestAccount.CreateOptions.CreateContainer),
        conf,
        true);

    accountUsingInputStreamV2 = AzureBlobStorageTestAccount.create(
        "testblockblobinputstream",
        EnumSet.noneOf(AzureBlobStorageTestAccount.CreateOptions.class),
        null,
        true);

    assumeNotNull(accountUsingInputStreamV1);
    assumeNotNull(accountUsingInputStreamV2);
    return accountUsingInputStreamV1;
  }

  /**
   * Create a test file by repeating the characters in the alphabet.
   * @throws IOException
   */
  private void createTestFileAndSetLength() throws IOException {
    FileSystem fs = accountUsingInputStreamV1.getFileSystem();

    // To reduce test run time, the test file can be reused.
    if (fs.exists(TEST_FILE_PATH)) {
      testFileStatus = fs.getFileStatus(TEST_FILE_PATH);
      testFileLength = testFileStatus.getLen();
      LOG.info("Reusing test file: {}", testFileStatus);
      return;
    }

    int sizeOfAlphabet = ('z' - 'a' + 1);
    byte[] buffer = new byte[26 * KILOBYTE];
    char character = 'a';
    for (int i = 0; i < buffer.length; i++) {
      buffer[i] = (byte) character;
      character = (character == 'z') ? 'a' : (char) ((int) character + 1);
    }

    LOG.info("Creating test file {} of size: {}", TEST_FILE_PATH,
        TEST_FILE_SIZE);
    ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer();

    try(FSDataOutputStream outputStream = fs.create(TEST_FILE_PATH)) {
      int bytesWritten = 0;
      while (bytesWritten < TEST_FILE_SIZE) {
        outputStream.write(buffer);
        bytesWritten += buffer.length;
      }
      LOG.info("Closing stream {}", outputStream);
      ContractTestUtils.NanoTimer closeTimer
          = new ContractTestUtils.NanoTimer();
      outputStream.close();
      closeTimer.end("time to close() output stream");
    }
    timer.end("time to write %d KB", TEST_FILE_SIZE / 1024);
    testFileLength = fs.getFileStatus(TEST_FILE_PATH).getLen();
  }

  void assumeHugeFileExists() throws IOException {
    ContractTestUtils.assertPathExists(fs, "huge file not created", hugefile);
    FileStatus status = fs.getFileStatus(hugefile);
    ContractTestUtils.assertIsFile(hugefile, status);
    assertTrue("File " + hugefile + " is empty", status.getLen() > 0);
  }

  /**
   * Calculate megabits per second from the specified values for bytes and
   * milliseconds.
   * @param bytes The number of bytes.
   * @param milliseconds The number of milliseconds.
   * @return The number of megabits per second.
   */
  private static double toMbps(long bytes, long milliseconds) {
    return bytes / 1000.0 * 8 / milliseconds;
  }

  @Test
  public void test_0100_CreateHugeFile() throws IOException {
    createTestFileAndSetLength();
  }

  @Test
  public void test_0200_BasicReadTest() throws Exception {
    assumeHugeFileExists();

    try (
        FSDataInputStream inputStreamV1
            = accountUsingInputStreamV1.getFileSystem().open(TEST_FILE_PATH);

        FSDataInputStream inputStreamV2
            = accountUsingInputStreamV2.getFileSystem().open(TEST_FILE_PATH);
    ) {
      byte[] bufferV1 = new byte[3 * MEGABYTE];
      byte[] bufferV2 = new byte[bufferV1.length];

      // v1 forward seek and read a kilobyte into first kilobyte of bufferV1
      inputStreamV1.seek(5 * MEGABYTE);
      int numBytesReadV1 = inputStreamV1.read(bufferV1, 0, KILOBYTE);
      assertEquals(KILOBYTE, numBytesReadV1);

      // v2 forward seek and read a kilobyte into first kilobyte of bufferV2
      inputStreamV2.seek(5 * MEGABYTE);
      int numBytesReadV2 = inputStreamV2.read(bufferV2, 0, KILOBYTE);
      assertEquals(KILOBYTE, numBytesReadV2);

      assertArrayEquals(bufferV1, bufferV2);

      int len = MEGABYTE;
      int offset = bufferV1.length - len;

      // v1 reverse seek and read a megabyte into last megabyte of bufferV1
      inputStreamV1.seek(3 * MEGABYTE);
      numBytesReadV1 = inputStreamV1.read(bufferV1, offset, len);
      assertEquals(len, numBytesReadV1);

      // v2 reverse seek and read a megabyte into last megabyte of bufferV2
      inputStreamV2.seek(3 * MEGABYTE);
      numBytesReadV2 = inputStreamV2.read(bufferV2, offset, len);
      assertEquals(len, numBytesReadV2);

      assertArrayEquals(bufferV1, bufferV2);
    }
  }

  @Test
  public void test_0201_RandomReadTest() throws Exception {
    assumeHugeFileExists();

    try (
        FSDataInputStream inputStreamV1
            = accountUsingInputStreamV1.getFileSystem().open(TEST_FILE_PATH);

        FSDataInputStream inputStreamV2
            = accountUsingInputStreamV2.getFileSystem().open(TEST_FILE_PATH);
    ) {
      final int bufferSize = 4 * KILOBYTE;
      byte[] bufferV1 = new byte[bufferSize];
      byte[] bufferV2 = new byte[bufferV1.length];

      verifyConsistentReads(inputStreamV1, inputStreamV2, bufferV1, bufferV2);

      inputStreamV1.seek(0);
      inputStreamV2.seek(0);

      verifyConsistentReads(inputStreamV1, inputStreamV2, bufferV1, bufferV2);

      verifyConsistentReads(inputStreamV1, inputStreamV2, bufferV1, bufferV2);

      int seekPosition = 2 * KILOBYTE;
      inputStreamV1.seek(seekPosition);
      inputStreamV2.seek(seekPosition);

      inputStreamV1.seek(0);
      inputStreamV2.seek(0);

      verifyConsistentReads(inputStreamV1, inputStreamV2, bufferV1, bufferV2);

      verifyConsistentReads(inputStreamV1, inputStreamV2, bufferV1, bufferV2);

      verifyConsistentReads(inputStreamV1, inputStreamV2, bufferV1, bufferV2);

      seekPosition = 5 * KILOBYTE;
      inputStreamV1.seek(seekPosition);
      inputStreamV2.seek(seekPosition);

      verifyConsistentReads(inputStreamV1, inputStreamV2, bufferV1, bufferV2);

      seekPosition = 10 * KILOBYTE;
      inputStreamV1.seek(seekPosition);
      inputStreamV2.seek(seekPosition);

      verifyConsistentReads(inputStreamV1, inputStreamV2, bufferV1, bufferV2);

      verifyConsistentReads(inputStreamV1, inputStreamV2, bufferV1, bufferV2);

      seekPosition = 4100 * KILOBYTE;
      inputStreamV1.seek(seekPosition);
      inputStreamV2.seek(seekPosition);

      verifyConsistentReads(inputStreamV1, inputStreamV2, bufferV1, bufferV2);
    }
  }

  private void verifyConsistentReads(FSDataInputStream inputStreamV1,
      FSDataInputStream inputStreamV2,
      byte[] bufferV1,
      byte[] bufferV2) throws IOException {
    int size = bufferV1.length;
    final int numBytesReadV1 = inputStreamV1.read(bufferV1, 0, size);
    assertEquals("Bytes read from V1 stream", size, numBytesReadV1);

    final int numBytesReadV2 = inputStreamV2.read(bufferV2, 0, size);
    assertEquals("Bytes read from V2 stream", size, numBytesReadV2);

    assertArrayEquals("Mismatch in read data", bufferV1, bufferV2);
  }

  @Test
  public void test_202_PosReadTest() throws Exception {
    assumeHugeFileExists();
    FutureDataInputStreamBuilder builder = accountUsingInputStreamV2
        .getFileSystem().openFile(TEST_FILE_PATH);
    builder.opt(AzureNativeFileSystemStore.FS_AZURE_BLOCK_BLOB_BUFFERED_PREAD_DISABLE, true);
    try (
        FSDataInputStream inputStreamV1
            = accountUsingInputStreamV1.getFileSystem().open(TEST_FILE_PATH);
        FSDataInputStream inputStreamV2
            = accountUsingInputStreamV2.getFileSystem().open(TEST_FILE_PATH);
        FSDataInputStream inputStreamV2NoBuffer = builder.build().get();
    ) {
      final int bufferSize = 4 * KILOBYTE;
      byte[] bufferV1 = new byte[bufferSize];
      byte[] bufferV2 = new byte[bufferSize];
      byte[] bufferV2NoBuffer = new byte[bufferSize];

      verifyConsistentReads(inputStreamV1, inputStreamV2, inputStreamV2NoBuffer, 0,
          bufferV1, bufferV2, bufferV2NoBuffer);

      int pos = 2 * KILOBYTE;
      verifyConsistentReads(inputStreamV1, inputStreamV2, inputStreamV2NoBuffer, pos,
          bufferV1, bufferV2, bufferV2NoBuffer);

      pos = 10 * KILOBYTE;
      verifyConsistentReads(inputStreamV1, inputStreamV2, inputStreamV2NoBuffer, pos,
          bufferV1, bufferV2, bufferV2NoBuffer);

      pos = 4100 * KILOBYTE;
      verifyConsistentReads(inputStreamV1, inputStreamV2, inputStreamV2NoBuffer, pos,
          bufferV1, bufferV2, bufferV2NoBuffer);
    }
  }

  private void verifyConsistentReads(FSDataInputStream inputStreamV1,
      FSDataInputStream inputStreamV2, FSDataInputStream inputStreamV2NoBuffer,
      int pos, byte[] bufferV1, byte[] bufferV2, byte[] bufferV2NoBuffer)
      throws IOException {
    int size = bufferV1.length;
    int numBytesReadV1 = inputStreamV1.read(pos, bufferV1, 0, size);
    assertEquals("Bytes read from V1 stream", size, numBytesReadV1);

    int numBytesReadV2 = inputStreamV2.read(pos, bufferV2, 0, size);
    assertEquals("Bytes read from V2 stream", size, numBytesReadV2);

    int numBytesReadV2NoBuffer = inputStreamV2NoBuffer.read(pos,
        bufferV2NoBuffer, 0, size);
    assertEquals("Bytes read from V2 stream (buffered pread disabled)", size,
        numBytesReadV2NoBuffer);

    assertArrayEquals("Mismatch in read data", bufferV1, bufferV2);
    assertArrayEquals("Mismatch in read data", bufferV2, bufferV2NoBuffer);
  }

  /**
   * Validates the implementation of InputStream.markSupported.
   * @throws IOException
   */
  @Test
  public void test_0301_MarkSupportedV1() throws IOException {
    validateMarkSupported(accountUsingInputStreamV1.getFileSystem());
  }

  /**
   * Validates the implementation of InputStream.markSupported.
   * @throws IOException
   */
  @Test
  public void test_0302_MarkSupportedV2() throws IOException {
    validateMarkSupported(accountUsingInputStreamV1.getFileSystem());
  }

  private void validateMarkSupported(FileSystem fs) throws IOException {
    assumeHugeFileExists();
    try (FSDataInputStream inputStream = fs.open(TEST_FILE_PATH)) {
      assertTrue("mark is not supported", inputStream.markSupported());
    }
  }

  /**
   * Validates the implementation of InputStream.mark and reset
   * for version 1 of the block blob input stream.
   * @throws Exception
   */
  @Test
  public void test_0303_MarkAndResetV1() throws Exception {
    validateMarkAndReset(accountUsingInputStreamV1.getFileSystem());
  }

  /**
   * Validates the implementation of InputStream.mark and reset
   * for version 2 of the block blob input stream.
   * @throws Exception
   */
  @Test
  public void test_0304_MarkAndResetV2() throws Exception {
    validateMarkAndReset(accountUsingInputStreamV2.getFileSystem());
  }

  private void validateMarkAndReset(FileSystem fs) throws Exception {
    assumeHugeFileExists();
    try (FSDataInputStream inputStream = fs.open(TEST_FILE_PATH)) {
      inputStream.mark(KILOBYTE - 1);

      byte[] buffer = new byte[KILOBYTE];
      int bytesRead = inputStream.read(buffer);
      assertEquals(buffer.length, bytesRead);

      inputStream.reset();
      assertEquals("rest -> pos 0", 0, inputStream.getPos());

      inputStream.mark(8 * KILOBYTE - 1);

      buffer = new byte[8 * KILOBYTE];
      bytesRead = inputStream.read(buffer);
      assertEquals(buffer.length, bytesRead);

      intercept(IOException.class,
          "Resetting to invalid mark",
          new Callable<FSDataInputStream>() {
            @Override
            public FSDataInputStream call() throws Exception {
              inputStream.reset();
              return inputStream;
            }
          }
      );
    }
  }

  /**
   * Validates the implementation of Seekable.seekToNewSource, which should
   * return false for version 1 of the block blob input stream.
   * @throws IOException
   */
  @Test
  public void test_0305_SeekToNewSourceV1() throws IOException {
    validateSeekToNewSource(accountUsingInputStreamV1.getFileSystem());
  }

  /**
   * Validates the implementation of Seekable.seekToNewSource, which should
   * return false for version 2 of the block blob input stream.
   * @throws IOException
   */
  @Test
  public void test_0306_SeekToNewSourceV2() throws IOException {
    validateSeekToNewSource(accountUsingInputStreamV2.getFileSystem());
  }

  private void validateSeekToNewSource(FileSystem fs) throws IOException {
    assumeHugeFileExists();
    try (FSDataInputStream inputStream = fs.open(TEST_FILE_PATH)) {
      assertFalse(inputStream.seekToNewSource(0));
    }
  }

  /**
   * Validates the implementation of InputStream.skip and ensures there is no
   * network I/O for version 1 of the block blob input stream.
   * @throws Exception
   */
  @Test
  public void test_0307_SkipBoundsV1() throws Exception {
    validateSkipBounds(accountUsingInputStreamV1.getFileSystem());
  }

  /**
   * Validates the implementation of InputStream.skip and ensures there is no
   * network I/O for version 2 of the block blob input stream.
   * @throws Exception
   */
  @Test
  public void test_0308_SkipBoundsV2() throws Exception {
    validateSkipBounds(accountUsingInputStreamV2.getFileSystem());
  }

  private void validateSkipBounds(FileSystem fs) throws Exception {
    assumeHugeFileExists();
    try (FSDataInputStream inputStream = fs.open(TEST_FILE_PATH)) {
      NanoTimer timer = new NanoTimer();

      long skipped = inputStream.skip(-1);
      assertEquals(0, skipped);

      skipped = inputStream.skip(0);
      assertEquals(0, skipped);

      assertTrue(testFileLength > 0);

      skipped = inputStream.skip(testFileLength);
      assertEquals(testFileLength, skipped);

      intercept(EOFException.class,
          new Callable<Long>() {
            @Override
            public Long call() throws Exception {
              return inputStream.skip(1);
            }
          }
      );
      long elapsedTimeMs = timer.elapsedTimeMs();
      assertTrue(
          String.format(
              "There should not be any network I/O (elapsedTimeMs=%1$d).",
              elapsedTimeMs),
          elapsedTimeMs < 20);
    }
  }

  /**
   * Validates the implementation of Seekable.seek and ensures there is no
   * network I/O for forward seek.
   * @throws Exception
   */
  @Test
  public void test_0309_SeekBoundsV1() throws Exception {
    validateSeekBounds(accountUsingInputStreamV1.getFileSystem());
  }

  /**
   * Validates the implementation of Seekable.seek and ensures there is no
   * network I/O for forward seek.
   * @throws Exception
   */
  @Test
  public void test_0310_SeekBoundsV2() throws Exception {
    validateSeekBounds(accountUsingInputStreamV2.getFileSystem());
  }

  private void validateSeekBounds(FileSystem fs) throws Exception {
    assumeHugeFileExists();
    try (
        FSDataInputStream inputStream = fs.open(TEST_FILE_PATH);
    ) {
      NanoTimer timer = new NanoTimer();

      inputStream.seek(0);
      assertEquals(0, inputStream.getPos());

      intercept(EOFException.class,
          FSExceptionMessages.NEGATIVE_SEEK,
          new Callable<FSDataInputStream>() {
            @Override
            public FSDataInputStream call() throws Exception {
              inputStream.seek(-1);
              return inputStream;
            }
          }
      );

      assertTrue("Test file length only " + testFileLength, testFileLength > 0);
      inputStream.seek(testFileLength);
      assertEquals(testFileLength, inputStream.getPos());

      intercept(EOFException.class,
          FSExceptionMessages.CANNOT_SEEK_PAST_EOF,
          new Callable<FSDataInputStream>() {
            @Override
            public FSDataInputStream call() throws Exception {
              inputStream.seek(testFileLength + 1);
              return inputStream;
            }
          }
      );

      long elapsedTimeMs = timer.elapsedTimeMs();
      assertTrue(
          String.format(
              "There should not be any network I/O (elapsedTimeMs=%1$d).",
              elapsedTimeMs),
          elapsedTimeMs < 20);
    }
  }

  /**
   * Validates the implementation of Seekable.seek, Seekable.getPos,
   * and InputStream.available.
   * @throws Exception
   */
  @Test
  public void test_0311_SeekAndAvailableAndPositionV1() throws Exception {
    validateSeekAndAvailableAndPosition(
        accountUsingInputStreamV1.getFileSystem());
  }

  /**
   * Validates the implementation of Seekable.seek, Seekable.getPos,
   * and InputStream.available.
   * @throws Exception
   */
  @Test
  public void test_0312_SeekAndAvailableAndPositionV2() throws Exception {
    validateSeekAndAvailableAndPosition(
        accountUsingInputStreamV2.getFileSystem());
  }

  private void validateSeekAndAvailableAndPosition(FileSystem fs)
      throws Exception {
    assumeHugeFileExists();
    try (FSDataInputStream inputStream = fs.open(TEST_FILE_PATH)) {
      byte[] expected1 = {(byte) 'a', (byte) 'b', (byte) 'c'};
      byte[] expected2 = {(byte) 'd', (byte) 'e', (byte) 'f'};
      byte[] expected3 = {(byte) 'b', (byte) 'c', (byte) 'd'};
      byte[] expected4 = {(byte) 'g', (byte) 'h', (byte) 'i'};
      byte[] buffer = new byte[3];

      int bytesRead = inputStream.read(buffer);
      assertEquals(buffer.length, bytesRead);
      assertArrayEquals(expected1, buffer);
      assertEquals(buffer.length, inputStream.getPos());
      assertEquals(testFileLength - inputStream.getPos(),
          inputStream.available());

      bytesRead = inputStream.read(buffer);
      assertEquals(buffer.length, bytesRead);
      assertArrayEquals(expected2, buffer);
      assertEquals(2 * buffer.length, inputStream.getPos());
      assertEquals(testFileLength - inputStream.getPos(),
          inputStream.available());

      // reverse seek
      int seekPos = 0;
      inputStream.seek(seekPos);

      bytesRead = inputStream.read(buffer);
      assertEquals(buffer.length, bytesRead);
      assertArrayEquals(expected1, buffer);
      assertEquals(buffer.length + seekPos, inputStream.getPos());
      assertEquals(testFileLength - inputStream.getPos(),
          inputStream.available());

      // reverse seek
      seekPos = 1;
      inputStream.seek(seekPos);

      bytesRead = inputStream.read(buffer);
      assertEquals(buffer.length, bytesRead);
      assertArrayEquals(expected3, buffer);
      assertEquals(buffer.length + seekPos, inputStream.getPos());
      assertEquals(testFileLength - inputStream.getPos(),
          inputStream.available());

      // forward seek
      seekPos = 6;
      inputStream.seek(seekPos);

      bytesRead = inputStream.read(buffer);
      assertEquals(buffer.length, bytesRead);
      assertArrayEquals(expected4, buffer);
      assertEquals(buffer.length + seekPos, inputStream.getPos());
      assertEquals(testFileLength - inputStream.getPos(),
          inputStream.available());
    }
  }

  /**
   * Validates the implementation of InputStream.skip, Seekable.getPos,
   * and InputStream.available.
   * @throws IOException
   */
  @Test
  public void test_0313_SkipAndAvailableAndPositionV1() throws IOException {
    validateSkipAndAvailableAndPosition(
        accountUsingInputStreamV1.getFileSystem());
  }

  /**
   * Validates the implementation of InputStream.skip, Seekable.getPos,
   * and InputStream.available.
   * @throws IOException
   */
  @Test
  public void test_0314_SkipAndAvailableAndPositionV2() throws IOException {
    validateSkipAndAvailableAndPosition(
        accountUsingInputStreamV1.getFileSystem());
  }

  private void validateSkipAndAvailableAndPosition(FileSystem fs)
      throws IOException {
    assumeHugeFileExists();
    try (
        FSDataInputStream inputStream = fs.open(TEST_FILE_PATH);
    ) {
      byte[] expected1 = {(byte) 'a', (byte) 'b', (byte) 'c'};
      byte[] expected2 = {(byte) 'd', (byte) 'e', (byte) 'f'};
      byte[] expected3 = {(byte) 'b', (byte) 'c', (byte) 'd'};
      byte[] expected4 = {(byte) 'g', (byte) 'h', (byte) 'i'};

      assertEquals(testFileLength, inputStream.available());
      assertEquals(0, inputStream.getPos());

      int n = 3;
      long skipped = inputStream.skip(n);

      assertEquals(skipped, inputStream.getPos());
      assertEquals(testFileLength - inputStream.getPos(),
          inputStream.available());
      assertEquals(skipped, n);

      byte[] buffer = new byte[3];
      int bytesRead = inputStream.read(buffer);
      assertEquals(buffer.length, bytesRead);
      assertArrayEquals(expected2, buffer);
      assertEquals(buffer.length + skipped, inputStream.getPos());
      assertEquals(testFileLength - inputStream.getPos(),
          inputStream.available());

      // does skip still work after seek?
      int seekPos = 1;
      inputStream.seek(seekPos);

      bytesRead = inputStream.read(buffer);
      assertEquals(buffer.length, bytesRead);
      assertArrayEquals(expected3, buffer);
      assertEquals(buffer.length + seekPos, inputStream.getPos());
      assertEquals(testFileLength - inputStream.getPos(),
          inputStream.available());

      long currentPosition = inputStream.getPos();
      n = 2;
      skipped = inputStream.skip(n);

      assertEquals(currentPosition + skipped, inputStream.getPos());
      assertEquals(testFileLength - inputStream.getPos(),
          inputStream.available());
      assertEquals(skipped, n);

      bytesRead = inputStream.read(buffer);
      assertEquals(buffer.length, bytesRead);
      assertArrayEquals(expected4, buffer);
      assertEquals(buffer.length + skipped + currentPosition,
          inputStream.getPos());
      assertEquals(testFileLength - inputStream.getPos(),
          inputStream.available());
    }
  }

  /**
   * Ensures parity in the performance of sequential read for
   * version 1 and version 2 of the block blob input stream.
   * @throws IOException
   */
  @Test
  public void test_0315_SequentialReadPerformance() throws IOException {
    assumeHugeFileExists();
    final int maxAttempts = 10;
    final double maxAcceptableRatio = 1.01;
    double v1ElapsedMs = 0, v2ElapsedMs = 0;
    double ratio = Double.MAX_VALUE;
    for (int i = 0; i < maxAttempts && ratio >= maxAcceptableRatio; i++) {
      v1ElapsedMs = sequentialRead(1,
          accountUsingInputStreamV1.getFileSystem(), false);
      v2ElapsedMs = sequentialRead(2,
          accountUsingInputStreamV2.getFileSystem(), false);
      ratio = v2ElapsedMs / v1ElapsedMs;
      LOG.info(String.format(
          "v1ElapsedMs=%1$d, v2ElapsedMs=%2$d, ratio=%3$.2f",
          (long) v1ElapsedMs,
          (long) v2ElapsedMs,
          ratio));
    }
    assertTrue(String.format(
        "Performance of version 2 is not acceptable: v1ElapsedMs=%1$d,"
            + " v2ElapsedMs=%2$d, ratio=%3$.2f",
        (long) v1ElapsedMs,
        (long) v2ElapsedMs,
        ratio),
        ratio < maxAcceptableRatio);
  }

  /**
   * Ensures parity in the performance of sequential read after reverse seek for
   * version 2 of the block blob input stream.
   * @throws IOException
   */
  @Test
  public void test_0316_SequentialReadAfterReverseSeekPerformanceV2()
      throws IOException {
    assumeHugeFileExists();
    final int maxAttempts = 10;
    final double maxAcceptableRatio = 1.01;
    double beforeSeekElapsedMs = 0, afterSeekElapsedMs = 0;
    double ratio = Double.MAX_VALUE;
    for (int i = 0; i < maxAttempts && ratio >= maxAcceptableRatio; i++) {
      beforeSeekElapsedMs = sequentialRead(2,
          accountUsingInputStreamV2.getFileSystem(), false);
      afterSeekElapsedMs = sequentialRead(2,
          accountUsingInputStreamV2.getFileSystem(), true);
      ratio = afterSeekElapsedMs / beforeSeekElapsedMs;
      LOG.info(String.format(
          "beforeSeekElapsedMs=%1$d, afterSeekElapsedMs=%2$d, ratio=%3$.2f",
          (long) beforeSeekElapsedMs,
          (long) afterSeekElapsedMs,
          ratio));
    }
    assertTrue(String.format(
        "Performance of version 2 after reverse seek is not acceptable:"
            + " beforeSeekElapsedMs=%1$d, afterSeekElapsedMs=%2$d,"
            + " ratio=%3$.2f",
        (long) beforeSeekElapsedMs,
        (long) afterSeekElapsedMs,
        ratio),
        ratio < maxAcceptableRatio);
  }

  private long sequentialRead(int version,
      FileSystem fs,
      boolean afterReverseSeek) throws IOException {
    byte[] buffer = new byte[16 * KILOBYTE];
    long totalBytesRead = 0;
    long bytesRead = 0;

    try(FSDataInputStream inputStream = fs.open(TEST_FILE_PATH)) {
      if (afterReverseSeek) {
        while (bytesRead > 0 && totalBytesRead < 4 * MEGABYTE) {
          bytesRead = inputStream.read(buffer);
          totalBytesRead += bytesRead;
        }
        totalBytesRead = 0;
        inputStream.seek(0);
      }

      NanoTimer timer = new NanoTimer();
      while ((bytesRead = inputStream.read(buffer)) > 0) {
        totalBytesRead += bytesRead;
      }
      long elapsedTimeMs = timer.elapsedTimeMs();

      LOG.info(String.format(
          "v%1$d: bytesRead=%2$d, elapsedMs=%3$d, Mbps=%4$.2f,"
              + " afterReverseSeek=%5$s",
          version,
          totalBytesRead,
          elapsedTimeMs,
          toMbps(totalBytesRead, elapsedTimeMs),
          afterReverseSeek));

      assertEquals(testFileLength, totalBytesRead);
      inputStream.close();
      return elapsedTimeMs;
    }
  }

  @Test
  public void test_0317_RandomReadPerformance() throws IOException {
    assumeHugeFileExists();
    final int maxAttempts = 10;
    final double maxAcceptableRatio = 0.10;
    double v1ElapsedMs = 0, v2ElapsedMs = 0;
    double ratio = Double.MAX_VALUE;
    for (int i = 0; i < maxAttempts && ratio >= maxAcceptableRatio; i++) {
      v1ElapsedMs = randomRead(1,
          accountUsingInputStreamV1.getFileSystem());
      v2ElapsedMs = randomRead(2,
          accountUsingInputStreamV2.getFileSystem());
      ratio = v2ElapsedMs / v1ElapsedMs;
      LOG.info(String.format(
          "v1ElapsedMs=%1$d, v2ElapsedMs=%2$d, ratio=%3$.2f",
          (long) v1ElapsedMs,
          (long) v2ElapsedMs,
          ratio));
    }
    assertTrue(String.format(
        "Performance of version 2 is not acceptable: v1ElapsedMs=%1$d,"
            + " v2ElapsedMs=%2$d, ratio=%3$.2f",
        (long) v1ElapsedMs,
        (long) v2ElapsedMs,
        ratio),
        ratio < maxAcceptableRatio);
  }

  private long randomRead(int version, FileSystem fs) throws IOException {
    assumeHugeFileExists();
    final int minBytesToRead = 2 * MEGABYTE;
    Random random = new Random();
    byte[] buffer = new byte[8 * KILOBYTE];
    long totalBytesRead = 0;
    long bytesRead = 0;
    try(FSDataInputStream inputStream = fs.open(TEST_FILE_PATH)) {
      NanoTimer timer = new NanoTimer();

      do {
        bytesRead = inputStream.read(buffer);
        totalBytesRead += bytesRead;
        inputStream.seek(random.nextInt(
            (int) (testFileLength - buffer.length)));
      } while (bytesRead > 0 && totalBytesRead < minBytesToRead);

      long elapsedTimeMs = timer.elapsedTimeMs();

      inputStream.close();

      LOG.info(String.format(
          "v%1$d: totalBytesRead=%2$d, elapsedTimeMs=%3$d, Mbps=%4$.2f",
          version,
          totalBytesRead,
          elapsedTimeMs,
          toMbps(totalBytesRead, elapsedTimeMs)));

      assertTrue(minBytesToRead <= totalBytesRead);

      return elapsedTimeMs;
    }
  }

  @Test
  public void test_999_DeleteHugeFiles() throws IOException {
    try {
      NanoTimer timer = new NanoTimer();
      NativeAzureFileSystem fs = getFileSystem();
      fs.delete(TEST_FILE_PATH, false);
      timer.end("time to delete %s", TEST_FILE_PATH);
    } finally {
      // clean up the test account
      AzureTestUtils.cleanupTestAccount(accountUsingInputStreamV1);
    }
  }

}