ITestS3AOpenCost.java
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.performance;
import java.io.EOFException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileRange;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.S3AInputStream;
import org.apache.hadoop.fs.s3a.S3ATestUtils;
import org.apache.hadoop.fs.s3a.Statistic;
import org.apache.hadoop.fs.statistics.IOStatistics;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_FOOTER_CACHE;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_RANDOM;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE;
import static org.apache.hadoop.fs.contract.ContractTestUtils.readStream;
import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
import static org.apache.hadoop.fs.contract.ContractTestUtils.writeTextFile;
import static org.apache.hadoop.fs.s3a.Constants.CHECKSUM_VALIDATION;
import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_DEFAULT;
import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.assertStreamIsNotChecksummed;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.disableFilesystemCaching;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.getS3AInputStream;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_BYTES_READ_CLOSE;
import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_OPENED;
import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_SEEK_BYTES_SKIPPED;
import static org.apache.hadoop.fs.s3a.performance.OperationCost.NO_HEAD_OR_LIST;
import static org.apache.hadoop.fs.s3a.performance.OperationCostValidator.probe;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertDurationRange;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.extractStatistics;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.demandStringifyIOStatistics;
import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToPrettyString;
import static org.apache.hadoop.fs.statistics.StoreStatisticNames.ACTION_FILE_OPENED;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
import static org.apache.hadoop.test.LambdaTestUtils.interceptFuture;
/**
* Cost of openFile().
*/
public class ITestS3AOpenCost extends AbstractS3ACostTest {
private static final Logger LOG =
LoggerFactory.getLogger(ITestS3AOpenCost.class);
public static final String TEXT = "0123456789ABCDEF";
private Path testFile;
private FileStatus testFileStatus;
private int fileLength;
/**
* Is prefetching enabled?
*/
private boolean prefetching;
@Override
public Configuration createConfiguration() {
Configuration conf = super.createConfiguration();
removeBaseAndBucketOverrides(conf,
CHECKSUM_VALIDATION);
conf.setBoolean(CHECKSUM_VALIDATION, false);
disableFilesystemCaching(conf);
return conf;
}
/**
* Setup creates a test file, saves is status and length
* to fields.
*/
@Override
public void setup() throws Exception {
super.setup();
S3AFileSystem fs = getFileSystem();
testFile = methodPath();
writeTextFile(fs, testFile, TEXT, true);
testFileStatus = fs.getFileStatus(testFile);
fileLength = (int)testFileStatus.getLen();
prefetching = prefetching();
}
/**
* Test when openFile() performs GET requests when file status
* and length options are passed down.
* Note that the input streams only update the FS statistics
* in close(), so metrics cannot be verified until all operations
* on a stream are complete.
* This is slightly less than ideal.
*/
@Test
public void testOpenFileWithStatusOfOtherFS() throws Throwable {
describe("Test cost of openFile with/without status; raw only");
S3AFileSystem fs = getFileSystem();
// now read that file back in using the openFile call.
// with a new FileStatus and a different path.
// this verifies that any FileStatus class/subclass is used
// as a source of the file length.
FileStatus st2 = new FileStatus(
fileLength, false,
testFileStatus.getReplication(),
testFileStatus.getBlockSize(),
testFileStatus.getModificationTime(),
testFileStatus.getAccessTime(),
testFileStatus.getPermission(),
testFileStatus.getOwner(),
testFileStatus.getGroup(),
new Path("gopher:///localhost/" + testFile.getName()));
// no IO in open
FSDataInputStream in = verifyMetrics(() ->
fs.openFile(testFile)
.withFileStatus(st2)
.build()
.get(),
always(NO_HEAD_OR_LIST),
with(STREAM_READ_OPENED, 0));
// the stream gets opened during read
long readLen = verifyMetrics(() ->
readStream(in),
always(NO_HEAD_OR_LIST),
with(STREAM_READ_OPENED, 1));
assertEquals("bytes read from file", fileLength, readLen);
}
@Test
public void testStreamIsNotChecksummed() throws Throwable {
describe("Verify that an opened stream is not checksummed");
// if prefetching is enabled, skip this test
assumeNoPrefetching();
S3AFileSystem fs = getFileSystem();
// open the file
try (FSDataInputStream in = verifyMetrics(() ->
fs.openFile(testFile)
.must(FS_OPTION_OPENFILE_READ_POLICY,
FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE)
.must(FS_OPTION_OPENFILE_FOOTER_CACHE, false)
.mustLong(FS_OPTION_OPENFILE_LENGTH, fileLength)
.build()
.get(),
always(NO_HEAD_OR_LIST),
with(STREAM_READ_OPENED, 0))) {
// open the stream.
in.read();
// now examine the innermost stream and make sure it doesn't have a checksum
assertStreamIsNotChecksummed(getS3AInputStream(in));
}
}
@Test
public void testOpenFileShorterLength() throws Throwable {
// do a second read with the length declared as short.
// we now expect the bytes read to be shorter.
S3AFileSystem fs = getFileSystem();
S3ATestUtils.MetricDiff bytesDiscarded =
new S3ATestUtils.MetricDiff(fs, STREAM_READ_BYTES_READ_CLOSE);
int offset = 2;
long shortLen = fileLength - offset;
// open the file
FSDataInputStream in2 = openFile(shortLen,
FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL);
// verify that the statistics are in range
IOStatistics ioStatistics = extractStatistics(in2);
Object statsString = demandStringifyIOStatistics(ioStatistics);
LOG.info("Statistics of open stream {}", statsString);
verifyStatisticCounterValue(ioStatistics, ACTION_FILE_OPENED, 1);
// no network IO happened, duration is 0. There's a very small
// risk of some delay making it positive just from scheduling delays
assertDurationRange(ioStatistics, ACTION_FILE_OPENED, 0, 0);
// now read it
long r2 = verifyMetrics(() ->
readStream(in2),
always(NO_HEAD_OR_LIST),
with(STREAM_READ_OPENED, 1),
with(STREAM_READ_BYTES_READ_CLOSE, 0),
with(STREAM_READ_SEEK_BYTES_SKIPPED, 0));
LOG.info("Statistics of read stream {}", statsString);
assertEquals("bytes read from file", shortLen, r2);
// no bytes were discarded.
bytesDiscarded.assertDiffEquals(0);
}
@Test
public void testOpenFileLongerLengthReadFully() throws Throwable {
// do a read with the length declared as longer
// than it is.
// An EOF will be read on readFully(), -1 on a read()
final int extra = 10;
long longLen = fileLength + extra;
// assert behaviors of seeking/reading past the file length.
// there is no attempt at recovery.
verifyMetrics(() -> {
try (FSDataInputStream in = openFile(longLen,
FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL)) {
byte[] out = new byte[(int) (longLen)];
intercept(EOFException.class, () -> {
in.readFully(0, out);
return in;
});
in.seek(longLen - 1);
assertEquals("read past real EOF on " + in, -1, in.read());
return in.toString();
}
},
always(),
// two GET calls were made, one for readFully,
// the second on the read() past the EOF
// the operation has got as far as S3
probe(!prefetching(), STREAM_READ_OPENED, 1 + 1));
// now on a new stream, try a full read from after the EOF
verifyMetrics(() -> {
try (FSDataInputStream in =
openFile(longLen, FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL)) {
byte[] out = new byte[extra];
intercept(EOFException.class, () -> in.readFully(fileLength, out));
return in.toString();
}
},
// two GET calls were made, one for readFully,
// the second on the read() past the EOF
// the operation has got as far as S3
with(STREAM_READ_OPENED, 1));
}
/**
* Open a file.
* @param longLen length to declare
* @param policy read policy
* @return file handle
*/
private FSDataInputStream openFile(final long longLen, String policy)
throws Exception {
S3AFileSystem fs = getFileSystem();
// set a length past the actual file length
return verifyMetrics(() ->
fs.openFile(testFile)
.must(FS_OPTION_OPENFILE_READ_POLICY, policy)
.mustLong(FS_OPTION_OPENFILE_LENGTH, longLen)
.build()
.get(),
always(NO_HEAD_OR_LIST));
}
/**
* Open a file with a length declared as longer than the actual file length.
* Validate input stream.read() semantics.
*/
@Test
public void testReadPastEOF() throws Throwable {
// set a length past the actual file length
describe("read() up to the end of the real file");
assumeNoPrefetching();
final int extra = 10;
int longLen = fileLength + extra;
try (FSDataInputStream in = openFile(longLen,
FS_OPTION_OPENFILE_READ_POLICY_RANDOM)) {
for (int i = 0; i < fileLength; i++) {
Assertions.assertThat(in.read())
.describedAs("read() at %d from stream %s", i, in)
.isEqualTo(TEXT.charAt(i));
}
LOG.info("Statistics after EOF {}", ioStatisticsToPrettyString(in.getIOStatistics()));
}
// now open and read after the EOF; this is
// expected to return -1 on each read; there's a GET per call.
// as the counters are updated on close(), the stream must be closed
// within the verification clause.
// note how there's no attempt to alter file expected length...
// instead the call always goes to S3.
// there's no information in the exception from the SDK
describe("reading past the end of the file");
verifyMetrics(() -> {
try (FSDataInputStream in =
openFile(longLen, FS_OPTION_OPENFILE_READ_POLICY_RANDOM)) {
for (int i = 0; i < extra; i++) {
final int p = fileLength + i;
in.seek(p);
Assertions.assertThat(in.read())
.describedAs("read() at %d", p)
.isEqualTo(-1);
}
LOG.info("Statistics after EOF {}", ioStatisticsToPrettyString(in.getIOStatistics()));
return in.toString();
}
},
always(),
probe(!prefetching, Statistic.ACTION_HTTP_GET_REQUEST, extra));
}
/**
* Test {@code PositionedReadable.readFully()} past EOF in a file.
*/
@Test
public void testPositionedReadableReadFullyPastEOF() throws Throwable {
// now, next corner case. Do a readFully() of more bytes than the file length.
// we expect failure.
// this codepath does a GET to the end of the (expected) file length, and when
// that GET returns -1 from the read because the bytes returned is less than
// expected then the readFully call fails.
describe("PositionedReadable.readFully() past the end of the file");
// set a length past the actual file length
final int extra = 10;
int longLen = fileLength + extra;
verifyMetrics(() -> {
try (FSDataInputStream in =
openFile(longLen, FS_OPTION_OPENFILE_READ_POLICY_RANDOM)) {
byte[] buf = new byte[(int) (longLen + 1)];
// readFully will fail
intercept(EOFException.class, () -> {
in.readFully(0, buf);
return in;
});
assertS3StreamClosed(in);
return "readFully past EOF with statistics"
+ ioStatisticsToPrettyString(in.getIOStatistics());
}
},
always(),
probe(!prefetching, Statistic.ACTION_HTTP_GET_REQUEST, 1)); // no attempt to re-open
}
/**
* Test {@code PositionedReadable.read()} past EOF in a file.
*/
@Test
public void testPositionedReadableReadPastEOF() throws Throwable {
// set a length past the actual file length
final int extra = 10;
int longLen = fileLength + extra;
describe("PositionedReadable.read() past the end of the file");
assumeNoPrefetching();
verifyMetrics(() -> {
try (FSDataInputStream in =
openFile(longLen, FS_OPTION_OPENFILE_READ_POLICY_RANDOM)) {
byte[] buf = new byte[(int) (longLen + 1)];
// readFully will read to the end of the file
Assertions.assertThat(in.read(0, buf, 0, buf.length))
.isEqualTo(fileLength);
assertS3StreamOpen(in);
// now attempt to read after EOF
Assertions.assertThat(in.read(fileLength, buf, 0, buf.length))
.describedAs("PositionedReadable.read() past EOF")
.isEqualTo(-1);
// stream is closed as part of this failure
assertS3StreamClosed(in);
return "PositionedReadable.read()) past EOF with " + in;
}
},
always(),
probe(!prefetching, Statistic.ACTION_HTTP_GET_REQUEST, 1)); // no attempt to re-open
}
/**
* Test Vector Read past EOF in a file.
* See related tests in {@code ITestS3AContractVectoredRead}
*/
@Test
public void testVectorReadPastEOF() throws Throwable {
// set a length past the actual file length
final int extra = 10;
int longLen = fileLength + extra;
describe("Vector read past the end of the file, expecting an EOFException");
verifyMetrics(() -> {
try (FSDataInputStream in =
openFile(longLen, FS_OPTION_OPENFILE_READ_POLICY_RANDOM)) {
assertS3StreamClosed(in);
byte[] buf = new byte[longLen];
ByteBuffer bb = ByteBuffer.wrap(buf);
final FileRange range = FileRange.createFileRange(0, longLen);
in.readVectored(Arrays.asList(range), (i) -> bb);
interceptFuture(EOFException.class,
"",
ContractTestUtils.VECTORED_READ_OPERATION_TEST_TIMEOUT_SECONDS,
TimeUnit.SECONDS,
range.getData());
assertS3StreamClosed(in);
return "vector read past EOF with " + in;
}
},
always(),
probe(!prefetching, Statistic.ACTION_HTTP_GET_REQUEST, 1));
}
/**
* Probe the FS for supporting prefetching.
* @return true if the fs has prefetching enabled.
*/
private boolean prefetching() {
return getFileSystem().getConf().getBoolean(
PREFETCH_ENABLED_KEY, PREFETCH_ENABLED_DEFAULT);
}
/**
* Skip the test if prefetching is enabled.
*/
private void assumeNoPrefetching(){
if (prefetching) {
skip("Prefetching is enabled");
}
}
/**
* Assert that the inner S3 Stream is closed.
* @param in input stream
*/
private static void assertS3StreamClosed(final FSDataInputStream in) {
final InputStream wrapped = in.getWrappedStream();
if (wrapped instanceof S3AInputStream) {
S3AInputStream s3ain = (S3AInputStream) wrapped;
Assertions.assertThat(s3ain.isObjectStreamOpen())
.describedAs("stream is open: %s", s3ain)
.isFalse();
}
}
/**
* Assert that the inner S3 Stream is closed.
* @param in input stream
*/
private static void assertS3StreamOpen(final FSDataInputStream in) {
final InputStream wrapped = in.getWrappedStream();
if (wrapped instanceof S3AInputStream) {
S3AInputStream s3ain = (S3AInputStream) wrapped;
Assertions.assertThat(s3ain.isObjectStreamOpen())
.describedAs("stream is closed: %s", s3ain)
.isTrue();
}
}
}