AbstractSTestS3AHugeFiles.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.fs.s3a.scale;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.function.IntFunction;

import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.MethodOrderer;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestMethodOrder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileRange;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.s3a.Constants;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.S3ATestUtils;
import org.apache.hadoop.fs.s3a.Statistic;
import org.apache.hadoop.fs.s3a.statistics.BlockOutputStreamStatistics;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.io.ElasticByteBufferPool;
import org.apache.hadoop.io.WeakReferencedElasticByteBufferPool;
import org.apache.hadoop.util.DurationInfo;

import static java.util.Objects.requireNonNull;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_BUFFER_SIZE;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE;
import static org.apache.hadoop.fs.contract.ContractTestUtils.*;
import static org.apache.hadoop.fs.contract.ContractTestUtils.validateVectoredReadResult;
import static org.apache.hadoop.fs.s3a.Constants.*;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
import static org.apache.hadoop.fs.s3a.Statistic.MULTIPART_UPLOAD_COMPLETED;
import static org.apache.hadoop.fs.s3a.Statistic.STREAM_WRITE_BLOCK_UPLOADS_BYTES_PENDING;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticCounter;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.lookupCounterStatistic;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticGaugeValue;
import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsSourceToString;
import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToPrettyString;
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_WRITE_BLOCK_UPLOADS;
import static org.apache.hadoop.util.functional.RemoteIterators.filteringRemoteIterator;

/**
 * Scale test which creates a huge file.
 * <p>
 * <b>Important:</b> the order in which these tests execute is fixed to
 * alphabetical order. Test cases are numbered {@code test_123_} to impose
 * an ordering based on the numbers.
 * <p>
 * Having this ordering allows the tests to assume that the huge file
 * exists. Even so: they should all have a {@link #assumeHugeFileExists()}
 * check at the start, in case an individual test is executed.
 */
@TestMethodOrder(MethodOrderer.Alphanumeric.class)
public abstract class AbstractSTestS3AHugeFiles extends S3AScaleTestBase {

  private static final Logger LOG = LoggerFactory.getLogger(
      AbstractSTestS3AHugeFiles.class);
  public static final int DEFAULT_UPLOAD_BLOCKSIZE = 128 * _1KB;

  private Path scaleTestDir;
  private Path hugefile;
  private Path hugefileRenamed;

  private int uploadBlockSize = DEFAULT_UPLOAD_BLOCKSIZE;
  private int partitionSize;
  private long filesize;

  @BeforeEach
  @Override
  public void setup() throws Exception {
    super.setup();
    scaleTestDir = new Path(getTestPath(), getTestSuiteName());
    hugefile = new Path(scaleTestDir, "src/hugefile");
    hugefileRenamed = new Path(scaleTestDir, "dest/hugefile");
    uploadBlockSize = uploadBlockSize();
    filesize = getTestPropertyBytes(getConf(), KEY_HUGE_FILESIZE,
        DEFAULT_HUGE_FILESIZE);
  }

  /**
   * Test dir deletion is removed from test case teardown so the
   * subsequent tests see the output.
   * @throws IOException failure
   */
  @Override
  protected void deleteTestDirInTeardown() throws IOException {
    /* no-op */
  }

  /**
   * Get the name of this test suite, which is used in path generation.
   * Base implementation uses {@link #getBlockOutputBufferName()} for this.
   * @return the name of the suite.
   */
  public String getTestSuiteName() {
    return getBlockOutputBufferName();
  }

  /**
   * Note that this can get called before test setup.
   * @return the configuration to use.
   */
  @Override
  protected Configuration createScaleConfiguration() {
    Configuration conf = super.createScaleConfiguration();
    partitionSize = (int) getTestPropertyBytes(conf,
        KEY_HUGE_PARTITION_SIZE,
        DEFAULT_HUGE_PARTITION_SIZE);
    Assertions.assertThat(partitionSize)
        .describedAs("Partition size set in " + KEY_HUGE_PARTITION_SIZE)
        .isGreaterThanOrEqualTo(MULTIPART_MIN_SIZE);
    removeBaseAndBucketOverrides(conf,
        SOCKET_SEND_BUFFER,
        SOCKET_RECV_BUFFER,
        MIN_MULTIPART_THRESHOLD,
        MULTIPART_SIZE,
        USER_AGENT_PREFIX,
        FAST_UPLOAD_BUFFER);

    conf.setLong(SOCKET_SEND_BUFFER, _1MB);
    conf.setLong(SOCKET_RECV_BUFFER, _1MB);
    conf.setLong(MIN_MULTIPART_THRESHOLD, partitionSize);
    conf.setInt(MULTIPART_SIZE, partitionSize);
    conf.setInt(AWS_S3_VECTOR_ACTIVE_RANGE_READS, 32);
    conf.set(USER_AGENT_PREFIX, "STestS3AHugeFileCreate");
    conf.set(FAST_UPLOAD_BUFFER, getBlockOutputBufferName());
    S3ATestUtils.disableFilesystemCaching(conf);
    return conf;
  }

  /**
   * The name of the buffering mechanism to use.
   * @return a buffering mechanism
   */
  protected abstract String getBlockOutputBufferName();

  @Test
  public void test_010_CreateHugeFile() throws IOException {

    long filesizeMB = filesize / _1MB;

    // clean up from any previous attempts
    deleteHugeFile();

    Path fileToCreate = getPathOfFileToCreate();
    describe("Creating file %s of size %d MB" +
            " with partition size %d buffered by %s",
        fileToCreate, filesizeMB, partitionSize, getBlockOutputBufferName());

    // now do a check of available upload time, with a pessimistic bandwidth
    // (that of remote upload tests). If the test times out then not only is
    // the test outcome lost, as the follow-on tests continue, they will
    // overlap with the ongoing upload test, for much confusion.
    int timeout = getTestTimeoutSeconds();
    // assume 1 MB/s upload bandwidth
    int bandwidth = _1MB;
    long uploadTime = filesize / bandwidth;
    assertTrue(uploadTime < timeout,
        String.format("Timeout set in %s seconds is too low;" +
        " estimating upload time of %d seconds at 1 MB/s." +
        " Rerun tests with -D%s=%d",
        timeout, uploadTime, KEY_TEST_TIMEOUT, uploadTime * 2));
    assertEquals(0, filesize % uploadBlockSize,
        "File size set in " + KEY_HUGE_FILESIZE + " = " + filesize
         + " is not a multiple of " + uploadBlockSize);

    byte[] data = new byte[uploadBlockSize];
    for (int i = 0; i < uploadBlockSize; i++) {
      data[i] = (byte) (i % 256);
    }

    long blocks = filesize / uploadBlockSize;
    long blocksPerMB = _1MB / uploadBlockSize;

    // perform the upload.
    // there's lots of logging here, so that a tail -f on the output log
    // can give a view of what is happening.
    S3AFileSystem fs = getFileSystem();
    IOStatistics iostats = fs.getIOStatistics();

    String putRequests = Statistic.OBJECT_PUT_REQUESTS.getSymbol();
    String multipartBlockUploads = Statistic.MULTIPART_UPLOAD_PART_PUT.getSymbol();
    String putBytes = Statistic.OBJECT_PUT_BYTES.getSymbol();
    Statistic putRequestsActive = Statistic.OBJECT_PUT_REQUESTS_ACTIVE;
    Statistic putBytesPending = Statistic.OBJECT_PUT_BYTES_PENDING;

    ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer();
    BlockOutputStreamStatistics streamStatistics;
    long blocksPer10MB = blocksPerMB * 10;
    CountingProgressListener progress = new CountingProgressListener(timer);
    try (FSDataOutputStream out = fs.create(fileToCreate,
        true,
        uploadBlockSize,
        progress)) {
      streamStatistics = requireNonNull(getOutputStreamStatistics(out),
          () -> "No iostatistics in " + out);

      for (long block = 1; block <= blocks; block++) {
        out.write(data);
        long written = block * uploadBlockSize;
        // every 10 MB and on file upload @ 100%, print some stats
        if (block % blocksPer10MB == 0 || written == filesize) {
          long percentage = written * 100 / filesize;
          double elapsedTime = timer.elapsedTime() / 1.0e9;
          double writtenMB = 1.0 * written / _1MB;
          LOG.info(String.format("[%02d%%] Buffered %.2f MB out of %d MB;" +
                  " PUT %d bytes (%d pending) in %d operations (%d active);" +
                  " elapsedTime=%.2fs; write to buffer bandwidth=%.2f MB/s",
              percentage,
              writtenMB,
              filesizeMB,
              iostats.counters().get(putBytes),
              gaugeValue(putBytesPending),
              iostats.counters().get(putRequests),
              gaugeValue(putRequestsActive),
              elapsedTime,
              writtenMB / elapsedTime));
        }
      }
      if (!expectMultipartUpload()) {
        // it is required that no data has uploaded at this point on a
        // non-multipart upload
        Assertions.assertThat(progress.getUploadEvents())
            .describedAs("upload events in %s", progress)
            .isEqualTo(0);
      }
      // now close the file
      LOG.info("Closing stream {}", out);
      LOG.info("Statistics : {}", streamStatistics);
      ContractTestUtils.NanoTimer closeTimer
          = new ContractTestUtils.NanoTimer();
      out.close();
      closeTimer.end("time to close() output stream");
    }

    timer.end("time to write %d MB in blocks of %d",
        filesizeMB, uploadBlockSize);
    logFSState();
    bandwidth(timer, filesize);

    final IOStatistics streamIOstats = streamStatistics.getIOStatistics();
    LOG.info("Stream IOStatistics after stream closed: {}",
        ioStatisticsToPrettyString(streamIOstats));

    LOG.info("FileSystem IOStatistics after upload: {}",
        ioStatisticsToPrettyString(iostats));
    final String requestKey;
    long putByteCount = lookupCounterStatistic(iostats, putBytes);
    long putRequestCount;

    if (expectMultipartUpload()) {
      requestKey = multipartBlockUploads;
      putRequestCount = lookupCounterStatistic(streamIOstats, requestKey);
      assertThatStatisticCounter(streamIOstats, multipartBlockUploads)
          .isGreaterThanOrEqualTo(1);
      verifyStatisticCounterValue(streamIOstats, STREAM_WRITE_BLOCK_UPLOADS, putRequestCount);
      // non-magic uploads will have completed
      verifyStatisticCounterValue(streamIOstats, MULTIPART_UPLOAD_COMPLETED.getSymbol(),
          expectImmediateFileVisibility() ? 1 : 0);
    } else {
      // single put
      requestKey = putRequests;
      putRequestCount = lookupCounterStatistic(streamIOstats, requestKey);
      verifyStatisticCounterValue(streamIOstats, putRequests, 1);
      verifyStatisticCounterValue(streamIOstats, STREAM_WRITE_BLOCK_UPLOADS, 1);
      verifyStatisticCounterValue(streamIOstats, MULTIPART_UPLOAD_COMPLETED.getSymbol(), 0);
    }
    Assertions.assertThat(putByteCount)
        .describedAs("%s count from stream stats %s",
            putBytes, streamStatistics)
        .isGreaterThan(0);

    LOG.info("PUT {} bytes in {} operations; {} MB/operation",
        putByteCount, putRequestCount,
        putByteCount / (putRequestCount * _1MB));
    LOG.info("Time per PUT {} nS",
        toHuman(timer.nanosPerOperation(putRequestCount)));
    verifyStatisticGaugeValue(iostats, putRequestsActive.getSymbol(), 0);
    verifyStatisticGaugeValue(iostats, STREAM_WRITE_BLOCK_UPLOADS_BYTES_PENDING.getSymbol(), 0);

    progress.verifyNoFailures(
        "Put file " + fileToCreate + " of size " + filesize);
    assertEquals(0, streamStatistics.getBlocksActivelyAllocated(),
        "actively allocated blocks in " + streamStatistics);
  }

  /**
   * Get the path of the file which is to created. This is normally
   * {@link #hugefile}
   * @return the path to use when creating the file.
   */
  protected Path getPathOfFileToCreate() {
    return this.hugefile;
  }

  protected Path getScaleTestDir() {
    return scaleTestDir;
  }

  protected Path getHugefile() {
    return hugefile;
  }

  public void setHugefile(Path hugefile) {
    this.hugefile = hugefile;
  }

  protected Path getHugefileRenamed() {
    return hugefileRenamed;
  }

  public int getUploadBlockSize() {
    return uploadBlockSize;
  }

  /**
   * Get the desired upload block size for this test run.
   * @return the block size
   */
  protected int uploadBlockSize() {
    return DEFAULT_UPLOAD_BLOCKSIZE;
  }

  /**
   * Get the size of the file.
   * @return file size
   */
  public long getFilesize() {
    return filesize;
  }

  /**
   * Is this expected to be a multipart upload?
   * Assertions will change if not.
   * @return what the filesystem expects.
   */
  protected boolean expectMultipartUpload() {
    return getFileSystem().getS3AInternals().isMultipartCopyEnabled();
  }

  /**
   * Is this expected to be a normal file creation with
   * the output immediately visible?
   * Assertions will change if not.
   * @return true by default.
   */
  protected boolean expectImmediateFileVisibility() {
    return true;
  }

  protected int getPartitionSize() {
    return partitionSize;
  }

  /**
   * Assume that the huge file exists; skip the test if it does not.
   * @throws IOException IO failure
   */
  void assumeHugeFileExists() throws IOException {
    assumeFileExists(this.hugefile);
  }

  /**
   * Assume a specific file exists.
   * @param file file to look for
   * @throws IOException IO problem
   */
  private void assumeFileExists(Path file) throws IOException {
    S3AFileSystem fs = getFileSystem();
    ContractTestUtils.assertPathExists(fs, "huge file not created",
        file);
    FileStatus status = fs.getFileStatus(file);
    ContractTestUtils.assertIsFile(file, status);
    assertTrue(status.getLen() > 0, "File " + file + " is empty");
  }

  private void logFSState() {
    LOG.info("File System state after operation:\n{}", getFileSystem());
  }

  /**
   * This is the set of actions to perform when verifying the file actually
   * was created. With the S3A committer, the file doesn't come into
   * existence; a different set of assertions must be checked.
   */
  @Test
  public void test_030_postCreationAssertions() throws Throwable {
    S3AFileSystem fs = getFileSystem();
    ContractTestUtils.assertPathExists(fs, "Huge file", hugefile);
    FileStatus status = fs.getFileStatus(hugefile);
    ContractTestUtils.assertIsFile(hugefile, status);
    LOG.info("Huge File Status: {}", status);
    assertEquals(filesize, status.getLen(), "File size in " + status);

    // now do some etag status checks asserting they are always the same
    // across listing operations.
    final Path path = hugefile;
    final FileStatus listStatus = listFile(hugefile);
    LOG.info("List File Status: {}", listStatus);

    Assertions.assertThat(listStatus.getLen())
        .describedAs("List file status length %s", listStatus)
        .isEqualTo(filesize);
    Assertions.assertThat(etag(listStatus))
        .describedAs("List file status etag %s", listStatus)
        .isEqualTo(etag(status));
  }

  /**
   * Get a filestatus by listing the parent directory.
   * @param path path
   * @return status
   * @throws IOException failure to read, file not found
   */
  private FileStatus listFile(final Path path)
      throws IOException {
    try {
      return filteringRemoteIterator(
          getFileSystem().listStatusIterator(path.getParent()),
          st -> st.getPath().equals(path))
          .next();
    } catch (NoSuchElementException e) {
      throw (FileNotFoundException)(new FileNotFoundException("Not found: " + path)
          .initCause(e));
    }
  }

  /**
   * Read in the file using Positioned read(offset) calls.
   * @throws Throwable failure
   */
  @Test
  public void test_040_PositionedReadHugeFile() throws Throwable {
    assumeHugeFileExists();
    final String encryption = getConf().getTrimmed(
        Constants.S3_ENCRYPTION_ALGORITHM);
    boolean encrypted = encryption != null;
    if (encrypted) {
      LOG.info("File is encrypted with algorithm {}", encryption);
    }
    String filetype = encrypted ? "encrypted file" : "file";
    describe("Positioned reads of %s %s", filetype, hugefile);
    S3AFileSystem fs = getFileSystem();
    FileStatus status = listFile(hugefile);
    long size = status.getLen();
    int ops = 0;
    final int bufferSize = 8192;
    byte[] buffer = new byte[bufferSize];
    long eof = size - 1;

    ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer();
    ContractTestUtils.NanoTimer readAtByte0, readAtByte0Again, readAtEOF;
    try (FSDataInputStream in = fs.openFile(hugefile)
        .withFileStatus(status)
        .opt(FS_OPTION_OPENFILE_READ_POLICY, "random")
        .opt(FS_OPTION_OPENFILE_BUFFER_SIZE, uploadBlockSize)
        .build().get()) {
      readAtByte0 = new ContractTestUtils.NanoTimer();
      in.readFully(0, buffer);
      readAtByte0.end("time to read data at start of file");
      ops++;

      readAtEOF = new ContractTestUtils.NanoTimer();
      in.readFully(eof - bufferSize, buffer);
      readAtEOF.end("time to read data at end of file");
      ops++;

      readAtByte0Again = new ContractTestUtils.NanoTimer();
      in.readFully(0, buffer);
      readAtByte0Again.end("time to read data at start of file again");
      ops++;
      LOG.info("Final stream state: {}", in);
    }
    long mb = Math.max(size / _1MB, 1);

    logFSState();
    timer.end("time to perform positioned reads of %s of %d MB ",
        filetype, mb);
    LOG.info("Time per positioned read = {} nS",
        toHuman(timer.nanosPerOperation(ops)));
  }

  /**
   * Should this test suite use direct buffers for
   * the Vector IO operations?
   * @return true if direct buffers are desired.
   */
  protected boolean isDirectVectorBuffer() {
    return false;
  }

  @Test
  public void test_045_vectoredIOHugeFile() throws Throwable {
    assumeHugeFileExists();
    final ElasticByteBufferPool pool =
              new WeakReferencedElasticByteBufferPool();
    boolean direct = isDirectVectorBuffer();
    IntFunction<ByteBuffer> allocate = size -> pool.getBuffer(direct, size);

    // build a list of ranges for both reads.
    final int rangeLength = 116770;
    long base = 1520861;
    long pos = base;
    List<FileRange> rangeList = range(pos, rangeLength);
    pos += rangeLength;
    range(rangeList, pos, rangeLength);
    pos += rangeLength;
    range(rangeList, pos, rangeLength);
    pos += rangeLength;
    range(rangeList, pos, rangeLength);
    pos += rangeLength;
    range(rangeList, pos, rangeLength);
    pos += rangeLength;
    range(rangeList, pos, rangeLength);

    FileSystem fs = getFileSystem();

    final int validateSize = (int) totalReadSize(rangeList);

    // read the same ranges using readFully into a buffer.
    // this is to both validate the range resolution logic,
    // and to compare performance of sequential GET requests
    // with the vector IO.
    byte[] readFullRes = new byte[validateSize];
    IOStatistics readIOStats, vectorIOStats;
    DurationInfo readFullyTime = new DurationInfo(LOG, true, "Sequential read of %,d bytes",
        validateSize);
    try (FSDataInputStream in = fs.openFile(hugefile)
        .optLong(FS_OPTION_OPENFILE_LENGTH, filesize)
        .opt(FS_OPTION_OPENFILE_READ_POLICY, "random")
        .opt(FS_OPTION_OPENFILE_BUFFER_SIZE, uploadBlockSize)
        .build().get()) {
      for (FileRange range : rangeList) {
        in.readFully(range.getOffset(),
            readFullRes,
            (int)(range.getOffset() - base),
            range.getLength());
      }
      readIOStats = in.getIOStatistics();
    } finally {
      readFullyTime.close();
    }

    // now do a vector IO read
    DurationInfo vectorTime = new DurationInfo(LOG, true, "Vector Read");
    try (FSDataInputStream in = fs.openFile(hugefile)
        .optLong(FS_OPTION_OPENFILE_LENGTH, filesize)
        .opt(FS_OPTION_OPENFILE_READ_POLICY, "vector, random")
        .build().get()) {
      // initiate the read.
      in.readVectored(rangeList, allocate);
      // Wait for the results and compare with read fully.
      validateVectoredReadResult(rangeList, readFullRes, base);
      vectorIOStats = in.getIOStatistics();
    } finally {
      vectorTime.close();
      // release the pool
      pool.release();
    }

    final Duration readFullyDuration = readFullyTime.asDuration();
    final Duration vectorDuration = vectorTime.asDuration();
    final Duration diff = readFullyDuration.minus(vectorDuration);
    double ratio = readFullyDuration.toNanos() / (double) vectorDuration.toNanos();
    String format = String.format("Vector read to %s buffer taking %s was %s faster than"
            + " readFully() (%s); ratio=%,.2fX",
        direct ? "direct" : "heap",
        vectorDuration, diff, readFullyDuration, ratio);
    LOG.info(format);
    LOG.info("Bulk read IOStatistics={}", ioStatisticsToPrettyString(readIOStats));
    LOG.info("Vector IOStatistics={}", ioStatisticsToPrettyString(vectorIOStats));
  }

  /**
   * Read in the entire file using read() calls.
   * @throws Throwable failure
   */
  @Test
  public void test_050_readHugeFile() throws Throwable {
    assumeHugeFileExists();
    describe("Reading %s", hugefile);
    S3AFileSystem fs = getFileSystem();
    FileStatus status = fs.getFileStatus(hugefile);
    long size = status.getLen();
    long blocks = size / uploadBlockSize;
    byte[] data = new byte[uploadBlockSize];

    ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer();
    try (FSDataInputStream in = fs.openFile(hugefile)
        .withFileStatus(status)
        .opt(FS_OPTION_OPENFILE_BUFFER_SIZE, uploadBlockSize)
        .opt(FS_OPTION_OPENFILE_READ_POLICY, FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE)
        .build().get();
         DurationInfo ignored = new DurationInfo(LOG, "Vector Read")) {
      for (long block = 0; block < blocks; block++) {
        in.readFully(data);
      }
      LOG.info("Final stream state: {}", in);
    }

    long mb = Math.max(size / _1MB, 1);
    timer.end("time to read file of %d MB ", mb);
    LOG.info("Time per MB to read = {} nS",
        toHuman(timer.nanosPerOperation(mb)));
    bandwidth(timer, size);
    logFSState();
  }

  /**
   * Test to verify source file encryption key.
   * @throws IOException
   */
  @Test
  public void test_090_verifyRenameSourceEncryption() throws IOException {
    if(isEncrypted(getFileSystem())) {
      assertEncrypted(getHugefile());
    }
  }

  protected void assertEncrypted(Path hugeFile) throws IOException {
    //Concrete classes will have implementation.
  }

  /**
   * Checks if the encryption is enabled for the file system.
   * @param fileSystem
   * @return
   */
  protected boolean isEncrypted(S3AFileSystem fileSystem) {
    return false;
  }

  @Test
  public void test_100_renameHugeFile() throws Throwable {
    assumeHugeFileExists();
    describe("renaming %s to %s", hugefile, hugefileRenamed);
    S3AFileSystem fs = getFileSystem();
    FileStatus status = fs.getFileStatus(hugefile);
    long size = status.getLen();
    ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer();
    renameFile(hugefile, hugefileRenamed);
    long mb = Math.max(size / _1MB, 1);
    timer.end("time to rename file of %d MB", mb);
    LOG.info("Time per MB to rename = {} nS",
        toHuman(timer.nanosPerOperation(mb)));
    bandwidth(timer, size);
    assertPathExists("renamed file", hugefileRenamed);
    logFSState();
    FileStatus destFileStatus = fs.getFileStatus(hugefileRenamed);
    assertEquals(size, destFileStatus.getLen());

    // rename back
    ContractTestUtils.NanoTimer timer2 = new ContractTestUtils.NanoTimer();
    renameFile(hugefileRenamed, hugefile);

    timer2.end("Renaming back");
    LOG.info("Time per MB to rename = {} nS",
        toHuman(timer2.nanosPerOperation(mb)));
    bandwidth(timer2, size);
  }

  /**
   * Rename a file.
   * Subclasses may do this differently.
   * @param src source file
   * @param dest dest file
   * @throws IOException IO failure
   */
  protected void renameFile(final Path src,
      final Path dest) throws IOException {
    final S3AFileSystem fs = getFileSystem();
    fs.delete(dest, false);
    final boolean renamed = fs.rename(src, dest);
    Assertions.assertThat(renamed)
        .describedAs("rename(%s, %s)", src, dest)
        .isTrue();
  }

  /**
   * Test to verify target file encryption key.
   * @throws IOException
   */
  @Test
  public void test_110_verifyRenameDestEncryption() throws IOException {
    if(isEncrypted(getFileSystem())) {
      /**
       * Using hugeFile again as hugeFileRenamed is renamed back
       * to hugeFile.
       */
      assertEncrypted(hugefile);
    }
  }
  /**
   * Cleanup: delete the files.
   */
  @Test
  public void test_800_DeleteHugeFiles() throws IOException {
    try {
      deleteHugeFile();
      delete(hugefileRenamed, false);
    } finally {
      ContractTestUtils.rm(getFileSystem(), getTestPath(), true, false);
    }
  }

  /**
   * After all the work, dump the statistics.
   */
  @Test
  public void test_900_dumpStats() {
    LOG.info("Statistics\n{}", ioStatisticsSourceToString(getFileSystem()));
  }

  protected void deleteHugeFile() throws IOException {
    delete(hugefile, false);
  }

  /**
   * Delete any file, time how long it took.
   * @param path path to delete
   * @param recursive recursive flag
   */
  protected void delete(Path path, boolean recursive) throws IOException {
    describe("Deleting %s", path);
    ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer();
    getFileSystem().delete(path, recursive);
    timer.end("time to delete %s", path);
  }



}