ITestS3AMultipartUploadSizeLimits.java
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.scale;
import java.io.File;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathIOException;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.S3AInstrumentation;
import org.apache.hadoop.fs.s3a.Statistic;
import org.apache.hadoop.fs.s3a.auth.ProgressCounter;
import org.apache.hadoop.fs.s3a.commit.impl.CommitOperations;
import static org.apache.hadoop.fs.StreamCapabilities.ABORTABLE_STREAM;
import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
import static org.apache.hadoop.fs.contract.ContractTestUtils.verifyFileContents;
import static org.apache.hadoop.fs.contract.ContractTestUtils.writeTextFile;
import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_SIZE;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
import static org.apache.hadoop.fs.s3a.Statistic.INVOCATION_ABORT;
import static org.apache.hadoop.fs.s3a.Statistic.OBJECT_MULTIPART_UPLOAD_ABORTED;
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.UPLOAD_PART_COUNT_LIMIT;
import static org.apache.hadoop.fs.s3a.test.ExtraAssertions.assertCompleteAbort;
import static org.apache.hadoop.fs.s3a.test.ExtraAssertions.assertNoopAbort;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticCounter;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToPrettyString;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
/**
* Testing S3 multipart upload for s3.
*/
public class ITestS3AMultipartUploadSizeLimits extends S3AScaleTestBase {
public static final int MPU_SIZE = 5 * _1MB;
@Override
protected Configuration createScaleConfiguration() {
Configuration configuration = super.createScaleConfiguration();
removeBaseAndBucketOverrides(configuration,
MULTIPART_SIZE,
UPLOAD_PART_COUNT_LIMIT);
configuration.setLong(MULTIPART_SIZE, MPU_SIZE);
// Setting the part count limit to 2 such that we
// failures.
configuration.setLong(UPLOAD_PART_COUNT_LIMIT, 2);
return configuration;
}
/**
* Uploads under the limit are valid.
*/
@Test
public void testTwoPartUpload() throws Throwable {
Path file = path(getMethodName());
// Creating a file having parts less than configured
// part count will succeed.
createFile(getFileSystem(), file, true,
dataset(6 * _1MB, 'a', 'z' - 'a'));
}
/**
* Tests to validate that exception is thrown during a
* multi part upload when the number of parts is greater
* than the allowed limit.
*/
@Test
public void testUploadOverLimitFailure() throws Throwable {
S3AFileSystem fs = getFileSystem();
Path file = path(getMethodName());
// Creating a file with more than configured part count should
// throw a PathIOE
intercept(PathIOException.class,
() -> createFile(fs,
file,
false,
dataset(15 * _1MB, 'a', 'z' - 'a')));
// and the path does not exist
assertPathDoesNotExist("upload must not have completed", file);
}
@Test
public void testCommitLimitFailure() throws Throwable {
describe("verify commit uploads fail-safe when MPU limits exceeded");
S3AFileSystem fs = getFileSystem();
CommitOperations actions = new CommitOperations(fs);
File tempFile = File.createTempFile("commit", ".txt");
FileUtils.writeByteArrayToFile(tempFile,
dataset(15 * _1MB, 'a', 'z' - 'a'));
Path dest = methodPath();
final S3AInstrumentation instrumentation = fs.getInstrumentation();
final long initial = instrumentation.getCounterValue(
Statistic.COMMITTER_COMMITS_ABORTED);
intercept(PathIOException.class, () ->
actions.uploadFileToPendingCommit(tempFile,
dest,
null,
MPU_SIZE,
new ProgressCounter()));
assertPathDoesNotExist("upload must not have completed", dest);
final long after = instrumentation.getCounterValue(
Statistic.COMMITTER_COMMITS_ABORTED);
Assertions.assertThat(after).
describedAs("commit abort count")
.isEqualTo(initial + 1);
}
@Test
public void testAbortAfterTwoPartUpload() throws Throwable {
Path file = path(getMethodName());
byte[] data = dataset(6 * _1MB, 'a', 'z' - 'a');
S3AFileSystem fs = getFileSystem();
FSDataOutputStream stream = fs.create(file, true);
try {
stream.write(data);
// From testTwoPartUpload() we know closing stream will finalize uploads
// and materialize the path. Here we call abort() to abort the upload,
// and ensure the path is NOT available. (uploads are aborted)
assertCompleteAbort(stream.abort());
// the path should not exist
assertPathDoesNotExist("upload must not have completed", file);
} finally {
IOUtils.closeStream(stream);
// check the path doesn't exist "after" closing stream
assertPathDoesNotExist("upload must not have completed", file);
}
verifyStreamWasAborted(fs, stream);
// a second abort is a no-op
assertNoopAbort(stream.abort());
}
@Test
public void testAbortWhenOverwritingAFile() throws Throwable {
Path file = path(getMethodName());
S3AFileSystem fs = getFileSystem();
// write the original data
byte[] smallData = writeTextFile(fs, file, "original", true);
// now attempt a multipart upload
byte[] data = dataset(6 * _1MB, 'a', 'z' - 'a');
FSDataOutputStream stream = fs.create(file, true);
try {
ContractTestUtils.assertCapabilities(stream,
new String[]{ABORTABLE_STREAM},
null);
stream.write(data);
assertCompleteAbort(stream.abort());
verifyFileContents(fs, file, smallData);
} finally {
IOUtils.closeStream(stream);
}
verifyFileContents(fs, file, smallData);
verifyStreamWasAborted(fs, stream);
}
/**
* Check up on the IOStatistics of the FS and stream to verify that
* a stream was aborted -both in invocations of abort() and
* that the multipart upload itself was aborted.
* @param fs filesystem
* @param stream stream
*/
private void verifyStreamWasAborted(final S3AFileSystem fs,
final FSDataOutputStream stream) {
// check the stream
final IOStatistics iostats = stream.getIOStatistics();
final String sstr = ioStatisticsToPrettyString(iostats);
LOG.info("IOStatistics for stream: {}", sstr);
verifyStatisticCounterValue(iostats, INVOCATION_ABORT.getSymbol(), 1);
verifyStatisticCounterValue(iostats,
OBJECT_MULTIPART_UPLOAD_ABORTED.getSymbol(), 1);
// now the FS.
final IOStatistics fsIostats = fs.getIOStatistics();
assertThatStatisticCounter(fsIostats, INVOCATION_ABORT.getSymbol())
.isGreaterThanOrEqualTo(1);
}
}