ITestS3APrefetchingLruEviction.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.fs.s3a;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.s3a.performance.AbstractS3ACostTest;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.test.LambdaTestUtils;

import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_SIZE_KEY;
import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_MAX_BLOCKS_COUNT;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.enablePrefetching;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticCounter;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValues;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticGaugeValue;
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_EVICT_BLOCKS_FROM_FILE_CACHE;
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_FILE_CACHE_EVICTION;
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_BLOCKS_IN_FILE_CACHE;

/**
 * Test the prefetching input stream with LRU cache eviction on S3ACachingInputStream.
 */
public class ITestS3APrefetchingLruEviction extends AbstractS3ACostTest {

  private String maxBlocks;

  public static Collection<Object[]> params() {
    return Arrays.asList(new Object[][]{
        {"1"},
        {"2"}
    });
  }

  public void initITestS3APrefetchingLruEviction(final String pMaxBlocks) {
    this.maxBlocks = pMaxBlocks;
  }

  private static final Logger LOG =
      LoggerFactory.getLogger(ITestS3APrefetchingLruEviction.class);

  private static final int S_1K = 1024;
  private static final int S_500 = 512;
  private static final int SMALL_FILE_SIZE = S_1K * 56;

  private static final int TIMEOUT_MILLIS = 3000;
  private static final int INTERVAL_MILLIS = 500;
  private static final int BLOCK_SIZE = S_1K * 10;

  @Override
  public Configuration createConfiguration() {
    Configuration conf = enablePrefetching(super.createConfiguration());
    S3ATestUtils.removeBaseAndBucketOverrides(conf,
        PREFETCH_MAX_BLOCKS_COUNT,
        PREFETCH_BLOCK_SIZE_KEY);
    conf.setInt(PREFETCH_MAX_BLOCKS_COUNT, Integer.parseInt(maxBlocks));
    conf.setInt(PREFETCH_BLOCK_SIZE_KEY, BLOCK_SIZE);
    return conf;
  }

  @MethodSource("params")
  @ParameterizedTest(name = "max-blocks-{0}")
  public void testSeeksWithLruEviction(String pMaxBlocks) throws Throwable {
    initITestS3APrefetchingLruEviction(pMaxBlocks);
    IOStatistics ioStats;
    byte[] data = ContractTestUtils.dataset(SMALL_FILE_SIZE, 'x', 26);
    // Path for file which should have length > block size so S3ACachingInputStream is used
    Path smallFile = methodPath();
    ContractTestUtils.writeDataset(getFileSystem(), smallFile, data, data.length, 16, true);

    ExecutorService executorService = Executors.newFixedThreadPool(5,
        new ThreadFactoryBuilder()
            .setDaemon(true)
            .setNameFormat("testSeeksWithLruEviction-%d")
            .build());
    CountDownLatch countDownLatch = new CountDownLatch(7);

    try (FSDataInputStream in = getFileSystem().open(methodPath())) {
      ioStats = in.getIOStatistics();
      // tests to add multiple blocks in the prefetch cache
      // and let LRU eviction take place as more cache entries
      // are added with multiple block reads.

      // Don't read block 0 completely
      executorService.submit(() -> readFullyWithPositionedRead(countDownLatch,
          in,
          0,
          BLOCK_SIZE - S_500 * 10));

      // Seek to block 1 and don't read completely
      executorService.submit(() -> readFullyWithPositionedRead(countDownLatch,
          in,
          BLOCK_SIZE,
          2 * S_500));

      // Seek to block 2 and don't read completely
      executorService.submit(() -> readFullyWithSeek(countDownLatch,
          in,
          BLOCK_SIZE * 2L,
          2 * S_500));

      // Seek to block 3 and don't read completely
      executorService.submit(() -> readFullyWithPositionedRead(countDownLatch,
          in,
          BLOCK_SIZE * 3L,
          2 * S_500));

      // Seek to block 4 and don't read completely
      executorService.submit(() -> readFullyWithSeek(countDownLatch,
          in,
          BLOCK_SIZE * 4L,
          2 * S_500));

      // Seek to block 5 and don't read completely
      executorService.submit(() -> readFullyWithPositionedRead(countDownLatch,
          in,
          BLOCK_SIZE * 5L,
          2 * S_500));

      // backward seek, can't use block 0 as it is evicted
      executorService.submit(() -> readFullyWithSeek(countDownLatch,
          in,
          S_500 * 5,
          2 * S_500));

      countDownLatch.await();

      // expect 3 blocks as rest are to be evicted by LRU
      LambdaTestUtils.eventually(TIMEOUT_MILLIS, INTERVAL_MILLIS, () -> {
        LOG.info("IO stats: {}", ioStats);
        verifyStatisticGaugeValue(ioStats, STREAM_READ_BLOCKS_IN_FILE_CACHE,
            Integer.parseInt(maxBlocks));
      });
      // let LRU evictions settle down, if any
      Thread.sleep(TIMEOUT_MILLIS);
    } finally {
      executorService.shutdownNow();
      executorService.awaitTermination(5, TimeUnit.SECONDS);
    }
    LambdaTestUtils.eventually(TIMEOUT_MILLIS, INTERVAL_MILLIS, () -> {
      LOG.info("IO stats: {}", ioStats);
      verifyStatisticGaugeValue(ioStats, STREAM_READ_BLOCKS_IN_FILE_CACHE, 0);
      // stream_evict_blocks_from_cache is expected to be higher than 4, however we might face
      // transient failures due to async prefetch get cancel issues. While TIMEOUT_MILLIS is
      // sufficient wait time, consider re-running the test if stream_evict_blocks_from_cache
      // value stays lower than 4.
      assertThatStatisticCounter(ioStats,
          STREAM_EVICT_BLOCKS_FROM_FILE_CACHE).isGreaterThanOrEqualTo(4);
      verifyStatisticCounterValues(ioStats, STREAM_EVICT_BLOCKS_FROM_FILE_CACHE,
          STREAM_FILE_CACHE_EVICTION);
    });
  }

  /**
   * Read the bytes from the given position in the stream to a new buffer using the positioned
   * readable.
   *
   * @param countDownLatch count down latch to mark the operation completed.
   * @param in input stream.
   * @param position position in the given input stream to seek from.
   * @param len the number of bytes to read.
   * @return true if the read operation is successful.
   */
  private boolean readFullyWithPositionedRead(CountDownLatch countDownLatch, FSDataInputStream in,
      long position, int len) {
    byte[] buffer = new byte[BLOCK_SIZE];
    try {
      in.readFully(position, buffer, 0, len);
      countDownLatch.countDown();
      return true;
    } catch (IOException e) {
      throw new UncheckedIOException(e);
    }
  }

  /**
   * Read the bytes from the given position in the stream to a new buffer using seek followed by
   * input stream read.
   *
   * @param countDownLatch count down latch to mark the operation completed.
   * @param in input stream.
   * @param position position in the given input stream to seek from.
   * @param len the number of bytes to read.
   * @return true if the read operation is successful.
   */
  private boolean readFullyWithSeek(CountDownLatch countDownLatch, FSDataInputStream in,
      long position, int len) {
    byte[] buffer = new byte[BLOCK_SIZE];
    try {
      in.seek(position);
      in.readFully(buffer, 0, len);
      countDownLatch.countDown();
      return true;
    } catch (IOException e) {
      throw new UncheckedIOException(e);
    }
  }

}