RequestFactoryImpl.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.fs.s3a.impl;

import java.time.Duration;
import java.util.Base64;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;

import software.amazon.awssdk.core.SdkRequest;
import software.amazon.awssdk.services.s3.model.AbortMultipartUploadRequest;
import software.amazon.awssdk.services.s3.model.ChecksumAlgorithm;
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest;
import software.amazon.awssdk.services.s3.model.CompletedMultipartUpload;
import software.amazon.awssdk.services.s3.model.CompletedPart;
import software.amazon.awssdk.services.s3.model.CopyObjectRequest;
import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest;
import software.amazon.awssdk.services.s3.model.HeadBucketRequest;
import software.amazon.awssdk.services.s3.model.HeadObjectRequest;
import software.amazon.awssdk.services.s3.model.HeadObjectResponse;
import software.amazon.awssdk.services.s3.model.ListMultipartUploadsRequest;
import software.amazon.awssdk.services.s3.model.ListObjectsRequest;
import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
import software.amazon.awssdk.services.s3.model.MetadataDirective;
import software.amazon.awssdk.services.s3.model.ObjectIdentifier;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.SdkPartType;
import software.amazon.awssdk.services.s3.model.ServerSideEncryption;
import software.amazon.awssdk.services.s3.model.StorageClass;
import software.amazon.awssdk.services.s3.model.UploadPartRequest;
import software.amazon.awssdk.utils.Md5Utils;
import org.apache.hadoop.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.fs.PathIOException;
import org.apache.hadoop.fs.s3a.Retries;
import org.apache.hadoop.fs.s3a.S3AEncryptionMethods;
import org.apache.hadoop.fs.s3a.api.RequestFactory;
import org.apache.hadoop.fs.s3a.auth.delegation.EncryptionSecretOperations;
import org.apache.hadoop.fs.s3a.auth.delegation.EncryptionSecrets;
import org.apache.hadoop.fs.s3a.impl.write.WriteObjectFlags;

import static org.apache.commons.lang3.StringUtils.isEmpty;
import static org.apache.commons.lang3.StringUtils.isNotEmpty;
import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_PART_UPLOAD_TIMEOUT;
import static org.apache.hadoop.fs.s3a.Constants.IF_NONE_MATCH_STAR;
import static org.apache.hadoop.fs.s3a.S3AEncryptionMethods.SSE_C;
import static org.apache.hadoop.fs.s3a.S3AEncryptionMethods.UNKNOWN_ALGORITHM;
import static org.apache.hadoop.fs.s3a.impl.AWSClientConfig.setRequestTimeout;
import static org.apache.hadoop.fs.s3a.impl.AWSHeaders.IF_MATCH;
import static org.apache.hadoop.fs.s3a.impl.AWSHeaders.IF_NONE_MATCH;
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.DEFAULT_UPLOAD_PART_COUNT_LIMIT;
import static org.apache.hadoop.util.Preconditions.checkArgument;
import static org.apache.hadoop.util.Preconditions.checkNotNull;
import static software.amazon.awssdk.services.s3.model.StorageClass.UNKNOWN_TO_SDK_VERSION;

/**
 * The standard implementation of the request factory.
 * This creates AWS SDK request classes for the specific bucket,
 * with standard options/headers set.
 * It is also where custom setting parameters can take place.
 *
 * All creation of AWS S3 requests MUST be through this class so that
 * common options (encryption etc.) can be added here,
 * and so that any chained transformation of requests can be applied.
 *
 * This is where audit span information is added to the requests,
 * until it is done in the AWS SDK itself.
 *
 * All created request builders will be passed to
 * {@link PrepareRequest#prepareRequest(SdkRequest.Builder)} before
 * being returned to the caller.
 */
public class RequestFactoryImpl implements RequestFactory {

  public static final Logger LOG = LoggerFactory.getLogger(
      RequestFactoryImpl.class);

  /**
   * Target bucket.
   */
  private final String bucket;

  /**
   * Encryption secrets.
   */
  private EncryptionSecrets encryptionSecrets;

  /**
   * ACL For new objects.
   */
  private final String cannedACL;

  /**
   * Max number of multipart entries allowed in a large
   * upload. Tunable for testing only.
   */
  private final long multipartPartCountLimit;

  /**
   * Callback to prepare requests.
   */
  private final PrepareRequest requestPreparer;

  /**
   * Content encoding (null for none).
   */
  private final String contentEncoding;

  /**
   * Storage class.
   */
  private final StorageClass storageClass;

  /**
   * Is multipart upload enabled.
   */
  private final boolean isMultipartUploadEnabled;

  /**
   * Timeout for uploading objects/parts.
   * This will be set on data put/post operations only.
   */
  private final Duration partUploadTimeout;

  /**
   * Indicates the algorithm used to create the checksum for the object to be uploaded to S3.
   */
  private final ChecksumAlgorithm checksumAlgorithm;

  /**
   * Constructor.
   * @param builder builder with all the configuration.
   */
  protected RequestFactoryImpl(
      final RequestFactoryBuilder builder) {
    this.bucket = builder.bucket;
    this.cannedACL = builder.cannedACL;
    this.encryptionSecrets = builder.encryptionSecrets;
    this.multipartPartCountLimit = builder.multipartPartCountLimit;
    this.requestPreparer = builder.requestPreparer;
    this.contentEncoding = builder.contentEncoding;
    this.storageClass = builder.storageClass;
    this.isMultipartUploadEnabled = builder.isMultipartUploadEnabled;
    this.partUploadTimeout = builder.partUploadTimeout;
    this.checksumAlgorithm = builder.checksumAlgorithm;
  }

  /**
   * Preflight preparation of AWS request.
   * @param <T> web service request builder
   * @return prepared builder.
   */
  @Retries.OnceRaw
  private <T extends SdkRequest.Builder> T prepareRequest(T t) {
    if (requestPreparer != null) {
      requestPreparer.prepareRequest(t);
    }
    return t;
  }

  /**
   * Get the canned ACL of this FS.
   * @return an ACL, if any
   */
  @Override
  public String getCannedACL() {
    return cannedACL;
  }

  /**
   * Get the target bucket.
   * @return the bucket.
   */
  protected String getBucket() {
    return bucket;
  }

  /**
   * Get the encryption algorithm of this endpoint.
   * @return the encryption algorithm.
   */
  @Override
  public S3AEncryptionMethods getServerSideEncryptionAlgorithm() {
    return encryptionSecrets.getEncryptionMethod();
  }

  /**
   * Get the content encoding (e.g. gzip) or return null if none.
   * @return content encoding
   */
  @Override
  public String getContentEncoding() {
    return contentEncoding;
  }

  /**
   * Get the object storage class, return null if none.
   * @return storage class
   */
  @Override
  public StorageClass getStorageClass() {
    return storageClass;
  }

  /**
   * Sets server side encryption parameters to the part upload
   * request when encryption is enabled.
   * @param builder upload part request builder
   */
  protected void uploadPartEncryptionParameters(
      UploadPartRequest.Builder builder) {
    // need to set key to get objects encrypted with SSE_C
    EncryptionSecretOperations.getSSECustomerKey(encryptionSecrets).ifPresent(base64customerKey -> {
      builder.sseCustomerAlgorithm(ServerSideEncryption.AES256.name())
          .sseCustomerKey(base64customerKey)
          .sseCustomerKeyMD5(Md5Utils.md5AsBase64(Base64.getDecoder().decode(base64customerKey)));
    });
  }

  private CopyObjectRequest.Builder buildCopyObjectRequest() {

    CopyObjectRequest.Builder copyObjectRequestBuilder = CopyObjectRequest.builder();

    if (contentEncoding != null) {
      copyObjectRequestBuilder.contentEncoding(contentEncoding);
    }

    if (checksumAlgorithm != null) {
      copyObjectRequestBuilder.checksumAlgorithm(checksumAlgorithm);
    }

    return copyObjectRequestBuilder;
  }

  @Override
  public CopyObjectRequest.Builder newCopyObjectRequestBuilder(String srcKey,
      String dstKey,
      HeadObjectResponse srcom) {

    CopyObjectRequest.Builder copyObjectRequestBuilder = buildCopyObjectRequest();

    Map<String, String> dstom = new HashMap<>();
    HeaderProcessing.cloneObjectMetadata(srcom, dstom, copyObjectRequestBuilder);
    copyEncryptionParameters(srcom, copyObjectRequestBuilder);

    copyObjectRequestBuilder
        .metadata(dstom)
        .metadataDirective(MetadataDirective.REPLACE)
        .acl(cannedACL);

    if (srcom.storageClass() != null && srcom.storageClass() != UNKNOWN_TO_SDK_VERSION) {
      copyObjectRequestBuilder.storageClass(srcom.storageClass());
    }

    copyObjectRequestBuilder.destinationBucket(getBucket())
        .destinationKey(dstKey).sourceBucket(getBucket()).sourceKey(srcKey);

    return prepareRequest(copyObjectRequestBuilder);
  }

  /**
   * Propagate encryption parameters from source file if set else use the
   * current filesystem encryption settings.
   * @param copyObjectRequestBuilder copy object request builder.
   * @param srcom source object metadata.
   */
  protected void copyEncryptionParameters(HeadObjectResponse srcom,
      CopyObjectRequest.Builder copyObjectRequestBuilder) {

    final S3AEncryptionMethods algorithm = getServerSideEncryptionAlgorithm();

    String sourceKMSId = srcom.ssekmsKeyId();
    if (isNotEmpty(sourceKMSId)) {
      // source KMS ID is propagated
      LOG.debug("Propagating SSE-KMS settings from source {}",
          sourceKMSId);
      copyObjectRequestBuilder.ssekmsKeyId(sourceKMSId);
      EncryptionSecretOperations.getSSEAwsKMSEncryptionContext(encryptionSecrets)
          .ifPresent(copyObjectRequestBuilder::ssekmsEncryptionContext);
      return;
    }

    switch (algorithm) {
    case SSE_S3:
      copyObjectRequestBuilder.serverSideEncryption(algorithm.getMethod());
      break;
    case SSE_KMS:
      copyObjectRequestBuilder.serverSideEncryption(ServerSideEncryption.AWS_KMS);
      // Set the KMS key if present, else S3 uses AWS managed key.
      EncryptionSecretOperations.getSSEAwsKMSKey(encryptionSecrets)
          .ifPresent(copyObjectRequestBuilder::ssekmsKeyId);
      EncryptionSecretOperations.getSSEAwsKMSEncryptionContext(encryptionSecrets)
              .ifPresent(copyObjectRequestBuilder::ssekmsEncryptionContext);
      break;
    case DSSE_KMS:
      copyObjectRequestBuilder.serverSideEncryption(ServerSideEncryption.AWS_KMS_DSSE);
      EncryptionSecretOperations.getSSEAwsKMSKey(encryptionSecrets)
          .ifPresent(copyObjectRequestBuilder::ssekmsKeyId);
      EncryptionSecretOperations.getSSEAwsKMSEncryptionContext(encryptionSecrets)
              .ifPresent(copyObjectRequestBuilder::ssekmsEncryptionContext);
      break;
    case SSE_C:
      EncryptionSecretOperations.getSSECustomerKey(encryptionSecrets)
          .ifPresent(base64customerKey -> copyObjectRequestBuilder
              .copySourceSSECustomerAlgorithm(ServerSideEncryption.AES256.name())
              .copySourceSSECustomerKey(base64customerKey)
              .copySourceSSECustomerKeyMD5(
                  Md5Utils.md5AsBase64(Base64.getDecoder().decode(base64customerKey)))
              .sseCustomerAlgorithm(ServerSideEncryption.AES256.name())
              .sseCustomerKey(base64customerKey).sseCustomerKeyMD5(
                  Md5Utils.md5AsBase64(Base64.getDecoder().decode(base64customerKey))));
      break;
    case CSE_KMS:
    case CSE_CUSTOM:
    case NONE:
      break;
    default:
      LOG.warn(UNKNOWN_ALGORITHM + ": " + algorithm);
    }
  }
  /**
   * Create a putObject request.
   * Adds the ACL, storage class and metadata
   * @param key key of object
   * @param options options for the request, including headers
   * @param length length of object to be uploaded
   * @param isDirectoryMarker true if object to be uploaded is a directory marker
   * @return the request builder
   */
  @Override
  public PutObjectRequest.Builder newPutObjectRequestBuilder(String key,
      final PutObjectOptions options,
      long length,
      boolean isDirectoryMarker) {

    Preconditions.checkArgument(isNotEmpty(key), "Null/empty key");

    PutObjectRequest.Builder putObjectRequestBuilder =
        buildPutObjectRequest(length, isDirectoryMarker);
    putObjectRequestBuilder.bucket(getBucket()).key(key);

    if (options != null) {
      putObjectRequestBuilder.metadata(options.getHeaders());
    }

    putEncryptionParameters(putObjectRequestBuilder);

    if (storageClass != null) {
      putObjectRequestBuilder.storageClass(storageClass);
    }

    // Set the timeout for object uploads but not directory markers.
    if (!isDirectoryMarker) {
      setRequestTimeout(putObjectRequestBuilder, partUploadTimeout);
    }

    if (options != null) {
      if (options.isNoObjectOverwrite()) {
        LOG.debug("setting If-None-Match");
        putObjectRequestBuilder.overrideConfiguration(
                override -> override.putHeader(IF_NONE_MATCH, IF_NONE_MATCH_STAR));
      }
      if (options.hasFlag(WriteObjectFlags.ConditionalOverwriteEtag)) {
        LOG.debug("setting If-Match");
        putObjectRequestBuilder.overrideConfiguration(
                override -> override.putHeader(IF_MATCH, options.getEtagOverwrite()));
      }
    }

    return prepareRequest(putObjectRequestBuilder);
  }

  private PutObjectRequest.Builder buildPutObjectRequest(long length, boolean isDirectoryMarker) {

    PutObjectRequest.Builder putObjectRequestBuilder = PutObjectRequest.builder();

    putObjectRequestBuilder.acl(cannedACL);

    if (length >= 0) {
      putObjectRequestBuilder.contentLength(length);
    }

    if (contentEncoding != null && !isDirectoryMarker) {
      putObjectRequestBuilder.contentEncoding(contentEncoding);
    }

    if (checksumAlgorithm != null) {
      putObjectRequestBuilder.checksumAlgorithm(checksumAlgorithm);
    }

    return putObjectRequestBuilder;
  }

  private void putEncryptionParameters(PutObjectRequest.Builder putObjectRequestBuilder) {
    final S3AEncryptionMethods algorithm
        = getServerSideEncryptionAlgorithm();

    switch (algorithm) {
    case SSE_S3:
      putObjectRequestBuilder.serverSideEncryption(algorithm.getMethod());
      break;
    case SSE_KMS:
      putObjectRequestBuilder.serverSideEncryption(ServerSideEncryption.AWS_KMS);
      // Set the KMS key if present, else S3 uses AWS managed key.
      EncryptionSecretOperations.getSSEAwsKMSKey(encryptionSecrets)
          .ifPresent(putObjectRequestBuilder::ssekmsKeyId);
      EncryptionSecretOperations.getSSEAwsKMSEncryptionContext(encryptionSecrets)
              .ifPresent(putObjectRequestBuilder::ssekmsEncryptionContext);
      break;
    case DSSE_KMS:
      putObjectRequestBuilder.serverSideEncryption(ServerSideEncryption.AWS_KMS_DSSE);
      EncryptionSecretOperations.getSSEAwsKMSKey(encryptionSecrets)
          .ifPresent(putObjectRequestBuilder::ssekmsKeyId);
      EncryptionSecretOperations.getSSEAwsKMSEncryptionContext(encryptionSecrets)
              .ifPresent(putObjectRequestBuilder::ssekmsEncryptionContext);
      break;
    case SSE_C:
      EncryptionSecretOperations.getSSECustomerKey(encryptionSecrets)
          .ifPresent(base64customerKey -> putObjectRequestBuilder
              .sseCustomerAlgorithm(ServerSideEncryption.AES256.name())
              .sseCustomerKey(base64customerKey)
              .sseCustomerKeyMD5(Md5Utils.md5AsBase64(
                  Base64.getDecoder().decode(base64customerKey))));
      break;
    case CSE_KMS:
    case CSE_CUSTOM:
    case NONE:
      break;
    default:
      LOG.warn(UNKNOWN_ALGORITHM + ": " + algorithm);
    }
  }

  @Override
  public PutObjectRequest.Builder newDirectoryMarkerRequest(String directory) {
    String key = directory.endsWith("/")
        ? directory
        : (directory + "/");

    // preparation happens in here
    PutObjectRequest.Builder putObjectRequestBuilder = buildPutObjectRequest(0L, true);

    putObjectRequestBuilder.bucket(getBucket()).key(key)
        .contentType(HeaderProcessing.CONTENT_TYPE_X_DIRECTORY);

    putEncryptionParameters(putObjectRequestBuilder);

    return prepareRequest(putObjectRequestBuilder);
  }

  @Override
  public ListMultipartUploadsRequest.Builder
      newListMultipartUploadsRequestBuilder(String prefix) {

    ListMultipartUploadsRequest.Builder requestBuilder = ListMultipartUploadsRequest.builder();

    requestBuilder.bucket(getBucket());
    if (prefix != null) {
      requestBuilder.prefix(prefix);
    }
    return prepareRequest(requestBuilder);
  }

  @Override
  public AbortMultipartUploadRequest.Builder newAbortMultipartUploadRequestBuilder(
      String destKey,
      String uploadId) {
    AbortMultipartUploadRequest.Builder requestBuilder =
        AbortMultipartUploadRequest.builder().bucket(getBucket()).key(destKey).uploadId(uploadId);

    return prepareRequest(requestBuilder);
  }

  private void multipartUploadEncryptionParameters(
      CreateMultipartUploadRequest.Builder mpuRequestBuilder) {
    final S3AEncryptionMethods algorithm = getServerSideEncryptionAlgorithm();

    switch (algorithm) {
    case SSE_S3:
      mpuRequestBuilder.serverSideEncryption(algorithm.getMethod());
      break;
    case SSE_KMS:
      mpuRequestBuilder.serverSideEncryption(ServerSideEncryption.AWS_KMS);
      // Set the KMS key if present, else S3 uses AWS managed key.
      EncryptionSecretOperations.getSSEAwsKMSKey(encryptionSecrets)
          .ifPresent(mpuRequestBuilder::ssekmsKeyId);
      EncryptionSecretOperations.getSSEAwsKMSEncryptionContext(encryptionSecrets)
              .ifPresent(mpuRequestBuilder::ssekmsEncryptionContext);
      break;
    case DSSE_KMS:
      mpuRequestBuilder.serverSideEncryption(ServerSideEncryption.AWS_KMS_DSSE);
      EncryptionSecretOperations.getSSEAwsKMSKey(encryptionSecrets)
          .ifPresent(mpuRequestBuilder::ssekmsKeyId);
      EncryptionSecretOperations.getSSEAwsKMSEncryptionContext(encryptionSecrets)
              .ifPresent(mpuRequestBuilder::ssekmsEncryptionContext);
      break;
    case SSE_C:
      EncryptionSecretOperations.getSSECustomerKey(encryptionSecrets)
          .ifPresent(base64customerKey -> mpuRequestBuilder
              .sseCustomerAlgorithm(ServerSideEncryption.AES256.name())
              .sseCustomerKey(base64customerKey)
              .sseCustomerKeyMD5(
                  Md5Utils.md5AsBase64(Base64.getDecoder().decode(base64customerKey))));
      break;
    case CSE_KMS:
    case CSE_CUSTOM:
    case NONE:
      break;
    default:
      LOG.warn(UNKNOWN_ALGORITHM + ": " + algorithm);
    }
  }

  @Override
  public CreateMultipartUploadRequest.Builder newMultipartUploadRequestBuilder(
      final String destKey,
      @Nullable final PutObjectOptions options) throws PathIOException {
    if (!isMultipartUploadEnabled) {
      throw new PathIOException(destKey, "Multipart uploads are disabled.");
    }

    CreateMultipartUploadRequest.Builder requestBuilder = CreateMultipartUploadRequest.builder();

    if (contentEncoding != null) {
      requestBuilder.contentEncoding(contentEncoding);
    }

    if (options != null) {
      requestBuilder.metadata(options.getHeaders());
    }

    requestBuilder.bucket(getBucket()).key(destKey).acl(cannedACL);

    multipartUploadEncryptionParameters(requestBuilder);

    if (storageClass != null) {
      requestBuilder.storageClass(storageClass);
    }

    if (checksumAlgorithm != null) {
      requestBuilder.checksumAlgorithm(checksumAlgorithm);
    }

    return prepareRequest(requestBuilder);
  }

  @Override
  public CompleteMultipartUploadRequest.Builder newCompleteMultipartUploadRequestBuilder(
      String destKey,
      String uploadId,
      List<CompletedPart> partETags,
      PutObjectOptions putOptions) {

    // a copy of the list is required, so that the AWS SDK doesn't
    // attempt to sort an unmodifiable list.
    CompleteMultipartUploadRequest.Builder requestBuilder;
    requestBuilder = CompleteMultipartUploadRequest.builder().bucket(bucket).key(destKey).uploadId(uploadId)
            .multipartUpload(CompletedMultipartUpload.builder().parts(partETags).build());

    if (putOptions.isNoObjectOverwrite()) {
      LOG.debug("setting If-None-Match");
      requestBuilder.overrideConfiguration(
              override -> override.putHeader(IF_NONE_MATCH, IF_NONE_MATCH_STAR));
    }
    if (!isEmpty(putOptions.getEtagOverwrite())) {
      LOG.debug("setting if If-Match");
      requestBuilder.overrideConfiguration(
              override -> override.putHeader(IF_MATCH, putOptions.getEtagOverwrite()));
    }

    // Correct SSE-C request parameters are required for this request when
    // specifying checksums for each part
    if (checksumAlgorithm != null && getServerSideEncryptionAlgorithm() == SSE_C) {
      EncryptionSecretOperations.getSSECustomerKey(encryptionSecrets)
          .ifPresent(base64customerKey -> requestBuilder
              .sseCustomerAlgorithm(ServerSideEncryption.AES256.name())
              .sseCustomerKey(base64customerKey)
              .sseCustomerKeyMD5(
                  Md5Utils.md5AsBase64(Base64.getDecoder().decode(base64customerKey))));
    }

    return prepareRequest(requestBuilder);
  }

  @Override
  public HeadObjectRequest.Builder newHeadObjectRequestBuilder(String key) {

    HeadObjectRequest.Builder headObjectRequestBuilder =
        HeadObjectRequest.builder().bucket(getBucket()).key(key);

    // need to set key to get metadata for objects encrypted with SSE_C
    EncryptionSecretOperations.getSSECustomerKey(encryptionSecrets).ifPresent(base64customerKey -> {
      headObjectRequestBuilder.sseCustomerAlgorithm(ServerSideEncryption.AES256.name())
          .sseCustomerKey(base64customerKey)
          .sseCustomerKeyMD5(Md5Utils.md5AsBase64(Base64.getDecoder().decode(base64customerKey)));
    });

    return prepareRequest(headObjectRequestBuilder);
  }

  @Override
  public HeadBucketRequest.Builder newHeadBucketRequestBuilder(String bucketName) {

    HeadBucketRequest.Builder headBucketRequestBuilder =
        HeadBucketRequest.builder().bucket(bucketName);

    return prepareRequest(headBucketRequestBuilder);
  }

  @Override
  public GetObjectRequest.Builder newGetObjectRequestBuilder(String key) {
    GetObjectRequest.Builder builder = GetObjectRequest.builder()
        .bucket(bucket)
        .key(key);

    // need to set key to get objects encrypted with SSE_C
    EncryptionSecretOperations.getSSECustomerKey(encryptionSecrets).ifPresent(base64customerKey -> {
      builder.sseCustomerAlgorithm(ServerSideEncryption.AES256.name())
          .sseCustomerKey(base64customerKey)
          .sseCustomerKeyMD5(Md5Utils.md5AsBase64(Base64.getDecoder().decode(base64customerKey)));
    });

    return prepareRequest(builder);
  }

  @Override
  public UploadPartRequest.Builder newUploadPartRequestBuilder(
      String destKey,
      String uploadId,
      int partNumber,
      boolean isLastPart,
      long size) throws PathIOException {
    checkNotNull(uploadId);
    checkArgument(size >= 0, "Invalid partition size %s", size);
    checkArgument(partNumber > 0,
        "partNumber must be between 1 and %s inclusive, but is %s",
        multipartPartCountLimit, partNumber);

    LOG.debug("Creating part upload request for {} #{} size {}",
        uploadId, partNumber, size);
    final String pathErrorMsg = "Number of parts in multipart upload exceeded."
        + " Current part count = %s, Part count limit = %s ";
    if (partNumber > multipartPartCountLimit) {
      throw new PathIOException(destKey,
          String.format(pathErrorMsg, partNumber, multipartPartCountLimit));
    }
    UploadPartRequest.Builder builder = UploadPartRequest.builder()
        .bucket(getBucket())
        .key(destKey)
        .uploadId(uploadId)
        .partNumber(partNumber)
        .contentLength(size);
    if (isLastPart) {
      builder.sdkPartType(SdkPartType.LAST);
    }
    uploadPartEncryptionParameters(builder);

    // Set the request timeout for the part upload
    setRequestTimeout(builder, partUploadTimeout);

    if (checksumAlgorithm != null) {
      builder.checksumAlgorithm(checksumAlgorithm);
    }

    return prepareRequest(builder);
  }

  @Override
  public ListObjectsRequest.Builder newListObjectsV1RequestBuilder(
      final String key,
      final String delimiter,
      final int maxKeys) {

    ListObjectsRequest.Builder requestBuilder =
        ListObjectsRequest.builder().bucket(bucket).maxKeys(maxKeys).prefix(key);

    if (delimiter != null) {
      requestBuilder.delimiter(delimiter);
    }

    return prepareRequest(requestBuilder);
  }

  @Override
  public ListObjectsV2Request.Builder newListObjectsV2RequestBuilder(
      final String key,
      final String delimiter,
      final int maxKeys) {

    final ListObjectsV2Request.Builder requestBuilder = ListObjectsV2Request.builder()
        .bucket(bucket)
        .maxKeys(maxKeys)
        .prefix(key);

    if (delimiter != null) {
      requestBuilder.delimiter(delimiter);
    }

    return prepareRequest(requestBuilder);
  }

  @Override
  public DeleteObjectRequest.Builder newDeleteObjectRequestBuilder(String key) {
    return prepareRequest(DeleteObjectRequest.builder().bucket(bucket).key(key));
  }

  @Override
  public DeleteObjectsRequest.Builder newBulkDeleteRequestBuilder(
          List<ObjectIdentifier> keysToDelete) {
    return prepareRequest(DeleteObjectsRequest
        .builder()
        .bucket(bucket)
        .delete(d -> d.objects(keysToDelete).quiet(!LOG.isTraceEnabled())));
  }

  @Override
  public void setEncryptionSecrets(final EncryptionSecrets secrets) {
    encryptionSecrets = secrets;
  }

  /**
   * Create a builder.
   * @return new builder.
   */
  public static RequestFactoryBuilder builder() {
    return new RequestFactoryBuilder();
  }

  /**
   * Builder.
   */
  public static final class RequestFactoryBuilder {

    /**
     * Target bucket.
     */
    private String bucket;

    /**
     * Encryption secrets.
     */
    private EncryptionSecrets encryptionSecrets = new EncryptionSecrets();

    /**
     * ACL For new objects.
     */
    private String cannedACL = null;

    /** Content Encoding. */
    private String contentEncoding;

    /**
     * Storage class.
     */
    private StorageClass storageClass;

    /**
     * Multipart limit.
     */
    private long multipartPartCountLimit = DEFAULT_UPLOAD_PART_COUNT_LIMIT;

    /**
     * Callback to prepare requests.
     */
    private PrepareRequest requestPreparer;

    /**
     * Is Multipart Enabled on the path.
     */
    private boolean isMultipartUploadEnabled = true;

    /**
     * Timeout for uploading objects/parts.
     * This will be set on data put/post operations only.
     * A zero value means "no custom timeout"
     */
    private Duration partUploadTimeout = DEFAULT_PART_UPLOAD_TIMEOUT;

    /**
     * Indicates the algorithm used to create the checksum for the object to be uploaded to S3.
     */
    private ChecksumAlgorithm checksumAlgorithm;

    private RequestFactoryBuilder() {
    }

    /**
     * Build the request factory.
     * @return the factory
     */
    public RequestFactory build() {
      return new RequestFactoryImpl(this);
    }

    /**
     * Content encoding.
     * @param value new value
     * @return the builder
     */
    public RequestFactoryBuilder withContentEncoding(final String value) {
      contentEncoding = value;
      return this;
    }

    /**
     * Storage class.
     * @param value new value
     * @return the builder
     */
    public RequestFactoryBuilder withStorageClass(final StorageClass value) {
      storageClass = value;
      return this;
    }

    /**
     * Target bucket.
     * @param value new value
     * @return the builder
     */
    public RequestFactoryBuilder withBucket(final String value) {
      bucket = value;
      return this;
    }

    /**
     * Encryption secrets.
     * @param value new value
     * @return the builder
     */
    public RequestFactoryBuilder withEncryptionSecrets(
        final EncryptionSecrets value) {
      encryptionSecrets = value;
      return this;
    }

    /**
     * ACL For new objects.
     * @param value new value
     * @return the builder
     */
    public RequestFactoryBuilder withCannedACL(
        final String value) {
      cannedACL = value;
      return this;
    }

    /**
     * Multipart limit.
     * @param value new value
     * @return the builder
     */
    public RequestFactoryBuilder withMultipartPartCountLimit(
        final long value) {
      multipartPartCountLimit = value;
      return this;
    }

    /**
     * Callback to prepare requests.
     *
     * @param value new value
     * @return the builder
     */
    public RequestFactoryBuilder withRequestPreparer(
        final PrepareRequest value) {
      this.requestPreparer = value;
      return this;
    }

    /**
     * Multipart upload enabled.
     *
     * @param value new value
     * @return the builder
     */
    public RequestFactoryBuilder withMultipartUploadEnabled(
        final boolean value) {
      this.isMultipartUploadEnabled = value;
      return this;
    }

    /**
     * Timeout for uploading objects/parts.
     * This will be set on data put/post operations only.
     * A zero value means "no custom timeout"
     * @param value new value
     * @return the builder
     */
    public RequestFactoryBuilder withPartUploadTimeout(final Duration value) {
      partUploadTimeout = value;
      return this;
    }

    /**
     * Indicates the algorithm used to create the checksum for the object to be uploaded to S3.
     * @param value new value
     * @return the builder
     */
    public RequestFactoryBuilder withChecksumAlgorithm(final ChecksumAlgorithm value) {
      checksumAlgorithm = value;
      return this;
    }
  }

  /**
   * This is a callback for anything to "prepare" every request
   * after creation. The S3AFileSystem's Audit Manager is expected
   * to be wired up via this call so can audit/prepare requests
   * after their creation.
   */
  @FunctionalInterface
  public interface PrepareRequest {

    /**
     * Post-creation preparation of AWS request.
     * @param t request builder
     */
    @Retries.OnceRaw
    void prepareRequest(SdkRequest.Builder t);
  }
}