ITestS3AEndpointRegion.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.fs.s3a;

import java.io.IOException;
import java.net.URI;
import java.net.UnknownHostException;
import java.nio.file.AccessDeniedException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;

import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import software.amazon.awssdk.awscore.AwsExecutionAttribute;
import software.amazon.awssdk.awscore.exception.AwsServiceException;
import software.amazon.awssdk.core.interceptor.Context;
import software.amazon.awssdk.core.interceptor.ExecutionAttributes;
import software.amazon.awssdk.core.interceptor.ExecutionInterceptor;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.HeadBucketRequest;
import software.amazon.awssdk.services.s3.model.HeadBucketResponse;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.statistics.impl.EmptyS3AStatisticsContext;
import org.apache.hadoop.fs.s3a.test.PublicDatasetTestUtils;

import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
import static org.apache.hadoop.fs.s3a.Constants.ALLOW_REQUESTER_PAYS;
import static org.apache.hadoop.fs.s3a.Constants.AWS_REGION;
import static org.apache.hadoop.fs.s3a.Constants.AWS_S3_CROSS_REGION_ACCESS_ENABLED;
import static org.apache.hadoop.fs.s3a.Constants.CENTRAL_ENDPOINT;
import static org.apache.hadoop.fs.s3a.Constants.ENDPOINT;
import static org.apache.hadoop.fs.s3a.Constants.FIPS_ENDPOINT;
import static org.apache.hadoop.fs.s3a.Constants.PATH_STYLE_ACCESS;
import static org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_ALGORITHM;
import static org.apache.hadoop.fs.s3a.DefaultS3ClientFactory.ERROR_ENDPOINT_WITH_FIPS;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.assume;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.assumeStoreAwsHosted;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
import static org.apache.hadoop.fs.s3a.test.PublicDatasetTestUtils.DEFAULT_REQUESTER_PAYS_BUCKET_NAME;
import static org.apache.hadoop.io.IOUtils.closeStream;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;

/**
 * Test to check correctness of S3A endpoint regions in
 * {@link DefaultS3ClientFactory}.
 */
public class ITestS3AEndpointRegion extends AbstractS3ATestBase {

  private static final String AWS_ENDPOINT_TEST = "test-endpoint";

  private static final String US_EAST_1 = "us-east-1";

  private static final String US_EAST_2 = "us-east-2";

  private static final String US_WEST_2 = "us-west-2";

  private static final String SA_EAST_1 = "sa-east-1";

  private static final String EU_WEST_2 = "eu-west-2";

  private static final String CN_NORTHWEST_1 = "cn-northwest-1";

  private static final String US_GOV_EAST_1 = "us-gov-east-1";

  private static final String US_REGION_PREFIX = "us-";

  private static final String CA_REGION_PREFIX = "ca-";

  private static final String US_DUAL_STACK_PREFIX = "dualstack.us-";

  /**
   * If anyone were ever to create a bucket with this UUID pair it would break the tests.
   */
  public static final String UNKNOWN_BUCKET = "23FA76D4-5F17-48B8-9D7D-9050269D0E40"
      + "-8281BAF2-DBCF-47AA-8A27-F2FA3589656A";

  private static final String EU_WEST_2_ENDPOINT = "s3.eu-west-2.amazonaws.com";

  private static final String CN_ENDPOINT = "s3.cn-northwest-1.amazonaws.com.cn";

  private static final String GOV_ENDPOINT = "s3-fips.us-gov-east-1.amazonaws.com";

  private static final String VPC_ENDPOINT = "vpce-1a2b3c4d-5e6f.s3.us-west-2.vpce.amazonaws.com";

  private static final String CN_VPC_ENDPOINT = "vpce-1a2b3c4d-5e6f.s3.cn-northwest-1.vpce.amazonaws.com.cn";

  public static final String EXCEPTION_THROWN_BY_INTERCEPTOR = "Exception thrown by interceptor";

  /**
   * Text to include in assertions.
   */
  private static final AtomicReference<String> EXPECTED_MESSAGE = new AtomicReference<>();
  /**
   * New FS instance which will be closed in teardown.
   */
  private S3AFileSystem newFS;

  @AfterEach
  @Override
  public void teardown() throws Exception {
    closeStream(newFS);
    super.teardown();
  }

  @Test
  public void testWithoutRegionConfig() throws IOException {
    describe("Verify that region lookup takes place");

    Configuration conf = getConfiguration();
    removeBaseAndBucketOverrides(conf, AWS_REGION, PATH_STYLE_ACCESS);
    conf.setBoolean(PATH_STYLE_ACCESS, false);

    newFS = new S3AFileSystem();

    try {
      newFS.initialize(getFileSystem().getUri(), conf);
      newFS.getS3AInternals().getBucketMetadata();
    } catch (UnknownHostException | UnknownStoreException | AccessDeniedException allowed) {
      // these are all valid failure modes from different test environments.
    }
  }

  @Test
  public void testUnknownBucket() throws Exception {
    describe("Verify unknown bucket probe failure");
    Configuration conf = getConfiguration();
    removeBaseAndBucketOverrides(UNKNOWN_BUCKET, conf, AWS_REGION, PATH_STYLE_ACCESS);

    newFS = new S3AFileSystem();

    try {
      newFS.initialize(new URI("s3a://" + UNKNOWN_BUCKET), conf);
      newFS.getS3AInternals().getBucketMetadata();
      // expect a failure by here
      fail("Expected failure, got " + newFS);
    } catch (UnknownHostException | UnknownStoreException expected) {
      // this is good.
    }
  }

  @Test
  public void testEndpointOverride() throws Throwable {
    describe("Create a client with a configured endpoint");
    Configuration conf = getConfiguration();

    S3Client client = createS3Client(conf, AWS_ENDPOINT_TEST, null, US_EAST_2, false);

    expectInterceptorException(client);
  }

  @Test
  public void testCentralEndpoint() throws Throwable {
    describe("Create a client with the central endpoint");
    Configuration conf = getConfiguration();

    S3Client client = createS3Client(conf, CENTRAL_ENDPOINT, null, US_EAST_2, false);

    expectInterceptorException(client);

    client = createS3Client(conf, CENTRAL_ENDPOINT, null,
        US_EAST_2, true);

    expectInterceptorException(client);
  }

  @Test
  public void testCentralEndpointWithRegion() throws Throwable {
    describe("Create a client with the central endpoint but also specify region");
    Configuration conf = getConfiguration();

    S3Client client = createS3Client(conf, CENTRAL_ENDPOINT, US_WEST_2,
        US_WEST_2, false);

    expectInterceptorException(client);

    client = createS3Client(conf, CENTRAL_ENDPOINT, US_WEST_2,
        US_WEST_2, true);

    expectInterceptorException(client);

    client = createS3Client(conf, CENTRAL_ENDPOINT, US_EAST_1,
        US_EAST_1, false);

    expectInterceptorException(client);

    client = createS3Client(conf, CENTRAL_ENDPOINT, US_EAST_1,
        US_EAST_1, true);

    expectInterceptorException(client);

  }

  @Test
  public void testWithRegionConfig() throws Throwable {
    describe("Create a client with a configured region");
    Configuration conf = getConfiguration();

    S3Client client = createS3Client(conf, null, EU_WEST_2, EU_WEST_2, false);

    expectInterceptorException(client);
  }

  @Test
  public void testWithFips() throws Throwable {
    describe("Create a client with fips enabled");

    S3Client client = createS3Client(getConfiguration(),
        null, EU_WEST_2, EU_WEST_2, true);
    expectInterceptorException(client);
  }

  /**
   * Attempting to create a client with fips enabled and an endpoint specified
   * fails during client construction.
   */
  @Test
  public void testWithFipsAndEndpoint() throws Throwable {
    describe("Create a client with fips and an endpoint");

    intercept(IllegalArgumentException.class, ERROR_ENDPOINT_WITH_FIPS, () ->
        createS3Client(getConfiguration(), US_WEST_2, null, US_EAST_1, true));
  }

  @Test
  public void testEUWest2Endpoint() throws Throwable {
    describe("Create a client with the eu west 2 endpoint");
    Configuration conf = getConfiguration();

    S3Client client = createS3Client(conf, EU_WEST_2_ENDPOINT, null, EU_WEST_2, false);

    expectInterceptorException(client);
  }

  @Test
  public void testWithRegionAndEndpointConfig() throws Throwable {
    describe("Test that when both region and endpoint are configured, region takes precedence");
    Configuration conf = getConfiguration();

    S3Client client = createS3Client(conf, EU_WEST_2_ENDPOINT, US_WEST_2, US_WEST_2, false);

    expectInterceptorException(client);
  }

  @Test
  public void testWithChinaEndpoint() throws Throwable {
    describe("Test with a china endpoint");
    Configuration conf = getConfiguration();

    S3Client client = createS3Client(conf, CN_ENDPOINT, null, CN_NORTHWEST_1, false);

    expectInterceptorException(client);
  }

  /**
   * Expect an exception to be thrown by the interceptor with the message
   * {@link #EXCEPTION_THROWN_BY_INTERCEPTOR}.
   * @param client client to issue a head request against.
   * @return the expected exception.
   * @throws Exception any other exception.
   */
  private AwsServiceException expectInterceptorException(final S3Client client)
      throws Exception {

    return intercept(AwsServiceException.class, EXCEPTION_THROWN_BY_INTERCEPTOR,
        () -> head(client));
  }

  /**
   * Issue a head request against the bucket.
   * @param client client to use
   * @return the response.
   */
  private HeadBucketResponse head(final S3Client client) {
    return client.headBucket(
        HeadBucketRequest.builder().bucket(getFileSystem().getBucket()).build());
  }

  @Test
  public void testWithGovCloudEndpoint() throws Throwable {
    describe("Test with a gov cloud endpoint; enable fips");
    Configuration conf = getConfiguration();

    S3Client client = createS3Client(conf, GOV_ENDPOINT, null, US_GOV_EAST_1, false);

    expectInterceptorException(client);
  }

  @Test
  public void testWithVPCE() throws Throwable {
    describe("Test with vpc endpoint");
    Configuration conf = getConfiguration();

    S3Client client = createS3Client(conf, VPC_ENDPOINT, null, US_WEST_2, false);

    expectInterceptorException(client);
  }

  @Test
  public void testWithChinaVPCE() throws Throwable {
    describe("Test with china vpc endpoint");
    Configuration conf = getConfiguration();

    S3Client client = createS3Client(conf, CN_VPC_ENDPOINT, null, CN_NORTHWEST_1, false);

    expectInterceptorException(client);
  }

  @Test
  public void testCentralEndpointAndDifferentRegionThanBucket() throws Throwable {
    describe("Access public bucket using central endpoint and region "
        + "different than that of the public bucket");
    final Configuration conf = getConfiguration();
    final Configuration newConf = new Configuration(conf);

    removeBaseAndBucketOverrides(
        newConf,
        ENDPOINT,
        AWS_REGION,
        ALLOW_REQUESTER_PAYS,
        KEY_REQUESTER_PAYS_FILE,
        FIPS_ENDPOINT);

    removeBaseAndBucketOverrides(
        DEFAULT_REQUESTER_PAYS_BUCKET_NAME,
        newConf,
        ENDPOINT,
        AWS_REGION,
        ALLOW_REQUESTER_PAYS,
        KEY_REQUESTER_PAYS_FILE,
        FIPS_ENDPOINT);

    newConf.set(ENDPOINT, CENTRAL_ENDPOINT);
    newConf.set(AWS_REGION, EU_WEST_1);
    newConf.setBoolean(ALLOW_REQUESTER_PAYS, true);

    assertRequesterPaysFileExistence(newConf);
  }

  @Test
  public void testWithOutCrossRegionAccess() throws Exception {
    describe("Verify cross region access fails when disabled");
    // skip the test if the region is sa-east-1
    skipCrossRegionTest();
    final Configuration newConf = new Configuration(getConfiguration());
    removeBaseAndBucketOverrides(newConf,
        ENDPOINT,
        AWS_S3_CROSS_REGION_ACCESS_ENABLED,
        AWS_REGION);
    // disable cross region access
    newConf.setBoolean(AWS_S3_CROSS_REGION_ACCESS_ENABLED, false);
    newConf.set(AWS_REGION, SA_EAST_1);
    try (S3AFileSystem fs = new S3AFileSystem()) {
      fs.initialize(getFileSystem().getUri(), newConf);
      intercept(AWSRedirectException.class,
          "does not match the AWS region containing the bucket",
          () -> fs.exists(getFileSystem().getWorkingDirectory()));
    }
  }

  @Test
  public void testWithCrossRegionAccess() throws Exception {
    describe("Verify cross region access succeed when enabled");
    // skip the test if the region is sa-east-1
    skipCrossRegionTest();
    final Configuration newConf = new Configuration(getConfiguration());
    removeBaseAndBucketOverrides(newConf,
        ENDPOINT,
        AWS_S3_CROSS_REGION_ACCESS_ENABLED,
        AWS_REGION);
    // enable cross region access
    newConf.setBoolean(AWS_S3_CROSS_REGION_ACCESS_ENABLED, true);
    newConf.set(AWS_REGION, SA_EAST_1);
    try (S3AFileSystem fs = new S3AFileSystem()) {
      fs.initialize(getFileSystem().getUri(), newConf);
      fs.exists(getFileSystem().getWorkingDirectory());
    }
  }

  @Test
  public void testCentralEndpointAndSameRegionAsBucket() throws Throwable {
    describe("Access public bucket using central endpoint and region "
        + "same as that of the public bucket");
    final Configuration conf = getConfiguration();
    final Configuration newConf = new Configuration(conf);

    removeBaseAndBucketOverrides(
        newConf,
        ENDPOINT,
        AWS_REGION,
        ALLOW_REQUESTER_PAYS,
        KEY_REQUESTER_PAYS_FILE,
        FIPS_ENDPOINT);

    removeBaseAndBucketOverrides(
        DEFAULT_REQUESTER_PAYS_BUCKET_NAME,
        newConf,
        ENDPOINT,
        AWS_REGION,
        ALLOW_REQUESTER_PAYS,
        KEY_REQUESTER_PAYS_FILE,
        FIPS_ENDPOINT);

    newConf.set(ENDPOINT, CENTRAL_ENDPOINT);
    newConf.set(AWS_REGION, US_WEST_2);
    newConf.setBoolean(ALLOW_REQUESTER_PAYS, true);

    assertRequesterPaysFileExistence(newConf);
  }

  @Test
  public void testCentralEndpointAndFipsForPublicBucket() throws Throwable {
    describe("Access public bucket using central endpoint and region "
        + "same as that of the public bucket with fips enabled");
    final Configuration conf = getConfiguration();
    final Configuration newConf = new Configuration(conf);

    removeBaseAndBucketOverrides(
        newConf,
        ENDPOINT,
        AWS_REGION,
        ALLOW_REQUESTER_PAYS,
        KEY_REQUESTER_PAYS_FILE,
        FIPS_ENDPOINT);

    removeBaseAndBucketOverrides(
        DEFAULT_REQUESTER_PAYS_BUCKET_NAME,
        newConf,
        ENDPOINT,
        AWS_REGION,
        ALLOW_REQUESTER_PAYS,
        KEY_REQUESTER_PAYS_FILE,
        FIPS_ENDPOINT);

    newConf.set(ENDPOINT, CENTRAL_ENDPOINT);
    newConf.set(AWS_REGION, US_WEST_2);
    newConf.setBoolean(ALLOW_REQUESTER_PAYS, true);
    newConf.setBoolean(FIPS_ENDPOINT, true);

    assertRequesterPaysFileExistence(newConf);
  }

  /**
   * Assert that the file exists on the requester pays public bucket.
   *
   * @param conf the configuration object.
   * @throws IOException if file system operations encounter errors.
   */
  private void assertRequesterPaysFileExistence(Configuration conf)
      throws IOException {
    Path filePath = new Path(PublicDatasetTestUtils
        .getRequesterPaysObject(conf));
    newFS = (S3AFileSystem) filePath.getFileSystem(conf);

    Assertions
        .assertThat(newFS.exists(filePath))
        .describedAs("Existence of path: " + filePath)
        .isTrue();
  }

  @Test
  public void testCentralEndpointAndNullRegionWithCRUD() throws Throwable {
    describe("Access the test bucket using central endpoint and"
        + " null region, perform file system CRUD operations");
    final Configuration conf = getConfiguration();

    final Configuration newConf = new Configuration(conf);

    removeBaseAndBucketOverrides(
        newConf,
        ENDPOINT,
        AWS_REGION,
        FIPS_ENDPOINT,
        S3_ENCRYPTION_ALGORITHM);

    newConf.set(ENDPOINT, CENTRAL_ENDPOINT);

    newFS = new S3AFileSystem();
    newFS.initialize(getFileSystem().getUri(), newConf);

    assertOpsUsingNewFs();
  }

  @Test
  public void testCentralEndpointAndNullRegionFipsWithCRUD() throws Throwable {
    describe("Access the test bucket using central endpoint and"
        + " null region and fips enabled, perform file system CRUD operations");
    assumeStoreAwsHosted(getFileSystem());

    final String bucketLocation = getFileSystem().getBucketLocation();
    assume("FIPS can be enabled to access buckets from US or Canada endpoints only",
        bucketLocation.startsWith(US_REGION_PREFIX)
            || bucketLocation.startsWith(CA_REGION_PREFIX)
            || bucketLocation.startsWith(US_DUAL_STACK_PREFIX));

    final Configuration conf = getConfiguration();
    final Configuration newConf = new Configuration(conf);

    removeBaseAndBucketOverrides(
        newConf,
        ENDPOINT,
        AWS_REGION,
        FIPS_ENDPOINT);

    newConf.set(ENDPOINT, CENTRAL_ENDPOINT);
    newConf.setBoolean(FIPS_ENDPOINT, true);

    newFS = new S3AFileSystem();
    newFS.initialize(getFileSystem().getUri(), newConf);

    assertOpsUsingNewFs();
  }

  /**
   * Skip the test if the region is null or sa-east-1.
   */
  private void skipCrossRegionTest() throws IOException {
    String region = getFileSystem().getS3AInternals().getBucketMetadata().bucketRegion();
    if (region == null || SA_EAST_1.equals(region)) {
      skip("Skipping test since region is null or it is set to sa-east-1");
    }
  }

  private void assertOpsUsingNewFs() throws IOException {
    final String file = getMethodName();
    final Path basePath = methodPath();
    final Path srcDir = new Path(basePath, "srcdir");
    newFS.mkdirs(srcDir);
    Path srcFilePath = new Path(srcDir, file);

    try (FSDataOutputStream out = newFS.create(srcFilePath)) {
      out.write(new byte[] {1, 2, 3});
    }

    Assertions
        .assertThat(newFS.exists(srcFilePath))
        .describedAs("Existence of file: " + srcFilePath)
        .isTrue();
    Assertions
        .assertThat(getFileSystem().exists(srcFilePath))
        .describedAs("Existence of file: " + srcFilePath)
        .isTrue();

    byte[] buffer = new byte[3];

    try (FSDataInputStream in = newFS.open(srcFilePath)) {
      in.readFully(buffer);
      Assertions
          .assertThat(buffer)
          .describedAs("Contents read from " + srcFilePath)
          .containsExactly(1, 2, 3);
    }

    newFS.delete(srcDir, true);

    Assertions
        .assertThat(newFS.exists(srcFilePath))
        .describedAs("Existence of file: " + srcFilePath + " using new FS")
        .isFalse();
    Assertions
        .assertThat(getFileSystem().exists(srcFilePath))
        .describedAs("Existence of file: " + srcFilePath + " using original FS")
        .isFalse();
  }

  private final class RegionInterceptor implements ExecutionInterceptor {
    private final String endpoint;
    private final String region;
    private final boolean isFips;

    RegionInterceptor(String endpoint, String region, final boolean isFips) {
      this.endpoint = endpoint;
      this.region = region;
      this.isFips = isFips;
    }

    @Override
    public void beforeExecution(Context.BeforeExecution context,
        ExecutionAttributes executionAttributes)  {


      // extract state from the execution attributes.
      final Boolean endpointOveridden =
          executionAttributes.getAttribute(AwsExecutionAttribute.ENDPOINT_OVERRIDDEN);
      final String clientEndpoint =
          executionAttributes.getAttribute(AwsExecutionAttribute.CLIENT_ENDPOINT).toString();
      final Boolean fipsEnabled = executionAttributes.getAttribute(
          AwsExecutionAttribute.FIPS_ENDPOINT_ENABLED);
      final String reg = executionAttributes.getAttribute(AwsExecutionAttribute.AWS_REGION).
          toString();

      String state = "SDK beforeExecution callback; "
          + "endpointOveridden=" + endpointOveridden
          + "; clientEndpoint=" + clientEndpoint
          + "; fipsEnabled=" + fipsEnabled
          + "; region=" + reg;

      if (endpoint != null && !endpoint.endsWith(CENTRAL_ENDPOINT)) {
        Assertions.assertThat(endpointOveridden)
            .describedAs("Endpoint not overridden in %s. Client Config=%s",
                state, EXPECTED_MESSAGE.get())
            .isTrue();

        Assertions.assertThat(clientEndpoint)
            .describedAs("There is an endpoint mismatch in %s. Client Config=%s",
                state, EXPECTED_MESSAGE.get())
            .isEqualTo("https://" + endpoint);
      } else {
        Assertions.assertThat(endpointOveridden)
            .describedAs("Attribute endpointOveridden is null in %s. Client Config=%s",
                state, EXPECTED_MESSAGE.get())
            .isEqualTo(false);
      }

      Assertions.assertThat(reg)
          .describedAs("Incorrect region set in %s. Client Config=%s",
              state, EXPECTED_MESSAGE.get())
          .isEqualTo(region);

      // verify the fips state matches expectation.
      Assertions.assertThat(fipsEnabled)
          .describedAs("Incorrect FIPS flag set in %s; Client Config=%s",
              state, EXPECTED_MESSAGE.get())
          .isNotNull()
          .isEqualTo(isFips);

      // We don't actually want to make a request, so exit early.
      throw AwsServiceException.builder().message(EXCEPTION_THROWN_BY_INTERCEPTOR).build();
    }
  }

  /**
   * Create an S3 client with the given conf and endpoint.
   * The region name must then match that of the expected
   * value.
   * @param conf configuration to use.
   * @param endpoint endpoint.
   * @param expectedRegion the region that should be set in the client.
   * @param isFips is this a FIPS endpoint?
   * @return the client.
   * @throws IOException IO problems
   */
  @SuppressWarnings("deprecation")
  private S3Client createS3Client(Configuration conf,
      String endpoint, String configuredRegion, String expectedRegion, boolean isFips)
      throws IOException {

    String expected =
        "endpoint=" + endpoint + "; region=" + configuredRegion
        + "; expectedRegion=" + expectedRegion + "; isFips=" + isFips;
    LOG.info("Creating S3 client with {}", expected);
    EXPECTED_MESSAGE.set(expected);
    List<ExecutionInterceptor> interceptors = new ArrayList<>();
    interceptors.add(new RegionInterceptor(endpoint, expectedRegion, isFips));

    DefaultS3ClientFactory factory
        = new DefaultS3ClientFactory();
    factory.setConf(conf);
    S3ClientFactory.S3ClientCreationParameters parameters
        = new S3ClientFactory.S3ClientCreationParameters()
        .withCredentialSet(new AnonymousAWSCredentialsProvider())
        .withEndpoint(endpoint)
        .withMetrics(new EmptyS3AStatisticsContext()
            .newStatisticsFromAwsSdk())
        .withExecutionInterceptors(interceptors)
        .withRegion(configuredRegion)
        .withFipsEnabled(isFips);

    S3Client client = factory.createS3Client(
        getFileSystem().getUri(),
        parameters);
    return client;
  }
}