AbstractContractVectoredReadTest.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.fs.contract;

import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.IntFunction;

import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileRange;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.ElasticByteBufferPool;
import org.apache.hadoop.io.WeakReferencedElasticByteBufferPool;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.hadoop.util.functional.FutureIO;

import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_VECTOR;
import static org.apache.hadoop.fs.contract.ContractTestUtils.VECTORED_READ_OPERATION_TEST_TIMEOUT_SECONDS;
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertDatasetEquals;
import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
import static org.apache.hadoop.fs.contract.ContractTestUtils.range;
import static org.apache.hadoop.fs.contract.ContractTestUtils.returnBuffersToPoolPostRead;
import static org.apache.hadoop.fs.contract.ContractTestUtils.validateVectoredReadResult;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
import static org.apache.hadoop.test.LambdaTestUtils.interceptFuture;
import static org.apache.hadoop.util.functional.FutureIO.awaitFuture;

@RunWith(Parameterized.class)
public abstract class AbstractContractVectoredReadTest extends AbstractFSContractTestBase {

  private static final Logger LOG =
      LoggerFactory.getLogger(AbstractContractVectoredReadTest.class);

  public static final int DATASET_LEN = 64 * 1024;
  protected static final byte[] DATASET = ContractTestUtils.dataset(DATASET_LEN, 'a', 32);
  protected static final String VECTORED_READ_FILE_NAME = "vectored_file.txt";

  /**
   * Buffer allocator for vector IO.
   */
  private final IntFunction<ByteBuffer> allocate;

  /**
   * Buffer pool for vector IO.
   */
  private final ElasticByteBufferPool pool =
          new WeakReferencedElasticByteBufferPool();

  private final String bufferType;

  /**
   * Path to the vector file.
   */
  private Path vectorPath;

  @Parameterized.Parameters(name = "Buffer type : {0}")
  public static List<String> params() {
    return Arrays.asList("direct", "array");
  }

  public AbstractContractVectoredReadTest(String bufferType) {
    this.bufferType = bufferType;
    final boolean isDirect = !"array".equals(bufferType);
    this.allocate = size -> pool.getBuffer(isDirect, size);
  }

  /**
   * Get the buffer allocator.
   * @return allocator function for vector IO.
   */
  protected IntFunction<ByteBuffer> getAllocate() {
    return allocate;
  }

  /**
   * Get the vector IO buffer pool.
   * @return a pool.
   */

  protected ElasticByteBufferPool getPool() {
    return pool;
  }

  @Override
  public void setup() throws Exception {
    super.setup();
    vectorPath = path(VECTORED_READ_FILE_NAME);
    FileSystem fs = getFileSystem();
    createFile(fs, vectorPath, true, DATASET);
  }

  @Override
  public void teardown() throws Exception {
    pool.release();
    super.teardown();
  }

  /**
   * Open the vector file.
   * @return the input stream.
   * @throws IOException failure.
   */
  protected FSDataInputStream openVectorFile() throws IOException {
    return openVectorFile(getFileSystem());
  }

  /**
   * Open the vector file.
   * @param fs filesystem to use
   * @return the input stream.
   * @throws IOException failure.
   */
  protected FSDataInputStream openVectorFile(final FileSystem fs) throws IOException {
    return awaitFuture(
        fs.openFile(vectorPath)
            .opt(FS_OPTION_OPENFILE_LENGTH, DATASET_LEN)
            .opt(FS_OPTION_OPENFILE_READ_POLICY,
                FS_OPTION_OPENFILE_READ_POLICY_VECTOR)
            .build());
  }

  @Test
  public void testVectoredReadMultipleRanges() throws Exception {
    List<FileRange> fileRanges = new ArrayList<>();
    for (int i = 0; i < 10; i++) {
      FileRange fileRange = FileRange.createFileRange(i * 100, 100);
      fileRanges.add(fileRange);
    }
    try (FSDataInputStream in = openVectorFile()) {
      in.readVectored(fileRanges, allocate);
      CompletableFuture<?>[] completableFutures = new CompletableFuture<?>[fileRanges.size()];
      int i = 0;
      for (FileRange res : fileRanges) {
        completableFutures[i++] = res.getData();
      }
      CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(completableFutures);
      combinedFuture.get();

      validateVectoredReadResult(fileRanges, DATASET, 0);
      returnBuffersToPoolPostRead(fileRanges, pool);
    }
  }

  @Test
  public void testVectoredReadAndReadFully()  throws Exception {
    List<FileRange> fileRanges = new ArrayList<>();
    range(fileRanges, 100, 100);
    try (FSDataInputStream in = openVectorFile()) {
      in.readVectored(fileRanges, allocate);
      byte[] readFullRes = new byte[100];
      in.readFully(100, readFullRes);
      ByteBuffer vecRes = FutureIO.awaitFuture(fileRanges.get(0).getData());
      Assertions.assertThat(vecRes)
              .describedAs("Result from vectored read and readFully must match")
              .isEqualByComparingTo(ByteBuffer.wrap(readFullRes));
      returnBuffersToPoolPostRead(fileRanges, pool);
    }
  }

  @Test
  public void testVectoredReadWholeFile()  throws Exception {
    describe("Read the whole file in one single vectored read");
    List<FileRange> fileRanges = new ArrayList<>();
    range(fileRanges, 0, DATASET_LEN);
    try (FSDataInputStream in = openVectorFile()) {
      in.readVectored(fileRanges, allocate);
      ByteBuffer vecRes = FutureIO.awaitFuture(fileRanges.get(0).getData());
      Assertions.assertThat(vecRes)
              .describedAs("Result from vectored read and readFully must match")
              .isEqualByComparingTo(ByteBuffer.wrap(DATASET));
      returnBuffersToPoolPostRead(fileRanges, pool);
    }
  }

  /**
   * As the minimum seek value is 4*1024,none of the below ranges
   * will get merged.
   */
  @Test
  public void testDisjointRanges() throws Exception {
    List<FileRange> fileRanges = new ArrayList<>();
    range(fileRanges, 0, 100);
    range(fileRanges, 4_000 + 101, 100);
    range(fileRanges, 16_000 + 101, 100);
    try (FSDataInputStream in = openVectorFile()) {
      in.readVectored(fileRanges, allocate);
      validateVectoredReadResult(fileRanges, DATASET, 0);
      returnBuffersToPoolPostRead(fileRanges, pool);
    }
  }

  /**
   * As the minimum seek value is 4*1024, all the below ranges
   * will get merged into one.
   */
  @Test
  public void testAllRangesMergedIntoOne() throws Exception {
    List<FileRange> fileRanges = new ArrayList<>();
    final int length = 100;
    range(fileRanges, 0, length);
    range(fileRanges, 4_000 - length - 1, length);
    range(fileRanges, 8_000 - length - 1, length);
    try (FSDataInputStream in = openVectorFile()) {
      in.readVectored(fileRanges, allocate);
      validateVectoredReadResult(fileRanges, DATASET, 0);
      returnBuffersToPoolPostRead(fileRanges, pool);
    }
  }

  /**
   * As the minimum seek value is 4*1024, the first three ranges will be
   * merged into and other two will remain as it is.
   */
  @Test
  public void testSomeRangesMergedSomeUnmerged() throws Exception {
    FileSystem fs = getFileSystem();
    List<FileRange> fileRanges = new ArrayList<>();
    range(fileRanges, 8 * 1024, 100);
    range(fileRanges, 14 * 1024, 100);
    range(fileRanges, 10 * 1024, 100);
    range(fileRanges, 2 * 1024 - 101, 100);
    range(fileRanges, 40 * 1024, 1024);
    FileStatus fileStatus = fs.getFileStatus(path(VECTORED_READ_FILE_NAME));
    CompletableFuture<FSDataInputStream> builder =
            fs.openFile(path(VECTORED_READ_FILE_NAME))
                    .withFileStatus(fileStatus)
                    .build();
    try (FSDataInputStream in = builder.get()) {
      in.readVectored(fileRanges, allocate);
      validateVectoredReadResult(fileRanges, DATASET, 0);
      returnBuffersToPoolPostRead(fileRanges, pool);
    }
  }

  /**
   * Most file systems won't support overlapping ranges.
   * Currently, only Raw Local supports it.
   */
  @Test
  public void testOverlappingRanges() throws Exception {
    if (!isSupported(VECTOR_IO_OVERLAPPING_RANGES)) {
      verifyExceptionalVectoredRead(
              getSampleOverlappingRanges(),
              IllegalArgumentException.class);
    } else {
      try (FSDataInputStream in = openVectorFile()) {
        List<FileRange> fileRanges = getSampleOverlappingRanges();
        in.readVectored(fileRanges, allocate);
        validateVectoredReadResult(fileRanges, DATASET, 0);
        returnBuffersToPoolPostRead(fileRanges, pool);
      }
    }
  }

  /**
   * Same ranges are special case of overlapping.
   */
  @Test
  public void testSameRanges() throws Exception {
    if (!isSupported(VECTOR_IO_OVERLAPPING_RANGES)) {
      verifyExceptionalVectoredRead(
              getSampleSameRanges(),
              IllegalArgumentException.class);
    } else {
      try (FSDataInputStream in = openVectorFile()) {
        List<FileRange> fileRanges = getSampleSameRanges();
        in.readVectored(fileRanges, allocate);
        validateVectoredReadResult(fileRanges, DATASET, 0);
        returnBuffersToPoolPostRead(fileRanges, pool);
      }
    }
  }

  /**
   * A null range is not permitted.
   */
  @Test
  public void testNullRange() throws Exception {
    List<FileRange> fileRanges = new ArrayList<>();
    range(fileRanges, 500, 100);
    fileRanges.add(null);
    verifyExceptionalVectoredRead(
        fileRanges,
        NullPointerException.class);
  }
  /**
   * A null range is not permitted.
   */
  @Test
  public void testNullRangeList() throws Exception {
    verifyExceptionalVectoredRead(
        null,
        NullPointerException.class);
  }

  @Test
  public void testSomeRandomNonOverlappingRanges() throws Exception {
    List<FileRange> fileRanges = new ArrayList<>();
    range(fileRanges, 500, 100);
    range(fileRanges, 1000, 200);
    range(fileRanges, 50, 10);
    range(fileRanges, 10, 5);
    try (FSDataInputStream in = openVectorFile()) {
      in.readVectored(fileRanges, allocate);
      validateVectoredReadResult(fileRanges, DATASET, 0);
      returnBuffersToPoolPostRead(fileRanges, pool);
    }
  }

  @Test
  public void testConsecutiveRanges() throws Exception {
    List<FileRange> fileRanges = new ArrayList<>();
    final int offset = 500;
    final int length = 2011;
    range(fileRanges, offset, length);
    range(fileRanges, offset + length, length);
    try (FSDataInputStream in = openVectorFile()) {
      in.readVectored(fileRanges, allocate);
      validateVectoredReadResult(fileRanges, DATASET, 0);
      returnBuffersToPoolPostRead(fileRanges, pool);
    }
  }

  @Test
  public void testEmptyRanges() throws Exception {
    List<FileRange> fileRanges = new ArrayList<>();
    try (FSDataInputStream in = openVectorFile()) {
      in.readVectored(fileRanges, allocate);
      Assertions.assertThat(fileRanges)
          .describedAs("Empty ranges must stay empty")
          .isEmpty();
    }
  }

  /**
   * Test to validate EOF ranges.
   * <p>
   * Default implementation fails with EOFException
   * while reading the ranges. Some implementation like s3, checksum fs fail fast
   * as they already have the file length calculated.
   * The contract option {@link ContractOptions#VECTOR_IO_EARLY_EOF_CHECK} is used
   * to determine which check to perform.
   */
  @Test
  public void testEOFRanges()  throws Exception {
    describe("Testing reading with an offset past the end of the file");
    List<FileRange> fileRanges = range(DATASET_LEN + 1, 100);

    if (isSupported(VECTOR_IO_EARLY_EOF_CHECK)) {
      LOG.info("Expecting early EOF failure");
      verifyExceptionalVectoredRead(fileRanges, EOFException.class);
    } else {
      expectEOFinRead(fileRanges);
    }
  }


  @Test
  public void testVectoredReadWholeFilePlusOne()  throws Exception {
    describe("Try to read whole file plus 1 byte");
    List<FileRange> fileRanges = range(0, DATASET_LEN + 1);

    if (isSupported(VECTOR_IO_EARLY_EOF_CHECK)) {
      LOG.info("Expecting early EOF failure");
      verifyExceptionalVectoredRead(fileRanges, EOFException.class);
    } else {
      expectEOFinRead(fileRanges);
    }
  }

  private void expectEOFinRead(final List<FileRange> fileRanges) throws Exception {
    LOG.info("Expecting late EOF failure");
    try (FSDataInputStream in = openVectorFile()) {
      in.readVectored(fileRanges, allocate);
      for (FileRange res : fileRanges) {
        CompletableFuture<ByteBuffer> data = res.getData();
        interceptFuture(EOFException.class,
            "",
            ContractTestUtils.VECTORED_READ_OPERATION_TEST_TIMEOUT_SECONDS,
            TimeUnit.SECONDS,
            data);
      }
    }
  }

  @Test
  public void testNegativeLengthRange()  throws Exception {

    verifyExceptionalVectoredRead(range(0, -50), IllegalArgumentException.class);
  }

  @Test
  public void testNegativeOffsetRange()  throws Exception {
    verifyExceptionalVectoredRead(range(-1, 50), EOFException.class);
  }

  @Test
  public void testNormalReadAfterVectoredRead() throws Exception {
    List<FileRange> fileRanges = createSampleNonOverlappingRanges();
    try (FSDataInputStream in = openVectorFile()) {
      in.readVectored(fileRanges, allocate);
      // read starting 200 bytes
      final int len = 200;
      byte[] res = new byte[len];
      in.readFully(res, 0, len);
      ByteBuffer buffer = ByteBuffer.wrap(res);
      assertDatasetEquals(0, "normal_read", buffer, len, DATASET);
      validateVectoredReadResult(fileRanges, DATASET, 0);
      returnBuffersToPoolPostRead(fileRanges, pool);
    }
  }

  @Test
  public void testVectoredReadAfterNormalRead() throws Exception {
    List<FileRange> fileRanges = createSampleNonOverlappingRanges();
    try (FSDataInputStream in = openVectorFile()) {
      // read starting 200 bytes
      final int len = 200;
      byte[] res = new byte[len];
      in.readFully(res, 0, len);
      ByteBuffer buffer = ByteBuffer.wrap(res);
      assertDatasetEquals(0, "normal_read", buffer, len, DATASET);
      in.readVectored(fileRanges, allocate);
      validateVectoredReadResult(fileRanges, DATASET, 0);
      returnBuffersToPoolPostRead(fileRanges, pool);
    }
  }

  @Test
  public void testMultipleVectoredReads() throws Exception {
    List<FileRange> fileRanges1 = createSampleNonOverlappingRanges();
    List<FileRange> fileRanges2 = createSampleNonOverlappingRanges();
    try (FSDataInputStream in = openVectorFile()) {
      in.readVectored(fileRanges1, allocate);
      in.readVectored(fileRanges2, allocate);
      validateVectoredReadResult(fileRanges2, DATASET, 0);
      validateVectoredReadResult(fileRanges1, DATASET, 0);
      returnBuffersToPoolPostRead(fileRanges1, pool);
      returnBuffersToPoolPostRead(fileRanges2, pool);
    }
  }

  /**
   * This test creates list of ranges and then submit a readVectored
   * operation and then uses a separate thread pool to process the
   * results asynchronously.
   */
  @Test
  public void testVectoredIOEndToEnd() throws Exception {
    List<FileRange> fileRanges = new ArrayList<>();
    range(fileRanges, 8 * 1024, 100);
    range(fileRanges, 14 * 1024, 100);
    range(fileRanges, 10 * 1024, 100);
    range(fileRanges, 2 * 1024 - 101, 100);
    range(fileRanges, 40 * 1024, 1024);

    ExecutorService dataProcessor = Executors.newFixedThreadPool(5);
    CountDownLatch countDown = new CountDownLatch(fileRanges.size());

    try (FSDataInputStream in = openVectorFile()) {
      in.readVectored(fileRanges, this.allocate);
      for (FileRange res : fileRanges) {
        dataProcessor.submit(() -> {
          try {
            readBufferValidateDataAndReturnToPool(res, countDown);
          } catch (Exception e) {
            String error = String.format("Error while processing result for %s", res);
            LOG.error(error, e);
            ContractTestUtils.fail(error, e);
          }
        });
      }
      // user can perform other computations while waiting for IO.
      if (!countDown.await(VECTORED_READ_OPERATION_TEST_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
        ContractTestUtils.fail("Timeout/Error while processing vectored io results");
      }
    } finally {
      HadoopExecutors.shutdown(dataProcessor, LOG,
              VECTORED_READ_OPERATION_TEST_TIMEOUT_SECONDS, TimeUnit.SECONDS);
    }
  }

  private void readBufferValidateDataAndReturnToPool(FileRange res,
                                                     CountDownLatch countDownLatch)
          throws IOException, TimeoutException {
    try {
      CompletableFuture<ByteBuffer> data = res.getData();
      // Read the data and perform custom operation. Here we are just
      // validating it with original data.
      FutureIO.awaitFuture(data.thenAccept(buffer -> {
        assertDatasetEquals((int) res.getOffset(),
                "vecRead", buffer, res.getLength(), DATASET);
        // return buffer to the pool once read.
        // If the read failed, this doesn't get invoked.
        pool.putBuffer(buffer);
      }),
          VECTORED_READ_OPERATION_TEST_TIMEOUT_SECONDS, TimeUnit.SECONDS);
    } finally {
      // countdown to notify main thread that processing has been done.
      countDownLatch.countDown();
    }
  }


  protected List<FileRange> createSampleNonOverlappingRanges() {
    List<FileRange> fileRanges = new ArrayList<>();
    range(fileRanges, 0, 100);
    range(fileRanges, 110, 50);
    return fileRanges;
  }

  protected List<FileRange> getSampleSameRanges() {
    List<FileRange> fileRanges = new ArrayList<>();
    range(fileRanges, 8_000, 1000);
    range(fileRanges, 8_000, 1000);
    range(fileRanges, 8_000, 1000);
    return fileRanges;
  }

  protected List<FileRange> getSampleOverlappingRanges() {
    List<FileRange> fileRanges = new ArrayList<>();
    range(fileRanges, 100, 500);
    range(fileRanges, 400, 500);
    return fileRanges;
  }

  protected List<FileRange> getConsecutiveRanges() {
    List<FileRange> fileRanges = new ArrayList<>();
    range(fileRanges, 100, 500);
    range(fileRanges, 600, 500);
    return fileRanges;
  }

  /**
   * Validate that exceptions must be thrown during a vectored
   * read operation with specific input ranges.
   * @param fileRanges input file ranges.
   * @param clazz type of exception expected.
   * @throws Exception any other exception.
   */
  protected <T extends Throwable> void verifyExceptionalVectoredRead(
          List<FileRange> fileRanges,
          Class<T> clazz) throws Exception {

    try (FSDataInputStream in = openVectorFile()) {
      intercept(clazz, () -> {
        in.readVectored(fileRanges, allocate);
        return "triggered read of " + fileRanges.size() + " ranges" + " against " + in;
      });
    }
  }
}