ITestS3AContractVectoredRead.java
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.contract.s3a;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileRange;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.contract.AbstractContractVectoredReadTest;
import org.apache.hadoop.fs.contract.AbstractFSContract;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.s3a.Constants;
import org.apache.hadoop.fs.s3a.RangeNotSatisfiableEOFException;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.S3AInputPolicy;
import org.apache.hadoop.fs.s3a.S3AInputStream;
import org.apache.hadoop.fs.s3a.S3ATestUtils;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.StoreStatisticNames;
import org.apache.hadoop.fs.statistics.StreamStatisticNames;
import org.apache.hadoop.test.LambdaTestUtils;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_PARQUET;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_VECTOR;
import static org.apache.hadoop.fs.contract.ContractTestUtils.range;
import static org.apache.hadoop.fs.contract.ContractTestUtils.returnBuffersToPoolPostRead;
import static org.apache.hadoop.fs.contract.ContractTestUtils.validateVectoredReadResult;
import static org.apache.hadoop.fs.s3a.Constants.AWS_S3_VECTOR_READS_MAX_MERGED_READ_SIZE;
import static org.apache.hadoop.fs.s3a.Constants.AWS_S3_VECTOR_READS_MIN_SEEK_SIZE;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToPrettyString;
import static org.apache.hadoop.io.Sizes.S_1M;
import static org.apache.hadoop.io.Sizes.S_4K;
import static org.apache.hadoop.test.LambdaTestUtils.interceptFuture;
import static org.apache.hadoop.test.MoreAsserts.assertEqual;
/**
* S3A contract tests for vectored reads.
* This is a complex suite as it really is testing the store, so measurements of
* what IO took place is also performed if the input stream is suitable for this.
*/
public class ITestS3AContractVectoredRead extends AbstractContractVectoredReadTest {
private static final Logger LOG = LoggerFactory.getLogger(ITestS3AContractVectoredRead.class);
public ITestS3AContractVectoredRead() {
}
@Override
protected AbstractFSContract createContract(Configuration conf) {
return new S3AContract(conf);
}
/**
* Analytics Accelerator Library for Amazon S3 does not support Vectored Reads.
* @throws Exception
*/
@Override
public void setup() throws Exception {
super.setup();
skipIfAnalyticsAcceleratorEnabled(getContract().getConf(),
"Analytics Accelerator does not support vectored reads");
}
/**
* Verify response to a vector read request which is beyond the
* real length of the file.
* Unlike the {@link #testEOFRanges()} test, the input stream in
* this test thinks the file is longer than it is, so the call
* fails in the GET request.
*/
@MethodSource("params")
@ParameterizedTest(name = "Buffer type : {0}")
public void testEOFRanges416Handling(String pBufferType) throws Exception {
initAbstractContractVectoredReadTest(pBufferType);
FileSystem fs = getFileSystem();
final int extendedLen = DATASET_LEN + 1024;
CompletableFuture<FSDataInputStream> builder =
fs.openFile(path(VECTORED_READ_FILE_NAME))
.mustLong(FS_OPTION_OPENFILE_LENGTH, extendedLen)
.opt(FS_OPTION_OPENFILE_READ_POLICY,
FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE)
.build();
List<FileRange> fileRanges = range(DATASET_LEN, 100);
// read starting past EOF generates a 416 response, mapped to
// RangeNotSatisfiableEOFException
describe("Read starting from past EOF");
try (FSDataInputStream in = builder.get()) {
in.readVectored(fileRanges, getAllocate());
FileRange res = fileRanges.get(0);
CompletableFuture<ByteBuffer> data = res.getData();
interceptFuture(EOFException.class,
"",
ContractTestUtils.VECTORED_READ_OPERATION_TEST_TIMEOUT_SECONDS,
TimeUnit.SECONDS,
data);
}
// a read starting before the EOF and continuing past it does generate
// an EOF exception, but not a 416.
describe("Read starting 0 continuing past EOF");
try (FSDataInputStream in = fs.openFile(path(VECTORED_READ_FILE_NAME))
.mustLong(FS_OPTION_OPENFILE_LENGTH, extendedLen)
.build().get()) {
final FileRange range = FileRange.createFileRange(0, extendedLen);
in.readVectored(Arrays.asList(range), getAllocate());
CompletableFuture<ByteBuffer> data = range.getData();
interceptFuture(EOFException.class, "",
ContractTestUtils.VECTORED_READ_OPERATION_TEST_TIMEOUT_SECONDS,
TimeUnit.SECONDS,
data);
}
}
@MethodSource("params")
@ParameterizedTest(name = "Buffer type : {0}")
public void testMinSeekAndMaxSizeConfigsPropagation(String pBufferType) throws Exception {
initAbstractContractVectoredReadTest(pBufferType);
Configuration conf = getFileSystem().getConf();
S3ATestUtils.removeBaseAndBucketOverrides(conf,
AWS_S3_VECTOR_READS_MAX_MERGED_READ_SIZE,
AWS_S3_VECTOR_READS_MIN_SEEK_SIZE);
S3ATestUtils.disableFilesystemCaching(conf);
final int configuredMinSeek = 2 * 1024;
final int configuredMaxSize = 10 * 1024 * 1024;
conf.set(AWS_S3_VECTOR_READS_MIN_SEEK_SIZE, "2K");
conf.set(AWS_S3_VECTOR_READS_MAX_MERGED_READ_SIZE, "10M");
try (S3AFileSystem fs = S3ATestUtils.createTestFileSystem(conf)) {
try (FSDataInputStream fis = openVectorFile(fs)) {
int newMinSeek = fis.minSeekForVectorReads();
int newMaxSize = fis.maxReadSizeForVectorReads();
assertEqual(newMinSeek, configuredMinSeek,
"configured s3a min seek for vectored reads");
assertEqual(newMaxSize, configuredMaxSize,
"configured s3a max size for vectored reads");
}
}
}
@MethodSource("params")
@ParameterizedTest(name = "Buffer type : {0}")
public void testMinSeekAndMaxSizeDefaultValues(String pBufferType)
throws Exception {
initAbstractContractVectoredReadTest(pBufferType);
Configuration conf = getFileSystem().getConf();
S3ATestUtils.removeBaseAndBucketOverrides(conf,
AWS_S3_VECTOR_READS_MIN_SEEK_SIZE,
AWS_S3_VECTOR_READS_MAX_MERGED_READ_SIZE);
try (S3AFileSystem fs = S3ATestUtils.createTestFileSystem(conf)) {
try (FSDataInputStream fis = openVectorFile(fs)) {
int minSeek = fis.minSeekForVectorReads();
int maxSize = fis.maxReadSizeForVectorReads();
assertEqual(minSeek, Constants.DEFAULT_AWS_S3_VECTOR_READS_MIN_SEEK_SIZE,
"default s3a min seek for vectored reads");
assertEqual(maxSize, Constants.DEFAULT_AWS_S3_VECTOR_READS_MAX_MERGED_READ_SIZE,
"default s3a max read size for vectored reads");
}
}
}
@MethodSource("params")
@ParameterizedTest(name = "Buffer type : {0}")
public void testStopVectoredIoOperationsCloseStream(String pBufferType) throws Exception {
initAbstractContractVectoredReadTest(pBufferType);
List<FileRange> fileRanges = createSampleNonOverlappingRanges();
try (FSDataInputStream in = openVectorFile()){
in.readVectored(fileRanges, getAllocate());
in.close();
LambdaTestUtils.intercept(InterruptedIOException.class,
() -> validateVectoredReadResult(fileRanges, DATASET, 0));
}
// reopening the stream should succeed.
try (FSDataInputStream in = openVectorFile()){
in.readVectored(fileRanges, getAllocate());
validateVectoredReadResult(fileRanges, DATASET, 0);
}
}
/**
* Verify that unbuffer() stops vectored IO operations.
* There's a small risk of a race condition where the unbuffer() call
* is made after the vector reads have completed.
*/
@MethodSource("params")
@ParameterizedTest(name = "Buffer type : {0}")
public void testStopVectoredIoOperationsUnbuffer(String pBufferType) throws Exception {
initAbstractContractVectoredReadTest(pBufferType);
List<FileRange> fileRanges = createSampleNonOverlappingRanges();
try (FSDataInputStream in = openVectorFile()){
in.readVectored(fileRanges, getAllocate());
in.unbuffer();
LambdaTestUtils.intercept(InterruptedIOException.class,
() -> validateVectoredReadResult(fileRanges, DATASET, 0));
// re-initiating the vectored reads after unbuffer should succeed.
in.readVectored(fileRanges, getAllocate());
validateVectoredReadResult(fileRanges, DATASET, 0);
}
}
/**
* As the minimum seek value is 4*1024, the first three ranges will be
* merged into and other two will remain as it is.
* */
@MethodSource("params")
@ParameterizedTest(name = "Buffer type : {0}")
public void testNormalReadVsVectoredReadStatsCollection(String pBufferType) throws Exception {
initAbstractContractVectoredReadTest(pBufferType);
try (S3AFileSystem fs = getTestFileSystemWithReadAheadDisabled()) {
List<FileRange> fileRanges = new ArrayList<>();
range(fileRanges, 10 * 1024, 100);
range(fileRanges, 8 * 1024, 100);
range(fileRanges, 14 * 1024, 100);
range(fileRanges, 2 * 1024 - 101, 100);
range(fileRanges, 40 * 1024, 1024);
FileStatus fileStatus = fs.getFileStatus(path(VECTORED_READ_FILE_NAME));
CompletableFuture<FSDataInputStream> builder =
fs.openFile(path(VECTORED_READ_FILE_NAME))
.withFileStatus(fileStatus)
.opt(FS_OPTION_OPENFILE_READ_POLICY,
FS_OPTION_OPENFILE_READ_POLICY_PARQUET
+ ", " + FS_OPTION_OPENFILE_READ_POLICY_VECTOR)
.build();
try (FSDataInputStream in = builder.get()) {
in.readVectored(fileRanges, getAllocate());
validateVectoredReadResult(fileRanges, DATASET, 0);
returnBuffersToPoolPostRead(fileRanges, getPool());
final InputStream wrappedStream = in.getWrappedStream();
// policy will be random.
if (wrappedStream instanceof S3AInputStream) {
S3AInputStream inner = (S3AInputStream) wrappedStream;
Assertions.assertThat(inner.getInputPolicy())
.describedAs("Input policy of %s", inner)
.isEqualTo(S3AInputPolicy.Random);
Assertions.assertThat(inner.isObjectStreamOpen())
.describedAs("Object stream open in %s", inner)
.isFalse();
}
// audit the io statistics for this stream
IOStatistics st = in.getIOStatistics();
LOG.info("IOStats after readVectored operation {}", ioStatisticsToPrettyString(st));
// the vectored io operation must be tracked
verifyStatisticCounterValue(st,
StreamStatisticNames.STREAM_READ_VECTORED_OPERATIONS,
1);
// the vectored io operation is being called with 5 input ranges.
verifyStatisticCounterValue(st,
StreamStatisticNames.STREAM_READ_VECTORED_INCOMING_RANGES,
5);
// 5 input ranges got combined in 3 as some of them are close.
verifyStatisticCounterValue(st,
StreamStatisticNames.STREAM_READ_VECTORED_COMBINED_RANGES,
3);
// number of bytes discarded will be based on the above input ranges.
verifyStatisticCounterValue(st,
StreamStatisticNames.STREAM_READ_VECTORED_READ_BYTES_DISCARDED,
5944);
verifyStatisticCounterValue(st,
StoreStatisticNames.ACTION_HTTP_GET_REQUEST,
3);
// read bytes should match the sum of requested length for each input ranges.
verifyStatisticCounterValue(st,
StreamStatisticNames.STREAM_READ_BYTES,
1424);
}
CompletableFuture<FSDataInputStream> builder1 =
fs.openFile(path(VECTORED_READ_FILE_NAME))
.withFileStatus(fileStatus)
.build();
try (FSDataInputStream in = builder1.get()) {
for (FileRange range : fileRanges) {
byte[] temp = new byte[range.getLength()];
in.readFully((int) range.getOffset(), temp, 0, range.getLength());
}
// audit the statistics for this stream
IOStatistics st = in.getIOStatistics();
LOG.info("IOStats after read fully operation {}", ioStatisticsToPrettyString(st));
verifyStatisticCounterValue(st,
StreamStatisticNames.STREAM_READ_VECTORED_OPERATIONS,
0);
// all other counter values consistent.
verifyStatisticCounterValue(st,
StreamStatisticNames.STREAM_READ_VECTORED_READ_BYTES_DISCARDED,
0);
verifyStatisticCounterValue(st,
StoreStatisticNames.ACTION_HTTP_GET_REQUEST,
5);
// read bytes should match the sum of requested length for each input ranges.
verifyStatisticCounterValue(st,
StreamStatisticNames.STREAM_READ_BYTES,
1424);
}
// validate stats are getting merged at fs instance level.
IOStatistics fsStats = fs.getIOStatistics();
// only 1 vectored io call is made in this fs instance.
verifyStatisticCounterValue(fsStats,
StreamStatisticNames.STREAM_READ_VECTORED_OPERATIONS,
1);
// 8 get requests were made in this fs instance.
verifyStatisticCounterValue(fsStats,
StoreStatisticNames.ACTION_HTTP_GET_REQUEST,
8);
verifyStatisticCounterValue(fsStats,
StreamStatisticNames.STREAM_READ_BYTES,
2848);
}
}
@MethodSource("params")
@ParameterizedTest(name = "Buffer type : {0}")
public void testMultiVectoredReadStatsCollection(String pBufferType) throws Exception {
initAbstractContractVectoredReadTest(pBufferType);
try (S3AFileSystem fs = getTestFileSystemWithReadAheadDisabled()) {
List<FileRange> ranges1 = getConsecutiveRanges();
List<FileRange> ranges2 = getConsecutiveRanges();
FileStatus fileStatus = fs.getFileStatus(path(VECTORED_READ_FILE_NAME));
CompletableFuture<FSDataInputStream> builder =
fs.openFile(path(VECTORED_READ_FILE_NAME))
.withFileStatus(fileStatus)
.build();
try (FSDataInputStream in = builder.get()) {
in.readVectored(ranges1, getAllocate());
in.readVectored(ranges2, getAllocate());
validateVectoredReadResult(ranges1, DATASET, 0);
validateVectoredReadResult(ranges2, DATASET, 0);
returnBuffersToPoolPostRead(ranges1, getPool());
returnBuffersToPoolPostRead(ranges2, getPool());
// audit the io statistics for this stream
IOStatistics st = in.getIOStatistics();
// 2 vectored io calls are made above.
verifyStatisticCounterValue(st,
StreamStatisticNames.STREAM_READ_VECTORED_OPERATIONS,
2);
// 2 vectored io operation is being called with 2 input ranges.
verifyStatisticCounterValue(st,
StreamStatisticNames.STREAM_READ_VECTORED_INCOMING_RANGES,
4);
// 2 ranges are getting merged in 1 during both vectored io operation.
verifyStatisticCounterValue(st,
StreamStatisticNames.STREAM_READ_VECTORED_COMBINED_RANGES,
2);
// number of bytes discarded will be 0 as the ranges are consecutive.
verifyStatisticCounterValue(st,
StreamStatisticNames.STREAM_READ_VECTORED_READ_BYTES_DISCARDED,
0);
// only 2 http get request will be made because ranges in both range list will be merged
// to 1 because they are consecutive.
verifyStatisticCounterValue(st,
StoreStatisticNames.ACTION_HTTP_GET_REQUEST,
2);
// read bytes should match the sum of requested length for each input ranges.
verifyStatisticCounterValue(st,
StreamStatisticNames.STREAM_READ_BYTES,
2000);
}
IOStatistics fsStats = fs.getIOStatistics();
// 2 vectored io calls are made in this fs instance.
verifyStatisticCounterValue(fsStats,
StreamStatisticNames.STREAM_READ_VECTORED_OPERATIONS,
2);
// 2 get requests were made in this fs instance.
verifyStatisticCounterValue(fsStats,
StoreStatisticNames.ACTION_HTTP_GET_REQUEST,
2);
}
}
/**
* Create a test fs with no readahead.
* The vector IO ranges are set to the original small values,
* so ranges on small files are not coalesced.
* @return a filesystem
* @throws IOException failure to instantiate.
*/
private S3AFileSystem getTestFileSystemWithReadAheadDisabled() throws IOException {
Configuration conf = getFileSystem().getConf();
// also resetting the min seek and max size values is important
// as this same test suite has test which overrides these params.
S3ATestUtils.removeBaseAndBucketOverrides(conf,
Constants.READAHEAD_RANGE,
AWS_S3_VECTOR_READS_MAX_MERGED_READ_SIZE,
AWS_S3_VECTOR_READS_MIN_SEEK_SIZE);
S3ATestUtils.disableFilesystemCaching(conf);
conf.setInt(Constants.READAHEAD_RANGE, 0);
conf.setInt(AWS_S3_VECTOR_READS_MIN_SEEK_SIZE, S_4K);
conf.setInt(AWS_S3_VECTOR_READS_MAX_MERGED_READ_SIZE, S_1M);
return S3ATestUtils.createTestFileSystem(conf);
}
}