DefaultS3ClientFactory.java
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.fs.s3a.impl.AWSClientConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.awscore.util.AwsHostNameUtils;
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
import software.amazon.awssdk.core.client.config.SdkAdvancedClientOption;
import software.amazon.awssdk.core.interceptor.ExecutionInterceptor;
import software.amazon.awssdk.core.retry.RetryPolicy;
import software.amazon.awssdk.http.apache.ApacheHttpClient;
import software.amazon.awssdk.http.auth.spi.scheme.AuthScheme;
import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
import software.amazon.awssdk.identity.spi.AwsCredentialsIdentity;
import software.amazon.awssdk.metrics.LoggingMetricPublisher;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.s3accessgrants.plugin.S3AccessGrantsPlugin;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.S3AsyncClientBuilder;
import software.amazon.awssdk.services.s3.S3BaseClientBuilder;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.S3Configuration;
import software.amazon.awssdk.services.s3.multipart.MultipartConfiguration;
import software.amazon.awssdk.transfer.s3.S3TransferManager;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.s3a.statistics.impl.AwsStatisticsCollector;
import org.apache.hadoop.fs.store.LogExactlyOnce;
import static org.apache.hadoop.fs.s3a.Constants.AWS_REGION;
import static org.apache.hadoop.fs.s3a.Constants.AWS_S3_ACCESS_GRANTS_ENABLED;
import static org.apache.hadoop.fs.s3a.Constants.AWS_S3_ACCESS_GRANTS_FALLBACK_TO_IAM_ENABLED;
import static org.apache.hadoop.fs.s3a.Constants.AWS_S3_CROSS_REGION_ACCESS_ENABLED;
import static org.apache.hadoop.fs.s3a.Constants.AWS_S3_CROSS_REGION_ACCESS_ENABLED_DEFAULT;
import static org.apache.hadoop.fs.s3a.Constants.AWS_S3_DEFAULT_REGION;
import static org.apache.hadoop.fs.s3a.Constants.CENTRAL_ENDPOINT;
import static org.apache.hadoop.fs.s3a.Constants.FIPS_ENDPOINT;
import static org.apache.hadoop.fs.s3a.Constants.HTTP_SIGNER_CLASS_NAME;
import static org.apache.hadoop.fs.s3a.Constants.HTTP_SIGNER_ENABLED;
import static org.apache.hadoop.fs.s3a.Constants.HTTP_SIGNER_ENABLED_DEFAULT;
import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_SECURE_CONNECTIONS;
import static org.apache.hadoop.fs.s3a.Constants.SECURE_CONNECTIONS;
import static org.apache.hadoop.fs.s3a.Constants.AWS_SERVICE_IDENTIFIER_S3;
import static org.apache.hadoop.fs.s3a.auth.SignerFactory.createHttpSigner;
import static org.apache.hadoop.fs.s3a.impl.AWSHeaders.REQUESTER_PAYS_HEADER;
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.AUTH_SCHEME_AWS_SIGV_4;
import static org.apache.hadoop.util.Preconditions.checkArgument;
/**
* The default {@link S3ClientFactory} implementation.
* This calls the AWS SDK to configure and create an
* {@code AmazonS3Client} that communicates with the S3 service.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class DefaultS3ClientFactory extends Configured
implements S3ClientFactory {
private static final String REQUESTER_PAYS_HEADER_VALUE = "requester";
private static final String S3_SERVICE_NAME = "s3";
private static final Pattern VPC_ENDPOINT_PATTERN =
Pattern.compile("^(?:.+\\.)?([a-z0-9-]+)\\.vpce\\.amazonaws\\.(?:com|com\\.cn)$");
/**
* Subclasses refer to this.
*/
protected static final Logger LOG =
LoggerFactory.getLogger(DefaultS3ClientFactory.class);
/**
* A one-off warning of default region chains in use.
*/
private static final LogExactlyOnce WARN_OF_DEFAULT_REGION_CHAIN =
new LogExactlyOnce(LOG);
/**
* Warning message printed when the SDK Region chain is in use.
*/
private static final String SDK_REGION_CHAIN_IN_USE =
"S3A filesystem client is using"
+ " the SDK region resolution chain.";
/** Exactly once log to inform about ignoring the AWS-SDK Warnings for CSE. */
private static final LogExactlyOnce IGNORE_CSE_WARN = new LogExactlyOnce(LOG);
/**
* Error message when an endpoint is set with FIPS enabled: {@value}.
*/
@VisibleForTesting
public static final String ERROR_ENDPOINT_WITH_FIPS =
"Non central endpoint cannot be set when " + FIPS_ENDPOINT + " is true";
/**
* A one-off log stating whether S3 Access Grants are enabled.
*/
private static final LogExactlyOnce LOG_S3AG_ENABLED = new LogExactlyOnce(LOG);
@Override
public S3Client createS3Client(
final URI uri,
final S3ClientCreationParameters parameters) throws IOException {
Configuration conf = getConf();
String bucket = uri.getHost();
ApacheHttpClient.Builder httpClientBuilder = AWSClientConfig
.createHttpClientBuilder(conf)
.proxyConfiguration(AWSClientConfig.createProxyConfiguration(conf, bucket));
return configureClientBuilder(S3Client.builder(), parameters, conf, bucket)
.httpClientBuilder(httpClientBuilder)
.build();
}
@Override
public S3AsyncClient createS3AsyncClient(
final URI uri,
final S3ClientCreationParameters parameters) throws IOException {
Configuration conf = getConf();
String bucket = uri.getHost();
NettyNioAsyncHttpClient.Builder httpClientBuilder = AWSClientConfig
.createAsyncHttpClientBuilder(conf)
.proxyConfiguration(AWSClientConfig.createAsyncProxyConfiguration(conf, bucket));
MultipartConfiguration multipartConfiguration = MultipartConfiguration.builder()
.minimumPartSizeInBytes(parameters.getMinimumPartSize())
.thresholdInBytes(parameters.getMultiPartThreshold())
.build();
S3AsyncClientBuilder s3AsyncClientBuilder =
configureClientBuilder(S3AsyncClient.builder(), parameters, conf, bucket)
.httpClientBuilder(httpClientBuilder);
// multipart upload pending with HADOOP-19326.
if (!parameters.isClientSideEncryptionEnabled() &&
!parameters.isAnalyticsAcceleratorEnabled()) {
s3AsyncClientBuilder.multipartConfiguration(multipartConfiguration)
.multipartEnabled(parameters.isMultipartCopy());
}
return s3AsyncClientBuilder.build();
}
@Override
public S3TransferManager createS3TransferManager(final S3AsyncClient s3AsyncClient) {
return S3TransferManager.builder()
.s3Client(s3AsyncClient)
.build();
}
/**
* Configure a sync or async S3 client builder.
* This method handles all shared configuration, including
* path style access, credentials and whether or not to use S3Express
* CreateSession.
* @param builder S3 client builder
* @param parameters parameter object
* @param conf configuration object
* @param bucket bucket name
* @return the builder object
* @param <BuilderT> S3 client builder type
* @param <ClientT> S3 client type
*/
private <BuilderT extends S3BaseClientBuilder<BuilderT, ClientT>, ClientT> BuilderT configureClientBuilder(
BuilderT builder, S3ClientCreationParameters parameters, Configuration conf, String bucket)
throws IOException {
configureEndpointAndRegion(builder, parameters, conf);
maybeApplyS3AccessGrantsConfigurations(builder, conf);
S3Configuration serviceConfiguration = S3Configuration.builder()
.pathStyleAccessEnabled(parameters.isPathStyleAccess())
.checksumValidationEnabled(parameters.isChecksumValidationEnabled())
.build();
final ClientOverrideConfiguration.Builder override =
createClientOverrideConfiguration(parameters, conf);
S3BaseClientBuilder<BuilderT, ClientT> s3BaseClientBuilder = builder
.overrideConfiguration(override.build())
.credentialsProvider(parameters.getCredentialSet())
.disableS3ExpressSessionAuth(!parameters.isExpressCreateSession())
.serviceConfiguration(serviceConfiguration);
if (LOG.isTraceEnabled()) {
// if this log is set to "trace" then we turn on logging of SDK metrics.
// The metrics itself will log at info; it is just that reflection work
// would be needed to change that setting safely for shaded and unshaded aws artifacts.
s3BaseClientBuilder.overrideConfiguration(o ->
o.addMetricPublisher(LoggingMetricPublisher.create()));
}
if (conf.getBoolean(HTTP_SIGNER_ENABLED, HTTP_SIGNER_ENABLED_DEFAULT)) {
// use an http signer through an AuthScheme
final AuthScheme<AwsCredentialsIdentity> signer =
createHttpSigner(conf, AUTH_SCHEME_AWS_SIGV_4, HTTP_SIGNER_CLASS_NAME);
builder.putAuthScheme(signer);
}
return (BuilderT) s3BaseClientBuilder;
}
/**
* Create an override configuration for an S3 client.
* @param parameters parameter object
* @param conf configuration object
* @throws IOException any IOE raised, or translated exception
* @throws RuntimeException some failures creating an http signer
* @return the override configuration
* @throws IOException any IOE raised, or translated exception
*/
protected ClientOverrideConfiguration.Builder createClientOverrideConfiguration(
S3ClientCreationParameters parameters, Configuration conf) throws IOException {
final ClientOverrideConfiguration.Builder clientOverrideConfigBuilder =
AWSClientConfig.createClientConfigBuilder(conf, AWS_SERVICE_IDENTIFIER_S3);
// add any headers
parameters.getHeaders().forEach((h, v) -> clientOverrideConfigBuilder.putHeader(h, v));
if (parameters.isRequesterPays()) {
// All calls must acknowledge requester will pay via header.
clientOverrideConfigBuilder.putHeader(REQUESTER_PAYS_HEADER, REQUESTER_PAYS_HEADER_VALUE);
}
if (!StringUtils.isEmpty(parameters.getUserAgentSuffix())) {
clientOverrideConfigBuilder.putAdvancedOption(SdkAdvancedClientOption.USER_AGENT_SUFFIX,
parameters.getUserAgentSuffix());
}
if (parameters.getExecutionInterceptors() != null) {
for (ExecutionInterceptor interceptor : parameters.getExecutionInterceptors()) {
clientOverrideConfigBuilder.addExecutionInterceptor(interceptor);
}
}
if (parameters.getMetrics() != null) {
clientOverrideConfigBuilder.addMetricPublisher(
new AwsStatisticsCollector(parameters.getMetrics()));
}
final RetryPolicy.Builder retryPolicyBuilder = AWSClientConfig.createRetryPolicyBuilder(conf);
clientOverrideConfigBuilder.retryPolicy(retryPolicyBuilder.build());
return clientOverrideConfigBuilder;
}
/**
* This method configures the endpoint and region for a S3 client.
* The order of configuration is:
*
* <ol>
* <li>If region is configured via fs.s3a.endpoint.region, use it.</li>
* <li>If endpoint is configured via via fs.s3a.endpoint, set it.
* If no region is configured, try to parse region from endpoint. </li>
* <li> If no region is configured, and it could not be parsed from the endpoint,
* set the default region as US_EAST_2</li>
* <li> If configured region is empty, fallback to SDK resolution chain. </li>
* <li> S3 cross region is enabled by default irrespective of region or endpoint
* is set or not.</li>
* </ol>
*
* @param builder S3 client builder.
* @param parameters parameter object
* @param conf conf configuration object
* @param <BuilderT> S3 client builder type
* @param <ClientT> S3 client type
* @throws IllegalArgumentException if endpoint is set when FIPS is enabled.
*/
private <BuilderT extends S3BaseClientBuilder<BuilderT, ClientT>, ClientT> void configureEndpointAndRegion(
BuilderT builder, S3ClientCreationParameters parameters, Configuration conf) {
final String endpointStr = parameters.getEndpoint();
final URI endpoint = getS3Endpoint(endpointStr, conf);
final String configuredRegion = parameters.getRegion();
Region region = null;
String origin = "";
// If the region was configured, set it.
if (configuredRegion != null && !configuredRegion.isEmpty()) {
origin = AWS_REGION;
region = Region.of(configuredRegion);
}
// FIPs? Log it, then reject any attempt to set an endpoint
final boolean fipsEnabled = parameters.isFipsEnabled();
if (fipsEnabled) {
LOG.debug("Enabling FIPS mode");
}
// always setting it guarantees the value is non-null,
// which tests expect.
builder.fipsEnabled(fipsEnabled);
if (endpoint != null) {
boolean endpointEndsWithCentral =
endpointStr.endsWith(CENTRAL_ENDPOINT);
checkArgument(!fipsEnabled || endpointEndsWithCentral, "%s : %s",
ERROR_ENDPOINT_WITH_FIPS,
endpoint);
// No region was configured,
// determine the region from the endpoint.
if (region == null) {
region = getS3RegionFromEndpoint(endpointStr,
endpointEndsWithCentral);
if (region != null) {
origin = "endpoint";
}
}
// No need to override endpoint with "s3.amazonaws.com".
// Let the client take care of endpoint resolution. Overriding
// the endpoint with "s3.amazonaws.com" causes 400 Bad Request
// errors for non-existent buckets and objects.
// ref: https://github.com/aws/aws-sdk-java-v2/issues/4846
if (!endpointEndsWithCentral) {
builder.endpointOverride(endpoint);
LOG.debug("Setting endpoint to {}", endpoint);
} else {
origin = "central endpoint with cross region access";
LOG.debug("Enabling cross region access for endpoint {}",
endpointStr);
}
}
if (region != null) {
builder.region(region);
} else if (configuredRegion == null) {
// no region is configured, and none could be determined from the endpoint.
// Use US_EAST_2 as default.
region = Region.of(AWS_S3_DEFAULT_REGION);
builder.region(region);
origin = "cross region access fallback";
} else if (configuredRegion.isEmpty()) {
// region configuration was set to empty string.
// allow this if people really want it; it is OK to rely on this
// when deployed in EC2.
WARN_OF_DEFAULT_REGION_CHAIN.warn(SDK_REGION_CHAIN_IN_USE);
LOG.debug(SDK_REGION_CHAIN_IN_USE);
origin = "SDK region chain";
}
boolean isCrossRegionAccessEnabled = conf.getBoolean(AWS_S3_CROSS_REGION_ACCESS_ENABLED,
AWS_S3_CROSS_REGION_ACCESS_ENABLED_DEFAULT);
// s3 cross region access
if (isCrossRegionAccessEnabled) {
builder.crossRegionAccessEnabled(true);
}
LOG.debug("Setting region to {} from {} with cross region access {}",
region, origin, isCrossRegionAccessEnabled);
}
/**
* Given a endpoint string, create the endpoint URI.
*
* @param endpoint possibly null endpoint.
* @param conf config to build the URI from.
* @return an endpoint uri
*/
protected static URI getS3Endpoint(String endpoint, final Configuration conf) {
boolean secureConnections = conf.getBoolean(SECURE_CONNECTIONS, DEFAULT_SECURE_CONNECTIONS);
String protocol = secureConnections ? "https" : "http";
if (endpoint == null || endpoint.isEmpty()) {
// don't set an endpoint if none is configured, instead let the SDK figure it out.
return null;
}
if (!endpoint.contains("://")) {
endpoint = String.format("%s://%s", protocol, endpoint);
}
try {
return new URI(endpoint);
} catch (URISyntaxException e) {
throw new IllegalArgumentException(e);
}
}
/**
* Parses the endpoint to get the region.
* If endpoint is the central one, use US_EAST_2.
*
* @param endpoint the configure endpoint.
* @param endpointEndsWithCentral true if the endpoint is configured as central.
* @return the S3 region, null if unable to resolve from endpoint.
*/
@VisibleForTesting
static Region getS3RegionFromEndpoint(final String endpoint,
final boolean endpointEndsWithCentral) {
if (!endpointEndsWithCentral) {
// S3 VPC endpoint parsing
Matcher matcher = VPC_ENDPOINT_PATTERN.matcher(endpoint);
if (matcher.find()) {
LOG.debug("Mapping to VPCE");
LOG.debug("Endpoint {} is vpc endpoint; parsing region as {}", endpoint, matcher.group(1));
return Region.of(matcher.group(1));
}
LOG.debug("Endpoint {} is not the default; parsing", endpoint);
return AwsHostNameUtils.parseSigningRegion(endpoint, S3_SERVICE_NAME).orElse(null);
}
// Select default region here to enable cross-region access.
// If both "fs.s3a.endpoint" and "fs.s3a.endpoint.region" are empty,
// Spark sets "fs.s3a.endpoint" to "s3.amazonaws.com".
// This applies to Spark versions with the changes of SPARK-35878.
// ref:
// https://github.com/apache/spark/blob/v3.5.0/core/
// src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala#L528
// If we do not allow cross region access, Spark would not be able to
// access any bucket that is not present in the given region.
// Hence, we should use default region us-east-2 to allow cross-region
// access.
return Region.of(AWS_S3_DEFAULT_REGION);
}
private static <BuilderT extends S3BaseClientBuilder<BuilderT, ClientT>, ClientT> void
maybeApplyS3AccessGrantsConfigurations(BuilderT builder, Configuration conf) {
boolean isS3AccessGrantsEnabled = conf.getBoolean(AWS_S3_ACCESS_GRANTS_ENABLED, false);
if (!isS3AccessGrantsEnabled){
LOG.debug("S3 Access Grants plugin is not enabled.");
return;
}
boolean isFallbackEnabled =
conf.getBoolean(AWS_S3_ACCESS_GRANTS_FALLBACK_TO_IAM_ENABLED, false);
S3AccessGrantsPlugin accessGrantsPlugin =
S3AccessGrantsPlugin.builder()
.enableFallback(isFallbackEnabled)
.build();
builder.addPlugin(accessGrantsPlugin);
LOG_S3AG_ENABLED.info(
"S3 Access Grants plugin is enabled with IAM fallback set to {}", isFallbackEnabled);
}
}