S3AStoreImpl.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.fs.s3a.impl;

import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletionException;
import javax.annotation.Nullable;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.awscore.exception.AwsServiceException;
import software.amazon.awssdk.core.ResponseInputStream;
import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest;
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadResponse;
import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
import software.amazon.awssdk.services.s3.model.DeleteObjectResponse;
import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest;
import software.amazon.awssdk.services.s3.model.DeleteObjectsResponse;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
import software.amazon.awssdk.services.s3.model.HeadObjectRequest;
import software.amazon.awssdk.services.s3.model.HeadObjectResponse;
import software.amazon.awssdk.services.s3.model.ObjectIdentifier;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.S3Error;
import software.amazon.awssdk.services.s3.model.UploadPartRequest;
import software.amazon.awssdk.services.s3.model.UploadPartResponse;
import software.amazon.awssdk.transfer.s3.S3TransferManager;
import software.amazon.awssdk.transfer.s3.model.CompletedFileUpload;
import software.amazon.awssdk.transfer.s3.model.FileUpload;
import software.amazon.awssdk.transfer.s3.model.UploadFileRequest;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.s3a.Invoker;
import org.apache.hadoop.fs.s3a.ProgressableProgressListener;
import org.apache.hadoop.fs.s3a.Retries;
import org.apache.hadoop.fs.s3a.S3AInstrumentation;
import org.apache.hadoop.fs.s3a.S3AStorageStatistics;
import org.apache.hadoop.fs.s3a.S3AStore;
import org.apache.hadoop.fs.s3a.S3AUtils;
import org.apache.hadoop.fs.s3a.Statistic;
import org.apache.hadoop.fs.s3a.UploadInfo;
import org.apache.hadoop.fs.s3a.api.RequestFactory;
import org.apache.hadoop.fs.s3a.audit.AuditSpanS3A;
import org.apache.hadoop.fs.s3a.statistics.S3AStatisticsContext;
import org.apache.hadoop.fs.statistics.DurationTracker;
import org.apache.hadoop.fs.statistics.DurationTrackerFactory;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.store.audit.AuditSpanSource;
import org.apache.hadoop.util.DurationInfo;
import org.apache.hadoop.util.RateLimiting;
import org.apache.hadoop.util.functional.Tuples;

import static java.util.Objects.requireNonNull;
import static org.apache.hadoop.fs.s3a.S3AUtils.extractException;
import static org.apache.hadoop.fs.s3a.S3AUtils.getPutRequestLength;
import static org.apache.hadoop.fs.s3a.S3AUtils.isThrottleException;
import static org.apache.hadoop.fs.s3a.Statistic.ACTION_HTTP_HEAD_REQUEST;
import static org.apache.hadoop.fs.s3a.Statistic.IGNORED_ERRORS;
import static org.apache.hadoop.fs.s3a.Statistic.MULTIPART_UPLOAD_PART_PUT;
import static org.apache.hadoop.fs.s3a.Statistic.OBJECT_BULK_DELETE_REQUEST;
import static org.apache.hadoop.fs.s3a.Statistic.OBJECT_DELETE_OBJECTS;
import static org.apache.hadoop.fs.s3a.Statistic.OBJECT_DELETE_REQUEST;
import static org.apache.hadoop.fs.s3a.Statistic.OBJECT_METADATA_REQUESTS;
import static org.apache.hadoop.fs.s3a.Statistic.OBJECT_PUT_BYTES;
import static org.apache.hadoop.fs.s3a.Statistic.OBJECT_PUT_BYTES_PENDING;
import static org.apache.hadoop.fs.s3a.Statistic.OBJECT_PUT_REQUESTS_ACTIVE;
import static org.apache.hadoop.fs.s3a.Statistic.OBJECT_PUT_REQUESTS_COMPLETED;
import static org.apache.hadoop.fs.s3a.Statistic.STORE_IO_RATE_LIMITED;
import static org.apache.hadoop.fs.s3a.Statistic.STORE_IO_RETRY;
import static org.apache.hadoop.fs.s3a.Statistic.STORE_IO_THROTTLED;
import static org.apache.hadoop.fs.s3a.Statistic.STORE_IO_THROTTLE_RATE;
import static org.apache.hadoop.fs.s3a.impl.ErrorTranslation.isObjectNotFound;
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.DELETE_CONSIDERED_IDEMPOTENT;
import static org.apache.hadoop.fs.statistics.StoreStatisticNames.ACTION_HTTP_GET_REQUEST;
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfOperation;
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfSupplier;
import static org.apache.hadoop.util.Preconditions.checkArgument;

/**
 * Store Layer.
 * This is where lower level storage operations are intended
 * to move.
 */
public class S3AStoreImpl implements S3AStore {

  private static final Logger LOG = LoggerFactory.getLogger(S3AStoreImpl.class);

  /**
   * Progress logger; fairly noisy.
   */
  private static final Logger PROGRESS =
      LoggerFactory.getLogger(InternalConstants.UPLOAD_PROGRESS_LOG_NAME);

  /** Factory to create store contexts. */
  private final StoreContextFactory storeContextFactory;

  /** Source of the S3 clients. */
  private final ClientManager clientManager;

  /** The S3 bucket to communicate with. */
  private final String bucket;

  /** Request factory for creating requests. */
  private final RequestFactory requestFactory;

  /** Duration tracker factory. */
  private final DurationTrackerFactory durationTrackerFactory;

  /** The core instrumentation. */
  private final S3AInstrumentation instrumentation;

  /** Accessors to statistics for this FS. */
  private final S3AStatisticsContext statisticsContext;

  /** Storage Statistics Bonded to the instrumentation. */
  private final S3AStorageStatistics storageStatistics;

  /** Rate limiter for read operations. */
  private final RateLimiting readRateLimiter;

  /** Rate limiter for write operations. */
  private final RateLimiting writeRateLimiter;

  /** Store context. */
  private final StoreContext storeContext;

  /** Invoker for retry operations. */
  private final Invoker invoker;

  /** Audit span source. */
  private final AuditSpanSource<AuditSpanS3A> auditSpanSource;

  /**
   * The original file system statistics: fairly minimal but broadly
   * collected so it is important to pick up.
   * This may be null.
   */
  private final FileSystem.Statistics fsStatistics;

  /** Constructor to create S3A store. */
  S3AStoreImpl(StoreContextFactory storeContextFactory,
      ClientManager clientManager,
      DurationTrackerFactory durationTrackerFactory,
      S3AInstrumentation instrumentation,
      S3AStatisticsContext statisticsContext,
      S3AStorageStatistics storageStatistics,
      RateLimiting readRateLimiter,
      RateLimiting writeRateLimiter,
      AuditSpanSource<AuditSpanS3A> auditSpanSource,
      @Nullable FileSystem.Statistics fsStatistics) {
    this.storeContextFactory = requireNonNull(storeContextFactory);
    this.clientManager = requireNonNull(clientManager);
    this.durationTrackerFactory = requireNonNull(durationTrackerFactory);
    this.instrumentation = requireNonNull(instrumentation);
    this.statisticsContext = requireNonNull(statisticsContext);
    this.storageStatistics = requireNonNull(storageStatistics);
    this.readRateLimiter = requireNonNull(readRateLimiter);
    this.writeRateLimiter = requireNonNull(writeRateLimiter);
    this.auditSpanSource = requireNonNull(auditSpanSource);
    this.storeContext = requireNonNull(storeContextFactory.createStoreContext());
    this.fsStatistics = fsStatistics;
    this.invoker = storeContext.getInvoker();
    this.bucket = storeContext.getBucket();
    this.requestFactory = storeContext.getRequestFactory();
  }

  @Override
  public void close() {
    clientManager.close();
  }

  /** Acquire write capacity for rate limiting {@inheritDoc}. */
  @Override
  public Duration acquireWriteCapacity(final int capacity) {
    return writeRateLimiter.acquire(capacity);
  }

  /** Acquire read capacity for rate limiting {@inheritDoc}. */
  @Override
  public Duration acquireReadCapacity(final int capacity) {
    return readRateLimiter.acquire(capacity);

  }

  /**
   * Create a new store context.
   * @return a new store context.
   */
  private StoreContext createStoreContext() {
    return storeContextFactory.createStoreContext();
  }

  @Override
  public StoreContext getStoreContext() {
    return storeContext;
  }

  /**
   * Get the S3 client.
   * @return the S3 client.
   * @throws UncheckedIOException on any failure to create the client.
   */
  private S3Client getS3Client() throws UncheckedIOException {
    return clientManager.getOrCreateS3ClientUnchecked();
  }

  @Override
  public S3TransferManager getOrCreateTransferManager() throws IOException {
    return clientManager.getOrCreateTransferManager();
  }

  @Override
  public S3Client getOrCreateS3Client() throws IOException {
    return clientManager.getOrCreateS3Client();
  }

  @Override
  public S3AsyncClient getOrCreateAsyncClient() throws IOException {
    return clientManager.getOrCreateAsyncClient();
  }

  @Override
  public S3Client getOrCreateS3ClientUnchecked() throws UncheckedIOException {
    return clientManager.getOrCreateS3ClientUnchecked();
  }

  @Override
  public S3Client getOrCreateAsyncS3ClientUnchecked() throws UncheckedIOException {
    return clientManager.getOrCreateAsyncS3ClientUnchecked();
  }

  @Override
  public S3Client getOrCreateUnencryptedS3Client() throws IOException {
    return clientManager.getOrCreateUnencryptedS3Client();
  }

  @Override
  public DurationTrackerFactory getDurationTrackerFactory() {
    return durationTrackerFactory;
  }

  private S3AInstrumentation getInstrumentation() {
    return instrumentation;
  }

  @Override
  public S3AStatisticsContext getStatisticsContext() {
    return statisticsContext;
  }

  private S3AStorageStatistics getStorageStatistics() {
    return storageStatistics;
  }

  @Override
  public RequestFactory getRequestFactory() {
    return requestFactory;
  }

  /**
   * Get the client manager.
   * @return the client manager.
   */
  @Override
  public ClientManager clientManager() {
    return clientManager;
  }

  /**
   * Increment a statistic by 1.
   * This increments both the instrumentation and storage statistics.
   * @param statistic The operation to increment
   */
  protected void incrementStatistic(Statistic statistic) {
    incrementStatistic(statistic, 1);
  }

  /**
   * Increment a statistic by a specific value.
   * This increments both the instrumentation and storage statistics.
   * @param statistic The operation to increment
   * @param count the count to increment
   */
  protected void incrementStatistic(Statistic statistic, long count) {
    statisticsContext.incrementCounter(statistic, count);
  }

  /**
   * Decrement a gauge by a specific value.
   * @param statistic The operation to decrement
   * @param count the count to decrement
   */
  protected void decrementGauge(Statistic statistic, long count) {
    statisticsContext.decrementGauge(statistic, count);
  }

  /**
   * Increment a gauge by a specific value.
   * @param statistic The operation to increment
   * @param count the count to increment
   */
  protected void incrementGauge(Statistic statistic, long count) {
    statisticsContext.incrementGauge(statistic, count);
  }

  /**
   * Callback when an operation was retried.
   * Increments the statistics of ignored errors or throttled requests,
   * depending up on the exception class.
   * @param ex exception.
   */
  public void operationRetried(Exception ex) {
    if (isThrottleException(ex)) {
      LOG.debug("Request throttled");
      incrementStatistic(STORE_IO_THROTTLED);
      statisticsContext.addValueToQuantiles(STORE_IO_THROTTLE_RATE, 1);
    } else {
      incrementStatistic(STORE_IO_RETRY);
      incrementStatistic(IGNORED_ERRORS);
    }
  }

  /**
   * Callback from {@link Invoker} when an operation is retried.
   * @param text text of the operation
   * @param ex exception
   * @param retries number of retries
   * @param idempotent is the method idempotent
   */
  public void operationRetried(String text, Exception ex, int retries, boolean idempotent) {
    operationRetried(ex);
  }

  /**
   * Get the instrumentation's IOStatistics.
   * @return statistics
   */
  @Override
  public IOStatistics getIOStatistics() {
    return instrumentation.getIOStatistics();
  }

  /**
   * Increment read operations.
   */
  @Override
  public void incrementReadOperations() {
    if (fsStatistics != null) {
      fsStatistics.incrementReadOps(1);
    }
  }

  /**
   * Increment the write operation counter.
   * This is somewhat inaccurate, as it appears to be invoked more
   * often than needed in progress callbacks.
   */
  @Override
  public void incrementWriteOperations() {
    if (fsStatistics != null) {
      fsStatistics.incrementWriteOps(1);
    }
  }


  /**
   * Increment the bytes written statistic.
   * @param bytes number of bytes written.
   */
  private void incrementBytesWritten(final long bytes) {
    if (fsStatistics != null) {
      fsStatistics.incrementBytesWritten(bytes);
    }
  }

  /**
   * At the start of a put/multipart upload operation, update the
   * relevant counters.
   *
   * @param bytes bytes in the request.
   */
  @Override
  public void incrementPutStartStatistics(long bytes) {
    LOG.debug("PUT start {} bytes", bytes);
    incrementWriteOperations();
    incrementGauge(OBJECT_PUT_REQUESTS_ACTIVE, 1);
    if (bytes > 0) {
      incrementGauge(OBJECT_PUT_BYTES_PENDING, bytes);
    }
  }

  /**
   * At the end of a put/multipart upload operation, update the
   * relevant counters and gauges.
   *
   * @param success did the operation succeed?
   * @param bytes bytes in the request.
   */
  @Override
  public void incrementPutCompletedStatistics(boolean success, long bytes) {
    LOG.debug("PUT completed success={}; {} bytes", success, bytes);
    if (bytes > 0) {
      incrementStatistic(OBJECT_PUT_BYTES, bytes);
      decrementGauge(OBJECT_PUT_BYTES_PENDING, bytes);
    }
    incrementStatistic(OBJECT_PUT_REQUESTS_COMPLETED);
    decrementGauge(OBJECT_PUT_REQUESTS_ACTIVE, 1);
  }

  /**
   * Callback for use in progress callbacks from put/multipart upload events.
   * Increments those statistics which are expected to be updated during
   * the ongoing upload operation.
   * @param key key to file that is being written (for logging)
   * @param bytes bytes successfully uploaded.
   */
  @Override
  public void incrementPutProgressStatistics(String key, long bytes) {
    PROGRESS.debug("PUT {}: {} bytes", key, bytes);
    incrementWriteOperations();
    if (bytes > 0) {
      incrementBytesWritten(bytes);
    }
  }

  /**
   * Given a possibly null duration tracker factory, return a non-null
   * one for use in tracking durations -either that or the FS tracker
   * itself.
   *
   * @param factory factory.
   * @return a non-null factory.
   */
  @Override
  public DurationTrackerFactory nonNullDurationTrackerFactory(
      DurationTrackerFactory factory) {
    return factory != null
        ? factory
        : getDurationTrackerFactory();
  }

  /**
   * Start an operation; this informs the audit service of the event
   * and then sets it as the active span.
   * @param operation operation name.
   * @param path1 first path of operation
   * @param path2 second path of operation
   * @return a span for the audit
   * @throws IOException failure
   */
  public AuditSpanS3A createSpan(String operation, @Nullable String path1, @Nullable String path2)
      throws IOException {

    return auditSpanSource.createSpan(operation, path1, path2);
  }

  /**
   * Reject any request to delete an object where the key is root.
   * @param key key to validate
   * @throws IllegalArgumentException if the request was rejected due to
   * a mistaken attempt to delete the root directory.
   */
  private void blockRootDelete(String key) throws IllegalArgumentException {
    checkArgument(!key.isEmpty() && !"/".equals(key), "Bucket %s cannot be deleted", bucket);
  }

  /**
   * {@inheritDoc}.
   */
  @Override
  @Retries.RetryRaw
  public Map.Entry<Duration, DeleteObjectsResponse> deleteObjects(
      final DeleteObjectsRequest deleteRequest)
      throws SdkException {

    DeleteObjectsResponse response;
    BulkDeleteRetryHandler retryHandler = new BulkDeleteRetryHandler(createStoreContext());

    final List<ObjectIdentifier> keysToDelete = deleteRequest.delete().objects();
    int keyCount = keysToDelete.size();
    if (LOG.isDebugEnabled()) {
      LOG.debug("Initiating delete operation for {} objects", keysToDelete.size());
      keysToDelete.stream().forEach(objectIdentifier -> {
        LOG.debug(" \"{}\" {}", objectIdentifier.key(),
            objectIdentifier.versionId() != null ? objectIdentifier.versionId() : "");
      });
    }
    // block root calls
    keysToDelete.stream().map(ObjectIdentifier::key).forEach(this::blockRootDelete);

    try (DurationInfo d = new DurationInfo(LOG, false, "DELETE %d keys", keyCount)) {
      response =
          invoker.retryUntranslated("delete",
              DELETE_CONSIDERED_IDEMPOTENT, (text, e, r, i) -> {
                // handle the failure
                retryHandler.bulkDeleteRetried(deleteRequest, e);
              },
              // duration is tracked in the bulk delete counters
              trackDurationOfOperation(getDurationTrackerFactory(),
                  OBJECT_BULK_DELETE_REQUEST.getSymbol(), () -> {
                    // acquire the write capacity for the number of keys to delete
                    // and record the duration.
                    Duration durationToAcquireWriteCapacity = acquireWriteCapacity(keyCount);
                    instrumentation.recordDuration(STORE_IO_RATE_LIMITED,
                        true,
                        durationToAcquireWriteCapacity);
                    incrementStatistic(OBJECT_DELETE_OBJECTS, keyCount);
                    return getS3Client().deleteObjects(deleteRequest);
                  }));
      if (!response.errors().isEmpty()) {
        // one or more of the keys could not be deleted.
        // log and then throw
        List<S3Error> errors = response.errors();
        if (LOG.isDebugEnabled()) {
          LOG.debug("Partial failure of delete, {} errors", errors.size());
          for (S3Error error : errors) {
            LOG.debug("{}: \"{}\" - {}", error.key(), error.code(), error.message());
          }
        }
      }
      d.close();
      return Tuples.pair(d.asDuration(), response);

    } catch (IOException e) {
      // convert to unchecked.
      throw new UncheckedIOException(e);
    }
  }

  /**
   * Performs a HEAD request on an S3 object to retrieve its metadata.
   *
   * @param key           The S3 object key to perform the HEAD operation on
   * @param changeTracker Tracks changes to the object's metadata across operations
   * @param changeInvoker The invoker responsible for executing the HEAD request with retries
   * @param fsHandler     Handler for filesystem-level operations and configurations
   * @param operation     Description of the operation being performed for tracking purposes
   * @return              HeadObjectResponse containing the object's metadata
   * @throws IOException  If the HEAD request fails, object doesn't exist, or other I/O errors occur
   */
  @Override
  @Retries.RetryRaw
  public HeadObjectResponse headObject(String key,
      ChangeTracker changeTracker,
      Invoker changeInvoker,
      S3AFileSystemOperations fsHandler,
      String operation) throws IOException {
    HeadObjectResponse response = getStoreContext().getInvoker()
        .retryUntranslated("HEAD " + key, true,
            () -> {
              HeadObjectRequest.Builder requestBuilder =
                  getRequestFactory().newHeadObjectRequestBuilder(key);
              incrementStatistic(OBJECT_METADATA_REQUESTS);
              DurationTracker duration =
                  getDurationTrackerFactory().trackDuration(ACTION_HTTP_HEAD_REQUEST.getSymbol());
              try {
                LOG.debug("HEAD {} with change tracker {}", key, changeTracker);
                if (changeTracker != null) {
                  changeTracker.maybeApplyConstraint(requestBuilder);
                }
                HeadObjectResponse headObjectResponse =
                    getS3Client().headObject(requestBuilder.build());
                if (fsHandler != null) {
                  long length =
                      fsHandler.getS3ObjectSize(key, headObjectResponse.contentLength(), this,
                          headObjectResponse);
                  // overwrite the content length
                  headObjectResponse = headObjectResponse.toBuilder().contentLength(length).build();
                }
                if (changeTracker != null) {
                  changeTracker.processMetadata(headObjectResponse, operation);
                }
                return headObjectResponse;
              } catch (AwsServiceException ase) {
                if (!isObjectNotFound(ase)) {
                  // file not found is not considered a failure of the call,
                  // so only switch the duration tracker to update failure
                  // metrics on other exception outcomes.
                  duration.failed();
                }
                throw ase;
              } finally {
                // update the tracker.
                duration.close();
              }
            });
    incrementReadOperations();
    return response;
  }

  /**
   * Retrieves a specific byte range of an S3 object as a stream.
   *
   * @param key    The S3 object key to retrieve
   * @param start  The starting byte position (inclusive) of the range to retrieve
   * @param end    The ending byte position (inclusive) of the range to retrieve
   * @return       A ResponseInputStream containing the requested byte range of the S3 object
   * @throws IOException  If the object cannot be retrieved other I/O errors occur
   * @see GetObjectResponse  For additional metadata about the retrieved object
   */
  @Override
  @Retries.RetryRaw
  public ResponseInputStream<GetObjectResponse> getRangedS3Object(String key,
      long start,
      long end) throws IOException {
    final GetObjectRequest request = getRequestFactory().newGetObjectRequestBuilder(key)
        .range(S3AUtils.formatRange(start, end))
        .build();
    DurationTracker duration = getDurationTrackerFactory()
        .trackDuration(ACTION_HTTP_GET_REQUEST);
    ResponseInputStream<GetObjectResponse> objectRange;
    try {
      objectRange = getStoreContext().getInvoker()
          .retryUntranslated("GET Ranged Object " + key, true,
              () -> getS3Client().getObject(request));

    } catch (IOException ex) {
      duration.failed();
      throw ex;
    } finally {
      duration.close();
    }
    return objectRange;
  }

  /**
   * {@inheritDoc}.
   */
  @Override
  @Retries.RetryRaw
  public Map.Entry<Duration, Optional<DeleteObjectResponse>> deleteObject(
      final DeleteObjectRequest request)
      throws SdkException {

    String key = request.key();
    blockRootDelete(key);
    DurationInfo d = new DurationInfo(LOG, false, "deleting %s", key);
    try {
      DeleteObjectResponse response =
          invoker.retryUntranslated(String.format("Delete %s:/%s", bucket, key),
              DELETE_CONSIDERED_IDEMPOTENT,
              trackDurationOfOperation(getDurationTrackerFactory(),
                  OBJECT_DELETE_REQUEST.getSymbol(), () -> {
                    incrementStatistic(OBJECT_DELETE_OBJECTS);
                    // We try to acquire write capacity just before delete call.
                    Duration durationToAcquireWriteCapacity = acquireWriteCapacity(1);
                    instrumentation.recordDuration(STORE_IO_RATE_LIMITED,
                        true, durationToAcquireWriteCapacity);
                    return getS3Client().deleteObject(request);
                  }));
      d.close();
      return Tuples.pair(d.asDuration(), Optional.of(response));
    } catch (AwsServiceException ase) {
      // 404 errors get swallowed; this can be raised by
      // third party stores (GCS).
      if (!isObjectNotFound(ase)) {
        throw ase;
      }
      d.close();
      return Tuples.pair(d.asDuration(), Optional.empty());
    } catch (IOException e) {
      // convert to unchecked.
      throw new UncheckedIOException(e);
    }
  }

  /**
   * Upload part of a multi-partition file.
   * Increments the write and put counters.
   * <i>Important: this call does not close any input stream in the body.</i>
   * <p>
   * Retry Policy: none.
   * @param trackerFactory duration tracker factory for operation
   * @param request the upload part request.
   * @param body the request body.
   * @return the result of the operation.
   * @throws AwsServiceException on problems
   * @throws UncheckedIOException failure to instantiate the s3 client
   */
  @Override
  @Retries.OnceRaw
  public UploadPartResponse uploadPart(
      final UploadPartRequest request,
      final RequestBody body,
      @Nullable final DurationTrackerFactory trackerFactory)
      throws AwsServiceException, UncheckedIOException {
    long len = request.contentLength();
    incrementPutStartStatistics(len);
    try {
      UploadPartResponse uploadPartResponse = trackDurationOfSupplier(
          nonNullDurationTrackerFactory(trackerFactory),
          MULTIPART_UPLOAD_PART_PUT.getSymbol(), () ->
              getS3Client().uploadPart(request, body));
      incrementPutCompletedStatistics(true, len);
      return uploadPartResponse;
    } catch (AwsServiceException e) {
      incrementPutCompletedStatistics(false, len);
      throw e;
    }
  }

  /**
   * Start a transfer-manager managed async PUT of an object,
   * incrementing the put requests and put bytes
   * counters.
   * <p>
   * It does not update the other counters,
   * as existing code does that as progress callbacks come in.
   * Byte length is calculated from the file length, or, if there is no
   * file, from the content length of the header.
   * <p>
   * Because the operation is async, any stream supplied in the request
   * must reference data (files, buffers) which stay valid until the upload
   * completes.
   * Retry policy: N/A: the transfer manager is performing the upload.
   * Auditing: must be inside an audit span.
   * @param putObjectRequest the request
   * @param file the file to be uploaded
   * @param listener the progress listener for the request
   * @return the upload initiated
   * @throws IOException if transfer manager creation failed.
   */
  @Override
  @Retries.OnceRaw
  public UploadInfo putObject(
      PutObjectRequest putObjectRequest,
      File file,
      ProgressableProgressListener listener) throws IOException {
    long len = getPutRequestLength(putObjectRequest);
    LOG.debug("PUT {} bytes to {} via transfer manager ", len, putObjectRequest.key());
    incrementPutStartStatistics(len);

    FileUpload upload = getOrCreateTransferManager().uploadFile(
            UploadFileRequest.builder()
                .putObjectRequest(putObjectRequest)
                .source(file)
                .addTransferListener(listener)
                .build());

    return new UploadInfo(upload, len);
  }

  /**
   * Wait for an upload to complete.
   * If the upload (or its result collection) failed, this is where
   * the failure is raised as an AWS exception.
   * Calls {@link S3AStore#incrementPutCompletedStatistics(boolean, long)}
   * to update the statistics.
   * @param key destination key
   * @param uploadInfo upload to wait for
   * @return the upload result
   * @throws IOException IO failure
   * @throws CancellationException if the wait() was cancelled
   */
  @Override
  @Retries.OnceTranslated
  public CompletedFileUpload waitForUploadCompletion(String key, UploadInfo uploadInfo)
      throws IOException {
    FileUpload upload = uploadInfo.getFileUpload();
    try {
      CompletedFileUpload result = upload.completionFuture().join();
      incrementPutCompletedStatistics(true, uploadInfo.getLength());
      return result;
    } catch (CompletionException e) {
      LOG.info("Interrupted: aborting upload");
      incrementPutCompletedStatistics(false, uploadInfo.getLength());
      throw extractException("upload", key, e);
    }
  }

  /**
   * Complete a multipart upload.
   * @param request request
   * @return the response
   */
  @Override
  @Retries.OnceRaw
  public CompleteMultipartUploadResponse completeMultipartUpload(
      CompleteMultipartUploadRequest request) {
    return getS3Client().completeMultipartUpload(request);
  }

}