ILoadTestS3ABulkDeleteThrottling.java
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.scale;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import software.amazon.awssdk.services.s3.model.ObjectIdentifier;
import org.apache.hadoop.util.Preconditions;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.TestMethodOrder;
import org.junit.jupiter.api.MethodOrderer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.store.audit.AuditSpan;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.S3ATestUtils;
import org.apache.hadoop.fs.s3a.auth.delegation.Csvout;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import static org.apache.hadoop.fs.s3a.Constants.EXPERIMENTAL_AWS_INTERNAL_THROTTLING;
import static org.apache.hadoop.fs.s3a.Constants.BULK_DELETE_PAGE_SIZE;
import static org.apache.hadoop.fs.s3a.Constants.BULK_DELETE_PAGE_SIZE_DEFAULT;
import static org.apache.hadoop.fs.s3a.Constants.ENABLE_MULTI_DELETE;
import static org.apache.hadoop.fs.s3a.Constants.USER_AGENT_PREFIX;
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.MAX_ENTRIES_TO_DELETE;
import static org.junit.jupiter.api.Assumptions.assumeTrue;
/**
* Test some scalable operations related to file renaming and deletion.
* Much of the setup code is lifted from ILoadTestSessionCredentials;
* whereas that was designed to overload an STS endpoint, this just
* tries to overload a single S3 shard with too many bulk IO requests
* -and so see what happens.
* Note: UA field includes the configuration tested for the benefit
* of anyone looking through the server logs.
*/
@TestMethodOrder(MethodOrderer.Alphanumeric.class)
public class ILoadTestS3ABulkDeleteThrottling extends S3AScaleTestBase {
private static final Logger LOG =
LoggerFactory.getLogger(ILoadTestS3ABulkDeleteThrottling.class);
protected static final int THREADS = 20;
public static final int TOTAL_KEYS = 25000;
public static final int SMALL = BULK_DELETE_PAGE_SIZE_DEFAULT;
public static final int SMALL_REQS = TOTAL_KEYS / SMALL;
public static final int MAXIMUM = MAX_ENTRIES_TO_DELETE;
public static final int MAXIMUM_REQS = TOTAL_KEYS / MAXIMUM;
// shared across test cases.
@SuppressWarnings("StaticNonFinalField")
private static boolean testWasThrottled;
private final ExecutorService executor =
HadoopExecutors.newFixedThreadPool(
THREADS,
new ThreadFactoryBuilder()
.setNameFormat("#%d")
.build());
private final CompletionService<Outcome>
completionService = new ExecutorCompletionService<>(executor);
private File dataDir;
private boolean throttle;
private int pageSize;
private int requests;
/**
* Test array for parameterized test runs.
* <ul>
* <li>AWS client throttle on/off</li>
* <li>Page size</li>
* </ul>
*
* @return a list of parameter tuples.
*/
public static Collection<Object[]> params() {
return Arrays.asList(new Object[][]{
{false, SMALL, SMALL_REQS},
{false, MAXIMUM, MAXIMUM_REQS},
{true, SMALL, SMALL_REQS},
{true, MAXIMUM, MAXIMUM_REQS},
});
}
/**
* Parameterized constructor.
* @param pThrottle AWS client throttle on/off
* @param pPageSize Page size
* @param pRequests request count;
*/
public void initILoadTestS3ABulkDeleteThrottling(
final boolean pThrottle,
final int pPageSize,
final int pRequests) throws Exception {
this.throttle = pThrottle;
Preconditions.checkArgument(pageSize > 0,
"page size too low %s", pageSize);
this.pageSize = pPageSize;
this.requests = pRequests;
setup();
}
@Override
protected Configuration createScaleConfiguration() {
Configuration conf = super.createScaleConfiguration();
S3ATestUtils.removeBaseAndBucketOverrides(conf,
EXPERIMENTAL_AWS_INTERNAL_THROTTLING,
BULK_DELETE_PAGE_SIZE,
USER_AGENT_PREFIX,
ENABLE_MULTI_DELETE);
conf.setBoolean(EXPERIMENTAL_AWS_INTERNAL_THROTTLING, throttle);
conf.setInt(BULK_DELETE_PAGE_SIZE, pageSize);
conf.set(USER_AGENT_PREFIX,
String.format("ILoadTestS3ABulkDeleteThrottling-%s-%04d",
throttle, pageSize));
S3ATestUtils.disableFilesystemCaching(conf);
return conf;
}
@Override
public void setup() throws Exception {
final Configuration conf = getConf();
super.setup();
assumeTrue(conf.getBoolean(ENABLE_MULTI_DELETE, true),
"multipart delete disabled");
dataDir = GenericTestUtils.getTestDir("throttling");
dataDir.mkdirs();
final String size = getFileSystem().getConf().get(BULK_DELETE_PAGE_SIZE);
Assertions.assertThat(size)
.describedAs("page size")
.isNotEmpty();
Assertions.assertThat(getFileSystem().getConf()
.getInt(BULK_DELETE_PAGE_SIZE, -1))
.isEqualTo(pageSize);
}
@MethodSource("params")
@ParameterizedTest(name = "bulk-delete-aws-retry={0}-requests={2}-size={1}")
public void test_010_Reset(final boolean pThrottle,
final int pPageSize, final int pRequests) throws Throwable {
initILoadTestS3ABulkDeleteThrottling(pThrottle, pPageSize, pRequests);
testWasThrottled = false;
}
@MethodSource("params")
@ParameterizedTest(name = "bulk-delete-aws-retry={0}-requests={2}-size={1}")
public void test_020_DeleteThrottling(final boolean pThrottle,
final int pPageSize, final int pRequests) throws Throwable {
initILoadTestS3ABulkDeleteThrottling(pThrottle, pPageSize, pRequests);
describe("test how S3 reacts to massive multipart deletion requests");
final File results = deleteFiles(requests, pageSize);
LOG.info("Test run completed against {}:\n see {}", getFileSystem(),
results);
if (testWasThrottled) {
LOG.warn("Test was throttled");
} else {
LOG.info("No throttling recorded in filesystem");
}
}
@MethodSource("params")
@ParameterizedTest(name = "bulk-delete-aws-retry={0}-requests={2}-size={1}")
public void test_030_Sleep(final boolean pThrottle,
final int pPageSize, final int pRequests) throws Throwable {
initILoadTestS3ABulkDeleteThrottling(pThrottle, pPageSize, pRequests);
maybeSleep();
}
private void maybeSleep() throws InterruptedException, IOException {
if (testWasThrottled) {
LOG.info("Sleeping briefly to let store recover");
Thread.sleep(30_000);
getFileSystem().delete(path("recovery"), true);
testWasThrottled = false;
}
}
/**
* delete files.
* @param requestCount number of requests.
* @throws Exception failure
* @return CSV filename
*/
private File deleteFiles(final int requestCount,
final int entries)
throws Exception {
File csvFile = new File(dataDir,
String.format("delete-%03d-%04d-%s.csv",
requestCount, entries, throttle));
describe("Issuing %d requests of size %d, saving log to %s",
requestCount, entries, csvFile);
Path basePath = path("testDeleteObjectThrottling");
final S3AFileSystem fs = getFileSystem();
final String base = fs.pathToKey(basePath);
final List<ObjectIdentifier> fileList
= buildDeleteRequest(base, entries);
final FileWriter out = new FileWriter(csvFile);
Csvout csvout = new Csvout(out, "\t", "\n");
Outcome.writeSchema(csvout);
final ContractTestUtils.NanoTimer jobTimer =
new ContractTestUtils.NanoTimer();
for (int i = 0; i < requestCount; i++) {
final int id = i;
completionService.submit(() -> {
final long startTime = System.currentTimeMillis();
Thread.currentThread().setName("#" + id);
LOG.info("Issuing request {}", id);
final ContractTestUtils.NanoTimer timer =
new ContractTestUtils.NanoTimer();
Exception ex = null;
try (AuditSpan span = span()) {
fs.removeKeys(fileList, false);
} catch (IOException e) {
ex = e;
}
timer.end("Request " + id);
return new Outcome(id, startTime, timer,
ex);
});
}
NanoTimerStats stats = new NanoTimerStats("Overall");
NanoTimerStats success = new NanoTimerStats("Successful");
NanoTimerStats throttled = new NanoTimerStats("Throttled");
List<Outcome> throttledEvents = new ArrayList<>();
for (int i = 0; i < requestCount; i++) {
Outcome outcome = completionService.take().get();
ContractTestUtils.NanoTimer timer = outcome.timer;
Exception ex = outcome.exception;
outcome.writeln(csvout);
stats.add(timer);
if (ex != null) {
// throttling event occurred.
LOG.info("Throttled at event {}", i, ex);
throttled.add(timer);
throttledEvents.add(outcome);
} else {
success.add(timer);
}
}
csvout.close();
jobTimer.end("Execution of operations");
// now print the stats
LOG.info("Summary file is " + csvFile);
LOG.info("Made {} requests with {} throttle events\n: {}\n{}\n{}",
requestCount,
throttled.getCount(),
stats,
throttled,
success);
double duration = jobTimer.duration();
double iops = requestCount * entries * 1.0e9 / duration;
LOG.info(String.format("TPS %3f operations/second",
iops));
// log at debug
if (LOG.isDebugEnabled()) {
throttledEvents.forEach((outcome -> {
LOG.debug("{}: duration: {}",
outcome.id, outcome.timer.elapsedTimeMs());
}));
}
return csvFile;
}
private List<ObjectIdentifier> buildDeleteRequest(
String base, int count) {
List<ObjectIdentifier> request = new ArrayList<>(count);
for (int i = 0; i < count; i++) {
request.add(ObjectIdentifier.builder().key(
String.format("%s/file-%04d", base, i)).build());
}
return request;
}
/**
* Outcome of one of the load operations.
*/
private static class Outcome {
private final int id;
private final long startTime;
private final ContractTestUtils.NanoTimer timer;
private final Exception exception;
Outcome(final int id,
final long startTime,
final ContractTestUtils.NanoTimer timer,
final Exception exception) {
this.id = id;
this.startTime = startTime;
this.timer = timer;
this.exception = exception;
}
/**
* Write this record.
* @param out the csvout to write through.
* @return the csvout instance
* @throws IOException IO failure.
*/
public Csvout writeln(Csvout out) throws IOException {
return out.write(
id,
startTime,
exception == null ? 1 : 0,
timer.getStartTime(),
timer.getEndTime(),
timer.duration(),
'"' + (exception == null ? "" : exception.getMessage()) + '"')
.newline();
}
/**
* Write the schema of the outcome records.
* @param out CSV destinatin
* @throws IOException IO failure.
*/
public static void writeSchema(Csvout out) throws IOException {
out.write("id", "starttime", "success", "started", "ended",
"duration", "error").newline();
}
}
}