ITestS3AAnalyticsAcceleratorStreamReading.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements. See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership. The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License. You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied. See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

package org.apache.hadoop.fs.s3a;

import java.io.File;
import java.io.IOException;
import java.io.InputStream;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.assertj.core.api.Assertions;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.impl.streams.InputStreamType;
import org.apache.hadoop.fs.s3a.impl.streams.ObjectInputStream;
import org.apache.hadoop.fs.statistics.IOStatistics;

import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamConfiguration;
import software.amazon.s3.analyticsaccelerator.common.ConnectorConfiguration;

import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_PARQUET;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE;
import static org.apache.hadoop.fs.s3a.Constants.ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.enableAnalyticsAccelerator;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
import static org.apache.hadoop.fs.s3a.test.PublicDatasetTestUtils.getExternalData;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_ANALYTICS_OPENED;
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.ANALYTICS_STREAM_FACTORY_CLOSED;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;

/**
 * Tests integration of the
 * <a href="https://github.com/awslabs/analytics-accelerator-s3">analytics accelerator library</a>
 *
 * Certain tests in this class rely on reading local parquet files stored in resources.
 * These files are copied from local to S3 and then read via the analytics stream.
 * This is done to ensure AAL can read the parquet format, and handles exceptions from malformed
 * parquet files.
 *
 */
public class ITestS3AAnalyticsAcceleratorStreamReading extends AbstractS3ATestBase {

  private static final String PHYSICAL_IO_PREFIX = "physicalio";

  private Path externalTestFile;

  @BeforeEach
  public void setUp() throws Exception {
    super.setup();
    skipIfClientSideEncryption();
    externalTestFile = getExternalData(getConfiguration());
  }

  @Override
  public Configuration createConfiguration() {
    Configuration configuration = super.createConfiguration();
    enableAnalyticsAccelerator(configuration);
    return configuration;
  }

  @Test
  public void testConnectorFrameWorkIntegration() throws Throwable {
    describe("Verify S3 connector framework integration");

    S3AFileSystem fs =
        (S3AFileSystem) FileSystem.get(externalTestFile.toUri(), getConfiguration());
    byte[] buffer = new byte[500];
    IOStatistics ioStats;

    try (FSDataInputStream inputStream =
        fs.openFile(externalTestFile)
            .must(FS_OPTION_OPENFILE_READ_POLICY, FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE)
            .build().get()) {
      ioStats = inputStream.getIOStatistics();
      inputStream.seek(5);
      inputStream.read(buffer, 0, 500);

      final InputStream wrappedStream = inputStream.getWrappedStream();
      ObjectInputStream objectInputStream = (ObjectInputStream) wrappedStream;

      Assertions.assertThat(objectInputStream.streamType()).isEqualTo(InputStreamType.Analytics);
      Assertions.assertThat(objectInputStream.getInputPolicy())
          .isEqualTo(S3AInputPolicy.Sequential);
    }

    verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1);
    fs.close();
    verifyStatisticCounterValue(fs.getIOStatistics(), ANALYTICS_STREAM_FACTORY_CLOSED, 1);
  }

  @Test
  public void testMalformedParquetFooter() throws IOException {
    describe("Reading a malformed parquet file should not throw an exception");

    // File with malformed footer take from
    // https://github.com/apache/parquet-testing/blob/master/bad_data/PARQUET-1481.parquet.
    // This test ensures AAL does not throw exceptions if footer parsing fails.
    // It will only emit a WARN log, "Unable to parse parquet footer for
    // test/malformedFooter.parquet, parquet prefetch optimisations will be disabled for this key."
    Path dest = path("malformed_footer.parquet");

    File file = new File("src/test/resources/malformed_footer.parquet");

    Path sourcePath = new Path(file.toURI().getPath());
    getFileSystem().copyFromLocalFile(false, true, sourcePath, dest);

    byte[] buffer = new byte[500];
    IOStatistics ioStats;

    try (FSDataInputStream inputStream = getFileSystem().open(dest)) {
      ioStats = inputStream.getIOStatistics();
      inputStream.seek(5);
      inputStream.read(buffer, 0, 500);
    }

    verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1);
  }

  /**
   * This test reads a multi-row group parquet file. Each parquet consists of at least one
   * row group, which contains the column data for a subset of rows. A single parquet file
   * can contain multiple row groups, this allows for further parallelisation, as each row group
   * can be processed independently.
   */
  @Test
  public void testMultiRowGroupParquet() throws Throwable {
    describe("A parquet file is read successfully");

    Path dest = path("multi_row_group.parquet");

    File file = new File("src/test/resources/multi_row_group.parquet");
    Path sourcePath = new Path(file.toURI().getPath());
    getFileSystem().copyFromLocalFile(false, true, sourcePath, dest);

    FileStatus fileStatus = getFileSystem().getFileStatus(dest);

    byte[] buffer = new byte[3000];
    IOStatistics ioStats;

    try (FSDataInputStream inputStream = getFileSystem().open(dest)) {
      ioStats = inputStream.getIOStatistics();
      inputStream.readFully(buffer, 0, (int) fileStatus.getLen());
    }

    verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1);

    try (FSDataInputStream inputStream = getFileSystem().openFile(dest)
        .must(FS_OPTION_OPENFILE_READ_POLICY, FS_OPTION_OPENFILE_READ_POLICY_PARQUET)
        .build().get()) {
      ioStats = inputStream.getIOStatistics();
      inputStream.readFully(buffer, 0, (int) fileStatus.getLen());
    }

    verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1);
  }

  @Test
  public void testInvalidConfigurationThrows() throws Exception {
    describe("Verify S3 connector framework throws with invalid configuration");

    Configuration conf = new Configuration(getConfiguration());
    removeBaseAndBucketOverrides(conf);
    //Disable Sequential Prefetching
    conf.setInt(ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX +
        "." + PHYSICAL_IO_PREFIX + ".cache.timeout", -1);

    ConnectorConfiguration connectorConfiguration =
        new ConnectorConfiguration(conf, ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX);

    intercept(IllegalArgumentException.class,
        () -> S3SeekableInputStreamConfiguration.fromConfiguration(connectorConfiguration));
  }

}