AWSClientConfig.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.fs.s3a.impl;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.awscore.AwsRequest;
import software.amazon.awssdk.awscore.AwsRequestOverrideConfiguration;
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
import software.amazon.awssdk.core.client.config.SdkAdvancedClientOption;
import software.amazon.awssdk.core.retry.RetryMode;
import software.amazon.awssdk.core.retry.RetryPolicy;
import software.amazon.awssdk.http.apache.ApacheHttpClient;
import software.amazon.awssdk.http.apache.ProxyConfiguration;
import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.s3a.S3AUtils;
import org.apache.hadoop.fs.s3a.auth.SignerFactory;
import org.apache.hadoop.util.VersionInfo;

import static org.apache.hadoop.fs.s3a.Constants.CONNECTION_ACQUISITION_TIMEOUT;
import static org.apache.hadoop.fs.s3a.Constants.AWS_SERVICE_IDENTIFIER_S3;
import static org.apache.hadoop.fs.s3a.Constants.AWS_SERVICE_IDENTIFIER_STS;
import static org.apache.hadoop.fs.s3a.Constants.CONNECTION_EXPECT_CONTINUE;
import static org.apache.hadoop.fs.s3a.Constants.CONNECTION_EXPECT_CONTINUE_DEFAULT;
import static org.apache.hadoop.fs.s3a.Constants.CONNECTION_IDLE_TIME;
import static org.apache.hadoop.fs.s3a.Constants.CONNECTION_KEEPALIVE;
import static org.apache.hadoop.fs.s3a.Constants.CONNECTION_TTL;
import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_CONNECTION_ACQUISITION_TIMEOUT_DURATION;
import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_CONNECTION_IDLE_TIME_DURATION;
import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_CONNECTION_KEEPALIVE;
import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_CONNECTION_TTL_DURATION;
import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_ESTABLISH_TIMEOUT_DURATION;
import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_MAXIMUM_CONNECTIONS;
import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_MAX_ERROR_RETRIES;
import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_REQUEST_TIMEOUT_DURATION;
import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_SOCKET_TIMEOUT_DURATION;
import static org.apache.hadoop.fs.s3a.Constants.ESTABLISH_TIMEOUT;
import static org.apache.hadoop.fs.s3a.Constants.MAXIMUM_CONNECTIONS;
import static org.apache.hadoop.fs.s3a.Constants.MAX_ERROR_RETRIES;
import static org.apache.hadoop.fs.s3a.Constants.MINIMUM_NETWORK_OPERATION_DURATION;
import static org.apache.hadoop.fs.s3a.Constants.PROXY_DOMAIN;
import static org.apache.hadoop.fs.s3a.Constants.PROXY_HOST;
import static org.apache.hadoop.fs.s3a.Constants.PROXY_PASSWORD;
import static org.apache.hadoop.fs.s3a.Constants.PROXY_PORT;
import static org.apache.hadoop.fs.s3a.Constants.PROXY_SECURED;
import static org.apache.hadoop.fs.s3a.Constants.PROXY_USERNAME;
import static org.apache.hadoop.fs.s3a.Constants.PROXY_WORKSTATION;
import static org.apache.hadoop.fs.s3a.Constants.REQUEST_TIMEOUT;
import static org.apache.hadoop.fs.s3a.Constants.SIGNING_ALGORITHM;
import static org.apache.hadoop.fs.s3a.Constants.SIGNING_ALGORITHM_S3;
import static org.apache.hadoop.fs.s3a.Constants.SIGNING_ALGORITHM_STS;
import static org.apache.hadoop.fs.s3a.Constants.SOCKET_TIMEOUT;
import static org.apache.hadoop.fs.s3a.Constants.USER_AGENT_PREFIX;
import static org.apache.hadoop.fs.s3a.Constants.CUSTOM_HEADERS_S3;
import static org.apache.hadoop.fs.s3a.Constants.CUSTOM_HEADERS_STS;
import static org.apache.hadoop.fs.s3a.impl.ConfigurationHelper.enforceMinimumDuration;
import static org.apache.hadoop.fs.s3a.impl.ConfigurationHelper.getDuration;
import static org.apache.hadoop.util.Preconditions.checkArgument;


/**
 * Methods for configuring the S3 client.
 * These methods are used when creating and configuring
 * the HTTP clients which communicate with the S3 service.
 * <p>
 * See {@code software.amazon.awssdk.http.SdkHttpConfigurationOption}
 * for the default values.
 */
public final class AWSClientConfig {

  private static final Logger LOG = LoggerFactory.getLogger(AWSClientConfig.class);

  /**
   * The minimum operation duration.
   */
  private static Duration minimumOperationDuration = MINIMUM_NETWORK_OPERATION_DURATION;


  private AWSClientConfig() {
  }

  /**
   * Create the config for a given service...the service identifier is used
   * to determine signature implementation.
   * @param conf configuration
   * @param awsServiceIdentifier service
   * @return the builder inited with signer, timeouts and UA.
   * @throws IOException failure.
   * @throws RuntimeException some failures creating an http signer
   */
  public static ClientOverrideConfiguration.Builder createClientConfigBuilder(Configuration conf,
      String awsServiceIdentifier) throws IOException {
    ClientOverrideConfiguration.Builder overrideConfigBuilder =
        ClientOverrideConfiguration.builder();

    initRequestTimeout(conf, overrideConfigBuilder);

    initUserAgent(conf, overrideConfigBuilder);

    initRequestHeaders(conf, overrideConfigBuilder, awsServiceIdentifier);

    String signer = conf.getTrimmed(SIGNING_ALGORITHM, "");
    if (!signer.isEmpty()) {
      LOG.debug("Signer override = {}", signer);
      overrideConfigBuilder.putAdvancedOption(SdkAdvancedClientOption.SIGNER,
          SignerFactory.createSigner(signer, SIGNING_ALGORITHM));
    }

    initSigner(conf, overrideConfigBuilder, awsServiceIdentifier);

    return overrideConfigBuilder;
  }

  /**
   * Create and configure the http client-based connector with timeouts for:
   * connection acquisition, max idle, timeout, TTL, socket and keepalive.
   * SSL channel mode is set up via
   * {@link NetworkBinding#bindSSLChannelMode(Configuration, ApacheHttpClient.Builder)}.
   *
   * @param conf The Hadoop configuration
   * @return Http client builder
   * @throws IOException on any problem
   */
  public static ApacheHttpClient.Builder createHttpClientBuilder(Configuration conf)
      throws IOException {
    final ConnectionSettings conn = createConnectionSettings(conf);
    ApacheHttpClient.Builder httpClientBuilder =
        ApacheHttpClient.builder()
            .connectionAcquisitionTimeout(conn.getAcquisitionTimeout())
            .connectionMaxIdleTime(conn.getMaxIdleTime())
            .connectionTimeout(conn.getEstablishTimeout())
            .connectionTimeToLive(conn.getConnectionTTL())
            .expectContinueEnabled(conn.isExpectContinueEnabled())
            .maxConnections(conn.getMaxConnections())
            .socketTimeout(conn.getSocketTimeout())
            .tcpKeepAlive(conn.isKeepAlive())
            .useIdleConnectionReaper(true);  // true by default in the SDK

    NetworkBinding.bindSSLChannelMode(conf, httpClientBuilder);

    return httpClientBuilder;
  }

  /**
   * Create and configure the async http client with timeouts for:
   * connection acquisition, max idle, timeout, TTL, socket and keepalive.
   * This is netty based and does not allow for the SSL channel mode to be set.
   * @param conf The Hadoop configuration
   * @return Async Http client builder
   */
  public static NettyNioAsyncHttpClient.Builder createAsyncHttpClientBuilder(Configuration conf) {
    final ConnectionSettings conn = createConnectionSettings(conf);

    NettyNioAsyncHttpClient.Builder httpClientBuilder =
        NettyNioAsyncHttpClient.builder()
            .connectionAcquisitionTimeout(conn.getAcquisitionTimeout())
            .connectionMaxIdleTime(conn.getMaxIdleTime())
            .connectionTimeout(conn.getEstablishTimeout())
            .connectionTimeToLive(conn.getConnectionTTL())
            .maxConcurrency(conn.getMaxConnections())
            .readTimeout(conn.getSocketTimeout())
            .tcpKeepAlive(conn.isKeepAlive())
            .useIdleConnectionReaper(true)  // true by default in the SDK
            .writeTimeout(conn.getSocketTimeout());

    // TODO: Don't think you can set a socket factory for the netty client.
    //  NetworkBinding.bindSSLChannelMode(conf, awsConf);

    return httpClientBuilder;
  }

  /**
   * Configures the retry policy.
   * Retry policy is {@code RetryMode.ADAPTIVE}, which
   * "dynamically limits the rate of AWS requests to maximize success rate",
   * possibly at the expense of latency.
   * Based on the ABFS experience, it is better to limit the rate requests are
   * made rather than have to resort to exponential backoff after failures come
   * in -especially as that backoff is per http connection.
   *
   * @param conf The Hadoop configuration
   * @return Retry policy builder
   */
  public static RetryPolicy.Builder createRetryPolicyBuilder(Configuration conf) {

    RetryPolicy.Builder retryPolicyBuilder = RetryPolicy.builder(RetryMode.ADAPTIVE);

    retryPolicyBuilder.numRetries(S3AUtils.intOption(conf, MAX_ERROR_RETRIES,
        DEFAULT_MAX_ERROR_RETRIES, 0));

    return retryPolicyBuilder;
  }

  /**
   * Configures the proxy.
   *
   * @param conf The Hadoop configuration
   * @param bucket Optional bucket to use to look up per-bucket proxy secrets
   * @return Proxy configuration
   * @throws IOException on any IO problem
   */
  public static ProxyConfiguration createProxyConfiguration(Configuration conf,
      String bucket) throws IOException {

    ProxyConfiguration.Builder proxyConfigBuilder = ProxyConfiguration.builder();

    String proxyHost = conf.getTrimmed(PROXY_HOST, "");
    int proxyPort = conf.getInt(PROXY_PORT, -1);

    if (!proxyHost.isEmpty()) {
      if (proxyPort >= 0) {
        String scheme = conf.getBoolean(PROXY_SECURED, false) ? "https" : "http";
        proxyConfigBuilder.endpoint(buildURI(scheme, proxyHost, proxyPort));
      } else {
        if (conf.getBoolean(PROXY_SECURED, false)) {
          LOG.warn("Proxy host set without port. Using HTTPS default 443");
          proxyConfigBuilder.endpoint(buildURI("https", proxyHost, 443));
        } else {
          LOG.warn("Proxy host set without port. Using HTTP default 80");
          proxyConfigBuilder.endpoint(buildURI("http", proxyHost, 80));
        }
      }
      final String proxyUsername = S3AUtils.lookupPassword(bucket, conf, PROXY_USERNAME,
          null, null);
      final String proxyPassword = S3AUtils.lookupPassword(bucket, conf, PROXY_PASSWORD,
          null, null);
      if ((proxyUsername == null) != (proxyPassword == null)) {
        String msg = "Proxy error: " + PROXY_USERNAME + " or " +
            PROXY_PASSWORD + " set without the other.";
        LOG.error(msg);
        throw new IllegalArgumentException(msg);
      }
      proxyConfigBuilder.username(proxyUsername);
      proxyConfigBuilder.password(proxyPassword);
      proxyConfigBuilder.ntlmDomain(conf.getTrimmed(PROXY_DOMAIN));
      proxyConfigBuilder.ntlmWorkstation(conf.getTrimmed(PROXY_WORKSTATION));
      if (LOG.isDebugEnabled()) {
        LOG.debug("Using proxy server {}:{} as user {} with password {} on "
                + "domain {} as workstation {}", proxyHost, proxyPort, proxyUsername, proxyPassword,
            PROXY_DOMAIN, PROXY_WORKSTATION);
      }
    } else if (proxyPort >= 0) {
      String msg =
          "Proxy error: " + PROXY_PORT + " set without " + PROXY_HOST;
      LOG.error(msg);
      throw new IllegalArgumentException(msg);
    }

    return proxyConfigBuilder.build();

  }

  /**
   * Configures the proxy for the async http client.
   * <p>
   * Although this is netty specific, it is part of the AWS SDK public API
   * and not any shaded netty internal class.
   * @param conf The Hadoop configuration
   * @param bucket Optional bucket to use to look up per-bucket proxy secrets
   * @return Proxy configuration
   * @throws IOException on any IO problem
   */
  public static software.amazon.awssdk.http.nio.netty.ProxyConfiguration
      createAsyncProxyConfiguration(Configuration conf,
          String bucket) throws IOException {

    software.amazon.awssdk.http.nio.netty.ProxyConfiguration.Builder proxyConfigBuilder =
        software.amazon.awssdk.http.nio.netty.ProxyConfiguration.builder();

    String proxyHost = conf.getTrimmed(PROXY_HOST, "");
    int proxyPort = conf.getInt(PROXY_PORT, -1);

    if (!proxyHost.isEmpty()) {
      if (proxyPort >= 0) {
        String scheme = conf.getBoolean(PROXY_SECURED, false) ? "https" : "http";
        proxyConfigBuilder.host(proxyHost);
        proxyConfigBuilder.port(proxyPort);
        proxyConfigBuilder.scheme(scheme);
      } else {
        if (conf.getBoolean(PROXY_SECURED, false)) {
          LOG.warn("Proxy host set without port. Using HTTPS default 443");
          proxyConfigBuilder.host(proxyHost);
          proxyConfigBuilder.port(443);
          proxyConfigBuilder.scheme("https");
        } else {
          LOG.warn("Proxy host set without port. Using HTTP default 80");
          proxyConfigBuilder.host(proxyHost);
          proxyConfigBuilder.port(80);
          proxyConfigBuilder.scheme("http");
        }
      }
      final String proxyUsername = S3AUtils.lookupPassword(bucket, conf, PROXY_USERNAME,
          null, null);
      final String proxyPassword = S3AUtils.lookupPassword(bucket, conf, PROXY_PASSWORD,
          null, null);
      if ((proxyUsername == null) != (proxyPassword == null)) {
        String msg = "Proxy error: " + PROXY_USERNAME + " or " +
            PROXY_PASSWORD + " set without the other.";
        LOG.error(msg);
        throw new IllegalArgumentException(msg);
      }
      proxyConfigBuilder.username(proxyUsername);
      proxyConfigBuilder.password(proxyPassword);
      // TODO: check NTLM support
      // proxyConfigBuilder.ntlmDomain(conf.getTrimmed(PROXY_DOMAIN));
      // proxyConfigBuilder.ntlmWorkstation(conf.getTrimmed(PROXY_WORKSTATION));
      if (LOG.isDebugEnabled()) {
        LOG.debug("Using proxy server {}:{} as user {} with password {} on "
                + "domain {} as workstation {}", proxyHost, proxyPort, proxyUsername, proxyPassword,
            PROXY_DOMAIN, PROXY_WORKSTATION);
      }
    } else if (proxyPort >= 0) {
      String msg =
          "Proxy error: " + PROXY_PORT + " set without " + PROXY_HOST;
      LOG.error(msg);
      throw new IllegalArgumentException(msg);
    } else {
      return null;
    }

    return proxyConfigBuilder.build();
  }

  /**
   * Builds a URI, throws an IllegalArgumentException in case of errors.
   *
   * @param host proxy host
   * @param port proxy port
   * @return uri with host and port
   */
  private static URI buildURI(String scheme, String host, int port) {
    try {
      return new URI(scheme, null, host, port, null, null, null);
    } catch (URISyntaxException e) {
      String msg =
          "Proxy error: incorrect " + PROXY_HOST + " or " + PROXY_PORT;
      LOG.error(msg);
      throw new IllegalArgumentException(msg);
    }
  }

  /**
   * Initializes the User-Agent header to send in HTTP requests to AWS
   * services.  We always include the Hadoop version number.  The user also
   * may set an optional custom prefix to put in front of the Hadoop version
   * number.  The AWS SDK internally appends its own information, which seems
   * to include the AWS SDK version, OS and JVM version.
   *
   * @param conf Hadoop configuration
   * @param clientConfig AWS SDK configuration to update
   */
  private static void initUserAgent(Configuration conf,
      ClientOverrideConfiguration.Builder clientConfig) {
    String userAgent = "Hadoop " + VersionInfo.getVersion();
    String userAgentPrefix = conf.getTrimmed(USER_AGENT_PREFIX, "");
    if (!userAgentPrefix.isEmpty()) {
      userAgent = userAgentPrefix + ", " + userAgent;
    }
    LOG.debug("Using User-Agent: {}", userAgent);
    clientConfig.putAdvancedOption(SdkAdvancedClientOption.USER_AGENT_PREFIX, userAgent);
  }

  /**
   * Initializes the signer override for the given service.
   * @param conf hadoop configuration
   * @param clientConfig client configuration to update
   * @param awsServiceIdentifier service name
   * @throws IOException failure.
   */
  private static void initSigner(Configuration conf,
      ClientOverrideConfiguration.Builder clientConfig, String awsServiceIdentifier)
      throws IOException {
    String configKey = null;
    switch (awsServiceIdentifier) {
    case AWS_SERVICE_IDENTIFIER_S3:
      configKey = SIGNING_ALGORITHM_S3;
      break;
    case AWS_SERVICE_IDENTIFIER_STS:
      configKey = SIGNING_ALGORITHM_STS;
      break;
    default:
      // Nothing to do. The original signer override is already setup
    }
    if (configKey != null) {
      String signerOverride = conf.getTrimmed(configKey, "");
      if (!signerOverride.isEmpty()) {
        LOG.debug("Signer override for {} = {}", awsServiceIdentifier, signerOverride);
        clientConfig.putAdvancedOption(SdkAdvancedClientOption.SIGNER,
            SignerFactory.createSigner(signerOverride, configKey));
      }
    }
  }

  /**
   * Initialize custom request headers for AWS clients.
   * @param conf hadoop configuration
   * @param clientConfig client configuration to update
   * @param awsServiceIdentifier service name
   */
  private static void initRequestHeaders(Configuration conf,
      ClientOverrideConfiguration.Builder clientConfig, String awsServiceIdentifier) {
    String configKey = null;
    switch (awsServiceIdentifier) {
    case AWS_SERVICE_IDENTIFIER_S3:
      configKey = CUSTOM_HEADERS_S3;
      break;
    case AWS_SERVICE_IDENTIFIER_STS:
      configKey = CUSTOM_HEADERS_STS;
      break;
    default:
      // No known service.
    }
    if (configKey != null) {
      Map<String, String> awsClientCustomHeadersMap =
              S3AUtils.getTrimmedStringCollectionSplitByEquals(conf, configKey);
      awsClientCustomHeadersMap.forEach((header, valueString) -> {
        List<String> headerValues = Arrays.stream(valueString.split(";"))
                        .map(String::trim)
                        .filter(v -> !v.isEmpty())
                        .collect(Collectors.toList());
        if (!headerValues.isEmpty()) {
          clientConfig.putHeader(header, headerValues);
        } else {
          LOG.warn("Ignoring header '{}' for {} client because no values were provided",
                  header, awsServiceIdentifier);
        }
      });
      LOG.debug("headers for {} client = {}", awsServiceIdentifier, clientConfig.headers());
    }
  }

  /**
   * Configures request timeout in the client configuration.
   * This is independent of the timeouts set in the sync and async HTTP clients;
   * the same method
   *
   * @param conf Hadoop configuration
   * @param clientConfig AWS SDK configuration to update
   */
  private static void initRequestTimeout(Configuration conf,
      ClientOverrideConfiguration.Builder clientConfig) {
    // Get the connection settings
    final Duration callTimeout = createApiConnectionSettings(conf).getApiCallTimeout();

    if (callTimeout.toMillis() > 0) {
      clientConfig.apiCallAttemptTimeout(callTimeout);
      clientConfig.apiCallTimeout(callTimeout);
    }
  }

  /**
   * Reset the minimum operation duration to the default.
   * For test use only; Logs at INFO.
   * <p>
   * This MUST be called in test teardown in any test suite which
   * called {@link #setMinimumOperationDuration(Duration)}.
   */
  @VisibleForTesting
  public static void resetMinimumOperationDuration() {
    setMinimumOperationDuration(MINIMUM_NETWORK_OPERATION_DURATION);
  }

  /**
   * Set the minimum operation duration.
   * This is for testing and will log at info; does require a non-negative duration.
   * <p>
   * Test suites must call {@link #resetMinimumOperationDuration()} in their teardown
   * to avoid breaking subsequent tests in the same process.
   * @param duration non-negative duration
   * @throws IllegalArgumentException if the duration is negative.
   */
  @VisibleForTesting
  public static void setMinimumOperationDuration(Duration duration) {
    LOG.info("Setting minimum operation duration to {}ms", duration.toMillis());
    checkArgument(duration.compareTo(Duration.ZERO) >= 0,
        "Duration must be positive: %sms", duration.toMillis());
    minimumOperationDuration = duration;
  }

  /**
   * Get the current minimum operation duration.
   * @return current duration.
   */
  public static Duration getMinimumOperationDuration() {
    return minimumOperationDuration;
  }

  /**
   * Settings for the AWS client, rather than the http client.
   */
  static class ClientSettings {
    private final Duration apiCallTimeout;

    private ClientSettings(final Duration apiCallTimeout) {
      this.apiCallTimeout = apiCallTimeout;
    }

    Duration getApiCallTimeout() {
      return apiCallTimeout;
    }

    @Override
    public String toString() {
      return "ClientSettings{" +
          "apiCallTimeout=" + apiCallTimeout +
          '}';
    }
  }

  /**
   * All the connection settings, wrapped as a class for use by
   * both the sync and async client.
   */
  static final class ConnectionSettings {
    private final int maxConnections;
    private final boolean keepAlive;
    private final Duration acquisitionTimeout;
    private final Duration connectionTTL;
    private final Duration establishTimeout;
    private final Duration maxIdleTime;
    private final Duration socketTimeout;
    private final boolean expectContinueEnabled;

    private ConnectionSettings(
        final int maxConnections,
        final boolean keepAlive,
        final Duration acquisitionTimeout,
        final Duration connectionTTL,
        final Duration establishTimeout,
        final Duration maxIdleTime,
        final Duration socketTimeout,
        final boolean expectContinueEnabled) {
      this.maxConnections = maxConnections;
      this.keepAlive = keepAlive;
      this.acquisitionTimeout = acquisitionTimeout;
      this.connectionTTL = connectionTTL;
      this.establishTimeout = establishTimeout;
      this.maxIdleTime = maxIdleTime;
      this.socketTimeout = socketTimeout;
      this.expectContinueEnabled = expectContinueEnabled;
    }

    int getMaxConnections() {
      return maxConnections;
    }

    boolean isKeepAlive() {
      return keepAlive;
    }

    Duration getAcquisitionTimeout() {
      return acquisitionTimeout;
    }

    Duration getConnectionTTL() {
      return connectionTTL;
    }

    Duration getEstablishTimeout() {
      return establishTimeout;
    }

    Duration getMaxIdleTime() {
      return maxIdleTime;
    }

    Duration getSocketTimeout() {
      return socketTimeout;
    }

    boolean isExpectContinueEnabled() {
      return expectContinueEnabled;
    }

    @Override
    public String toString() {
      return "ConnectionSettings{" +
          "maxConnections=" + maxConnections +
          ", keepAlive=" + keepAlive +
          ", acquisitionTimeout=" + acquisitionTimeout +
          ", connectionTTL=" + connectionTTL +
          ", establishTimeout=" + establishTimeout +
          ", maxIdleTime=" + maxIdleTime +
          ", socketTimeout=" + socketTimeout +
          ", expectContinueEnabled=" + expectContinueEnabled +
          '}';
    }
  }

  /**
   * Build a client settings object.
   * @param conf configuration to evaluate
   * @return connection settings.
   */
  static ClientSettings createApiConnectionSettings(Configuration conf) {

    Duration apiCallTimeout = getDuration(conf, REQUEST_TIMEOUT,
        DEFAULT_REQUEST_TIMEOUT_DURATION, TimeUnit.MILLISECONDS, Duration.ZERO);

    // if the API call timeout is set, it must be at least the minimum duration
    if (apiCallTimeout.compareTo(Duration.ZERO) > 0) {
      apiCallTimeout = enforceMinimumDuration(REQUEST_TIMEOUT,
          apiCallTimeout, minimumOperationDuration);
    }
    return new ClientSettings(apiCallTimeout);
  }

  /**
   * Build the HTTP connection settings object from the configuration.
   * All settings are calculated.
   * @param conf configuration to evaluate
   * @return connection settings.
   */
  static ConnectionSettings createConnectionSettings(Configuration conf) {

    int maxConnections = S3AUtils.intOption(conf, MAXIMUM_CONNECTIONS,
        DEFAULT_MAXIMUM_CONNECTIONS, 1);

    final boolean keepAlive = conf.getBoolean(CONNECTION_KEEPALIVE,
        DEFAULT_CONNECTION_KEEPALIVE);

    // time to acquire a connection from the pool
    Duration acquisitionTimeout = getDuration(conf, CONNECTION_ACQUISITION_TIMEOUT,
        DEFAULT_CONNECTION_ACQUISITION_TIMEOUT_DURATION, TimeUnit.MILLISECONDS,
        minimumOperationDuration);

    // set the connection TTL irrespective of whether the connection is in use or not.
    // this can balance requests over different S3 servers, and avoid failed
    // connections. See HADOOP-18845.
    Duration connectionTTL = getDuration(conf, CONNECTION_TTL,
        DEFAULT_CONNECTION_TTL_DURATION, TimeUnit.MILLISECONDS,
        null);

    Duration establishTimeout = getDuration(conf, ESTABLISH_TIMEOUT,
        DEFAULT_ESTABLISH_TIMEOUT_DURATION, TimeUnit.MILLISECONDS,
        minimumOperationDuration);

    // limit on the time a connection can be idle in the pool
    Duration maxIdleTime = getDuration(conf, CONNECTION_IDLE_TIME,
        DEFAULT_CONNECTION_IDLE_TIME_DURATION, TimeUnit.MILLISECONDS, Duration.ZERO);

    Duration socketTimeout = getDuration(conf, SOCKET_TIMEOUT,
        DEFAULT_SOCKET_TIMEOUT_DURATION, TimeUnit.MILLISECONDS,
        minimumOperationDuration);

    final boolean expectContinueEnabled = conf.getBoolean(CONNECTION_EXPECT_CONTINUE,
        CONNECTION_EXPECT_CONTINUE_DEFAULT);

    return new ConnectionSettings(
        maxConnections,
        keepAlive,
        acquisitionTimeout,
        connectionTTL,
        establishTimeout,
        maxIdleTime,
        socketTimeout,
        expectContinueEnabled);
  }

  /**
   * Set a custom ApiCallTimeout for a single request.
   * This allows for a longer timeout to be used in data upload
   * requests than that for all other S3 interactions;
   * This does not happen by default in the V2 SDK
   * (see HADOOP-19295).
   * <p>
   * If the timeout is zero, the request is not patched.
   * @param builder builder to patch.
   * @param timeout timeout
   */
  public static void setRequestTimeout(AwsRequest.Builder builder, Duration timeout) {
    if (!timeout.isZero()) {
      builder.overrideConfiguration(
          AwsRequestOverrideConfiguration.builder()
              .apiCallTimeout(timeout)
              .apiCallAttemptTimeout(timeout)
              .build());
    }
  }
}