ITestCreateSessionTimeout.java
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.performance;
import java.time.Duration;
import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.http.SdkHttpRequest;
import software.amazon.awssdk.http.auth.spi.signer.AsyncSignRequest;
import software.amazon.awssdk.http.auth.spi.signer.AsyncSignedRequest;
import software.amazon.awssdk.http.auth.spi.signer.HttpSigner;
import software.amazon.awssdk.http.auth.spi.signer.SignRequest;
import software.amazon.awssdk.http.auth.spi.signer.SignedRequest;
import software.amazon.awssdk.identity.spi.AwsCredentialsIdentity;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.AWSApiCallTimeoutException;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.auth.CustomHttpSigner;
import org.apache.hadoop.fs.s3a.impl.AWSClientConfig;
import org.apache.hadoop.util.DurationInfo;
import static org.apache.hadoop.fs.s3a.Constants.CUSTOM_SIGNERS;
import static org.apache.hadoop.fs.s3a.Constants.HTTP_SIGNER_CLASS_NAME;
import static org.apache.hadoop.fs.s3a.Constants.HTTP_SIGNER_ENABLED;
import static org.apache.hadoop.fs.s3a.Constants.REQUEST_TIMEOUT;
import static org.apache.hadoop.fs.s3a.Constants.RETRY_LIMIT;
import static org.apache.hadoop.fs.s3a.Constants.S3A_BUCKET_PROBE;
import static org.apache.hadoop.fs.s3a.Constants.S3EXPRESS_CREATE_SESSION;
import static org.apache.hadoop.fs.s3a.Constants.SIGNING_ALGORITHM_S3;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.disableFilesystemCaching;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfNotS3ExpressBucket;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
/**
* Test timeout of S3 Client CreateSession call, which was originally
* hard coded to 10 seconds.
* Only executed against an S3Express store.
*/
public class ITestCreateSessionTimeout extends AbstractS3ACostTest {
private static final Logger LOG =
LoggerFactory.getLogger(ITestCreateSessionTimeout.class);
/**
* What is the duration for the operation after which the test is considered
* to have failed because timeouts didn't get passed down?
*/
private static final long TIMEOUT_EXCEPTION_THRESHOLD = Duration.ofSeconds(5).toMillis();
/**
* How long to sleep in requests?
*/
private static final AtomicLong SLEEP_DURATION = new AtomicLong(
Duration.ofSeconds(20).toMillis());
/**
* Flag set if the sleep was interrupted during signing.
*/
private static final AtomicBoolean SLEEP_INTERRUPTED = new AtomicBoolean(false);
/**
* Create a configuration with a 10 millisecond timeout on API calls
* and a custom signer which sleeps much longer than that.
* @return the configuration.
*/
@Override
public Configuration createConfiguration() {
final Configuration conf = super.createConfiguration();
skipIfNotS3ExpressBucket(conf);
disableFilesystemCaching(conf);
removeBaseAndBucketOverrides(conf,
CUSTOM_SIGNERS,
HTTP_SIGNER_ENABLED,
REQUEST_TIMEOUT,
RETRY_LIMIT,
S3A_BUCKET_PROBE,
S3EXPRESS_CREATE_SESSION,
SIGNING_ALGORITHM_S3
);
conf.setBoolean(HTTP_SIGNER_ENABLED, true);
conf.setClass(HTTP_SIGNER_CLASS_NAME, SlowSigner.class, HttpSigner.class);
Duration duration = Duration.ofMillis(10);
conf.setLong(REQUEST_TIMEOUT, duration.toMillis());
conf.setInt(RETRY_LIMIT, 1);
return conf;
}
@Override
public void setup() throws Exception {
// remove the safety check on minimum durations.
AWSClientConfig.setMinimumOperationDuration(Duration.ZERO);
try {
super.setup();
} finally {
// restore the safety check on minimum durations.
AWSClientConfig.resetMinimumOperationDuration();
}
}
@Override
protected void deleteTestDirInTeardown() {
// no-op
}
/**
* Make this a no-op to avoid IO.
* @param path path path
*/
@Override
protected void mkdirs(Path path) {
}
@Test
public void testSlowSigningTriggersTimeout() throws Throwable {
final S3AFileSystem fs = getFileSystem();
DurationInfo call = new DurationInfo(LOG, true, "Create session");
final AWSApiCallTimeoutException thrown = intercept(AWSApiCallTimeoutException.class,
() -> fs.getFileStatus(path("testShortTimeout")));
call.finished();
LOG.info("Exception raised after {}", call, thrown);
// if the timeout took too long, fail with details and include the original
// exception
if (call.value() > TIMEOUT_EXCEPTION_THRESHOLD) {
throw new AssertionError("Duration of create session " + call.getDurationString()
+ " exceeds threshold " + TIMEOUT_EXCEPTION_THRESHOLD + " ms: " + thrown, thrown);
}
Assertions.assertThat(SLEEP_INTERRUPTED.get())
.describedAs("Sleep interrupted during signing")
.isTrue();
// now scan the inner exception stack for "createSession"
Arrays.stream(thrown.getCause().getStackTrace())
.filter(e -> e.getMethodName().equals("createSession"))
.findFirst()
.orElseThrow(() ->
new AssertionError("No createSession() in inner stack trace of", thrown));
}
/**
* Sleep for as long as {@link #SLEEP_DURATION} requires.
*/
private static void sleep() {
long sleep = SLEEP_DURATION.get();
if (sleep > 0) {
LOG.info("Sleeping for {} ms", sleep, new Exception());
try (DurationInfo d = new DurationInfo(LOG, true, "Sleep for %d ms", sleep)) {
Thread.sleep(sleep);
} catch (InterruptedException e) {
LOG.info("Interrupted", e);
SLEEP_INTERRUPTED.set(true);
Thread.currentThread().interrupt();
}
}
}
/**
* A signer which calls {@link #sleep()} before signing.
* As this signing takes place within the CreateSession Pipeline,
*/
public static class SlowSigner extends CustomHttpSigner {
@Override
public SignedRequest sign(
final SignRequest<? extends AwsCredentialsIdentity> request) {
final SdkHttpRequest httpRequest = request.request();
LOG.info("Signing request {}", httpRequest);
sleep();
return super.sign(request);
}
@Override
public CompletableFuture<AsyncSignedRequest> signAsync(
final AsyncSignRequest<? extends AwsCredentialsIdentity> request) {
sleep();
return super.signAsync(request);
}
}
}