ILoadTestSessionCredentials.java
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.auth.delegation;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.s3a.Constants;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.scale.NanoTimerStats;
import org.apache.hadoop.fs.s3a.scale.S3AScaleTestBase;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.assumeSessionTestsEnabled;
import static org.apache.hadoop.fs.s3a.auth.delegation.DelegationConstants.DELEGATION_TOKEN_BINDING;
import static org.apache.hadoop.fs.s3a.auth.delegation.DelegationConstants.DELEGATION_TOKEN_SESSION_BINDING;
/**
* This test has a unique name as it is designed to do something special:
* generate enough load on the AWS STS service to get some
* statistics on its throttling.
* This isn't documented anywhere, and for DT support it's
* important to know how much effort it takes to overload the service.
*
* <b>Important</b>
*
* If this test does trigger STS throttling, then all users in the same
* AWS account will experience throttling. This may be observable,
* in delays and, if the applications in use are not resilient to
* throttling events in STS, from application failures.
*
* Use with caution.
* <ol>
* <li>Don't run it on an AWS endpoint which other users in a
* shared AWS account are actively using. </li>
* <li>Don't run it on the same AWS account which is being used for
* any production service.</li>
* <li>And choose a time (weekend, etc) where the account is under-used.</li>
* <li>Warn your fellow users.</li>
* </ol>
*
* In experiments, the throttling recovers fast and appears restricted
* to the single STS service which the test overloads.
*
* @see <a href="https://github.com/steveloughran/datasets/releases/tag/tag_2018-09-17-aws">
* AWS STS login throttling statistics</a>
*/
public class ILoadTestSessionCredentials extends S3AScaleTestBase {
private static final Logger LOG =
LoggerFactory.getLogger(ILoadTestSessionCredentials.class);
protected static final int THREADS = 100;
private final ExecutorService executor =
HadoopExecutors.newFixedThreadPool(
THREADS,
new ThreadFactoryBuilder()
.setNameFormat("DelegationTokenFetcher #%d")
.build());
private final CompletionService<Outcome>
completionService =
new ExecutorCompletionService<>(executor);
private File dataDir;
@Override
protected Configuration createScaleConfiguration() {
Configuration conf = super.createScaleConfiguration();
conf.set(DELEGATION_TOKEN_BINDING,
getDelegationBinding());
conf.setInt(Constants.MAXIMUM_CONNECTIONS,
Math.max(THREADS, Constants.DEFAULT_MAXIMUM_CONNECTIONS));
conf.setInt(Constants.MAX_ERROR_RETRIES, 0);
return conf;
}
/**
* Which DT binding class to use.
* @return the binding config option.
*/
protected String getDelegationBinding() {
return DELEGATION_TOKEN_SESSION_BINDING;
}
@BeforeEach
@Override
public void setup() throws Exception {
super.setup();
assumeSessionTestsEnabled(getConfiguration());
S3AFileSystem fileSystem = getFileSystem();
assertNotNull(fileSystem.getCanonicalServiceName(),
"No delegation tokens in FS");
dataDir = GenericTestUtils.getTestDir("kerberos");
dataDir.mkdirs();
}
protected String getFilePrefix() {
return "session";
}
@Test
public void testCreate10Tokens() throws Throwable {
File file = fetchTokens(10);
String csv = FileUtils.readFileToString(file, StandardCharsets.UTF_8);
LOG.info("CSV data\n{}", csv);
}
@Test
public void testCreateManyTokens() throws Throwable {
fetchTokens(50000);
}
/**
* Fetch tokens.
* @param tokens number of tokens.
* @return file the timings were
* @throws Exception failure
*/
private File fetchTokens(final int tokens)
throws Exception {
File filename = new File(dataDir, getFilePrefix() + "-" + tokens + ".csv");
fetchTokens(tokens, filename);
return filename;
}
/**
* Fetch tokens.
* @param tokens number of tokens.
* @param csvFile file to save this to.
* @throws Exception failure
*/
private void fetchTokens(final int tokens, final File csvFile)
throws Exception {
describe("Fetching %d tokens, saving log to %s", tokens, csvFile);
final FileWriter out = new FileWriter(csvFile);
Csvout csvout = new Csvout(out, "\t", "\n");
Outcome.writeSchema(csvout);
final S3AFileSystem fileSystem = getFileSystem();
final ContractTestUtils.NanoTimer jobTimer =
new ContractTestUtils.NanoTimer();
for (int i = 0; i < tokens; i++) {
final int id = i;
completionService.submit(() -> {
final long startTime = System.currentTimeMillis();
final ContractTestUtils.NanoTimer timer =
new ContractTestUtils.NanoTimer();
Exception ex = null;
try {
fileSystem.getDelegationToken("Count ");
} catch (IOException e) {
ex = e;
}
timer.end("Request");
return new Outcome(id, startTime, timer, ex);
});
}
NanoTimerStats stats = new NanoTimerStats("Overall");
NanoTimerStats success = new NanoTimerStats("Successful");
NanoTimerStats throttled = new NanoTimerStats("Throttled");
List<Outcome> throttledEvents = new ArrayList<>();
for (int i = 0; i < tokens; i++) {
Outcome outcome = completionService.take().get();
ContractTestUtils.NanoTimer timer = outcome.timer;
Exception ex = outcome.exception;
outcome.writeln(csvout);
stats.add(timer);
if (ex != null) {
// throttling event occurred.
LOG.info("Throttled at event {}", i, ex);
throttled.add(timer);
throttledEvents.add(outcome);
} else {
success.add(timer);
}
}
csvout.close();
jobTimer.end("Execution of fetch calls");
// now print the stats
LOG.info("Summary file is " + csvFile);
LOG.info("Fetched {} tokens with {} throttle events\n: {}\n{}\n{}",
tokens,
throttled.getCount(),
stats,
throttled,
success);
double duration = jobTimer.duration();
double iops = tokens * 1.0e9 / duration;
LOG.info(
String.format("Effective IO rate is %3f operations/second", iops));
// log at debug
if (LOG.isDebugEnabled()) {
throttledEvents.stream().forEach((outcome -> {
LOG.debug("{}: duration: {}",
outcome.id, outcome.timer.elapsedTimeMs());
}));
}
}
/**
* Outcome of one of the load operations.
*/
private static class Outcome {
private final int id;
private final long startTime;
private final ContractTestUtils.NanoTimer timer;
private final Exception exception;
Outcome(final int id,
final long startTime,
final ContractTestUtils.NanoTimer timer,
final Exception exception) {
this.id = id;
this.startTime = startTime;
this.timer = timer;
this.exception = exception;
}
/**
* Write this record.
* @param out the csvout to write through.
* @return the csvout instance
* @throws IOException IO failure.
*/
public Csvout writeln(Csvout out) throws IOException {
return out.write(
id,
startTime,
exception == null ? 1: 0,
timer.getStartTime(),
timer.getEndTime(),
timer.duration(),
'"' + (exception == null ? "" : exception.getMessage()) + '"')
.newline();
}
/**
* Write the schema of the outcome records.
* @param out CSV destinatin
* @throws IOException IO failure.
*/
public static void writeSchema(Csvout out) throws IOException {
out.write("id", "starttime", "success", "started", "ended",
"duration", "error");
}
}
}