TestInvoker.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.fs.s3a;

import java.io.EOFException;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.SocketTimeoutException;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;

import software.amazon.awssdk.awscore.exception.AwsServiceException;
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.services.s3.model.S3Exception;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.net.ConnectTimeoutException;
import org.apache.hadoop.test.HadoopTestBase;

import static org.apache.hadoop.fs.s3a.Constants.*;
import static org.apache.hadoop.fs.s3a.Invoker.*;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.verifyExceptionClass;
import static org.apache.hadoop.fs.s3a.S3AUtils.*;
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.SC_400_BAD_REQUEST;
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.SC_500_INTERNAL_SERVER_ERROR;
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.SC_501_NOT_IMPLEMENTED;
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.SC_503_SERVICE_UNAVAILABLE;
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.SC_504_GATEWAY_TIMEOUT;
import static org.apache.hadoop.test.LambdaTestUtils.*;
import static org.assertj.core.api.Assertions.assertThat;

/**
 * Test the {@link Invoker} code and the associated {@link S3ARetryPolicy}.
 * <p>
 * Some of the tests look at how Connection Timeout Exceptions are processed.
 * Because of how the AWS libraries shade the classes, there have been some
 * regressions here during development. These tests are intended to verify that
 * the current match process based on classname works.
 * <p>
 * 500 errors may or may not be considered retriable; these tests validate
 * both configurations with different retry policies for each.
 */
public class TestInvoker extends HadoopTestBase {

  /** Configuration to use for short retry intervals. */
  private static final Configuration FAST_RETRY_CONF;
  private static final ConnectTimeoutException
      HADOOP_CONNECTION_TIMEOUT_EX = new ConnectTimeoutException("hadoop");
  private static final Local.ConnectTimeoutException
      LOCAL_CONNECTION_TIMEOUT_EX
      = new Local.ConnectTimeoutException("local");
  private static final org.apache.http.conn.ConnectTimeoutException
      HTTP_CONNECTION_TIMEOUT_EX
      = new org.apache.http.conn.ConnectTimeoutException("apache");
  private static final SocketTimeoutException SOCKET_TIMEOUT_EX
      = new SocketTimeoutException("socket");

  /**
   * What retry limit to use.
   */
  private static final int ACTIVE_RETRY_LIMIT = RETRY_LIMIT_DEFAULT;

  /**
   * A retry count guaranteed to be out of range.
   */
  private static final int RETRIES_TOO_MANY = ACTIVE_RETRY_LIMIT + 10;

  /**
   * A count of retry attempts guaranteed to be within the permitted range.
   */
  public static final int SAFE_RETRY_COUNT = 5;

  public static final String INTERNAL_ERROR_PLEASE_TRY_AGAIN =
      "We encountered an internal error. Please try again";

  /**
   * Retry configuration derived from {@link #FAST_RETRY_CONF} with 500 errors
   * never retried.
   */
  public static final Configuration RETRY_EXCEPT_500_ERRORS;

  static {
    FAST_RETRY_CONF = new Configuration();
    String interval = "10ms";
    FAST_RETRY_CONF.set(RETRY_INTERVAL, interval);
    FAST_RETRY_CONF.set(RETRY_THROTTLE_INTERVAL, interval);
    FAST_RETRY_CONF.setInt(RETRY_LIMIT, ACTIVE_RETRY_LIMIT);
    FAST_RETRY_CONF.setInt(RETRY_THROTTLE_LIMIT, ACTIVE_RETRY_LIMIT);
    FAST_RETRY_CONF.setBoolean(RETRY_HTTP_5XX_ERRORS, DEFAULT_RETRY_HTTP_5XX_ERRORS);
    RETRY_EXCEPT_500_ERRORS = new Configuration(FAST_RETRY_CONF);
    RETRY_EXCEPT_500_ERRORS.setBoolean(RETRY_HTTP_5XX_ERRORS, false);
  }

  /**
   * Retry policy with 500 error retry the default.
   */
  private static final S3ARetryPolicy RETRY_POLICY =
      new S3ARetryPolicy(FAST_RETRY_CONF);

  /**
   * Retry policyd with 500 errors never retried.
   */
  private static final S3ARetryPolicy RETRY_POLICY_NO_500_ERRORS =
      new S3ARetryPolicy(RETRY_EXCEPT_500_ERRORS);


  /**
   * Count of retries performed when invoking an operation which
   * failed.
   */
  private int retryCount;

  /**
   * Retry handler which increments {@link #retryCount}.
   */
  private final Retried retryHandler = (text, e, retries, idempotent) -> retryCount++;

  private final Invoker invoker = new Invoker(RETRY_POLICY, retryHandler);

  /**
   * AWS SDK exception wrapping a ConnectTimeoutException.
   */
  private static final SdkException CLIENT_TIMEOUT_EXCEPTION =
      SdkException.builder()
          .cause(new Local.ConnectTimeoutException("timeout"))
          .build();

  /**
   * AWS SDK 400 Bad Request exception.
   */
  private static final AwsServiceException BAD_REQUEST = serviceException(
      SC_400_BAD_REQUEST,
      "bad request");

  @BeforeEach
  public void setup() {
    resetCounters();
  }

  private static AwsServiceException serviceException(int code,
      String text) {
    return AwsServiceException.builder()
        .message(text)
        .statusCode(code)
        .build();
  }

  private static S3Exception createS3Exception(int code) {
    return createS3Exception(code, "", null);
  }

  private static S3Exception createS3Exception(int code,
      String message,
      Throwable inner) {
    return (S3Exception) S3Exception.builder()
        .message(message)
        .statusCode(code)
        .cause(inner)
        .build();
  }

  protected <E extends Throwable> void verifyTranslated(
      int status,
      Class<E> expected) throws Exception {
    verifyTranslated(expected, createS3Exception(status));
  }

  private static <E extends Throwable> E verifyTranslated(Class<E> clazz,
      SdkException exception) throws Exception {
    return verifyExceptionClass(clazz,
        translateException("test", "/", exception));
  }

  /**
   * jReset the retry count.
   */
  private void resetCounters() {
    retryCount = 0;
  }

  @Test
  public void test503isThrottled() throws Exception {
    verifyTranslated(SC_503_SERVICE_UNAVAILABLE, AWSServiceThrottledException.class);
  }

  @Test
  public void testS3500isStatus500Exception() throws Exception {
    verifyTranslated(SC_500_INTERNAL_SERVER_ERROR, AWSStatus500Exception.class);
  }

  /**
   * 500 error handling with the default options: the responses
   * trigger retry.
   */
  @Test
  public void test500ResponseHandling() throws Exception {

    // create a 500 SDK Exception;
    AwsServiceException ex = awsException(SC_500_INTERNAL_SERVER_ERROR,
        INTERNAL_ERROR_PLEASE_TRY_AGAIN);

    // translate this to a Hadoop IOE.
    AWSStatus500Exception ex500 =
        verifyTranslated(AWSStatus500Exception.class, ex);

    // the status code is preserved
    assertThat(ex500.statusCode())
        .describedAs("status code of %s", ex)
        .isEqualTo(SC_500_INTERNAL_SERVER_ERROR);

    // the default retry policies reject this and fail
    assertRetryAction("Expected retry on 500 error",
        RETRY_POLICY, RetryPolicy.RetryAction.RETRY,
        ex, 0, true);

    assertThat(invoker.getRetryPolicy()
        .shouldRetry(ex500, 1, 0, false).action)
        .describedAs("should retry %s", ex500)
        .isEqualTo(RetryPolicy.RetryAction.RETRY.action);
  }

  /**
   * Validate behavior on 500 errors when retry is disabled.
   */
  @Test
  public void test500ResponseHandlingRetryDisabled() throws Exception {
    // create a 500 SDK Exception;
    AwsServiceException ex = awsException(SC_500_INTERNAL_SERVER_ERROR,
        INTERNAL_ERROR_PLEASE_TRY_AGAIN);

    // translate this to a Hadoop IOE.
    AWSStatus500Exception ex500 =
        verifyTranslated(AWSStatus500Exception.class, ex);

    // the no 500 retry policies reject this and fail
    final Invoker failingInvoker = new Invoker(RETRY_POLICY_NO_500_ERRORS, retryHandler);
    assertRetryAction("Expected failure first throttle",
        RETRY_POLICY_NO_500_ERRORS, RetryPolicy.RetryAction.FAIL,
        ex, 0, true);
    assertThat(failingInvoker.getRetryPolicy()
        .shouldRetry(ex500, 1, 0, false).action)
        .describedAs("should retry %s", ex500)
        .isEqualTo(RetryPolicy.RetryAction.FAIL.action);
  }
  /**
   * A 501 error is never retried.
   */
  @Test
  public void test501UnsupportedFeatureNoRetry() throws Throwable {

    AwsServiceException ex = awsException(501,
        "501 We encountered an internal error. Please try again");
    final AWSUnsupportedFeatureException ex501 =
        intercept(AWSUnsupportedFeatureException.class, "501", () ->
            invoker.retry("ex", null, true, () -> {
              throw ex;
            }));
    assertThat(ex501.statusCode())
        .describedAs("status code of %s", ex)
        .isEqualTo(501);
    assertThat(retryCount)
        .describedAs("retry count")
        .isEqualTo(0);
  }

  /**
   * Construct an S3Exception.
   * @param statusCode status code
   * @param message message
   * @return the exception
   */
  private static AwsServiceException awsException(final int statusCode, final String message) {
    return S3Exception.builder()
        .statusCode(statusCode)
        .message(message)
        .requestId("reqID")
        .extendedRequestId("extreqID")
        .build();
  }

  /**
   * Assert expected retry actions on 5xx responses when 5xx errors are disabled.
   */
  @Test
  public void test5xxRetriesDisabled() throws Throwable {
    final S3ARetryPolicy policy = RETRY_POLICY_NO_500_ERRORS;
    assertRetryAction("500", policy, RetryPolicy.RetryAction.FAIL,
        awsException(SC_500_INTERNAL_SERVER_ERROR, INTERNAL_ERROR_PLEASE_TRY_AGAIN), 1, true);
    assertRetryAction("501", policy, RetryPolicy.RetryAction.FAIL,
        awsException(SC_501_NOT_IMPLEMENTED, INTERNAL_ERROR_PLEASE_TRY_AGAIN), 1, true);
    assertRetryAction("510", policy, RetryPolicy.RetryAction.FAIL,
        awsException(510, INTERNAL_ERROR_PLEASE_TRY_AGAIN), 1, true);
    assertRetryAction("gateway", policy, RetryPolicy.RetryAction.RETRY,
        awsException(SC_504_GATEWAY_TIMEOUT, "gateway"), 1, true);
  }

  /**
   * Various 5xx exceptions when 5xx errors are enabled.
   */
  @Test
  public void test5xxRetriesEnabled() throws Throwable {
    final Configuration conf = new Configuration(FAST_RETRY_CONF);
    conf.setBoolean(RETRY_HTTP_5XX_ERRORS, true);
    final S3ARetryPolicy policy = new S3ARetryPolicy(conf);
    assertRetryAction("500", policy, RetryPolicy.RetryAction.RETRY,
        awsException(SC_500_INTERNAL_SERVER_ERROR, INTERNAL_ERROR_PLEASE_TRY_AGAIN), 1, true);
    assertRetryAction("501", policy, RetryPolicy.RetryAction.FAIL,
        awsException(SC_501_NOT_IMPLEMENTED, INTERNAL_ERROR_PLEASE_TRY_AGAIN), 1, true);
    assertRetryAction("510", policy, RetryPolicy.RetryAction.RETRY,
        awsException(510, INTERNAL_ERROR_PLEASE_TRY_AGAIN), 1, true);
    assertRetryAction("gateway", policy, RetryPolicy.RetryAction.RETRY,
        awsException(SC_504_GATEWAY_TIMEOUT, "gateway"), 1, true);
  }

  @Test
  public void testExceptionsWithTranslatableMessage() throws Exception {
    SdkException xmlParsing = SdkException.builder()
        .message(EOF_MESSAGE_IN_XML_PARSER)
        .build();
    SdkException differentLength = SdkException.builder()
        .message(EOF_READ_DIFFERENT_LENGTH)
        .build();

    verifyTranslated(EOFException.class, xmlParsing);
    verifyTranslated(EOFException.class, differentLength);
  }


  @Test
  public void testSdkDifferentLengthExceptionIsTranslatable() throws Throwable {
    final AtomicInteger counter = new AtomicInteger(0);
    invoker.retry("test", null, false, () -> {
      if (counter.incrementAndGet() < ACTIVE_RETRY_LIMIT) {
        throw SdkClientException.builder()
            .message(EOF_READ_DIFFERENT_LENGTH)
            .build();
      }
    });

    assertEquals(ACTIVE_RETRY_LIMIT, counter.get());
  }

  @Test
  public void testSdkXmlParsingExceptionIsTranslatable() throws Throwable {
    final AtomicInteger counter = new AtomicInteger(0);
    invoker.retry("test", null, false, () -> {
      if (counter.incrementAndGet() < ACTIVE_RETRY_LIMIT) {
        throw SdkClientException.builder()
            .message(EOF_MESSAGE_IN_XML_PARSER)
            .build();
      }
    });

    assertEquals(ACTIVE_RETRY_LIMIT, counter.get());
  }

  @Test
  public void testExtractConnectTimeoutException() throws Throwable {
    assertThrows(org.apache.hadoop.net.ConnectTimeoutException.class, () -> {
      throw extractException("", "", new ExecutionException(
        SdkException.builder().cause(LOCAL_CONNECTION_TIMEOUT_EX).build()));
    });
  }

  @Test
  public void testExtractSocketTimeoutException() throws Throwable {
    assertThrows(SocketTimeoutException.class, () -> {
      throw extractException("", "",
        new ExecutionException(
            SdkException.builder()
            .cause(SOCKET_TIMEOUT_EX)
            .build()));
    });
  }

  @Test
  public void testExtractConnectTimeoutExceptionFromCompletionException() throws Throwable {
    assertThrows(org.apache.hadoop.net.ConnectTimeoutException.class, () -> {
      throw extractException("", "",
        new CompletionException(
          SdkException.builder()
          .cause(LOCAL_CONNECTION_TIMEOUT_EX)
          .build()));
    });
  }

  @Test
  public void testExtractSocketTimeoutExceptionFromCompletionException() throws Throwable {
    assertThrows(SocketTimeoutException.class, () -> {
      throw extractException("", "",
        new CompletionException(
            SdkException.builder()
            .cause(SOCKET_TIMEOUT_EX)
            .build()));
    });
  }

  /**
   * Make an assertion about a retry policy.
   * @param text text for error message
   * @param policy policy to check against
   * @param expected expected action
   * @param ex exception to analyze
   * @param retries number of times there has already been a retry
   * @param idempotent whether or not the operation is idempotent
   * @throws Exception if the retry policy raises it
   * @throws AssertionError if the returned action was not that expected.
   */
  private void assertRetryAction(String text,
      RetryPolicy policy,
      RetryPolicy.RetryAction expected,
      Exception ex,
      int retries,
      boolean idempotent) throws Exception {
    RetryPolicy.RetryAction outcome = policy.shouldRetry(ex, retries, 0,
        idempotent);
    assertThat(outcome.action)
        .describedAs("%s Expected action %s from shouldRetry(%s, %s, %s)",
                      text, expected, ex.toString(), retries, idempotent)
        .isEqualTo(expected.action);
  }

  @Test
  public void testRetryThrottled() throws Throwable {
    S3ARetryPolicy policy = RETRY_POLICY;
    IOException ex = translateException("GET", "/", newThrottledException());

    assertRetryAction("Expected retry on first throttle",
        policy, RetryPolicy.RetryAction.RETRY,
        ex, 0, true);
    int retries = SAFE_RETRY_COUNT;
    assertRetryAction("Expected retry on repeated throttle",
        policy, RetryPolicy.RetryAction.RETRY,
        ex, retries, true);
    assertRetryAction("Expected retry on non-idempotent throttle",
        policy, RetryPolicy.RetryAction.RETRY,
        ex, retries, false);
  }

  protected AwsServiceException newThrottledException() {
    return serviceException(
        AWSServiceThrottledException.STATUS_CODE, "throttled");
  }

  /**
   * Repeatedly retry until a throttle eventually stops being raised.
   */
  @Test
  public void testRetryOnThrottle() throws Throwable {

    final AtomicInteger counter = new AtomicInteger(0);
    invoker.retry("test", null, false,
        () -> {
          if (counter.incrementAndGet() < 5) {
            throw newThrottledException();
          }
        });
  }

  /**
   * Non-idempotent operations fail on anything which isn't a throttle
   * or connectivity problem.
   */
  @Test
  public void testNoRetryOfBadRequestNonIdempotent() throws Throwable {
    assertThrows(AWSBadRequestException.class, () -> {
      invoker.retry("test", null, false,
          () -> {
          throw serviceException(400, "bad request");
        });
    });
  }

  /**
   * AWS nested socket problems.
   */
  @Test
  public void testRetryAWSConnectivity() throws Throwable {
    final AtomicInteger counter = new AtomicInteger(0);
    invoker.retry("test", null, false,
        () -> {
          if (counter.incrementAndGet() < ACTIVE_RETRY_LIMIT) {
            throw CLIENT_TIMEOUT_EXCEPTION;
          }
        });
    assertEquals(ACTIVE_RETRY_LIMIT, counter.get());
  }

  /**
   * Repeatedly retry until eventually a bad request succeeds.
   */
  @Test
  public void testRetryBadRequestNotIdempotent() throws Throwable {
    assertThrows(AWSBadRequestException.class, () -> {
      invoker.retry("test", null, false,
          () -> {
          throw BAD_REQUEST;
        });
    });
  }

  @Test
  public void testConnectionRetryPolicyIdempotent() throws Throwable {
    assertRetryAction("Expected retry on connection timeout",
        RETRY_POLICY, RetryPolicy.RetryAction.RETRY,
        HADOOP_CONNECTION_TIMEOUT_EX, 1, true);
    assertRetryAction("Expected connection timeout failure",
        RETRY_POLICY, RetryPolicy.RetryAction.FAIL,
        HADOOP_CONNECTION_TIMEOUT_EX, RETRIES_TOO_MANY, true);
  }

  /**
   * Even on a non-idempotent call, connection failures are considered
   * retryable.
   */
  @Test
  public void testConnectionRetryPolicyNonIdempotent() throws Throwable {
    assertRetryAction("Expected retry on connection timeout",
        RETRY_POLICY, RetryPolicy.RetryAction.RETRY,
        HADOOP_CONNECTION_TIMEOUT_EX, 1, false);
  }

  /**
   * Interrupted IOEs are not retryable.
   */
  @Test
  public void testInterruptedIOExceptionRetry() throws Throwable {
    assertRetryAction("Expected retry on connection timeout",
        RETRY_POLICY, RetryPolicy.RetryAction.FAIL,
        new InterruptedIOException("interrupted"), 1, false);
  }

  @Test
  public void testUnshadedConnectionTimeoutExceptionMatching()
      throws Throwable {
    // connection timeout exceptions are special, but as AWS shades
    // theirs, we need to string match them
    verifyTranslated(ConnectTimeoutException.class,
        SdkException.builder()
            .cause(HTTP_CONNECTION_TIMEOUT_EX)
            .build());
  }

  @Test
  public void testShadedConnectionTimeoutExceptionMatching() throws Throwable {
    // connection timeout exceptions are special, but as AWS shades
    // theirs, we need to string match them
    verifyTranslated(ConnectTimeoutException.class,
        SdkException.builder()
            .cause(LOCAL_CONNECTION_TIMEOUT_EX)
            .build());
  }

  @Test
  public void testShadedConnectionTimeoutExceptionNotMatching()
      throws Throwable {
    InterruptedIOException ex = verifyTranslated(InterruptedIOException.class,
        SdkException.builder()
            .cause(new Local.NotAConnectTimeoutException())
            .build());
    if (ex instanceof ConnectTimeoutException) {
      throw ex;
    }
  }

  /**
   * Test that NPEs aren't retried. Also verify that the
   * catch counts are incremented, while the retry count isn't.
   */
  @Test
  public void testNPEsNotRetried() throws Throwable {
    assertRetryAction("Expected NPE trigger failure",
        RETRY_POLICY, RetryPolicy.RetryAction.FAIL,
        new NullPointerException("oops"), 1, true);
    // catch notification didn't see it
    assertEquals(0, retryCount, "retry count ");
  }

  /**
   * Container for the local exceptions, to help keep visible which
   * specific class of exception.
   */
  private static class Local {

    /**
     * A local exception with a name to match the expected one.
     */
    private static class ConnectTimeoutException
        extends InterruptedIOException {
      ConnectTimeoutException(String s) {
        super(s);
      }
    }

    /**
     * A local exception whose name should not match.
     */
    private static class NotAConnectTimeoutException
        extends InterruptedIOException {

    }

  }

  @Test
  public void testQuietlyVoid() {
    quietlyEval("", "",
        () -> {
        throw HADOOP_CONNECTION_TIMEOUT_EX;
      });
  }

  @Test
  public void testQuietlyEvalReturnValueSuccess() {
    assertOptionalEquals("quietly", 3,
        quietlyEval("", "", () -> 3));
  }

  @Test
  public void testQuietlyEvalReturnValueFail() {
    // use a variable so IDEs don't warn of numeric overflows
    int d = 0;
    assertOptionalUnset("quietly",
        quietlyEval("", "", () -> 3 / d));
  }

  /**
   * Catch the exception and preserve it for later queries.
   */
  private static final class CatchCallback implements Retried {
    private IOException lastException;
    @Override
    public void onFailure(String text,
        IOException exception,
        int retries,
        boolean idempotent) {
      lastException = exception;
    }
  }

}