ITestS3AFailureHandling.java
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a;
import software.amazon.awssdk.services.s3.model.ObjectIdentifier;
import software.amazon.awssdk.services.s3.model.S3Error;
import org.assertj.core.api.Assertions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.s3a.impl.MultiObjectDeleteException;
import org.apache.hadoop.fs.statistics.StoreStatisticNames;
import org.apache.hadoop.fs.store.audit.AuditSpan;
import org.junit.Test;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.nio.file.AccessDeniedException;
import java.util.stream.Collectors;
import static org.apache.hadoop.fs.contract.ContractTestUtils.*;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.createFiles;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.isBulkDeleteEnabled;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
import static org.apache.hadoop.fs.s3a.test.ExtraAssertions.failIf;
import static org.apache.hadoop.fs.s3a.test.PublicDatasetTestUtils.isUsingDefaultExternalDataFile;
import static org.apache.hadoop.fs.s3a.test.PublicDatasetTestUtils.requireDefaultExternalData;
import static org.apache.hadoop.test.LambdaTestUtils.*;
import static org.apache.hadoop.util.functional.RemoteIterators.mappingRemoteIterator;
import static org.apache.hadoop.util.functional.RemoteIterators.toList;
/**
* ITest for failure handling, primarily multipart deletion.
*/
public class ITestS3AFailureHandling extends AbstractS3ATestBase {
@Override
protected Configuration createConfiguration() {
Configuration conf = super.createConfiguration();
S3ATestUtils.disableFilesystemCaching(conf);
conf.setBoolean(Constants.ENABLE_MULTI_DELETE, true);
if (isUsingDefaultExternalDataFile(conf)) {
removeBaseAndBucketOverrides(conf, Constants.ENDPOINT);
}
return conf;
}
/**
* Assert that a read operation returned an EOF value.
* @param operation specific operation
* @param readResult result
*/
private void assertIsEOF(String operation, int readResult) {
assertEquals("Expected EOF from "+ operation
+ "; got char " + (char) readResult, -1, readResult);
}
@Test
public void testMultiObjectDeleteNoFile() throws Throwable {
describe("Deleting a missing object");
removeKeys(getFileSystem(), "ITestS3AFailureHandling/missingFile");
}
/**
* See HADOOP-18112.
*/
@Test
public void testMultiObjectDeleteLargeNumKeys() throws Exception {
S3AFileSystem fs = getFileSystem();
Path path = path("largeDir");
mkdirs(path);
final boolean bulkDeleteEnabled = isBulkDeleteEnabled(getFileSystem());
// with single object delete, only create a few files for a faster
// test run.
int filesToCreate;
int pages;
if (bulkDeleteEnabled) {
filesToCreate = 1005;
pages = 5;
} else {
filesToCreate = 250;
pages = 0;
}
createFiles(fs, path, 1, filesToCreate, 0);
RemoteIterator<LocatedFileStatus> locatedFileStatusRemoteIterator =
fs.listFiles(path, false);
List<String> keys = toList(mappingRemoteIterator(locatedFileStatusRemoteIterator,
locatedFileStatus -> fs.pathToKey(locatedFileStatus.getPath())));
// After implementation of paging during multi object deletion,
// no exception is encountered.
Long bulkDeleteReqBefore = getNumberOfBulkDeleteRequestsMadeTillNow(fs);
try (AuditSpan span = span()) {
fs.removeKeys(buildDeleteRequest(keys.toArray(new String[0])), false);
}
Long bulkDeleteReqAfter = getNumberOfBulkDeleteRequestsMadeTillNow(fs);
// number of delete requests is 5 as we have default page size of 250.
Assertions.assertThat(bulkDeleteReqAfter - bulkDeleteReqBefore)
.describedAs("Number of batched bulk delete requests")
.isEqualTo(pages);
}
private Long getNumberOfBulkDeleteRequestsMadeTillNow(S3AFileSystem fs) {
return fs.getIOStatistics().counters()
.get(StoreStatisticNames.OBJECT_BULK_DELETE_REQUEST);
}
private void removeKeys(S3AFileSystem fileSystem, String... keys)
throws IOException {
try (AuditSpan span = span()) {
fileSystem.removeKeys(buildDeleteRequest(keys), false);
}
}
private List<ObjectIdentifier> buildDeleteRequest(
final String[] keys) {
List<ObjectIdentifier> request = new ArrayList<>(
keys.length);
for (String key : keys) {
request.add(ObjectIdentifier.builder().key(key).build());
}
return request;
}
@Test
public void testMultiObjectDeleteSomeFiles() throws Throwable {
Path valid = path("ITestS3AFailureHandling/validFile");
touch(getFileSystem(), valid);
NanoTimer timer = new NanoTimer();
removeKeys(getFileSystem(), getFileSystem().pathToKey(valid),
"ITestS3AFailureHandling/missingFile");
timer.end("removeKeys");
}
/**
* Test low-level failure handling with low level delete request.
*/
@Test
public void testMultiObjectDeleteNoPermissions() throws Throwable {
describe("Delete the external file and expect it to fail");
Path path = requireDefaultExternalData(getConfiguration());
S3AFileSystem fs = (S3AFileSystem) path.getFileSystem(
getConfiguration());
// create a span, expect it to be activated.
fs.getAuditSpanSource().createSpan(StoreStatisticNames.OP_DELETE,
path.toString(), null);
List<ObjectIdentifier> keys
= buildDeleteRequest(
new String[]{
fs.pathToKey(path),
"missing-key.csv"
});
MultiObjectDeleteException ex = intercept(
MultiObjectDeleteException.class,
() -> fs.removeKeys(keys, false));
final List<Path> undeleted = ex.errors().stream()
.map(S3Error::key)
.map(fs::keyToQualifiedPath)
.collect(Collectors.toList());
final String undeletedFiles = undeleted.stream()
.map(Path::toString)
.collect(Collectors.joining(", "));
Assertions.assertThat(undeleted)
.describedAs("undeleted files")
.hasSize(2)
.contains(path);
}
/**
* Test low-level failure handling with a single-entry file.
* This is deleted as a single call, so isn't that useful.
*/
@Test
public void testSingleObjectDeleteNoPermissionsTranslated() throws Throwable {
describe("Delete the external file and expect it to fail");
Path path = requireDefaultExternalData(getConfiguration());
S3AFileSystem fs = (S3AFileSystem) path.getFileSystem(
getConfiguration());
AccessDeniedException aex = intercept(AccessDeniedException.class,
() -> fs.delete(path, false));
Throwable cause = aex.getCause();
failIf(cause == null, "no nested exception", aex);
}
}