ITestS3AContractBulkDelete.java
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.contract.s3a;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BulkDelete;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.AbstractContractBulkDeleteTest;
import org.apache.hadoop.fs.contract.AbstractFSContract;
import org.apache.hadoop.fs.s3a.Constants;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.S3ATestUtils;
import org.apache.hadoop.fs.statistics.MeanStatistic;
import static java.util.stream.Collectors.toList;
import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
import static org.apache.hadoop.fs.contract.ContractTestUtils.touch;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
import static org.apache.hadoop.fs.s3a.S3AUtils.propagateBucketOptions;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.lookupMeanStatistic;
import static org.apache.hadoop.fs.statistics.StoreStatisticNames.STORE_IO_RATE_LIMITED_DURATION;
import static org.apache.hadoop.fs.statistics.StoreStatisticNames.SUFFIX_MEAN;
import static org.apache.hadoop.io.wrappedio.WrappedIO.bulkDelete_delete;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
/**
* Contract tests for bulk delete operation for S3A Implementation.
*/
public class ITestS3AContractBulkDelete extends AbstractContractBulkDeleteTest {
private static final Logger LOG = LoggerFactory.getLogger(ITestS3AContractBulkDelete.class);
/**
* Delete Page size: {@value}.
* This is the default page size for bulk delete operation for this contract test.
* All the tests in this class should pass number of paths equal to or less than
* this page size during the bulk delete operation.
*/
private static final int DELETE_PAGE_SIZE = 20;
private boolean enableMultiObjectDelete;
public static Iterable<Object[]> enableMultiObjectDelete() {
return Arrays.asList(new Object[][]{
{true},
{false}
});
}
public void initITestS3AContractBulkDelete(boolean pEnableMultiObjectDelete) {
this.enableMultiObjectDelete = pEnableMultiObjectDelete;
}
@Override
protected Configuration createConfiguration() {
Configuration conf = super.createConfiguration();
S3ATestUtils.disableFilesystemCaching(conf);
conf = propagateBucketOptions(conf, getTestBucketName(conf));
if (enableMultiObjectDelete) {
// if multi-object delete is disabled, skip the test.
skipIfNotEnabled(conf, Constants.ENABLE_MULTI_DELETE,
"Bulk delete is explicitly disabled for this bucket");
}
S3ATestUtils.removeBaseAndBucketOverrides(conf,
Constants.BULK_DELETE_PAGE_SIZE);
conf.setInt(Constants.BULK_DELETE_PAGE_SIZE, DELETE_PAGE_SIZE);
conf.setBoolean(Constants.ENABLE_MULTI_DELETE, enableMultiObjectDelete);
return conf;
}
@Override
protected AbstractFSContract createContract(Configuration conf) {
return new S3AContract(createConfiguration());
}
@Override
protected int getExpectedPageSize() {
if (!enableMultiObjectDelete) {
// if multi-object delete is disabled, page size should be 1.
return 1;
}
return DELETE_PAGE_SIZE;
}
@Override
public void validatePageSize() throws Exception {
Assertions.assertThat(pageSize)
.describedAs("Page size should match the configured page size")
.isEqualTo(getExpectedPageSize());
}
@MethodSource("enableMultiObjectDelete")
@ParameterizedTest(name = "enableMultiObjectDelete = {0}")
public void testBulkDeleteZeroPageSizePrecondition(
boolean pEnableMultiObjectDelete) throws Exception {
initITestS3AContractBulkDelete(pEnableMultiObjectDelete);
if (!enableMultiObjectDelete) {
// if multi-object delete is disabled, skip this test as
// page size is always 1.
skip("Multi-object delete is disabled");
}
Configuration conf = getContract().getConf();
conf.setInt(Constants.BULK_DELETE_PAGE_SIZE, 0);
Path testPath = path(getMethodName());
try (S3AFileSystem fs = S3ATestUtils.createTestFileSystem(conf)) {
intercept(IllegalArgumentException.class,
() -> fs.createBulkDelete(testPath));
}
}
@MethodSource("enableMultiObjectDelete")
@ParameterizedTest(name = "enableMultiObjectDelete = {0}")
public void testPageSizeWhenMultiObjectsDisabled(
boolean pEnableMultiObjectDelete) throws Exception {
initITestS3AContractBulkDelete(pEnableMultiObjectDelete);
Configuration conf = getContract().getConf();
conf.setBoolean(Constants.ENABLE_MULTI_DELETE, false);
Path testPath = path(getMethodName());
try (S3AFileSystem fs = S3ATestUtils.createTestFileSystem(conf)) {
BulkDelete bulkDelete = fs.createBulkDelete(testPath);
Assertions.assertThat(bulkDelete.pageSize())
.describedAs("Page size should be 1 when multi-object delete is disabled")
.isEqualTo(1);
}
}
@Override
public void testDeletePathsDirectory() throws Exception {
List<Path> paths = new ArrayList<>();
Path dirPath = new Path(basePath, "dir");
fs.mkdirs(dirPath);
paths.add(dirPath);
Path filePath = new Path(dirPath, "file");
touch(fs, filePath);
if (enableMultiObjectDelete) {
// Adding more paths only if multi-object delete is enabled.
paths.add(filePath);
}
assertSuccessfulBulkDelete(bulkDelete_delete(getFileSystem(), basePath, paths));
// During the bulk delete operation, the directories are not deleted in S3A.
assertIsDirectory(dirPath);
}
@MethodSource("enableMultiObjectDelete")
@ParameterizedTest(name = "enableMultiObjectDelete = {0}")
public void testBulkDeleteParentDirectoryWithDirectories(
boolean pEnableMultiObjectDelete) throws Exception {
initITestS3AContractBulkDelete(pEnableMultiObjectDelete);
List<Path> paths = new ArrayList<>();
Path dirPath = new Path(basePath, "dir");
fs.mkdirs(dirPath);
Path subDir = new Path(dirPath, "subdir");
fs.mkdirs(subDir);
// adding parent directory to the list of paths.
paths.add(dirPath);
assertSuccessfulBulkDelete(bulkDelete_delete(getFileSystem(), basePath, paths));
// During the bulk delete operation, the directories are not deleted in S3A.
assertIsDirectory(dirPath);
assertIsDirectory(subDir);
}
public void testBulkDeleteParentDirectoryWithFiles() throws Exception {
List<Path> paths = new ArrayList<>();
Path dirPath = new Path(basePath, "dir");
fs.mkdirs(dirPath);
Path file = new Path(dirPath, "file");
touch(fs, file);
// adding parent directory to the list of paths.
paths.add(dirPath);
assertSuccessfulBulkDelete(bulkDelete_delete(getFileSystem(), basePath, paths));
// During the bulk delete operation,
// the directories are not deleted in S3A.
assertIsDirectory(dirPath);
}
@MethodSource("enableMultiObjectDelete")
@ParameterizedTest(name = "enableMultiObjectDelete = {0}")
public void testRateLimiting() throws Exception {
if (!enableMultiObjectDelete) {
skip("Multi-object delete is disabled so hard to trigger rate limiting");
}
Configuration conf = getContract().getConf();
conf.setInt(Constants.S3A_IO_RATE_LIMIT, 5);
Path basePath = path(getMethodName());
try (S3AFileSystem fs = S3ATestUtils.createTestFileSystem(conf)) {
createFiles(fs, basePath, 1, 20, 0);
FileStatus[] fileStatuses = fs.listStatus(basePath);
List<Path> paths = Arrays.stream(fileStatuses)
.map(FileStatus::getPath)
.collect(toList());
pageSizePreconditionForTest(paths.size());
BulkDelete bulkDelete = fs.createBulkDelete(basePath);
bulkDelete.bulkDelete(paths);
MeanStatistic meanStatisticBefore = lookupMeanStatistic(fs.getIOStatistics(),
STORE_IO_RATE_LIMITED_DURATION + SUFFIX_MEAN);
Assertions.assertThat(meanStatisticBefore.mean())
.describedAs("Rate limiting should not have happened during first delete call")
.isEqualTo(0.0);
bulkDelete.bulkDelete(paths);
bulkDelete.bulkDelete(paths);
bulkDelete.bulkDelete(paths);
MeanStatistic meanStatisticAfter = lookupMeanStatistic(fs.getIOStatistics(),
STORE_IO_RATE_LIMITED_DURATION + SUFFIX_MEAN);
Assertions.assertThat(meanStatisticAfter.mean())
.describedAs("Rate limiting should have happened during multiple delete calls")
.isGreaterThan(0.0);
}
}
}