ITestUnbufferDraining.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.fs.s3a.performance;

import java.io.IOException;
import java.time.Duration;

import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.S3AInputPolicy;
import org.apache.hadoop.fs.s3a.S3AInputStream;
import org.apache.hadoop.fs.s3a.impl.AWSClientConfig;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.io.IOUtils;

import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE;
import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
import static org.apache.hadoop.fs.s3a.Constants.ASYNC_DRAIN_THRESHOLD;
import static org.apache.hadoop.fs.s3a.Constants.CHECKSUM_VALIDATION;
import static org.apache.hadoop.fs.s3a.Constants.ESTABLISH_TIMEOUT;
import static org.apache.hadoop.fs.s3a.Constants.INPUT_FADVISE;
import static org.apache.hadoop.fs.s3a.Constants.MAXIMUM_CONNECTIONS;
import static org.apache.hadoop.fs.s3a.Constants.MAX_ERROR_RETRIES;
import static org.apache.hadoop.fs.s3a.Constants.READAHEAD_RANGE;
import static org.apache.hadoop.fs.s3a.Constants.REQUEST_TIMEOUT;
import static org.apache.hadoop.fs.s3a.Constants.RETRY_LIMIT;
import static org.apache.hadoop.fs.s3a.Constants.SOCKET_TIMEOUT;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.disablePrefetching;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
import static org.apache.hadoop.fs.s3a.impl.ConfigurationHelper.setDurationAsSeconds;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
import static org.apache.hadoop.fs.statistics.IOStatisticsSupport.retrieveIOStatistics;
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_ABORTED;
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_SEEK_POLICY_CHANGED;
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_UNBUFFERED;

/**
 * Test stream unbuffer performance/behavior with stream draining
 * and aborting.
 */
public class ITestUnbufferDraining extends AbstractS3ACostTest {

  private static final Logger LOG =
      LoggerFactory.getLogger(ITestUnbufferDraining.class);

  /**
   * Readahead range to use, sets drain threshold too.
   */
  public static final int READAHEAD = 1000;

  /**
   * How big a file to create?
   */
  public static final int FILE_SIZE = 50_000;

  /**
   * Number of attempts to unbuffer on each stream.
   */
  public static final int ATTEMPTS = 10;

  /**
   * Should checksums be enabled?
   */
  public static final boolean CHECKSUMS = false;

  /**
   * Test FS with a tiny connection pool and
   * no recovery.
   */
  private FileSystem brittleFS;

  @Override
  public Configuration createConfiguration() {
    Configuration conf = disablePrefetching(super.createConfiguration());
    removeBaseAndBucketOverrides(conf,
        ASYNC_DRAIN_THRESHOLD,
        CHECKSUM_VALIDATION,
        ESTABLISH_TIMEOUT,
        INPUT_FADVISE,
        MAX_ERROR_RETRIES,
        MAXIMUM_CONNECTIONS,
        READAHEAD_RANGE,
        REQUEST_TIMEOUT,
        RETRY_LIMIT,
        SOCKET_TIMEOUT);
    conf.setBoolean(CHECKSUM_VALIDATION, CHECKSUMS);
    return conf;
  }

  @BeforeEach
  @Override
  public void setup() throws Exception {
    super.setup();
    // now create a new FS with minimal http capacity and recovery
    // a separate one is used to avoid test teardown suffering
    // from the lack of http connections and short timeouts.
    try {
      // allow small durations.
      AWSClientConfig.setMinimumOperationDuration(Duration.ZERO);
      Configuration conf = getConfiguration();
      // kick off async drain for any data
      conf.setInt(ASYNC_DRAIN_THRESHOLD, 1);
      conf.setInt(MAXIMUM_CONNECTIONS, 2);
      conf.setInt(MAX_ERROR_RETRIES, 1);
      conf.setInt(READAHEAD_RANGE, READAHEAD);
      conf.setInt(RETRY_LIMIT, 1);
      conf.setBoolean(CHECKSUM_VALIDATION, CHECKSUMS);
      setDurationAsSeconds(conf, ESTABLISH_TIMEOUT,
          Duration.ofSeconds(1));

      brittleFS = FileSystem.newInstance(getFileSystem().getUri(), conf);
    } finally {
      AWSClientConfig.resetMinimumOperationDuration();
    }
  }

  @AfterEach
  @Override
  public void teardown() throws Exception {
    super.teardown();
    FileSystem bfs = getBrittleFS();
    FILESYSTEM_IOSTATS.aggregate(retrieveIOStatistics(bfs));
    IOUtils.cleanupWithLogger(LOG, bfs);
  }

  public FileSystem getBrittleFS() {
    return brittleFS;
  }

  /**
   * Test stream close performance/behavior with stream draining
   * and unbuffer.
   */
  @Test
  public void testUnbufferDraining() throws Throwable {

    describe("unbuffer draining");
    FileStatus st = createTestFile();

    IOStatistics brittleStats = retrieveIOStatistics(getBrittleFS());
    long originalUnbuffered = lookupCounter(brittleStats,
        STREAM_READ_UNBUFFERED);

    int offset = FILE_SIZE - READAHEAD + 1;
    try (FSDataInputStream in = getBrittleFS().openFile(st.getPath())
        .withFileStatus(st)
        .mustLong(ASYNC_DRAIN_THRESHOLD, 1)
        .build().get()) {
      describe("Initiating unbuffer with async drain\n");
      for (int i = 0; i < ATTEMPTS; i++) {
        describe("Starting read/unbuffer #%d", i);
        in.seek(offset);
        in.read();
        in.unbuffer();
      }
      // verify the policy switched.
      assertReadPolicy(in, S3AInputPolicy.Random);
      // assert that the statistics are as expected
      IOStatistics stats = in.getIOStatistics();
      verifyStatisticCounterValue(stats,
          STREAM_READ_UNBUFFERED,
          ATTEMPTS);
      verifyStatisticCounterValue(stats,
          STREAM_READ_ABORTED,
          0);
      // there's always a policy of 1, so
      // this value must be 1 + 1
      verifyStatisticCounterValue(stats,
          STREAM_READ_SEEK_POLICY_CHANGED,
          2);
    }
    // filesystem statistic propagation
    verifyStatisticCounterValue(brittleStats,
        STREAM_READ_UNBUFFERED,
        ATTEMPTS + originalUnbuffered);
  }

  /**
   * Lookup a counter, returning 0 if it is not defined.
   * @param statistics stats to probe
   * @param key counter key
   * @return the value or 0
   */
  private static long lookupCounter(
      final IOStatistics statistics,
      final String key) {
    Long counter = statistics.counters().get(key);
    return counter == null ? 0 : counter;
  }

  /**
   * Assert that the read policy is as expected.
   * @param in input stream
   * @param policy read policy.
   */
  private static void assertReadPolicy(final FSDataInputStream in,
      final S3AInputPolicy policy) {
    S3AInputStream inner = getS3AInputStream(in);
    Assertions.assertThat(inner.getInputPolicy())
        .describedAs("input policy of %s", inner)
        .isEqualTo(policy);
  }

  /**
   * Extract the inner stream from an FSDataInputStream.
   * Because prefetching is disabled, this is always an S3AInputStream.
   * @param in input stream
   * @return the inner stream cast to an S3AInputStream.
   */
  private static S3AInputStream getS3AInputStream(final FSDataInputStream in) {
    return (S3AInputStream) in.getWrappedStream();
  }

  /**
   * Test stream close performance/behavior with unbuffer
   * aborting rather than draining.
   */
  @Test
  public void testUnbufferAborting() throws Throwable {

    describe("unbuffer aborting");
    FileStatus st = createTestFile();
    IOStatistics brittleStats = retrieveIOStatistics(getBrittleFS());
    long originalUnbuffered =
        lookupCounter(brittleStats, STREAM_READ_UNBUFFERED);
    long originalAborted =
        lookupCounter(brittleStats, STREAM_READ_ABORTED);

    // open the file at the beginning with a whole file read policy,
    // so even with s3a switching to random on unbuffer,
    // this always does a full GET
    // also provide a floating point string for the threshold, to
    // verify it is safely parsed
    try (FSDataInputStream in = getBrittleFS().openFile(st.getPath())
        .withFileStatus(st)
        .must(ASYNC_DRAIN_THRESHOLD, "1.0")
        .must(FS_OPTION_OPENFILE_READ_POLICY,
            FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE)
        .build().get()) {
      assertReadPolicy(in, S3AInputPolicy.Sequential);

      describe("Initiating unbuffer with async drain\n");
      for (int i = 0; i < ATTEMPTS; i++) {
        describe("Starting read/unbuffer #%d", i);
        in.read();
        in.unbuffer();
        // because the read policy is sequential, it doesn't change
        assertReadPolicy(in, S3AInputPolicy.Sequential);
      }

      // assert that the statistics are as expected
      IOStatistics stats = in.getIOStatistics();
      verifyStatisticCounterValue(stats,
          STREAM_READ_UNBUFFERED,
          ATTEMPTS);
      verifyStatisticCounterValue(stats,
          STREAM_READ_ABORTED,
          ATTEMPTS);
      // there's always a policy of 1.
      verifyStatisticCounterValue(stats,
          STREAM_READ_SEEK_POLICY_CHANGED,
          1);
    }
    // look at FS statistics
    verifyStatisticCounterValue(brittleStats,
        STREAM_READ_UNBUFFERED,
        ATTEMPTS + originalUnbuffered);
    verifyStatisticCounterValue(brittleStats,
        STREAM_READ_ABORTED,
        ATTEMPTS + originalAborted);
  }

  private FileStatus createTestFile() throws IOException {
    byte[] data = dataset(FILE_SIZE, '0', 10);
    S3AFileSystem fs = getFileSystem();

    Path path = methodPath();
    ContractTestUtils.createFile(fs, path, true, data);
    return fs.getFileStatus(path);
  }


}