S3AFileSystem.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.fs.s3a;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.UncheckedIOException;
import java.net.URI;
import java.nio.file.AccessDeniedException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.EnumSet;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.Objects;
import java.util.TreeSet;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nullable;

import software.amazon.awssdk.core.ResponseInputStream;
import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest;
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadResponse;
import software.amazon.awssdk.services.s3.model.GetBucketLocationRequest;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
import software.amazon.awssdk.services.s3.model.HeadBucketRequest;
import software.amazon.awssdk.services.s3.model.HeadBucketResponse;
import software.amazon.awssdk.services.s3.model.MultipartUpload;
import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest;
import software.amazon.awssdk.services.s3.model.CreateMultipartUploadResponse;
import software.amazon.awssdk.services.s3.model.ListMultipartUploadsRequest;
import software.amazon.awssdk.services.s3.model.ListObjectsRequest;
import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
import software.amazon.awssdk.awscore.exception.AwsServiceException;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.services.s3.model.CopyObjectRequest;
import software.amazon.awssdk.services.s3.model.CopyObjectResponse;
import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest;
import software.amazon.awssdk.services.s3.model.DeleteObjectsResponse;
import software.amazon.awssdk.services.s3.model.HeadObjectResponse;
import software.amazon.awssdk.services.s3.model.NoSuchBucketException;
import software.amazon.awssdk.services.s3.model.ObjectIdentifier;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.PutObjectResponse;
import software.amazon.awssdk.services.s3.model.S3Object;
import software.amazon.awssdk.services.s3.model.StorageClass;
import software.amazon.awssdk.services.s3.model.UploadPartRequest;
import software.amazon.awssdk.services.s3.model.UploadPartResponse;
import software.amazon.awssdk.transfer.s3.model.CompletedCopy;
import software.amazon.awssdk.transfer.s3.model.Copy;
import software.amazon.awssdk.transfer.s3.model.CopyRequest;

import org.apache.hadoop.fs.impl.prefetch.ExecutorServiceFuturePool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BulkDelete;
import org.apache.hadoop.fs.CommonPathCapabilities;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FSDataOutputStreamBuilder;
import org.apache.hadoop.fs.Globber;
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.impl.FlagSet;
import org.apache.hadoop.fs.impl.OpenFileParameters;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.s3a.api.PerformanceFlagEnum;
import org.apache.hadoop.fs.s3a.audit.AuditSpanS3A;
import org.apache.hadoop.fs.s3a.auth.SignerManager;
import org.apache.hadoop.fs.s3a.auth.delegation.DelegationOperations;
import org.apache.hadoop.fs.s3a.auth.delegation.DelegationTokenProvider;
import org.apache.hadoop.fs.s3a.commit.magic.InMemoryMagicCommitTracker;
import org.apache.hadoop.fs.s3a.impl.AWSCannedACL;
import org.apache.hadoop.fs.s3a.impl.BaseS3AFileSystemOperations;
import org.apache.hadoop.fs.s3a.impl.BulkDeleteOperation;
import org.apache.hadoop.fs.s3a.impl.BulkDeleteOperationCallbacksImpl;
import org.apache.hadoop.fs.s3a.impl.CSES3AFileSystemOperations;
import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy;
import org.apache.hadoop.fs.s3a.impl.ClientManager;
import org.apache.hadoop.fs.s3a.impl.ClientManagerImpl;
import org.apache.hadoop.fs.s3a.impl.ConfigurationHelper;
import org.apache.hadoop.fs.s3a.impl.ContextAccessors;
import org.apache.hadoop.fs.s3a.impl.CopyFromLocalOperation;
import org.apache.hadoop.fs.s3a.impl.CreateFileBuilder;
import org.apache.hadoop.fs.s3a.impl.S3AFileSystemOperations;
import org.apache.hadoop.fs.s3a.impl.CSEV1CompatibleS3AFileSystemOperations;
import org.apache.hadoop.fs.s3a.impl.CSEMaterials;
import org.apache.hadoop.fs.s3a.impl.DeleteOperation;
import org.apache.hadoop.fs.s3a.impl.GetContentSummaryOperation;
import org.apache.hadoop.fs.s3a.impl.HeaderProcessing;
import org.apache.hadoop.fs.s3a.impl.InternalConstants;
import org.apache.hadoop.fs.s3a.impl.ListingOperationCallbacks;
import org.apache.hadoop.fs.s3a.impl.MkdirOperation;
import org.apache.hadoop.fs.s3a.impl.MultiObjectDeleteException;
import org.apache.hadoop.fs.s3a.impl.OpenFileSupport;
import org.apache.hadoop.fs.s3a.impl.OperationCallbacks;
import org.apache.hadoop.fs.s3a.impl.PutObjectOptions;
import org.apache.hadoop.fs.s3a.impl.RenameOperation;
import org.apache.hadoop.fs.s3a.impl.RequestFactoryImpl;
import org.apache.hadoop.fs.s3a.impl.S3AMultipartUploaderBuilder;
import org.apache.hadoop.fs.s3a.impl.S3AStoreBuilder;
import org.apache.hadoop.fs.s3a.impl.StatusProbeEnum;
import org.apache.hadoop.fs.s3a.impl.StoreContext;
import org.apache.hadoop.fs.s3a.impl.StoreContextBuilder;
import org.apache.hadoop.fs.s3a.impl.StoreContextFactory;
import org.apache.hadoop.fs.s3a.impl.UploadContentProviders;
import org.apache.hadoop.fs.s3a.impl.CSEUtils;
import org.apache.hadoop.fs.s3a.prefetch.S3APrefetchingInputStream;
import org.apache.hadoop.fs.s3a.tools.MarkerToolOperations;
import org.apache.hadoop.fs.s3a.tools.MarkerToolOperationsImpl;
import org.apache.hadoop.fs.statistics.DurationTracker;
import org.apache.hadoop.fs.statistics.DurationTrackerFactory;
import org.apache.hadoop.fs.statistics.FileSystemStatisticNames;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
import org.apache.hadoop.fs.statistics.IOStatisticsContext;
import org.apache.hadoop.fs.statistics.StreamStatisticNames;
import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
import org.apache.hadoop.fs.store.LogExactlyOnce;
import org.apache.hadoop.fs.store.audit.AuditEntryPoint;
import org.apache.hadoop.fs.store.audit.ActiveThreadSpanSource;
import org.apache.hadoop.fs.store.audit.AuditSpan;
import org.apache.hadoop.fs.store.audit.AuditSpanSource;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.token.DelegationTokenIssuer;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.GlobalStorageStatistics;
import org.apache.hadoop.fs.InvalidRequestException;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.PathIOException;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.s3a.api.RequestFactory;
import org.apache.hadoop.fs.s3a.audit.AuditManagerS3A;
import org.apache.hadoop.fs.s3a.audit.AuditIntegration;
import org.apache.hadoop.fs.s3a.audit.OperationAuditor;
import org.apache.hadoop.fs.s3a.auth.RoleModel;
import org.apache.hadoop.fs.s3a.auth.delegation.AWSPolicyProvider;
import org.apache.hadoop.fs.s3a.auth.delegation.EncryptionSecrets;
import org.apache.hadoop.fs.s3a.auth.delegation.S3ADelegationTokens;
import org.apache.hadoop.fs.s3a.auth.delegation.AbstractS3ATokenIdentifier;
import org.apache.hadoop.fs.s3a.commit.CommitConstants;
import org.apache.hadoop.fs.s3a.commit.PutTracker;
import org.apache.hadoop.fs.s3a.commit.MagicCommitIntegration;
import org.apache.hadoop.fs.s3a.impl.ChangeTracker;
import org.apache.hadoop.fs.s3a.statistics.BlockOutputStreamStatistics;
import org.apache.hadoop.fs.s3a.statistics.CommitterStatistics;
import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
import org.apache.hadoop.fs.s3a.statistics.S3AStatisticsContext;
import org.apache.hadoop.fs.s3a.statistics.impl.BondedS3AStatisticsContext;
import org.apache.hadoop.fs.s3native.S3xLoginHelper;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.fs.store.EtagChecksum;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.ProviderUtils;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.BlockingThreadPoolExecutorService;
import org.apache.hadoop.util.DurationInfo;
import org.apache.hadoop.util.LambdaUtils;
import org.apache.hadoop.util.Lists;
import org.apache.hadoop.util.Preconditions;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.RateLimitingFactory;
import org.apache.hadoop.util.SemaphoredDelegatingExecutor;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.hadoop.util.functional.CallableRaisingIOE;

import static java.util.Objects.requireNonNull;
import static org.apache.hadoop.fs.CommonConfigurationKeys.IOSTATISTICS_LOGGING_LEVEL;
import static org.apache.hadoop.fs.CommonConfigurationKeys.IOSTATISTICS_LOGGING_LEVEL_DEFAULT;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
import static org.apache.hadoop.fs.CommonPathCapabilities.DIRECTORY_LISTING_INCONSISTENT;
import static org.apache.hadoop.fs.impl.FlagSet.buildFlagSet;
import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
import static org.apache.hadoop.fs.s3a.Constants.*;
import static org.apache.hadoop.fs.s3a.Invoker.*;
import static org.apache.hadoop.fs.s3a.Listing.toLocatedFileStatusIterator;
import static org.apache.hadoop.fs.s3a.S3AUtils.*;
import static org.apache.hadoop.fs.s3a.Statistic.*;
import static org.apache.hadoop.fs.s3a.audit.S3AAuditConstants.INITIALIZE_SPAN;
import static org.apache.hadoop.fs.s3a.auth.CredentialProviderListFactory.createAWSCredentialProviderList;
import static org.apache.hadoop.fs.s3a.auth.RolePolicies.STATEMENT_ALLOW_KMS_RW;
import static org.apache.hadoop.fs.s3a.auth.RolePolicies.allowS3Operations;
import static org.apache.hadoop.fs.s3a.auth.delegation.S3ADelegationTokens.TokenIssuingPolicy.NoTokensAvailable;
import static org.apache.hadoop.fs.s3a.auth.delegation.S3ADelegationTokens.hasDelegationTokenBinding;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_ABORT_PENDING_UPLOADS;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_STAGING_ABORT_PENDING_UPLOADS;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC_COMMITTER_PENDING_OBJECT_ETAG_NAME;
import static org.apache.hadoop.fs.s3a.commit.magic.MagicCommitTrackerUtils.isTrackMagicCommitsInMemoryEnabled;
import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.submit;
import static org.apache.hadoop.fs.s3a.impl.CreateFileBuilder.OPTIONS_CREATE_FILE_NO_OVERWRITE;
import static org.apache.hadoop.fs.s3a.impl.CreateFileBuilder.OPTIONS_CREATE_FILE_OVERWRITE;
import static org.apache.hadoop.fs.s3a.impl.CreateFileBuilder.OPTIONS_CREATE_FILE_PERFORMANCE;
import static org.apache.hadoop.fs.s3a.impl.ErrorTranslation.isUnknownBucket;
import static org.apache.hadoop.fs.s3a.impl.HeaderProcessing.CONTENT_TYPE_OCTET_STREAM;
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.AP_REQUIRED_EXCEPTION;
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.ARN_BUCKET_OPTION;
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.DEFAULT_UPLOAD_PART_COUNT_LIMIT;
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.SC_403_FORBIDDEN;
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.SC_404_NOT_FOUND;
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.UPLOAD_PART_COUNT_LIMIT;
import static org.apache.hadoop.fs.s3a.impl.NetworkBinding.fixBucketRegion;
import static org.apache.hadoop.fs.s3a.impl.NetworkBinding.logDnsLookup;
import static org.apache.hadoop.fs.s3a.impl.S3ExpressStorage.STORE_CAPABILITY_S3_EXPRESS_STORAGE;
import static org.apache.hadoop.fs.s3a.impl.S3ExpressStorage.isS3ExpressStore;
import static org.apache.hadoop.fs.s3a.s3guard.S3Guard.checkNoS3Guard;
import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.logIOStatisticsAtLevel;
import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OBJECT_CONTINUE_LIST_REQUEST;
import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OBJECT_LIST_REQUEST;
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.pairedTrackerFactory;
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration;
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfOperation;
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfSupplier;
import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
import static org.apache.hadoop.util.Preconditions.checkArgument;
import static org.apache.hadoop.util.RateLimitingFactory.unlimitedRate;
import static org.apache.hadoop.util.functional.RemoteIterators.foreach;
import static org.apache.hadoop.util.functional.RemoteIterators.typeCastingRemoteIterator;

/**
 * The core S3A Filesystem implementation.
 *
 * This subclass is marked as private as code should not be creating it
 * directly; use {@link FileSystem#get(Configuration)} and variants to
 * create one.
 *
 * If cast to {@code S3AFileSystem}, extra methods and features may be accessed.
 * Consider those private and unstable.
 *
 * Because it prints some of the state of the instrumentation,
 * the output of {@link #toString()} must also be considered unstable.
 */
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class S3AFileSystem extends FileSystem implements StreamCapabilities,
    AWSPolicyProvider, DelegationTokenProvider, IOStatisticsSource,
    AuditSpanSource<AuditSpanS3A>, ActiveThreadSpanSource<AuditSpanS3A>,
        StoreContextFactory {

  /**
   * Default blocksize as used in blocksize and FS status queries.
   */
  public static final int DEFAULT_BLOCKSIZE = 32 * 1024 * 1024;

  private URI uri;

  private Path workingDir;

  private String username;

  /**
   * Store back end.
   */
  private S3AStore store;

  /**
   * The core S3 client is created and managed by the ClientManager.
   * It is copied here within {@link #initialize(URI, Configuration)}.
   * Some mocking tests modify this so take care with changes.
   */
  private S3Client s3Client;

  // initial callback policy is fail-once; it's there just to assist
  // some mock tests and other codepaths trying to call the low level
  // APIs on an uninitialized filesystem.
  private Invoker invoker = new Invoker(RetryPolicies.TRY_ONCE_THEN_FAIL,
      Invoker.LOG_EVENT);

  private final Retried onRetry = this::operationRetried;

  /**
   * Represents bucket name for all S3 operations. If per bucket override for
   * {@link InternalConstants#ARN_BUCKET_OPTION} property  is set, then the bucket is updated to
   * point to the configured Arn.
   */
  private String bucket;
  private int maxKeys;
  private Listing listing;
  private long partSize;
  private boolean enableMultiObjectsDelete;
  private ExecutorService boundedThreadPool;
  private ThreadPoolExecutor unboundedThreadPool;

  // S3 reads are prefetched asynchronously using this future pool.
  private ExecutorServiceFuturePool futurePool;

  // If true, the prefetching input stream is used for reads.
  private boolean prefetchEnabled;

  // Size in bytes of a single prefetch block.
  private int prefetchBlockSize;

  // Size of prefetch queue (in number of blocks).
  private int prefetchBlockCount;

  private int executorCapacity;
  private long multiPartThreshold;
  public static final Logger LOG = LoggerFactory.getLogger(S3AFileSystem.class);

  /** Log to warn of storage class configuration problems. */
  private static final LogExactlyOnce STORAGE_CLASS_WARNING = new LogExactlyOnce(LOG);

  private LocalDirAllocator directoryAllocator;
  private String cannedACL;

  /**
   * This must never be null; until initialized it just declares that there
   * is no encryption.
   */
  private EncryptionSecrets encryptionSecrets = new EncryptionSecrets();
  /** The core instrumentation. */
  private S3AInstrumentation instrumentation;
  /** Accessors to statistics for this FS. */
  private S3AStatisticsContext statisticsContext;
  /** Storage Statistics Bonded to the instrumentation. */
  private S3AStorageStatistics storageStatistics;

  /**
   * Performance flags.
   */
  private FlagSet<PerformanceFlagEnum> performanceFlags;

  /**
   * Default input policy; may be overridden in
   * {@code openFile()}.
   */
  private S3AInputPolicy inputPolicy;
  /** Vectored IO context. */
  private VectoredIOContext vectoredIOContext;

  /**
   * Maximum number of active range read operation a single
   * input stream can have.
   */
  private int vectoredActiveRangeReads;

  private long readAhead;
  private ChangeDetectionPolicy changeDetectionPolicy;
  private final AtomicBoolean closed = new AtomicBoolean(false);
  private volatile boolean isClosed = false;

  /** Delegation token integration; non-empty when DT support is enabled. */
  private Optional<S3ADelegationTokens> delegationTokens = Optional.empty();

  /** Principal who created the FS; recorded during initialization. */
  private UserGroupInformation owner;

  private String blockOutputBuffer;
  private S3ADataBlocks.BlockFactory blockFactory;
  private int blockOutputActiveBlocks;
  private boolean useListV1;
  private MagicCommitIntegration committerIntegration;

  private AWSCredentialProviderList credentials;
  private SignerManager signerManager;
  private S3AInternals s3aInternals;

  /**
   * Do directory operations purge pending uploads?
   */
  private boolean dirOperationsPurgeUploads;

  /**
   * Page size for deletions.
   */
  private int pageSize;

  private final ListingOperationCallbacks listingOperationCallbacks =
          new ListingOperationCallbacksImpl();

  /**
   * Helper for the openFile() method.
   */
  private OpenFileSupport openFileHelper;

  /**
   * Context accessors for re-use.
   */
  private final ContextAccessors contextAccessors = new ContextAccessorsImpl();

  /**
   * Factory for AWS requests.
   */
  private RequestFactory requestFactory;

  /**
   * Audit manager (service lifecycle).
   * Creates the audit service and manages the binding of different audit spans
   * to different threads.
   * Initially this is a no-op manager; once the service is initialized it will
   * be replaced with a configured one.
   */
  private AuditManagerS3A auditManager =
      AuditIntegration.stubAuditManager();

  /**
   * Is this S3A FS instance using S3 client side encryption?
   */
  private boolean isCSEEnabled;

  /**
   * Bucket AccessPoint.
   */
  private ArnResource accessPoint;

  /**
   * Handler for certain filesystem operations.
   */
  private S3AFileSystemOperations fsHandler;


  /**
   * Does this S3A FS instance have multipart upload enabled?
   */
  private boolean isMultipartUploadEnabled = DEFAULT_MULTIPART_UPLOAD_ENABLED;

  /**
   * Should file copy operations use the S3 transfer manager?
   * True unless multipart upload is disabled.
   */
  private boolean isMultipartCopyEnabled;

  /**
   * Is FIPS enabled?
   */
  private boolean fipsEnabled;

  /**
   * A cache of files that should be deleted when the FileSystem is closed
   * or the JVM is exited.
   */
  private final Set<Path> deleteOnExit = new TreeSet<>();

  /**
   * Scheme for the current filesystem.
   */
  private String scheme = FS_S3A;

  /**
   * Flag to indicate that the higher performance copyFromLocalFile implementation
   * should be used.
   */
  private boolean optimizedCopyFromLocal;

  /**
   * Is this an S3 Express store?
   */
  private boolean s3ExpressStore;

  /**
   * Store endpoint from configuration info or access point ARN.
   */
  private String endpoint;

  /**
   * Region from configuration info or access point ARN.
   */
  private String configuredRegion;

  /**
   * Are S3 Access Grants Enabled?
   */
  private boolean s3AccessGrantsEnabled;

  /** Add any deprecated keys. */
  @SuppressWarnings("deprecation")
  private static void addDeprecatedKeys() {
    Configuration.DeprecationDelta[] deltas = {
        new Configuration.DeprecationDelta(
            FS_S3A_COMMITTER_STAGING_ABORT_PENDING_UPLOADS,
            FS_S3A_COMMITTER_ABORT_PENDING_UPLOADS),
        new Configuration.DeprecationDelta(
            SERVER_SIDE_ENCRYPTION_ALGORITHM,
            S3_ENCRYPTION_ALGORITHM),
        new Configuration.DeprecationDelta(
            SERVER_SIDE_ENCRYPTION_KEY,
            S3_ENCRYPTION_KEY)
    };

    if (deltas.length > 0) {
      Configuration.addDeprecations(deltas);
      Configuration.reloadExistingConfigurations();
    }
  }

  static {
    addDeprecatedKeys();
  }

  /** Called after a new FileSystem instance is constructed.
   * @param name a uri whose authority section names the host, port, etc.
   *   for this FileSystem
   * @param originalConf the configuration to use for the FS. The
   * bucket-specific options are patched over the base ones before any use is
   * made of the config.
   */
  public void initialize(URI name, Configuration originalConf)
      throws IOException {
    // get the host; this is guaranteed to be non-null, non-empty
    bucket = name.getHost();
    AuditSpan span = null;
    // track initialization duration; will only be set after
    // statistics are set up.
    Optional<DurationTracker> trackInitialization = Optional.empty();
    try {
      LOG.debug("Initializing S3AFileSystem for {}", bucket);
      if (LOG.isTraceEnabled()) {
        // log a full trace for deep diagnostics of where an object is created,
        // for tracking down memory leak issues.
        LOG.trace("Filesystem for {} created; fs.s3a.impl.disable.cache = {}",
            name, originalConf.getBoolean("fs.s3a.impl.disable.cache", false),
            new RuntimeException(super.toString()));
      }
      // clone the configuration into one with propagated bucket options
      Configuration conf = propagateBucketOptions(originalConf, bucket);
      // HADOOP-17894. remove references to s3a stores in JCEKS credentials.
      conf = ProviderUtils.excludeIncompatibleCredentialProviders(
          conf, S3AFileSystem.class);
      String arn = String.format(ARN_BUCKET_OPTION, bucket);
      String configuredArn = conf.getTrimmed(arn, "");
      if (!configuredArn.isEmpty()) {
        accessPoint = ArnResource.accessPointFromArn(configuredArn);
        LOG.info("Using AccessPoint ARN \"{}\" for bucket {}", configuredArn, bucket);
        bucket = accessPoint.getFullArn();
      } else if (conf.getBoolean(AWS_S3_ACCESSPOINT_REQUIRED, false)) {
        LOG.warn("Access Point usage is required because \"{}\" is enabled," +
            " but not configured for the bucket: {}", AWS_S3_ACCESSPOINT_REQUIRED, bucket);
        throw new PathIOException(bucket, AP_REQUIRED_EXCEPTION);
      }

      // fix up the classloader of the configuration to be whatever
      // classloader loaded this filesystem.
      // See: HADOOP-17372 and follow-up on HADOOP-18993
      S3AUtils.maybeIsolateClassloader(conf, this.getClass().getClassLoader());

      // patch the Hadoop security providers
      patchSecurityCredentialProviders(conf);
      // look for delegation token support early.
      boolean delegationTokensEnabled = hasDelegationTokenBinding(conf);
      if (delegationTokensEnabled) {
        LOG.debug("Using delegation tokens");
      }
      // set the URI, this will do any fixup of the URI to remove secrets,
      // canonicalize.
      setUri(name, delegationTokensEnabled);
      super.initialize(uri, conf);
      setConf(conf);

      // initialize statistics, after which statistics
      // can be collected.
      instrumentation = new S3AInstrumentation(uri);
      initializeStatisticsBinding();

      // track initialization duration.
      // this should really be done in a onceTrackingDuration() call,
      // but then all methods below would need to be in the lambda and
      // it would create a merge/backport headache for all.
      trackInitialization = Optional.of(
          instrumentation.trackDuration(FileSystemStatisticNames.FILESYSTEM_INITIALIZATION));

      s3aInternals = createS3AInternals();

      // look for encryption data
      // DT Bindings may override this
      setEncryptionSecrets(
          buildEncryptionSecrets(bucket, conf));

      invoker = new Invoker(new S3ARetryPolicy(getConf()), onRetry);

      // If encryption method is set to CSE-KMS or CSE-CUSTOM then CSE is enabled.
      isCSEEnabled = CSEUtils.isCSEEnabled(getS3EncryptionAlgorithm().getMethod());

      // Create the appropriate fsHandler instance using a factory method
      fsHandler = createFileSystemHandler();
      fsHandler.setCSEGauge((IOStatisticsStore) getIOStatistics());
      // Username is the current user at the time the FS was instantiated.
      owner = UserGroupInformation.getCurrentUser();
      username = owner.getShortUserName();
      workingDir = new Path("/user", username)
          .makeQualified(this.uri, this.getWorkingDirectory());

      maxKeys = intOption(conf, MAX_PAGING_KEYS, DEFAULT_MAX_PAGING_KEYS, 1);
      partSize = getMultipartSizeProperty(conf,
          MULTIPART_SIZE, DEFAULT_MULTIPART_SIZE);
      multiPartThreshold = getMultipartSizeProperty(conf,
          MIN_MULTIPART_THRESHOLD, DEFAULT_MIN_MULTIPART_THRESHOLD);

      //check but do not store the block size
      longBytesOption(conf, FS_S3A_BLOCK_SIZE, DEFAULT_BLOCKSIZE, 1);
      enableMultiObjectsDelete = conf.getBoolean(ENABLE_MULTI_DELETE, true);

      // determine and cache the endpoints
      endpoint = accessPoint == null
          ? conf.getTrimmed(ENDPOINT, DEFAULT_ENDPOINT)
          : accessPoint.getEndpoint();

      configuredRegion = accessPoint == null
          ? conf.getTrimmed(AWS_REGION)
          : accessPoint.getRegion();

      fipsEnabled = conf.getBoolean(FIPS_ENDPOINT, ENDPOINT_FIPS_DEFAULT);

      // is this an S3Express store?
      s3ExpressStore = isS3ExpressStore(bucket, endpoint);

      // should the delete also purge uploads?
      // happens if explicitly enabled, or if the store is S3Express storage.
      dirOperationsPurgeUploads = conf.getBoolean(DIRECTORY_OPERATIONS_PURGE_UPLOADS,
          s3ExpressStore);

      this.prefetchEnabled = conf.getBoolean(PREFETCH_ENABLED_KEY, PREFETCH_ENABLED_DEFAULT);
      long prefetchBlockSizeLong =
          longBytesOption(conf, PREFETCH_BLOCK_SIZE_KEY, PREFETCH_BLOCK_DEFAULT_SIZE, 1);
      if (prefetchBlockSizeLong > (long) Integer.MAX_VALUE) {
        throw new IOException("S3A prefatch block size exceeds int limit");
      }
      this.prefetchBlockSize = (int) prefetchBlockSizeLong;
      this.prefetchBlockCount =
          intOption(conf, PREFETCH_BLOCK_COUNT_KEY, PREFETCH_BLOCK_DEFAULT_COUNT, 1);
      this.isMultipartUploadEnabled = conf.getBoolean(MULTIPART_UPLOADS_ENABLED,
          DEFAULT_MULTIPART_UPLOAD_ENABLED);
      // multipart copy and upload are the same; this just makes it explicit
      this.isMultipartCopyEnabled = isMultipartUploadEnabled;

      initThreadPools(conf);

      int listVersion = conf.getInt(LIST_VERSION, DEFAULT_LIST_VERSION);
      if (listVersion < 1 || listVersion > 2) {
        LOG.warn("Configured fs.s3a.list.version {} is invalid, forcing " +
            "version 2", listVersion);
      }
      useListV1 = (listVersion == 1);
      if (accessPoint != null && useListV1) {
        LOG.warn("V1 list configured in fs.s3a.list.version. This is not supported in by" +
            " access points. Upgrading to V2");
        useListV1 = false;
      }

      signerManager = new SignerManager(bucket, this, conf, owner);
      signerManager.initCustomSigners();

      // start auditing
      initializeAuditService();

      // create the requestFactory.
      // requires the audit manager to be initialized.
      requestFactory = createRequestFactory();

      // create an initial span for all other operations.
      span = createSpan(INITIALIZE_SPAN, bucket, null);

      // creates the AWS client, including overriding auth chain if
      // the FS came with a DT
      // this may do some patching of the configuration (e.g. setting
      // the encryption algorithms)
      ClientManager clientManager = createClientManager(name, delegationTokensEnabled);

      inputPolicy = S3AInputPolicy.getPolicy(
          conf.getTrimmed(INPUT_FADVISE,
              Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_DEFAULT),
          S3AInputPolicy.Normal);
      LOG.debug("Input fadvise policy = {}", inputPolicy);
      changeDetectionPolicy = ChangeDetectionPolicy.getPolicy(conf);
      LOG.debug("Change detection policy = {}", changeDetectionPolicy);
      boolean magicCommitterEnabled = conf.getBoolean(
          CommitConstants.MAGIC_COMMITTER_ENABLED,
          CommitConstants.DEFAULT_MAGIC_COMMITTER_ENABLED);
      LOG.debug("Filesystem support for magic committers {} enabled",
          magicCommitterEnabled ? "is" : "is not");
      committerIntegration = new MagicCommitIntegration(
          this, magicCommitterEnabled);

      boolean blockUploadEnabled = conf.getBoolean(FAST_UPLOAD, true);

      if (!blockUploadEnabled) {
        LOG.warn("The \"slow\" output stream is no longer supported");
      }
      blockOutputBuffer = conf.getTrimmed(FAST_UPLOAD_BUFFER,
          DEFAULT_FAST_UPLOAD_BUFFER);
      blockFactory = S3ADataBlocks.createFactory(createStoreContext(), blockOutputBuffer);
      blockOutputActiveBlocks = intOption(conf,
          FAST_UPLOAD_ACTIVE_BLOCKS, DEFAULT_FAST_UPLOAD_ACTIVE_BLOCKS, 1);
      // If CSE is enabled, do multipart uploads serially.
      if (isCSEEnabled) {
        blockOutputActiveBlocks = 1;
      }
      LOG.debug("Using S3ABlockOutputStream with buffer = {}; block={};" +
              " queue limit={}; multipart={}",
          blockOutputBuffer, partSize, blockOutputActiveBlocks, isMultipartUploadEnabled);
      // verify there's no S3Guard in the store config.
      checkNoS3Guard(this.getUri(), getConf());

      // read in performance options and parse them to a list of flags.
      performanceFlags = buildFlagSet(
          PerformanceFlagEnum.class,
          conf,
          FS_S3A_PERFORMANCE_FLAGS,
          true);
      // performance creation flag for code which wants performance
      // at the risk of overwrites.
      // this uses the performance flags as the default and then
      // updates the performance flags to match.
      // a bit convoluted.
      boolean performanceCreation = conf.getBoolean(FS_S3A_CREATE_PERFORMANCE,
          performanceFlags.enabled(PerformanceFlagEnum.Create));
      performanceFlags.set(PerformanceFlagEnum.Create, performanceCreation);
      // freeze.
      performanceFlags.makeImmutable();

      LOG.debug("{} = {}", FS_S3A_CREATE_PERFORMANCE, performanceCreation);

      pageSize = intOption(getConf(), BULK_DELETE_PAGE_SIZE,
          BULK_DELETE_PAGE_SIZE_DEFAULT, 0);
      checkArgument(pageSize <= InternalConstants.MAX_ENTRIES_TO_DELETE,
              "page size out of range: %s", pageSize);
      listing = new Listing(listingOperationCallbacks, createStoreContext());
      // now the open file logic
      openFileHelper = new OpenFileSupport(
          changeDetectionPolicy,
          longBytesOption(conf, READAHEAD_RANGE,
              DEFAULT_READAHEAD_RANGE, 0),
          username,
          intOption(conf, IO_FILE_BUFFER_SIZE_KEY,
              IO_FILE_BUFFER_SIZE_DEFAULT, 0),
          longBytesOption(conf, ASYNC_DRAIN_THRESHOLD,
                        DEFAULT_ASYNC_DRAIN_THRESHOLD, 0),
          inputPolicy);
      vectoredActiveRangeReads = intOption(conf,
              AWS_S3_VECTOR_ACTIVE_RANGE_READS, DEFAULT_AWS_S3_VECTOR_ACTIVE_RANGE_READS, 1);
      vectoredIOContext = populateVectoredIOContext(conf);
      scheme = (this.uri != null && this.uri.getScheme() != null) ? this.uri.getScheme() : FS_S3A;
      optimizedCopyFromLocal = conf.getBoolean(OPTIMIZED_COPY_FROM_LOCAL,
          OPTIMIZED_COPY_FROM_LOCAL_DEFAULT);
      LOG.debug("Using optimized copyFromLocal implementation: {}", optimizedCopyFromLocal);
      s3AccessGrantsEnabled = conf.getBoolean(AWS_S3_ACCESS_GRANTS_ENABLED, false);

      int rateLimitCapacity = intOption(conf, S3A_IO_RATE_LIMIT, DEFAULT_S3A_IO_RATE_LIMIT, 0);
      // now create the store
      store = createS3AStore(clientManager, rateLimitCapacity);
      // the s3 client is created through the store, rather than
      // directly through the client manager.
      // this is to aid mocking.
      s3Client = store.getOrCreateS3Client();
      // The filesystem is now ready to perform operations against
      // S3
      // This initiates a probe against S3 for the bucket existing.
      doBucketProbing();
      initMultipartUploads(conf);
      trackInitialization.ifPresent(DurationTracker::close);
    } catch (SdkException e) {
      // amazon client exception: stop all services then throw the translation
      cleanupWithLogger(LOG, span);
      stopAllServices();
      trackInitialization.ifPresent(DurationTracker::failed);
      throw translateException("initializing ", new Path(name), e);
    } catch (IOException | RuntimeException e) {
      // other exceptions: stop the services.
      cleanupWithLogger(LOG, span);
      stopAllServices();
      trackInitialization.ifPresent(DurationTracker::failed);
      throw e;
    }
  }

  /**
   * Creates and returns an instance of the appropriate S3AFileSystemOperations.
   * Creation is baaed on the client-side encryption (CSE) settings.
   *
   * @return An instance of the appropriate S3AFileSystemOperations implementation.
   */
  private S3AFileSystemOperations createFileSystemHandler() {
    if (isCSEEnabled) {
      if (getConf().getBoolean(S3_ENCRYPTION_CSE_V1_COMPATIBILITY_ENABLED,
          S3_ENCRYPTION_CSE_V1_COMPATIBILITY_ENABLED_DEFAULT)) {
        return new CSEV1CompatibleS3AFileSystemOperations();
      } else {
        return new CSES3AFileSystemOperations();
      }
    } else {
      return new BaseS3AFileSystemOperations();
    }
  }


  /**
   * Create the S3AStore instance.
   * This is protected so that tests can override it.
   * @param clientManager client manager
   * @param rateLimitCapacity rate limit
   * @return a new store instance
   */
  @VisibleForTesting
  protected S3AStore createS3AStore(final ClientManager clientManager,
      final int rateLimitCapacity) {
    return new S3AStoreBuilder()
        .withAuditSpanSource(getAuditManager())
        .withClientManager(clientManager)
        .withDurationTrackerFactory(getDurationTrackerFactory())
        .withFsStatistics(getFsStatistics())
        .withInstrumentation(getInstrumentation())
        .withStatisticsContext(statisticsContext)
        .withStoreContextFactory(this)
        .withStorageStatistics(getStorageStatistics())
        .withReadRateLimiter(unlimitedRate())
        .withWriteRateLimiter(RateLimitingFactory.create(rateLimitCapacity))
        .build();
  }

  /**
   * Populates the configurations related to vectored IO operation
   * in the context which has to passed down to input streams.
   * @param conf configuration object.
   * @return VectoredIOContext.
   */
  private VectoredIOContext populateVectoredIOContext(Configuration conf) {
    final int minSeekVectored = (int) longBytesOption(conf, AWS_S3_VECTOR_READS_MIN_SEEK_SIZE,
            DEFAULT_AWS_S3_VECTOR_READS_MIN_SEEK_SIZE, 0);
    final int maxReadSizeVectored = (int) longBytesOption(conf, AWS_S3_VECTOR_READS_MAX_MERGED_READ_SIZE,
            DEFAULT_AWS_S3_VECTOR_READS_MAX_MERGED_READ_SIZE, 0);
    return new VectoredIOContext()
            .setMinSeekForVectoredReads(minSeekVectored)
            .setMaxReadSizeForVectoredReads(maxReadSizeVectored)
            .build();
  }

  /**
   * Test bucket existence in S3.
   * When the value of {@link Constants#S3A_BUCKET_PROBE} is set to 0,
   * bucket existence check is not done to improve performance of
   * S3AFileSystem initialization. When set to 1 or 2, bucket existence check
   * will be performed which is potentially slow.
   * If 3 or higher: warn and skip check.
   * Also logging DNS address of the s3 endpoint if the bucket probe value is
   * greater than 0 else skipping it for increased performance.
   * @throws UnknownStoreException the bucket is absent
   * @throws IOException any other problem talking to S3
   */
  @Retries.RetryTranslated
  private void doBucketProbing() throws IOException {
    int bucketProbe = getConf()
            .getInt(S3A_BUCKET_PROBE, S3A_BUCKET_PROBE_DEFAULT);
    Preconditions.checkArgument(bucketProbe >= 0,
            "Value of " + S3A_BUCKET_PROBE + " should be >= 0");
    switch (bucketProbe) {
    case 0:
      LOG.debug("skipping check for bucket existence");
      break;
    case 1:
    case 2:
      logDnsLookup(getConf());
      verifyBucketExists();
      break;
    default:
      // we have no idea what this is, assume it is from a later release.
      LOG.warn("Unknown bucket probe option {}: {}; skipping check for bucket existence",
          S3A_BUCKET_PROBE, bucketProbe);
      break;
    }
  }

  /**
   * Initialize the statistics binding.
   * This is done by creating an {@code IntegratedS3AStatisticsContext}
   * with callbacks to get the FS's instrumentation and FileSystem.statistics
   * field; the latter may change after {@link #initialize(URI, Configuration)},
   * so needs to be dynamically adapted.
   * Protected so that (mock) subclasses can replace it with a
   * different statistics binding, if desired.
   */
  protected void initializeStatisticsBinding() {
    storageStatistics = createStorageStatistics(
        requireNonNull(getIOStatistics()));
    statisticsContext = new BondedS3AStatisticsContext(
        new BondedS3AStatisticsContext.S3AFSStatisticsSource() {

          @Override
          public S3AInstrumentation getInstrumentation() {
            return S3AFileSystem.this.getInstrumentation();
          }

          @Override
          public Statistics getInstanceStatistics() {
            return S3AFileSystem.this.statistics;
          }
        });
  }

  /**
   * Initialize the thread pool.
   * This must be re-invoked after replacing the S3Client during test
   * runs.
   * @param conf configuration.
   */
  private void initThreadPools(Configuration conf) {
    final String name = "s3a-transfer-" + getBucket();
    int maxThreads = conf.getInt(MAX_THREADS, DEFAULT_MAX_THREADS);
    if (maxThreads < 2) {
      LOG.warn(MAX_THREADS + " must be at least 2: forcing to 2.");
      maxThreads = 2;
    }
    int totalTasks = intOption(conf,
        MAX_TOTAL_TASKS, DEFAULT_MAX_TOTAL_TASKS, 1);
    // keepalive time takes a time suffix; default unit is seconds
    long keepAliveTime = ConfigurationHelper.getDuration(conf,
            KEEPALIVE_TIME,
            Duration.ofSeconds(DEFAULT_KEEPALIVE_TIME),
            TimeUnit.SECONDS,
            Duration.ZERO).getSeconds();

    int numPrefetchThreads = this.prefetchEnabled ? this.prefetchBlockCount : 0;

    int activeTasksForBoundedThreadPool = maxThreads;
    int waitingTasksForBoundedThreadPool = maxThreads + totalTasks + numPrefetchThreads;
    boundedThreadPool = BlockingThreadPoolExecutorService.newInstance(
        activeTasksForBoundedThreadPool,
        waitingTasksForBoundedThreadPool,
        keepAliveTime, TimeUnit.SECONDS,
        name + "-bounded");
    unboundedThreadPool = new ThreadPoolExecutor(
        maxThreads, Integer.MAX_VALUE,
        keepAliveTime, TimeUnit.SECONDS,
        new LinkedBlockingQueue<>(),
        BlockingThreadPoolExecutorService.newDaemonThreadFactory(
            name + "-unbounded"));
    unboundedThreadPool.allowCoreThreadTimeOut(true);
    executorCapacity = intOption(conf,
        EXECUTOR_CAPACITY, DEFAULT_EXECUTOR_CAPACITY, 1);
    if (prefetchEnabled) {
      final S3AInputStreamStatistics s3AInputStreamStatistics =
          statisticsContext.newInputStreamStatistics();
      futurePool = new ExecutorServiceFuturePool(
          new SemaphoredDelegatingExecutor(
              boundedThreadPool,
              activeTasksForBoundedThreadPool + waitingTasksForBoundedThreadPool,
              true,
              s3AInputStreamStatistics));
    }
  }

  /**
   * Create the storage statistics or bind to an existing one.
   * @param ioStatistics IOStatistics to build the storage statistics from.
   * @return a storage statistics instance; expected to be that of the FS.
   */
  protected static S3AStorageStatistics createStorageStatistics(
      final IOStatistics ioStatistics) {
    return (S3AStorageStatistics)
        GlobalStorageStatistics.INSTANCE
            .put(S3AStorageStatistics.NAME,
                () -> new S3AStorageStatistics(ioStatistics));
  }

  /**
   * Verify that the bucket exists.
   * Retry policy: retrying, translated.
   * @throws UnknownStoreException the bucket is absent
   * @throws IOException any other problem talking to S3
   */
  @Retries.RetryTranslated
  protected void verifyBucketExists() throws UnknownStoreException, IOException {

    if(!trackDurationAndSpan(
        STORE_EXISTS_PROBE, bucket, null, () ->
            invoker.retry("doesBucketExist", bucket, true, () -> {
              try {
                getS3Client().headBucket(HeadBucketRequest.builder().bucket(bucket).build());
                return true;
              } catch (AwsServiceException ex) {
                int statusCode = ex.statusCode();
                if (statusCode == SC_404_NOT_FOUND ||
                    (statusCode == SC_403_FORBIDDEN && accessPoint != null)) {
                  return false;
                }
              }

              return true;
            }))) {

      throw new UnknownStoreException("s3a://" + bucket + "/",
          " Bucket does " + "not exist. " + "Accessing with " + ENDPOINT + " set to "
              + getConf().getTrimmed(ENDPOINT, null));
    }
  }

  /**
   * Get S3A Instrumentation. For test purposes.
   * @return this instance's instrumentation.
   */
  @VisibleForTesting
  public S3AInstrumentation getInstrumentation() {
    return instrumentation;
  }

  /**
   * Get FS Statistic for this S3AFS instance.
   *
   * @return FS statistic instance.
   */
  @VisibleForTesting
  public FileSystem.Statistics getFsStatistics() {
    return statistics;
  }

  /**
   * Get current listing instance.
   * @return this instance's listing.
   */
  public Listing getListing() {
    return listing;
  }

  /**
   * Set up the client bindings.
   * If delegation tokens are enabled, the FS first looks for a DT
   * ahead of any other bindings.
   * If there is a DT it uses that to do the auth
   * and switches to the DT authenticator automatically (and exclusively).
   * <p>
   * Delegation tokens are configured and started, but the actual
   * S3 clients are not: instead a {@link ClientManager} is created
   * and returned, from which they can be created on demand.
   * This is to reduce delays in FS initialization, especially
   * for features (transfer manager, async client) which are not
   * always used.
   * @param fsURI URI of the FS
   * @param dtEnabled are delegation tokens enabled?
   * @return the client manager which can generate the clients.
   * @throws IOException failure.
   */
  private ClientManager createClientManager(URI fsURI, boolean dtEnabled) throws IOException {
    Configuration conf = getConf();
    credentials = null;
    String uaSuffix = "";

    if (dtEnabled) {
      // Delegation support.
      // Create and start the DT integration.
      // Then look for an existing DT for this bucket, switch to authenticating
      // with it if so.

      LOG.debug("Using delegation tokens");
      S3ADelegationTokens tokens = new S3ADelegationTokens();
      this.delegationTokens = Optional.of(tokens);
      tokens.bindToFileSystem(getCanonicalUri(),
          createStoreContext(),
          createDelegationOperations());
      tokens.init(conf);
      tokens.start();
      // switch to the DT provider and bypass all other configured
      // providers.
      if (tokens.isBoundToDT()) {
        // A DT was retrieved.
        LOG.debug("Using existing delegation token");
        // and use the encryption settings from that client, whatever they were
      } else {
        LOG.debug("No delegation token for this instance");
      }
      // Get new credential chain
      credentials = tokens.getCredentialProviders();
      // and any encryption secrets which came from a DT
      tokens.getEncryptionSecrets()
          .ifPresent(this::setEncryptionSecrets);
      // and update the UA field with any diagnostics provided by
      // the DT binding.
      uaSuffix = tokens.getUserAgentField();
    } else {
      // DT support is disabled, so create the normal credential chain
      credentials = createAWSCredentialProviderList(fsURI, conf);
    }
    LOG.debug("Using credential provider {}", credentials);

    S3ClientFactory clientFactory = fsHandler.getS3ClientFactory(conf);
    S3ClientFactory unencryptedClientFactory = fsHandler.getUnencryptedS3ClientFactory(conf);
    CSEMaterials cseMaterials = fsHandler.getClientSideEncryptionMaterials(conf, bucket,
        getS3EncryptionAlgorithm());

    S3ClientFactory.S3ClientCreationParameters parameters =
        new S3ClientFactory.S3ClientCreationParameters()
        .withCredentialSet(credentials)
        .withPathUri(fsURI)
        .withEndpoint(endpoint)
        .withMetrics(statisticsContext.newStatisticsFromAwsSdk())
        .withPathStyleAccess(conf.getBoolean(PATH_STYLE_ACCESS, false))
        .withUserAgentSuffix(uaSuffix)
        .withRequesterPays(conf.getBoolean(ALLOW_REQUESTER_PAYS, DEFAULT_ALLOW_REQUESTER_PAYS))
        .withExecutionInterceptors(auditManager.createExecutionInterceptors())
        .withMinimumPartSize(partSize)
        .withMultipartCopyEnabled(isMultipartCopyEnabled)
        .withMultipartThreshold(multiPartThreshold)
        .withTransferManagerExecutor(unboundedThreadPool)
        .withRegion(configuredRegion)
        .withFipsEnabled(fipsEnabled)
        .withExpressCreateSession(
            conf.getBoolean(S3EXPRESS_CREATE_SESSION, S3EXPRESS_CREATE_SESSION_DEFAULT))
        .withChecksumValidationEnabled(
            conf.getBoolean(CHECKSUM_VALIDATION, CHECKSUM_VALIDATION_DEFAULT))
        .withClientSideEncryptionEnabled(isCSEEnabled)
        .withClientSideEncryptionMaterials(cseMaterials)
        .withKMSRegion(conf.get(S3_ENCRYPTION_CSE_KMS_REGION));

    // this is where clients and the transfer manager are created on demand.
    return createClientManager(clientFactory, unencryptedClientFactory, parameters,
        getDurationTrackerFactory());
  }

  /**
   * Create the Client Manager; protected to allow for mocking.
   * Requires {@link #unboundedThreadPool} to be initialized.
   * @param clientFactory (reflection-bonded) client factory.
   * @param unencryptedClientFactory (reflection-bonded) client factory.
   * @param clientCreationParameters parameters for client creation.
   * @param durationTrackerFactory factory for duration tracking.
   * @return a client manager instance.
   */
  @VisibleForTesting
  protected ClientManager createClientManager(
      final S3ClientFactory clientFactory,
      final S3ClientFactory unencryptedClientFactory,
      final S3ClientFactory.S3ClientCreationParameters clientCreationParameters,
      final DurationTrackerFactory durationTrackerFactory) {
    return new ClientManagerImpl(clientFactory,
        unencryptedClientFactory,
        clientCreationParameters,
        durationTrackerFactory
    );
  }

  /**
   * Initialize and launch the audit manager and service.
   * As this takes the FS IOStatistics store, it must be invoked
   * after instrumentation is initialized.
   * @throws IOException failure to instantiate/initialize.
   */
  protected void initializeAuditService() throws IOException {
    auditManager = AuditIntegration.createAndStartAuditManager(
        getConf(),
        instrumentation.createMetricsUpdatingStore());
  }

  /**
   * The audit manager.
   * @return the audit manager
   */
  @InterfaceAudience.Private
  public AuditManagerS3A getAuditManager() {
    return auditManager;
  }

  /**
   * Get the auditor; valid once initialized.
   * @return the auditor.
   */
  @InterfaceAudience.Private
  public OperationAuditor getAuditor() {
    return getAuditManager().getAuditor();
  }

  /**
   * Get the active audit span.
   * @return the span.
   */
  @InterfaceAudience.Private
  @Override
  public AuditSpanS3A getActiveAuditSpan() {
    return getAuditManager().getActiveAuditSpan();
  }

  /**
   * Get the audit span source; allows for components like the committers
   * to have a source of spans without being hard coded to the FS only.
   * @return the source of spans -base implementation is this instance.
   */
  @InterfaceAudience.Private
  public AuditSpanSource getAuditSpanSource() {
    return this;
  }

  /**
   * Start an operation; this informs the audit service of the event
   * and then sets it as the active span.
   * @param operation operation name.
   * @param path1 first path of operation
   * @param path2 second path of operation
   * @return a span for the audit
   * @throws IOException failure
   */
  public AuditSpanS3A createSpan(String operation,
      @Nullable String path1,
      @Nullable String path2)
      throws IOException {

    return getAuditManager().createSpan(operation, path1, path2);
  }

  /**
   * Build the request factory.
   * MUST be called after reading encryption secrets from settings/
   * delegation token.
   * Protected, in case test/mock classes want to implement their
   * own variants.
   * @return request factory.
   */
  protected RequestFactory createRequestFactory() {
    long partCountLimit = longOption(getConf(),
        UPLOAD_PART_COUNT_LIMIT,
        DEFAULT_UPLOAD_PART_COUNT_LIMIT,
        1);
    if (partCountLimit != DEFAULT_UPLOAD_PART_COUNT_LIMIT) {
      LOG.warn("Configuration property {} shouldn't be overridden by client",
          UPLOAD_PART_COUNT_LIMIT);
    }

    // ACLs; this is passed to the
    // request factory.
    initCannedAcls(getConf());

    // Any encoding type
    String contentEncoding = getConf().getTrimmed(CONTENT_ENCODING, null);
    if (contentEncoding != null) {
      LOG.debug("Using content encoding set in {} = {}", CONTENT_ENCODING,  contentEncoding);
    }

    String storageClassConf = getConf()
        .getTrimmed(STORAGE_CLASS, "")
        .toUpperCase(Locale.US);
    StorageClass storageClass = null;
    if (!storageClassConf.isEmpty()) {
      storageClass = StorageClass.fromValue(storageClassConf);
      LOG.debug("Using storage class {}", storageClass);
      if (storageClass.equals(StorageClass.UNKNOWN_TO_SDK_VERSION)) {
        STORAGE_CLASS_WARNING.warn("Unknown storage class \"{}\" from option: {};"
                + " falling back to default storage class",
            storageClassConf, STORAGE_CLASS);
        storageClass = null;
      }

    } else {
      LOG.debug("Unset storage class property {}; falling back to default storage class",
          STORAGE_CLASS);
    }

    // optional custom timeout for bulk uploads
    Duration partUploadTimeout = ConfigurationHelper.getDuration(getConf(),
        PART_UPLOAD_TIMEOUT,
        DEFAULT_PART_UPLOAD_TIMEOUT,
        TimeUnit.MILLISECONDS,
        Duration.ZERO);

    return RequestFactoryImpl.builder()
        .withBucket(requireNonNull(bucket))
        .withCannedACL(getCannedACL())
        .withEncryptionSecrets(requireNonNull(encryptionSecrets))
        .withMultipartPartCountLimit(partCountLimit)
        .withRequestPreparer(getAuditManager()::requestCreated)
        .withContentEncoding(contentEncoding)
        .withStorageClass(storageClass)
        .withMultipartUploadEnabled(isMultipartUploadEnabled)
        .withPartUploadTimeout(partUploadTimeout)
        .build();
  }

  /**
   * Get the request factory which uses this store's audit span.
   * @return the request factory.
   */
  @VisibleForTesting
  public RequestFactory getRequestFactory() {
    return requestFactory;
  }

  /**
   * Get the performance flags.
   * @return performance flags.
   */
  public FlagSet<PerformanceFlagEnum> getPerformanceFlags() {
    return performanceFlags;
  }

  /**
   * Implementation of all operations used by delegation tokens.
   */
  private class DelegationOperationsImpl implements DelegationOperations {

    @Override
    public List<RoleModel.Statement> listAWSPolicyRules(final Set<AccessLevel> access) {
      return S3AFileSystem.this.listAWSPolicyRules(access);
    }
  }

  /**
   * Create an instance of the delegation operations.
   * @return callbacks for DT support.
   */
  @VisibleForTesting
  public DelegationOperations createDelegationOperations() {
    return new DelegationOperationsImpl();
  }

  /**
   * Set the encryption secrets for requests.
   * @param secrets secrets
   */
  protected void setEncryptionSecrets(final EncryptionSecrets secrets) {
    this.encryptionSecrets = secrets;
    if (requestFactory != null) {
      requestFactory.setEncryptionSecrets(secrets);
    }
  }

  /**
   * Get the encryption secrets.
   * This potentially sensitive information and must be treated with care.
   * @return the current encryption secrets.
   */
  public EncryptionSecrets getEncryptionSecrets() {
    return encryptionSecrets;
  }

  private void initCannedAcls(Configuration conf) {
    String cannedACLName = conf.get(CANNED_ACL, DEFAULT_CANNED_ACL);
    if (!cannedACLName.isEmpty()) {
      cannedACL = AWSCannedACL.valueOf(cannedACLName).toString();
    } else {
      cannedACL = null;
    }
  }

  @Retries.RetryTranslated
  private void initMultipartUploads(Configuration conf) throws IOException {
    boolean purgeExistingMultipart = conf.getBoolean(PURGE_EXISTING_MULTIPART,
        DEFAULT_PURGE_EXISTING_MULTIPART);

    if (purgeExistingMultipart) {
      try {
        Duration purgeDuration = ConfigurationHelper.getDuration(conf,
            PURGE_EXISTING_MULTIPART_AGE,
            Duration.ofSeconds(DEFAULT_PURGE_EXISTING_MULTIPART_AGE),
            TimeUnit.SECONDS,
            Duration.ZERO);
        abortOutstandingMultipartUploads(purgeDuration.getSeconds());
      } catch (AccessDeniedException e) {
        instrumentation.errorIgnored();
        LOG.debug("Failed to purge multipart uploads against {}," +
            " FS may be read only", bucket);
      }
    }
  }

  /**
   * Abort all outstanding MPUs older than a given age.
   * @param seconds time in seconds
   * @throws IOException on any failure, other than 403 "permission denied"
   */
  @Retries.RetryTranslated
  public void abortOutstandingMultipartUploads(long seconds)
      throws IOException {
    Preconditions.checkArgument(seconds >= 0);
    Instant purgeBefore =
        Instant.now().minusSeconds(seconds);
    LOG.debug("Purging outstanding multipart uploads older than {}",
        purgeBefore);
    invoker.retry("Purging multipart uploads", bucket, true,
        () -> {
          RemoteIterator<MultipartUpload> uploadIterator =
              MultipartUtils.listMultipartUploads(createStoreContext(),
                  getS3Client(), null, maxKeys);

          while (uploadIterator.hasNext()) {
            MultipartUpload upload = uploadIterator.next();
            if (upload.initiated().compareTo(purgeBefore) < 0) {
              abortMultipartUpload(upload);
            }
          }
        });
  }

  /**
   * Return the protocol scheme for the FileSystem.
   *
   * @return "s3a"
   */
  @Override
  public String getScheme() {
    return this.scheme;
  }

  /**
   * Returns a URI whose scheme and authority identify this FileSystem.
   */
  @Override
  public URI getUri() {
    return uri;
  }

  /**
   * Set the URI field through {@link S3xLoginHelper} and
   * optionally {@link #canonicalizeUri(URI)}
   * Exported for testing.
   * @param fsUri filesystem URI.
   * @param canonicalize true if the URI should be canonicalized.
   */
  @VisibleForTesting
  protected void setUri(URI fsUri, boolean canonicalize) {
    URI u = S3xLoginHelper.buildFSURI(fsUri);
    this.uri = canonicalize ? u : canonicalizeUri(u);
  }

  /**
   * Get the canonical URI.
   * @return the canonical URI of this FS.
   */
  public URI getCanonicalUri() {
    return uri;
  }

  @VisibleForTesting
  @Override
  public int getDefaultPort() {
    return 0;
  }

  /**
   * Set the client -used in mocking tests to force in a different client.
   * @param client client.
   */
  @VisibleForTesting
  protected void setAmazonS3Client(S3Client client) {
    Preconditions.checkNotNull(client, "clientV2");
    LOG.debug("Setting S3V2 client to {}", client);
    s3Client = client;
  }

  /**
   * Get the S3 client created in {@link #initialize(URI, Configuration)}.
   * @return the s3Client
   * @throws UncheckedIOException if the client could not be created.
   */
  @VisibleForTesting
  protected S3Client getS3Client() {
    return s3Client;
  }

  /**
   * S3AInternals method.
   * {@inheritDoc}.
   */
  @AuditEntryPoint
  @Retries.RetryTranslated
  public String getBucketLocation() throws IOException {
    return s3aInternals.getBucketLocation(bucket);
  }

  /**
   * Create the S3AInternals; left as something mocking
   * subclasses may want to override.
   * @return the internal implementation
   */
  protected S3AInternals createS3AInternals() {
    return new S3AInternalsImpl();
  }

  /**
   * Get the S3AInternals.
   * @return the internal implementation
   */
  public S3AInternals getS3AInternals() {
    return s3aInternals;
  }

  /**
   * Implementation of the S3A Internals operations; pulled out of S3AFileSystem to
   * force code accessing it to call {@link #getS3AInternals()}.
   */
  private final class S3AInternalsImpl implements S3AInternals {

    @Override
    public S3Client getAmazonS3Client(String reason) {
      LOG.debug("Access to S3 client requested, reason {}", reason);
      return getS3Client();
    }

    @Override
    public S3AStore getStore() {
      return store;
    }

    /**
     * S3AInternals method.
     * {@inheritDoc}.
     */
    @Override
    @AuditEntryPoint
    @Retries.RetryTranslated
    public String getBucketLocation() throws IOException {
      return s3aInternals.getBucketLocation(bucket);
    }

    /**
     * S3AInternals method.
     * {@inheritDoc}.
     */
    @Override
    @AuditEntryPoint
    @Retries.RetryTranslated
    public String getBucketLocation(String bucketName) throws IOException {
      final String region = trackDurationAndSpan(
          STORE_EXISTS_PROBE, bucketName, null, () ->
              invoker.retry("getBucketLocation()", bucketName, true, () ->
                  // If accessPoint then region is known from Arn
                  accessPoint != null
                      ? accessPoint.getRegion()
                      : getS3Client().getBucketLocation(GetBucketLocationRequest.builder()
                          .bucket(bucketName)
                          .build())
                      .locationConstraintAsString()));
      return fixBucketRegion(region);
    }

    /**
     * S3AInternals method.
     * {@inheritDoc}.
     */
    @Override
    @AuditEntryPoint
    @Retries.RetryTranslated
    public HeadObjectResponse getObjectMetadata(Path path) throws IOException {
      return trackDurationAndSpan(INVOCATION_GET_FILE_STATUS, path, () ->
          S3AFileSystem.this.getObjectMetadata(makeQualified(path), null, invoker,
              "getObjectMetadata"));
    }

    /**
     * S3AInternals method.
     * {@inheritDoc}.
     */
    @Override
    @AuditEntryPoint
    @Retries.RetryTranslated
    public HeadBucketResponse getBucketMetadata() throws IOException {
      return S3AFileSystem.this.getBucketMetadata();
    }

    /**
     * Get a shared copy of the AWS credentials, with its reference
     * counter updated.
     * Caller is required to call {@code close()} on this after
     * they have finished using it.
     * @param purpose what is this for? This is initially for logging
     * @return a reference to shared credentials.
     */
    public AWSCredentialProviderList shareCredentials(final String purpose) {
      LOG.debug("Sharing credentials for: {}", purpose);
      return credentials.share();
    }

    @Override
    public boolean isMultipartCopyEnabled() {
      return S3AFileSystem.this.isMultipartUploadEnabled;
    }

    @Override
    public long abortMultipartUploads(final Path path) throws IOException {
      final String prefix = pathToKey(path);
      try (AuditSpan span = createSpan("object_multipart_bulk_abort", prefix, null)) {
        return S3AFileSystem.this.abortMultipartUploadsUnderPrefix(
            createStoreContext(),
            span,
            prefix);
      }
    }

  } // end S3AInternals

  /**
   * Get the input policy for this FS instance.
   * @return the input policy
   */
  @InterfaceStability.Unstable
  public S3AInputPolicy getInputPolicy() {
    return inputPolicy;
  }

  /**
   * Get the change detection policy for this FS instance.
   * Only public to allow access in tests in other packages.
   * @return the change detection policy
   */
  @VisibleForTesting
  public ChangeDetectionPolicy getChangeDetectionPolicy() {
    return changeDetectionPolicy;
  }

  /**
   * Get the encryption algorithm of this connector.
   * @return the encryption algorithm.
   */
  public S3AEncryptionMethods getS3EncryptionAlgorithm() {
    return encryptionSecrets.getEncryptionMethod();
  }

  /**
   * Demand create the directory allocator, then create a temporary file.
   * This does not mark the file for deletion when a process exits.
   * {@link LocalDirAllocator#createTmpFileForWrite(String, long, Configuration)}.
   * @param pathStr prefix for the temporary file
   * @param size the size of the file that is going to be written
   * @param conf the Configuration object
   * @return a unique temporary file
   * @throws IOException IO problems
   */
  File createTmpFileForWrite(String pathStr, long size,
      Configuration conf) throws IOException {
    initLocalDirAllocatorIfNotInitialized(conf);
    Path path = directoryAllocator.getLocalPathForWrite(pathStr,
        size, conf);
    File dir = new File(path.getParent().toUri().getPath());
    String prefix = path.getName();
    // create a temp file on this directory
    return File.createTempFile(prefix, null, dir);
  }

  /**
   * Initialize dir allocator if not already initialized.
   *
   * @param conf The Configuration object.
   */
  private void initLocalDirAllocatorIfNotInitialized(Configuration conf) {
    if (directoryAllocator == null) {
      synchronized (this) {
        String bufferDir = conf.get(BUFFER_DIR) != null
            ? BUFFER_DIR : HADOOP_TMP_DIR;
        directoryAllocator = new LocalDirAllocator(bufferDir);
      }
    }
  }

  /**
   * Get the bucket of this filesystem.
   * @return the bucket
   */
  @InterfaceAudience.Public
  @InterfaceStability.Stable
  public String getBucket() {
    return bucket;
  }

  /**
   * Set the bucket.
   * @param bucket the bucket
   */
  @VisibleForTesting
  protected void setBucket(String bucket) {
    this.bucket = bucket;
  }

  /**
   * Get the canned ACL of this FS.
   * @return an ACL, if any
   */
  String getCannedACL() {
    return cannedACL;
  }

  /**
   * Change the input policy for this FS.
   * This is now a no-op, retained in case some application
   * or external test invokes it.
   *
   * @deprecated use openFile() options
   * @param inputPolicy new policy
   */
  @InterfaceStability.Unstable
  @Deprecated
  public void setInputPolicy(S3AInputPolicy inputPolicy) {
    LOG.warn("setInputPolicy is no longer supported");
  }

  /**
   * Turns a path (relative or otherwise) into an S3 key.
   *
   * @param path input path, may be relative to the working dir
   * @return a key excluding the leading "/", or, if it is the root path, ""
   */
  @VisibleForTesting
  public String pathToKey(Path path) {
    if (!path.isAbsolute()) {
      path = new Path(workingDir, path);
    }

    if (path.toUri().getScheme() != null && path.toUri().getPath().isEmpty()) {
      return "";
    }

    return path.toUri().getPath().substring(1);
  }

  /**
   * Turns a path (relative or otherwise) into an S3 key, adding a trailing
   * "/" if the path is not the root <i>and</i> does not already have a "/"
   * at the end.
   *
   * @param key s3 key or ""
   * @return the with a trailing "/", or, if it is the root key, "",
   */
  @InterfaceAudience.Private
  public String maybeAddTrailingSlash(String key) {
    return S3AUtils.maybeAddTrailingSlash(key);
  }

  /**
   * Convert a path back to a key.
   * @param key input key
   * @return the path from this key
   */
  Path keyToPath(String key) {
    return new Path("/" + key);
  }

  /**
   * Convert a key to a fully qualified path.
   * This includes fixing up the URI so that if it ends with a trailing slash,
   * that is corrected, similar to {@code Path.normalizePath()}.
   * @param key input key
   * @return the fully qualified path including URI scheme and bucket name.
   */
  public Path keyToQualifiedPath(String key) {
    return qualify(keyToPath(key));
  }

  @Override
  public Path makeQualified(final Path path) {
    Path q = super.makeQualified(path);
    if (!q.isRoot()) {
      String urlString = q.toUri().toString();
      if (urlString.endsWith(Path.SEPARATOR)) {
        // this is a path which needs root stripping off to avoid
        // confusion, See HADOOP-15430
        LOG.debug("Stripping trailing '/' from {}", q);
        // deal with an empty "/" at the end by mapping to the parent and
        // creating a new path from it
        q = new Path(urlString.substring(0, urlString.length() - 1));
      }
    }
    if (!q.isRoot() && q.getName().isEmpty()) {
      q = q.getParent();
    }
    return q;
  }

  /**
   * Qualify a path.
   * This includes fixing up the URI so that if it ends with a trailing slash,
   * that is corrected, similar to {@code Path.normalizePath()}.
   * @param path path to qualify
   * @return a qualified path.
   */
  public Path qualify(Path path) {
    return makeQualified(path);
  }

  /**
   * Check that a Path belongs to this FileSystem.
   * Unlike the superclass, this version does not look at authority,
   * only hostnames.
   * @param path to check
   * @throws IllegalArgumentException if there is an FS mismatch
   */
  @Override
  public void checkPath(Path path) {
    S3xLoginHelper.checkPath(getConf(), getUri(), path, getDefaultPort());
  }

  /**
   * Override the base canonicalization logic and relay to
   * {@link S3xLoginHelper#canonicalizeUri(URI, int)}.
   * This allows for the option of changing this logic for better DT handling.
   * @param rawUri raw URI.
   * @return the canonical URI to use in delegation tokens and file context.
   */
  @Override
  protected URI canonicalizeUri(URI rawUri) {
    return S3xLoginHelper.canonicalizeUri(rawUri, getDefaultPort());
  }

  /**
   * Opens an FSDataInputStream at the indicated Path.
   * @param f the file name to open
   * @param bufferSize the size of the buffer to be used.
   */
  @Retries.RetryTranslated
  public FSDataInputStream open(Path f, int bufferSize)
      throws IOException {
    return executeOpen(qualify(f),
        openFileHelper.openSimpleFile(bufferSize));
  }

  /**
   * Opens an FSDataInputStream at the indicated Path.
   * The {@code fileInformation} parameter controls how the file
   * is opened, can a HEAD be skipped, etc.
   * @param path the file to open
   * @param fileInformation information about the file to open
   * @throws IOException IO failure.
   */
  @AuditEntryPoint
  @Retries.RetryTranslated
  private FSDataInputStream executeOpen(
      final Path path,
      final OpenFileSupport.OpenFileInformation fileInformation)
      throws IOException {
    // create the input stream statistics before opening
    // the file so that the time to prepare to open the file is included.
    S3AInputStreamStatistics inputStreamStats =
        statisticsContext.newInputStreamStatistics();
    // this span is passed into the stream.
    final AuditSpan auditSpan = entryPoint(INVOCATION_OPEN, path);
    final S3AFileStatus fileStatus =
        trackDuration(inputStreamStats,
            ACTION_FILE_OPENED.getSymbol(), () ->
            extractOrFetchSimpleFileStatus(path, fileInformation));
    S3AReadOpContext readContext = createReadContext(
        fileStatus,
        auditSpan);
    fileInformation.applyOptions(readContext);
    LOG.debug("Opening '{}'", readContext);

    if (this.prefetchEnabled) {
      Configuration configuration = getConf();
      initLocalDirAllocatorIfNotInitialized(configuration);
      return new FSDataInputStream(
          new S3APrefetchingInputStream(
              readContext.build(),
              createObjectAttributes(path, fileStatus),
              createInputStreamCallbacks(auditSpan),
              inputStreamStats,
              configuration,
              directoryAllocator));
    } else {
      return new FSDataInputStream(
          new S3AInputStream(
              readContext.build(),
              createObjectAttributes(path, fileStatus),
              createInputStreamCallbacks(auditSpan),
                  inputStreamStats,
                  new SemaphoredDelegatingExecutor(
                          boundedThreadPool,
                          vectoredActiveRangeReads,
                          true,
                          inputStreamStats)));
    }
  }

  /**
   * Override point: create the callbacks for S3AInputStream.
   * @return an implementation of the InputStreamCallbacks,
   */
  private S3AInputStream.InputStreamCallbacks createInputStreamCallbacks(
      final AuditSpan auditSpan) {
    return new InputStreamCallbacksImpl(auditSpan);
  }

  /**
   * Operations needed by S3AInputStream to read data.
   */
  private final class InputStreamCallbacksImpl implements
      S3AInputStream.InputStreamCallbacks {

    /**
     * Audit span to activate before each call.
     */
    private final AuditSpan auditSpan;

    /**
     * Create.
     * @param auditSpan Audit span to activate before each call.
     */
    private InputStreamCallbacksImpl(final AuditSpan auditSpan) {
      this.auditSpan = requireNonNull(auditSpan);
    }

    /**
     * Closes the audit span.
     */
    @Override
    public void close()  {
      auditSpan.close();
    }

    @Override
    public GetObjectRequest.Builder newGetRequestBuilder(final String key) {
      // active the audit span used for the operation
      try (AuditSpan span = auditSpan.activate()) {
        return getRequestFactory().newGetObjectRequestBuilder(key);
      }
    }

    @Override
    public ResponseInputStream<GetObjectResponse> getObject(GetObjectRequest request) throws
        IOException {
      // active the audit span used for the operation
      try (AuditSpan span = auditSpan.activate()) {
        return fsHandler.getObject(store, request, getRequestFactory());
      }
    }

    @Override
    public <T> CompletableFuture<T> submit(final CallableRaisingIOE<T> operation) {
      CompletableFuture<T> result = new CompletableFuture<>();
      unboundedThreadPool.submit(() ->
          LambdaUtils.eval(result, () -> {
            LOG.debug("Starting submitted operation in {}", auditSpan.getSpanId());
            try (AuditSpan span = auditSpan.activate()) {
              return operation.apply();
            } finally {
              LOG.debug("Completed submitted operation in {}", auditSpan.getSpanId());
            }
          }));
      return result;
    }
  }

  /**
   * Callbacks for WriteOperationHelper.
   */
  private final class WriteOperationHelperCallbacksImpl
      implements WriteOperationHelper.WriteOperationHelperCallbacks {

    @Override
    @Retries.OnceRaw
    public CompleteMultipartUploadResponse completeMultipartUpload(
        CompleteMultipartUploadRequest request) {
      return store.completeMultipartUpload(request);
    }

    @Override
    @Retries.OnceRaw
    public UploadPartResponse uploadPart(
        final UploadPartRequest request,
        final RequestBody body,
        final DurationTrackerFactory durationTrackerFactory)
        throws AwsServiceException, UncheckedIOException {
      return store.uploadPart(request, body, durationTrackerFactory);
    }

  }

  /**
   * Create the read context for reading from the referenced file,
   * using FS state as well as the status.
   * @param fileStatus file status.
   * @param auditSpan audit span.
   * @return a context for read operations.
   */
  @VisibleForTesting
  protected S3AReadOpContext createReadContext(
      final FileStatus fileStatus,
      final AuditSpan auditSpan) {
    final S3AReadOpContext roc = new S3AReadOpContext(
        fileStatus.getPath(),
        invoker,
        statistics,
        statisticsContext,
        fileStatus,
        vectoredIOContext,
        IOStatisticsContext.getCurrentIOStatisticsContext().getAggregator(),
        futurePool,
        prefetchBlockSize,
        prefetchBlockCount)
        .withAuditSpan(auditSpan);
    openFileHelper.applyDefaultOptions(roc);
    return roc.build();
  }

  /**
   * Create the attributes of an object for subsequent use.
   * @param f path path of the request.
   * @param eTag the eTag of the S3 object
   * @param versionId S3 object version ID
   * @param len length of the file
   * @return attributes to use when building the query.
   */
  private S3ObjectAttributes createObjectAttributes(
      final Path f,
      final String eTag,
      final String versionId,
      final long len) {
    return new S3ObjectAttributes(bucket,
        f,
        pathToKey(f),
        getS3EncryptionAlgorithm(),
        encryptionSecrets.getEncryptionKey(),
        eTag,
        versionId,
        len);
  }

  /**
   * Create the attributes of an object for subsequent use.
   * @param path path -this is used over the file status path.
   * @param fileStatus file status to build from.
   * @return attributes to use when building the query.
   */
  private S3ObjectAttributes createObjectAttributes(
      final Path path,
      final S3AFileStatus fileStatus) {
    return createObjectAttributes(
        path,
        fileStatus.getEtag(),
        fileStatus.getVersionId(),
        fileStatus.getLen());
  }

  /**
   * Create an FSDataOutputStream at the indicated Path with write-progress
   * reporting.
   * Retry policy: retrying, translated on the getFileStatus() probe.
   * No data is uploaded to S3 in this call, so retry issues related to that.
   * @param f the file name to open
   * @param permission the permission to set.
   * @param overwrite if a file with this name already exists, then if true,
   *   the file will be overwritten, and if false an error will be thrown.
   * @param bufferSize the size of the buffer to be used.
   * @param replication required block replication for the file.
   * @param blockSize the requested block size.
   * @param progress the progress reporter.
   * @throws IOException in the event of IO related errors.
   * @see #setPermission(Path, FsPermission)
   */
  @Override
  @AuditEntryPoint
  @SuppressWarnings("IOResourceOpenedButNotSafelyClosed")
  public FSDataOutputStream create(Path f, FsPermission permission,
      boolean overwrite, int bufferSize, short replication, long blockSize,
      Progressable progress) throws IOException {
    final Path path = qualify(f);

    // work out the options to pass down
    CreateFileBuilder.CreateFileOptions options;
    if (getPerformanceFlags().enabled(PerformanceFlagEnum.Create)) {
      options = OPTIONS_CREATE_FILE_PERFORMANCE;
    } else {
      options = overwrite
          ? OPTIONS_CREATE_FILE_OVERWRITE
          : OPTIONS_CREATE_FILE_NO_OVERWRITE;
    }

    // the span will be picked up inside the output stream
    return trackDurationAndSpan(INVOCATION_CREATE, path, () ->
        innerCreateFile(path,
            progress,
            getActiveAuditSpan(),
            options));
  }

  /**
   * Create an FSDataOutputStream at the indicated Path with write-progress
   * reporting; in the active span.
   * Retry policy: retrying, translated on the getFileStatus() probe.
   * No data is uploaded to S3 in this call, so no retry issues related to that.
   * The "performance" flag disables safety checks for the path being a file,
   * or parent directory existing.
   * If true, this method call does no IO at all.
   * @param path the file name to open
   * @param progress the progress reporter.
   * @param auditSpan audit span
   * @param options options for the file
   * @throws IOException in the event of IO related errors.
   */
  @SuppressWarnings("IOResourceOpenedButNotSafelyClosed")
  @Retries.RetryTranslated
  private FSDataOutputStream innerCreateFile(
      final Path path,
      final Progressable progress,
      final AuditSpan auditSpan,
      final CreateFileBuilder.CreateFileOptions options) throws IOException {
    auditSpan.activate();
    String key = pathToKey(path);
    if (key.isEmpty()) {
      // no matter the creation options, root cannot be written to.
      throw new PathIOException("/", "Can't create root path");
    }
    EnumSet<CreateFlag> flags = options.getFlags();

    boolean skipProbes = options.isPerformance() || isUnderMagicCommitPath(path);
    if (skipProbes) {
      LOG.debug("Skipping existence/overwrite checks");
    } else {
      try {
        boolean overwrite = flags.contains(CreateFlag.OVERWRITE);

        // get the status or throw an FNFE.
        // when overwriting, there is no need to look for any existing file,
        // just a directory (for safety)
        FileStatus status = innerGetFileStatus(path, false,
            overwrite
                ? StatusProbeEnum.DIRECTORIES
                : StatusProbeEnum.ALL);

        // if the thread reaches here, there is something at the path
        if (status.isDirectory()) {
          // path references a directory: automatic error
          throw new FileAlreadyExistsException(path + " is a directory");
        }
        if (!overwrite) {
          // path references a file and overwrite is disabled
          throw new FileAlreadyExistsException(path + " already exists");
        }
        LOG.debug("Overwriting file {}", path);
      } catch (FileNotFoundException e) {
        // this means there is nothing at the path; all good.
      }
    }
    instrumentation.fileCreated();
    final BlockOutputStreamStatistics outputStreamStatistics
        = statisticsContext.newOutputStreamStatistics();
    PutTracker putTracker =
        committerIntegration.createTracker(path, key, outputStreamStatistics);
    String destKey = putTracker.getDestKey();

    // put options are derived from the option builder.
    final PutObjectOptions putOptions =
        new PutObjectOptions(null, options.getHeaders());

    validateOutputStreamConfiguration(path, getConf());

    final S3ABlockOutputStream.BlockOutputStreamBuilder builder =
        S3ABlockOutputStream.builder()
        .withKey(destKey)
        .withBlockFactory(blockFactory)
        .withBlockSize(partSize)
        .withStatistics(outputStreamStatistics)
        .withProgress(progress)
        .withPutTracker(putTracker)
        .withWriteOperations(
            createWriteOperationHelper(auditSpan))
        .withExecutorService(
            new SemaphoredDelegatingExecutor(
                boundedThreadPool,
                blockOutputActiveBlocks,
                true,
                outputStreamStatistics))
        .withDowngradeSyncableExceptions(
            getConf().getBoolean(
                DOWNGRADE_SYNCABLE_EXCEPTIONS,
                DOWNGRADE_SYNCABLE_EXCEPTIONS_DEFAULT))
        .withCSEEnabled(isCSEEnabled)
        .withPutOptions(putOptions)
        .withIOStatisticsAggregator(
            IOStatisticsContext.getCurrentIOStatisticsContext().getAggregator())
        .withMultipartEnabled(isMultipartUploadEnabled);
    return new FSDataOutputStream(
        new S3ABlockOutputStream(builder),
        null);
  }
  /**
   * Create a Write Operation Helper with the current active span.
   * All operations made through this helper will activate the
   * span before execution.
   *
   * This class permits other low-level operations against the store.
   * It is unstable and
   * only intended for code with intimate knowledge of the object store.
   * If using this, be prepared for changes even on minor point releases.
   * @return a new helper.
   */
  @InterfaceAudience.Private
  public WriteOperationHelper getWriteOperationHelper() {
    return createWriteOperationHelper(getActiveAuditSpan());
  }

  /**
   * Create a Write Operation Helper with the given span.
   * All operations made through this helper will activate the
   * span before execution.
   * @param auditSpan audit span
   * @return a new helper.
   */
  @InterfaceAudience.Private
  public WriteOperationHelper createWriteOperationHelper(AuditSpan auditSpan) {
    return new WriteOperationHelper(this,
        getConf(),
        statisticsContext,
        getAuditSpanSource(),
        auditSpan,
        new WriteOperationHelperCallbacksImpl());
  }

  /**
   * Create instance of an FSDataOutputStreamBuilder for
   * creating a file at the given path.
   * @param path path to create
   * @return a builder.
   * @throws UncheckedIOException for problems creating the audit span
   */
  @Override
  @AuditEntryPoint
  public FSDataOutputStreamBuilder createFile(final Path path) {
    try {
      final Path qualified = qualify(path);
      final AuditSpan span = entryPoint(INVOCATION_CREATE_FILE,
          pathToKey(qualified),
          null);
      final CreateFileBuilder builder = new CreateFileBuilder(this,
          qualified,
          new CreateFileBuilderCallbacksImpl(INVOCATION_CREATE_FILE, span));
      builder
          .create()
          .overwrite(true)
          .must(FS_S3A_CREATE_PERFORMANCE,
              getPerformanceFlags().enabled(PerformanceFlagEnum.Create));
      return builder;
    } catch (IOException e) {
      // catch any IOEs raised in span creation and convert to
      // an UncheckedIOException
      throw new UncheckedIOException(e);
    }
  }

  /**
   * Callback for create file operations.
   */
  private final class CreateFileBuilderCallbacksImpl implements
      CreateFileBuilder.CreateFileBuilderCallbacks {

    private final Statistic statistic;
    /** span for operations. */
    private final AuditSpan span;

    private CreateFileBuilderCallbacksImpl(
        final Statistic statistic,
        final AuditSpan span) {
      this.statistic = statistic;
      this.span = span;
    }

    @Override
    public FSDataOutputStream createFileFromBuilder(
        final Path path,
        final Progressable progress,
        final CreateFileBuilder.CreateFileOptions options) throws IOException {
      // the span will be picked up inside the output stream
      return trackDuration(getDurationTrackerFactory(), statistic.getSymbol(), () ->
          innerCreateFile(path, progress, span, options));
    }
  }

  /**
   * {@inheritDoc}
   * The S3A implementations downgrades to the recursive creation, to avoid
   * any race conditions with parent entries "disappearing".
   */
  @Override
  @AuditEntryPoint
  public FSDataOutputStream createNonRecursive(Path p,
      FsPermission permission,
      EnumSet<CreateFlag> flags,
      int bufferSize,
      short replication,
      long blockSize,
      Progressable progress) throws IOException {
    final Path path = makeQualified(p);

    // span is created and passed in to the callbacks.
    final AuditSpan span = entryPoint(INVOCATION_CREATE_NON_RECURSIVE,
        pathToKey(path),
        null);
    // uses the CreateFileBuilder, filling it in with the relevant arguments.
    final CreateFileBuilder builder = new CreateFileBuilder(this,
        path,
        new CreateFileBuilderCallbacksImpl(INVOCATION_CREATE_NON_RECURSIVE, span))
        .create()
        .withFlags(flags)
        .blockSize(blockSize)
        .bufferSize(bufferSize)
        .must(FS_S3A_CREATE_PERFORMANCE,
            getPerformanceFlags().enabled(PerformanceFlagEnum.Create));
    if (progress != null) {
      builder.progress(progress);
    }
    return builder.build();
  }

  /**
   * Append to an existing file (optional operation).
   * @param f the existing file to be appended.
   * @param bufferSize the size of the buffer to be used.
   * @param progress for reporting progress if it is not null.
   * @throws IOException indicating that append is not supported.
   */
  public FSDataOutputStream append(Path f, int bufferSize,
      Progressable progress) throws IOException {
    throw new UnsupportedOperationException("Append is not supported "
        + "by S3AFileSystem");
  }


  /**
   * Renames Path src to Path dst.  Can take place on local fs
   * or remote DFS.
   *
   * Warning: S3 does not support renames. This method does a copy which can
   * take S3 some time to execute with large files and directories. Since
   * there is no Progressable passed in, this can time out jobs.
   *
   * Note: This implementation differs with other S3 drivers. Specifically:
   * <pre>
   *       Fails if src is a file and dst is a directory.
   *       Fails if src is a directory and dst is a file.
   *       Fails if the parent of dst does not exist or is a file.
   *       Fails if dst is a directory that is not empty.
   * </pre>
   *
   * @param src path to be renamed
   * @param dst new path after rename
   * @throws IOException on IO failure
   * @return true if rename is successful
   */
  @AuditEntryPoint
  @Retries.RetryTranslated
  public boolean rename(Path src, Path dst) throws IOException {
    try {
      long bytesCopied = trackDurationAndSpan(
          INVOCATION_RENAME, src.toString(), dst.toString(), () ->
          innerRename(src, dst));
      LOG.debug("Copied {} bytes", bytesCopied);
      return true;
    } catch (SdkException e) {
      throw translateException("rename(" + src +", " + dst + ")", src, e);
    } catch (RenameFailedException e) {
      LOG.info("{}", e.getMessage());
      LOG.debug("rename failure", e);
      return e.getExitCode();
    }
  }

  /**
   * Validate the rename parameters and status of the filesystem;
   * returns the source and any destination File Status.
   * @param src qualified path to be renamed
   * @param dst qualified path after rename
   * @return the source and (possibly null) destination status entries.
   * @throws RenameFailedException if some criteria for a state changing
   * rename was not met. This means work didn't happen; it's not something
   * which is reported upstream to the FileSystem APIs, for which the semantics
   * of "false" are pretty vague.
   * @throws FileNotFoundException there's no source file.
   * @throws IOException on IO failure.
   */
  @Retries.RetryTranslated
  private Pair<S3AFileStatus, S3AFileStatus> initiateRename(
      final Path src,
      final Path dst) throws IOException {
    String srcKey = pathToKey(src);
    String dstKey = pathToKey(dst);

    if (srcKey.isEmpty()) {
      throw new RenameFailedException(src, dst, "source is root directory");
    }
    if (dstKey.isEmpty()) {
      throw new RenameFailedException(src, dst, "dest is root directory");
    }

    // get the source file status; this raises a FNFE if there is no source
    // file.
    S3AFileStatus srcStatus = innerGetFileStatus(src, true,
        StatusProbeEnum.ALL);

    if (srcKey.equals(dstKey)) {
      LOG.debug("rename: src and dest refer to the same file or directory: {}",
          dst);
      throw new RenameFailedException(src, dst,
          "source and dest refer to the same file or directory")
          .withExitCode(srcStatus.isFile());
    }

    S3AFileStatus dstStatus = null;
    try {
      dstStatus = innerGetFileStatus(dst, true, StatusProbeEnum.ALL);
      // if there is no destination entry, an exception is raised.
      // hence this code sequence can assume that there is something
      // at the end of the path; the only detail being what it is and
      // whether or not it can be the destination of the rename.
      if (srcStatus.isDirectory()) {
        if (dstStatus.isFile()) {
          throw new FileAlreadyExistsException(
              "Failed to rename " + src + " to " + dst
               +"; source is a directory and dest is a file");
        } else if (dstStatus.isEmptyDirectory() != Tristate.TRUE) {
          throw new RenameFailedException(src, dst,
              "Destination is a non-empty directory")
              .withExitCode(false);
        }
        // at this point the destination is an empty directory
      } else {
        // source is a file. The destination must be a directory,
        // empty or not
        if (dstStatus.isFile()) {
          throw new FileAlreadyExistsException(
              "Failed to rename " + src + " to " + dst
                  + "; destination file exists");
        }
      }

    } catch (FileNotFoundException e) {
      LOG.debug("rename: destination path {} not found", dst);
      // Parent must exist
      Path parent = dst.getParent();
      if (!pathToKey(parent).isEmpty()
          && !parent.equals(src.getParent())) {
        try {
          // make sure parent isn't a file.
          // don't look for parent being a dir as there is a risk
          // of a race between dest dir cleanup and rename in different
          // threads.
          S3AFileStatus dstParentStatus = innerGetFileStatus(parent,
              false, StatusProbeEnum.FILE);
          // if this doesn't raise an exception then
          // the parent is a file or a dir.
          if (!dstParentStatus.isDirectory()) {
            throw new RenameFailedException(src, dst,
                "destination parent is not a directory");
          }
        } catch (FileNotFoundException expected) {
          // nothing was found. Don't worry about it;
          // expect rename to implicitly create the parent dir
        }
      }
    }
    return Pair.of(srcStatus, dstStatus);
  }

  /**
   * The inner rename operation. See {@link #rename(Path, Path)} for
   * the description of the operation.
   * This operation throws an exception on any failure which needs to be
   * reported and downgraded to a failure.
   * Retries: retry translated, assuming all operations it is called do
   * so. For safely, consider catch and handle SdkException
   * because this is such a complex method there's a risk it could surface.
   * @param source path to be renamed
   * @param dest new path after rename
   * @throws RenameFailedException if some criteria for a state changing
   * rename was not met. This means work didn't happen; it's not something
   * which is reported upstream to the FileSystem APIs, for which the semantics
   * of "false" are pretty vague.
   * @return the number of bytes copied.
   * @throws FileNotFoundException there's no source file.
   * @throws IOException on IO failure.
   * @throws SdkException on failures inside the AWS SDK
   */
  @Retries.RetryMixed
  private long innerRename(Path source, Path dest)
      throws RenameFailedException, FileNotFoundException, IOException,
      SdkException {
    Path src = qualify(source);
    Path dst = qualify(dest);

    LOG.debug("Rename path {} to {}", src, dst);

    String srcKey = pathToKey(src);
    String dstKey = pathToKey(dst);

    Pair<S3AFileStatus, S3AFileStatus> p = initiateRename(src, dst);

    // Initiate the rename.
    // this will call back into this class via the rename callbacks
    final StoreContext storeContext = createStoreContext();
    RenameOperation renameOperation = new RenameOperation(
        storeContext,
        src, srcKey, p.getLeft(),
        dst, dstKey, p.getRight(),
        new OperationCallbacksImpl(storeContext),
        pageSize,
        dirOperationsPurgeUploads);
    return renameOperation.execute();
  }

  @Override public Token<? extends TokenIdentifier> getFsDelegationToken()
      throws IOException {
    return getDelegationToken(null);
  }

  /**
   * The callbacks made by the rename and delete operations.
   * This separation allows the operation to be factored out and
   * still avoid knowledge of the S3AFilesystem implementation.
   * The Audit span active at the time of creation is cached and activated
   * before every call.
   */
  private final class OperationCallbacksImpl implements OperationCallbacks {

    /** Audit Span at time of creation. */
    private final AuditSpan auditSpan;

    private final StoreContext storeContext;

    private OperationCallbacksImpl(final StoreContext storeContext) {
      this.storeContext = requireNonNull(storeContext);
      this.auditSpan = storeContext.getActiveAuditSpan();
    }

    /**
     * Get the audit span.
     * @return the span
     */
    private AuditSpan getAuditSpan() {
      return auditSpan;
    }

    @Override
    public S3ObjectAttributes createObjectAttributes(final Path path,
        final String eTag,
        final String versionId,
        final long len) {
      return S3AFileSystem.this.createObjectAttributes(path, eTag, versionId,
          len);
    }

    @Override
    public S3ObjectAttributes createObjectAttributes(
        final S3AFileStatus fileStatus) {
      return S3AFileSystem.this.createObjectAttributes(
          fileStatus.getPath(),
          fileStatus);
    }

    @Override
    public S3AReadOpContext createReadContext(final FileStatus fileStatus) {
      return S3AFileSystem.this.createReadContext(fileStatus,
          auditSpan);
    }

    @Override
    @Retries.RetryTranslated
    public void deleteObjectAtPath(final Path path,
        final String key,
        final boolean isFile)
        throws IOException {
      auditSpan.activate();
      once("delete", path.toString(), () ->
          S3AFileSystem.this.deleteObjectAtPath(path, key, isFile));
    }

    @Override
    @Retries.RetryTranslated
    public RemoteIterator<S3ALocatedFileStatus> listFilesAndDirectoryMarkers(
        final Path path,
        final S3AFileStatus status,
        final boolean includeSelf) throws IOException {
      auditSpan.activate();
      return innerListFiles(
          path,
          true,
          includeSelf
              ? Listing.ACCEPT_ALL_OBJECTS
              : new Listing.AcceptAllButSelf(path),
          status
      );
    }

    @Override
    public CopyObjectResponse copyFile(final String srcKey,
        final String destKey,
        final S3ObjectAttributes srcAttributes,
        final S3AReadOpContext readContext) throws IOException {
      auditSpan.activate();
      return S3AFileSystem.this.copyFile(srcKey, destKey,
          srcAttributes.getLen(), srcAttributes, readContext);
    }

    @Override
    public void removeKeys(
            final List<ObjectIdentifier> keysToDelete,
            final boolean deleteFakeDir)
        throws MultiObjectDeleteException, SdkException, IOException {
      auditSpan.activate();
      S3AFileSystem.this.removeKeys(keysToDelete, deleteFakeDir);
    }

    @Override
    public void finishRename(final Path sourceRenamed, final Path destCreated)
        throws IOException {
      auditSpan.activate();
      Path destParent = destCreated.getParent();
      if (!sourceRenamed.getParent().equals(destParent)) {
        LOG.debug("source & dest parents are different; fix up dir markers");
        maybeCreateFakeParentDirectory(sourceRenamed);
      }
    }

    @Override
    @Retries.RetryTranslated
    public RemoteIterator<S3AFileStatus> listObjects(
        final Path path,
        final String key)
        throws IOException {
      return once("listObjects", key, () ->
          listing.createFileStatusListingIterator(path,
              createListObjectsRequest(key, null),
              ACCEPT_ALL,
              Listing.ACCEPT_ALL_OBJECTS,
              auditSpan));
    }

    /**
     * Abort multipart uploads under a path.
     * @param prefix prefix for uploads to abort
     * @return a count of aborts
     * @throws IOException trouble; FileNotFoundExceptions are swallowed.
     */
    @Override
    @Retries.RetryTranslated
    public long abortMultipartUploadsUnderPrefix(String prefix)
        throws IOException {
      return S3AFileSystem.this.abortMultipartUploadsUnderPrefix(storeContext, auditSpan, prefix);
    }

  }  // end OperationCallbacksImpl

  /**
   * Abort multipart uploads under a prefix.
   * @param storeContext store context
   * @param span span for the operations
   * @param prefix prefix for uploads to abort
   * @return a count of aborts
   * @throws IOException trouble; FileNotFoundExceptions are swallowed.
   */
  private long abortMultipartUploadsUnderPrefix(StoreContext storeContext,
      AuditSpan span,
      String prefix) throws IOException {

    span.activate();
    // this deactivates the audit span somehow
    final RemoteIterator<MultipartUpload> uploads =
        listUploadsUnderPrefix(storeContext, prefix);
    // so reactivate it.
    span.activate();
    return foreach(uploads, upload ->
            invoker.retry("Aborting multipart commit", upload.key(), true, () ->
                abortMultipartUpload(upload)));
  }

  /**
   * Callbacks from {@link Listing}.
   * Auditing: the listing object is long-lived; the audit span
   * for a single listing is passed in from the listing
   * method calls and then down to the callbacks.
   */
  protected class ListingOperationCallbacksImpl implements
          ListingOperationCallbacks {

    @Override
    public CompletableFuture<S3ListResult> listObjectsAsync(
        S3ListRequest request,
        DurationTrackerFactory trackerFactory,
        AuditSpan span) {
      return submit(unboundedThreadPool, span, () ->
          listObjects(request,
              pairedTrackerFactory(trackerFactory,
                  getDurationTrackerFactory())));
    }

    @Override
    @Retries.RetryRaw
    public CompletableFuture<S3ListResult> continueListObjectsAsync(
        S3ListRequest request,
        S3ListResult prevResult,
        DurationTrackerFactory trackerFactory,
        AuditSpan span) {
      return submit(unboundedThreadPool, span,
          () -> continueListObjects(request, prevResult,
              pairedTrackerFactory(trackerFactory,
                  getDurationTrackerFactory())));
    }

    @Override
    public S3ALocatedFileStatus toLocatedFileStatus(
            S3AFileStatus status)
            throws IOException {
      return S3AFileSystem.this.toLocatedFileStatus(status);
    }

    @Override
    public S3ListRequest createListObjectsRequest(
        String key,
        String delimiter,
        AuditSpan span) {
      span.activate();
      return S3AFileSystem.this.createListObjectsRequest(key, delimiter);
    }

    @Override
    public long getDefaultBlockSize(Path path) {
      return S3AFileSystem.this.getDefaultBlockSize(path);
    }

    /**
     * Get the S3 object size.
     * If the object is encrypted, the unpadded size will be returned.
     * @param s3Object S3object
     * @return plaintext S3 object size
     * @throws IOException IO problems
     */
    @Override
    public long getObjectSize(S3Object s3Object) throws IOException {
      return fsHandler.getS3ObjectSize(s3Object.key(), s3Object.size(), store, null);
    }

    @Override
    public int getMaxKeys() {
      return S3AFileSystem.this.getMaxKeys();
    }

  }

  /**
   * Low-level call to get at the object metadata.
   * This method is used in some external applications and so
   * must be viewed as a public entry point.
   * @deprecated use S3AInternals API.
   * @param path path to the object. This will be qualified.
   * @return metadata
   * @throws IOException IO and object access problems.
   */
  @AuditEntryPoint
  @InterfaceAudience.LimitedPrivate("utilities")
  @Retries.RetryTranslated
  @Deprecated
  public HeadObjectResponse getObjectMetadata(Path path) throws IOException {
    return getS3AInternals().getObjectMetadata(path);
  }

  /**
   * Low-level call to get at the object metadata.
   * @param path path to the object
   * @param changeTracker the change tracker to detect version inconsistencies
   * @param changeInvoker the invoker providing the retry policy
   * @param operation the operation being performed (e.g. "read" or "copy")
   * @return metadata
   * @throws IOException IO and object access problems.
   */
  @Retries.RetryTranslated
  private HeadObjectResponse getObjectMetadata(Path path,
      ChangeTracker changeTracker, Invoker changeInvoker, String operation)
      throws IOException {
    String key = pathToKey(path);
    return once(operation, path.toString(), () ->
            // HEAD against the object
            getObjectMetadata(
                key, changeTracker, changeInvoker, operation));
  }

  /**
   * Entry point to an operation.
   * Increments the statistic; verifies the FS is active.
   * @param operation The operation being invoked
   * @param path first path of operation
   * @return a span for the audit
   * @throws IOException failure of audit service
   */
  protected AuditSpan entryPoint(Statistic operation,
      Path path) throws IOException {
    return entryPoint(operation,
        (path != null ? pathToKey(path): null),
        null);
  }

  /**
   * Entry point to an operation.
   * Increments the statistic; verifies the FS is active.
   * @param operation The operation being invoked
   * @param path1 first path of operation
   * @param path2 second path of operation
   * @return a span for the audit
   * @throws IOException failure of audit service
   */
  protected AuditSpan entryPoint(Statistic operation,
      @Nullable String path1,
      @Nullable String path2) throws IOException {
    checkNotClosed();
    incrementStatistic(operation);
    return createSpan(operation.getSymbol(),
        path1, path2);
  }

  /**
   * Given an IOException raising callable/lambda expression,
   * execute it and update the relevant statistic within a span
   * of the same statistic.
   * @param statistic statistic key
   * @param path first path for span (nullable)
   * @param path2 second path for span
   * @param input input callable.
   * @param <B> return type.
   * @return the result of the operation.
   * @throws IOException if raised in the callable
   */
  private <B> B trackDurationAndSpan(
      Statistic statistic, String path, String path2,
      CallableRaisingIOE<B> input) throws IOException {
    checkNotClosed();
    try (AuditSpan span = createSpan(statistic.getSymbol(),
        path, path2)) {
      return trackDuration(getDurationTrackerFactory(),
          statistic.getSymbol(), input);
    }
  }

  /**
   * Overloaded version of {@code trackDurationAndSpan()}.
   * Takes a single nullable path as the path param,
   * @param statistic statistic key
   * @param path path for span (nullable)
   * @param input input callable.
   * @param <B> return type.
   * @return the result of the operation.
   * @throws IOException if raised in the callable
   */
  private <B> B trackDurationAndSpan(
      Statistic statistic,
      @Nullable Path path,
      CallableRaisingIOE<B> input) throws IOException {
    return trackDurationAndSpan(statistic,
        path != null ? pathToKey(path): null,
        null, input);
  }

  /**
   * Increment a statistic by 1.
   * This increments both the instrumentation and storage statistics.
   * @param statistic The operation to increment
   */
  protected void incrementStatistic(Statistic statistic) {
    incrementStatistic(statistic, 1);
  }

  /**
   * Increment a statistic by a specific value.
   * This increments both the instrumentation and storage statistics.
   * @param statistic The operation to increment
   * @param count the count to increment
   */
  protected void incrementStatistic(Statistic statistic, long count) {
    statisticsContext.incrementCounter(statistic, count);
  }

  /**
   * Decrement a gauge by a specific value.
   * @param statistic The operation to decrement
   * @param count the count to decrement
   */
  protected void decrementGauge(Statistic statistic, long count) {
    statisticsContext.decrementGauge(statistic, count);
  }

  /**
   * Increment a gauge by a specific value.
   * @param statistic The operation to increment
   * @param count the count to increment
   */
  protected void incrementGauge(Statistic statistic, long count) {
    statisticsContext.incrementGauge(statistic, count);
  }

  /**
   * Callback when an operation was retried.
   * Increments the statistics of ignored errors or throttled requests,
   * depending up on the exception class.
   * @param ex exception.
   */
  public void operationRetried(Exception ex) {
    if (isThrottleException(ex)) {
      LOG.debug("Request throttled");
      incrementStatistic(STORE_IO_THROTTLED);
      statisticsContext.addValueToQuantiles(STORE_IO_THROTTLE_RATE, 1);
    } else {
      incrementStatistic(STORE_IO_RETRY);
      incrementStatistic(IGNORED_ERRORS);
    }
  }

  /**
   * Callback from {@link Invoker} when an operation is retried.
   * @param text text of the operation
   * @param ex exception
   * @param retries number of retries
   * @param idempotent is the method idempotent
   */
  public void operationRetried(
      String text,
      Exception ex,
      int retries,
      boolean idempotent) {
    operationRetried(ex);
  }

  /**
   * Get the storage statistics of this filesystem.
   * @return the storage statistics
   */
  @Override
  public S3AStorageStatistics getStorageStatistics() {
    return storageStatistics;
  }

  /**
   * Get the instrumentation's IOStatistics.
   * @return statistics or null if instrumentation has not yet been instantiated.
   */
  @Override
  public IOStatistics getIOStatistics() {
    return instrumentation != null
        ? instrumentation.getIOStatistics()
        : null;
  }

  /**
   * Get the factory for duration tracking.
   * @return a factory from the instrumentation.
   */
  protected DurationTrackerFactory getDurationTrackerFactory() {
    return instrumentation != null ?
        instrumentation.getDurationTrackerFactory()
        : null;
  }

  /**
   * Given a possibly null duration tracker factory, return a non-null
   * one for use in tracking durations -either that or the FS tracker
   * itself.
   *
   * @param factory factory.
   * @return a non-null factory.
   */
  protected DurationTrackerFactory nonNullDurationTrackerFactory(
      DurationTrackerFactory factory) {
    return store.nonNullDurationTrackerFactory(factory);
  }

  /**
   * Request object metadata; increments counters in the process.
   * Retry policy: retry untranslated.
   * This method is used in some external applications and so
   * must be viewed as a public entry point.
   * Auditing: this call does NOT initiate a new AuditSpan; the expectation
   * is that there is already an active span.
   * @param key key
   * @return the metadata
   * @throws IOException if the retry invocation raises one (it shouldn't).
   */
  @Retries.RetryRaw
  @VisibleForTesting
  @InterfaceAudience.LimitedPrivate("external utilities")
  HeadObjectResponse getObjectMetadata(String key) throws IOException {
    return getObjectMetadata(key, null, invoker, "getObjectMetadata");
  }

  /**
   * Request object metadata; increments counters in the process.
   * Retry policy: retry untranslated.
   * Uses changeTracker to detect an unexpected file version (eTag or versionId)
   * @param key key
   * @param changeTracker the change tracker to detect unexpected object version
   * @param changeInvoker the invoker providing the retry policy
   * @param operation the operation (e.g. "read" or "copy") triggering this call
   * @return the metadata
   * @throws IOException if the retry invocation raises one (it shouldn't).
   * @throws RemoteFileChangedException if an unexpected version is detected
   */
  @Retries.RetryRaw
  protected HeadObjectResponse getObjectMetadata(String key,
      ChangeTracker changeTracker,
      Invoker changeInvoker,
      String operation) throws IOException {
    return store.headObject(key, changeTracker, changeInvoker, fsHandler, operation);
  }

  /**
   * Request bucket metadata.
   * @return the metadata
   * @throws UnknownStoreException the bucket is absent
   * @throws IOException  any other problem talking to S3
   */
  @AuditEntryPoint
  @Retries.RetryTranslated
  protected HeadBucketResponse getBucketMetadata() throws IOException {
    final HeadBucketResponse response = trackDurationAndSpan(STORE_EXISTS_PROBE, bucket, null,
        () -> invoker.retry("getBucketMetadata()", bucket, true, () -> {
          try {
            return getS3Client().headBucket(
                getRequestFactory().newHeadBucketRequestBuilder(bucket).build());
          } catch (NoSuchBucketException e) {
            throw new UnknownStoreException("s3a://" + bucket + "/", " Bucket does " + "not exist");
          }
        }));
    return response;
  }

  /**
   * Initiate a {@code listObjects} operation, incrementing metrics
   * in the process.
   *
   * Retry policy: retry untranslated.
   * @param request request to initiate
   * @param trackerFactory duration tracking
   * @return the results
   * @throws IOException if the retry invocation raises one (it shouldn't).
   */
  @Retries.RetryRaw
  protected S3ListResult listObjects(S3ListRequest request,
      @Nullable final DurationTrackerFactory trackerFactory)
      throws IOException {
    incrementReadOperations();
    LOG.debug("LIST {}", request);
    validateListArguments(request);
    try(DurationInfo ignored =
            new DurationInfo(LOG, false, "LIST")) {
      return invoker.retryUntranslated(
          request.toString(),
          true,
          trackDurationOfOperation(trackerFactory,
              OBJECT_LIST_REQUEST,
              () -> {
                if (useListV1) {
                  return S3ListResult.v1(getS3Client().listObjects(request.getV1()));
                } else {
                  return S3ListResult.v2(getS3Client().listObjectsV2(request.getV2()));
                }
              }));
    }
  }

  /**
   * Validate the list arguments with this bucket's settings.
   * @param request the request to validate
   */
  private void validateListArguments(S3ListRequest request) {
    if (useListV1) {
      Preconditions.checkArgument(request.isV1());
    } else {
      Preconditions.checkArgument(!request.isV1());
    }
  }

  /**
   * List the next set of objects.
   * Retry policy: retry untranslated.
   * @param request last list objects request to continue
   * @param prevResult last paged result to continue from
   * @param trackerFactory duration tracking
   * @return the next result object
   * @throws IOException none, just there for retryUntranslated.
   */
  @Retries.RetryRaw
  protected S3ListResult continueListObjects(S3ListRequest request,
      S3ListResult prevResult,
      final DurationTrackerFactory trackerFactory) throws IOException {
    incrementReadOperations();
    validateListArguments(request);
    try(DurationInfo ignored =
            new DurationInfo(LOG, false, "LIST (continued)")) {
      return invoker.retryUntranslated(
          request.toString(),
          true,
          trackDurationOfOperation(
              trackerFactory,
              OBJECT_CONTINUE_LIST_REQUEST,
              () -> {
                if (useListV1) {
                  List<S3Object> prevListResult = prevResult.getV1().contents();

                  // Next markers are only present when a delimiter is specified.
                  String nextMarker;
                  if (prevResult.getV1().nextMarker() != null) {
                    nextMarker = prevResult.getV1().nextMarker();
                  } else {
                    nextMarker = prevListResult.get(prevListResult.size() - 1).key();
                  }

                  return S3ListResult.v1(getS3Client().listObjects(
                      request.getV1().toBuilder().marker(nextMarker).build()));
                } else {
                  return S3ListResult.v2(getS3Client().listObjectsV2(request.getV2().toBuilder()
                      .continuationToken(prevResult.getV2().nextContinuationToken()).build()));
                }
              }));
    }
  }

  /**
   * Increment read operations.
   */
  public void incrementReadOperations() {
    statistics.incrementReadOps(1);
  }

  /**
   * Increment the write operation counter.
   * This is somewhat inaccurate, as it appears to be invoked more
   * often than needed in progress callbacks.
   */
  public void incrementWriteOperations() {
    statistics.incrementWriteOps(1);
  }

  /**
   * Delete an object.
   * Increments the {@code OBJECT_DELETE_REQUESTS} and write
   * operation statistics.
   * This call does <i>not</i> create any mock parent entries.
   *
   * Retry policy: retry untranslated; delete considered idempotent.
   * @param key key to blob to delete.
   * @throws SdkException problems working with S3
   * @throws InvalidRequestException if the request was rejected due to
   * a mistaken attempt to delete the root directory.
   */
  @VisibleForTesting
  @Retries.RetryRaw
  protected void deleteObject(String key)
      throws SdkException, IOException {
    incrementWriteOperations();
    store.deleteObject(getRequestFactory()
        .newDeleteObjectRequestBuilder(key)
        .build());
  }

  /**
   * Delete an object.
   * This call does <i>not</i> create any mock parent entries.
   * Retry policy: retry untranslated; delete considered idempotent.
   * @param f path path to delete
   * @param key key of entry
   * @param isFile is the path a file (used for instrumentation only)
   * @throws SdkException problems working with S3
   * @throws IOException from invoker signature only -should not be raised.
   */
  @Retries.RetryRaw
  void deleteObjectAtPath(Path f,
      String key,
      boolean isFile)
      throws SdkException, IOException {
    if (isFile) {
      instrumentation.fileDeleted(1);
    } else {
      instrumentation.directoryDeleted();
    }
    deleteObject(key);
  }

  /**
   * Perform a bulk object delete operation against S3.
   * Increments the {@code OBJECT_DELETE_REQUESTS} and write
   * operation statistics
   * <p></p>
   * {@code OBJECT_DELETE_OBJECTS} is updated with the actual number
   * of objects deleted in the request.
   * <p></p>
   * Retry policy: retry untranslated; delete considered idempotent.
   * If the request is throttled, this is logged in the throttle statistics,
   * with the counter set to the number of keys, rather than the number
   * of invocations of the delete operation.
   * This is because S3 considers each key as one mutating operation on
   * the store when updating its load counters on a specific partition
   * of an S3 bucket.
   * If only the request was measured, this operation would under-report.
   * @param deleteRequest keys to delete on the s3-backend
   * @return the AWS response
   * @throws MultiObjectDeleteException one or more of the keys could not
   * be deleted.
   * @throws SdkException amazon-layer failure.
   */
  @Retries.RetryRaw
  private DeleteObjectsResponse deleteObjects(DeleteObjectsRequest deleteRequest)
      throws MultiObjectDeleteException, SdkException, IOException {
    incrementWriteOperations();
    DeleteObjectsResponse response = store.deleteObjects(deleteRequest).getValue();
    if (!response.errors().isEmpty()) {
      throw new MultiObjectDeleteException(response.errors());
    }
    return response;
  }

  /**
   * Create a putObject request builder.
   * Adds the ACL and metadata
   * @param key key of object
   * @param length length of object to be uploaded
   * @param isDirectoryMarker true if object to be uploaded is a directory marker
   * @return the request
   */
  public PutObjectRequest.Builder newPutObjectRequestBuilder(String key,
      long length,
      boolean isDirectoryMarker) {
    return requestFactory.newPutObjectRequestBuilder(key, null, length, isDirectoryMarker);
  }

  /**
   * Start a transfer-manager managed async PUT of an object,
   * incrementing the put requests and put bytes
   * counters.
   * It does not update the other counters,
   * as existing code does that as progress callbacks come in.
   * Byte length is calculated from the file length, or, if there is no
   * file, from the content length of the header.
   * Because the operation is async, any stream supplied in the request
   * must reference data (files, buffers) which stay valid until the upload
   * completes.
   * Retry policy: N/A: the transfer manager is performing the upload.
   * Auditing: must be inside an audit span.
   * @param putObjectRequest the request
   * @param file the file to be uploaded
   * @param listener the progress listener for the request
   * @return the upload initiated
   * @throws IOException if transfer manager creation failed.
   */
  @Retries.OnceRaw
  public UploadInfo putObject(PutObjectRequest putObjectRequest, File file,
      ProgressableProgressListener listener) throws IOException {
    return store.putObject(putObjectRequest, file, listener);
  }

  /**
   * PUT an object directly (i.e. not via the transfer manager).
   * Byte length is calculated from the file length, or, if there is no
   * file, from the content length of the header.
   *
   * Retry Policy: none.
   * Auditing: must be inside an audit span.
   * <i>Important: this call will close any input stream in the request.</i>
   * @param putObjectRequest the request
   * @param putOptions put object options
   * @param uploadData data to be uploaded
   * @param durationTrackerFactory factory for duration tracking
   * @return the upload initiated
   * @throws SdkException on problems
   */
  @VisibleForTesting
  @Retries.OnceRaw("For PUT; post-PUT actions are RetryExceptionsSwallowed")
  PutObjectResponse putObjectDirect(PutObjectRequest putObjectRequest,
      PutObjectOptions putOptions,
      S3ADataBlocks.BlockUploadData uploadData,
      DurationTrackerFactory durationTrackerFactory)
      throws SdkException {

    long len = getPutRequestLength(putObjectRequest);
    LOG.debug("PUT {} bytes to {}", len, putObjectRequest.key());
    incrementPutStartStatistics(len);
    final UploadContentProviders.BaseContentProvider provider =
        uploadData.getContentProvider();
    try {
      PutObjectResponse response =
          trackDurationOfSupplier(nonNullDurationTrackerFactory(durationTrackerFactory),
              OBJECT_PUT_REQUESTS.getSymbol(),
              () -> getS3Client().putObject(putObjectRequest,
                  RequestBody.fromContentProvider(
                      provider,
                      provider.getSize(),
                      CONTENT_TYPE_OCTET_STREAM)));
      incrementPutCompletedStatistics(true, len);
      return response;
    } catch (SdkException e) {
      incrementPutCompletedStatistics(false, len);
      throw e;
    }
  }

  /**
   * Get the length of the PUT, verifying that the length is known.
   * @param putObjectRequest a request bound to a file or a stream.
   * @return the request length
   * @throws IllegalArgumentException if the length is negative
   */
  private long getPutRequestLength(PutObjectRequest putObjectRequest) {
    long len = putObjectRequest.contentLength();

    Preconditions.checkState(len >= 0, "Cannot PUT object of unknown length");
    return len;
  }

  /**
   * Upload part of a multi-partition file.
   * Increments the write and put counters.
   * <i>Important: this call does not close any input stream in the body.</i>
   *
   * Retry Policy: none.
   * @param durationTrackerFactory duration tracker factory for operation
   * @param request the upload part request.
   * @param body the request body.
   * @return the result of the operation.
   * @throws AwsServiceException on problems
   */
  @Retries.OnceRaw
  UploadPartResponse uploadPart(UploadPartRequest request, RequestBody body,
      final DurationTrackerFactory durationTrackerFactory)
      throws AwsServiceException {
    long len = request.contentLength();
    incrementPutStartStatistics(len);
    try {
      UploadPartResponse uploadPartResponse = trackDurationOfSupplier(
          nonNullDurationTrackerFactory(durationTrackerFactory),
          MULTIPART_UPLOAD_PART_PUT.getSymbol(), () ->
              getS3Client().uploadPart(request, body));
      incrementPutCompletedStatistics(true, len);
      return uploadPartResponse;
    } catch (AwsServiceException e) {
      incrementPutCompletedStatistics(false, len);
      throw e;
    }
  }

  /**
   * At the start of a put/multipart upload operation, update the
   * relevant counters.
   *
   * @param bytes bytes in the request.
   */
  protected void incrementPutStartStatistics(long bytes) {
    store.incrementPutStartStatistics(bytes);
  }

  /**
   * At the end of a put/multipart upload operation, update the
   * relevant counters and gauges.
   *
   * @param success did the operation succeed?
   * @param bytes bytes in the request.
   */
  protected void incrementPutCompletedStatistics(boolean success, long bytes) {
    store.incrementPutCompletedStatistics(success, bytes);
  }

  /**
   * Callback for use in progress callbacks from put/multipart upload events.
   * Increments those statistics which are expected to be updated during
   * the ongoing upload operation.
   * @param key key to file that is being written (for logging)
   * @param bytes bytes successfully uploaded.
   */
  protected void incrementPutProgressStatistics(String key, long bytes) {
    store.incrementPutProgressStatistics(key, bytes);
  }

  /**
   * Delete a list of keys on a s3-backend.
   * Retry policy: retry untranslated; delete considered idempotent.
   * @param keysToDelete collection of keys to delete on the s3-backend.
   *        if empty, no request is made of the object store.
   * @param deleteFakeDir indicates whether this is for deleting fake dirs
   * @throws InvalidRequestException if the request was rejected due to
   * a mistaken attempt to delete the root directory.
   * @throws MultiObjectDeleteException one or more of the keys could not
   * be deleted in a multiple object delete operation.
   * The number of rejected objects will be added to the metric
   * {@link Statistic#FILES_DELETE_REJECTED}.
   * @throws AwsServiceException other amazon-layer failure.
   */
  @Retries.RetryRaw
  private void removeKeysS3(
          List<ObjectIdentifier> keysToDelete,
          boolean deleteFakeDir)
      throws MultiObjectDeleteException, AwsServiceException, IOException {
    if (keysToDelete.isEmpty()) {
      // exit fast if there are no keys to delete
      return;
    }
    if (keysToDelete.size() == 1) {
      // single object is a single delete call.
      // this is more informative in server logs and may be more efficient..
      deleteObject(keysToDelete.get(0).key());
      noteDeleted(1, deleteFakeDir);
      return;
    }
    try {
      if (enableMultiObjectsDelete) {
        if (keysToDelete.size() <= pageSize) {
          deleteObjects(getRequestFactory()
              .newBulkDeleteRequestBuilder(keysToDelete)
              .build());
        } else {
          // Multi object deletion of more than 1000 keys is not supported
          // by s3. So we are paging the keys by page size.
          LOG.debug("Partitioning the keys to delete as it is more than " +
                  "page size. Number of keys: {}, Page size: {}",
                  keysToDelete.size(), pageSize);
          for (List<ObjectIdentifier> batchOfKeysToDelete :
                  Lists.partition(keysToDelete, pageSize)) {
            deleteObjects(getRequestFactory()
                .newBulkDeleteRequestBuilder(batchOfKeysToDelete)
                .build());
          }
        }
      } else {
        for (ObjectIdentifier objectIdentifier : keysToDelete) {
          deleteObject(objectIdentifier.key());
        }
      }
    } catch (MultiObjectDeleteException ex) {
      // partial delete.
      // Update the stats with the count of the actual number of successful
      // deletions.
      int rejected = ex.errors().size();
      noteDeleted(keysToDelete.size() - rejected, deleteFakeDir);
      incrementStatistic(FILES_DELETE_REJECTED, rejected);
      throw ex;
    }
    noteDeleted(keysToDelete.size(), deleteFakeDir);
  }

  /**
   * Note the deletion of files or fake directories deleted.
   * @param count count of keys deleted.
   * @param deleteFakeDir are the deletions fake directories?
   */
  private void noteDeleted(final int count, final boolean deleteFakeDir) {
    if (!deleteFakeDir) {
      instrumentation.fileDeleted(count);
    } else {
      instrumentation.fakeDirsDeleted(count);
    }
  }

  /**
   * Invoke {@link #removeKeysS3(List, boolean)}.
   * If a {@code MultiObjectDeleteException} is raised, the
   * relevant statistics are updated.
   *
   * @param keysToDelete collection of keys to delete on the s3-backend.
   *        if empty, no request is made of the object store.
   * @param deleteFakeDir indicates whether this is for deleting fake dirs
   * @throws InvalidRequestException if the request was rejected due to
   * a mistaken attempt to delete the root directory.
   * @throws MultiObjectDeleteException one or more of the keys could not
   * be deleted in a multiple object delete operation.
   * @throws AwsServiceException amazon-layer failure.
   * @throws IOException other IO Exception.
   */
  @VisibleForTesting
  @Retries.RetryRaw
  public void removeKeys(
      final List<ObjectIdentifier> keysToDelete,
      final boolean deleteFakeDir)
      throws MultiObjectDeleteException, AwsServiceException,
      IOException {
    try (DurationInfo ignored = new DurationInfo(LOG, false,
            "Deleting %d keys", keysToDelete.size())) {
      removeKeysS3(keysToDelete, deleteFakeDir);
    }
  }

  /**
   * Delete a Path. This operation is at least {@code O(files)}, with
   * added overheads to enumerate the path. It is also not atomic.
   *
   * @param f the path to delete.
   * @param recursive if path is a directory and set to
   * true, the directory is deleted else throws an exception. In
   * case of a file the recursive can be set to either true or false.
   * @return true if the path existed and then was deleted; false if there
   * was no path in the first place, or the corner cases of root path deletion
   * have surfaced.
   * @throws IOException due to inability to delete a directory or file.
   */
  @Override
  @Retries.RetryTranslated
  @AuditEntryPoint
  public boolean delete(Path f, boolean recursive) throws IOException {
    checkNotClosed();
    return deleteWithoutCloseCheck(f, recursive);
  }

  /**
   * Same as delete(), except that it does not check if fs is closed.
   *
   * @param f the path to delete.
   * @param recursive if path is a directory and set to
   * true, the directory is deleted else throws an exception. In
   * case of a file the recursive can be set to either true or false.
   * @return true if the path existed and then was deleted; false if there
   * was no path in the first place, or the corner cases of root path deletion
   * have surfaced.
   * @throws IOException due to inability to delete a directory or file.
   */

  @VisibleForTesting
  protected boolean deleteWithoutCloseCheck(Path f, boolean recursive) throws IOException {
    final Path path = qualify(f);
    // span covers delete, getFileStatus, fake directory operations.
    try (AuditSpan span = createSpan(INVOCATION_DELETE.getSymbol(),
        path.toString(), null)) {
      // SC will include active span
      final StoreContext storeContext = createStoreContext();
      boolean outcome = trackDuration(getDurationTrackerFactory(),
          INVOCATION_DELETE.getSymbol(),
          new DeleteOperation(
              storeContext,
              innerGetFileStatus(path, true, StatusProbeEnum.ALL),
              recursive,
              new OperationCallbacksImpl(storeContext),
              pageSize,
              dirOperationsPurgeUploads));
      if (outcome) {
        try {
          maybeCreateFakeParentDirectory(path);
        } catch (AccessDeniedException e) {
          LOG.warn("Cannot create directory marker at {}: {}",
              f.getParent(), e.toString());
          LOG.debug("Failed to create fake dir above {}", path, e);
        }
      }
      return outcome;
    } catch (FileNotFoundException e) {
      LOG.debug("Couldn't delete {} - does not exist: {}", path, e.toString());
      instrumentation.errorIgnored();
      return false;
    } catch (SdkException e) {
      throw translateException("delete", path, e);
    }
  }

  /**
   * Create a fake directory if required.
   * That is: it is not the root path and the path does not exist.
   * Retry policy: retrying; untranslated.
   * @param f path to create
   * @throws IOException IO problem
   */
  @Retries.RetryTranslated
  private void createFakeDirectoryIfNecessary(Path f)
      throws IOException, SdkException {
    String key = pathToKey(f);
    // we only make the LIST call; the codepaths to get here should not
    // be reached if there is an empty dir marker -and if they do, it
    // is mostly harmless to create a new one.
    if (!key.isEmpty() && !s3Exists(f, StatusProbeEnum.DIRECTORIES)) {
      LOG.debug("Creating new fake directory at {}", f);
      createFakeDirectory(key, PutObjectOptions.defaultOptions());
    }
  }

  /**
   * Create a fake parent directory if required.
   * That is: it parent is not the root path and does not yet exist.
   * @param path whose parent is created if needed.
   * @throws IOException IO problem
   */
  @Retries.RetryTranslated
  @VisibleForTesting
  protected void maybeCreateFakeParentDirectory(Path path)
      throws IOException, SdkException {
    Path parent = path.getParent();
    if (parent != null && !parent.isRoot() && !isUnderMagicCommitPath(parent)) {
      createFakeDirectoryIfNecessary(parent);
    }
  }

  /**
   * Override subclass such that we benefit for async listing done
   * in {@code S3AFileSystem}. See {@code Listing#ObjectListingIterator}.
   * {@inheritDoc}
   *
   */
  @Override
  @AuditEntryPoint
  public RemoteIterator<FileStatus> listStatusIterator(Path p)
          throws FileNotFoundException, IOException {
    Path path = qualify(p);
    return typeCastingRemoteIterator(trackDurationAndSpan(
        INVOCATION_LIST_STATUS, path, () ->
            once("listStatus", path.toString(), () ->
                innerListStatus(p))));
  }

  /**
   * List the statuses of the files/directories in the given path if the path is
   * a directory.
   *
   * @param f given path
   * @return the statuses of the files/directories in the given patch
   * @throws FileNotFoundException when the path does not exist;
   *         IOException see specific implementation
   */
  @Override
  @AuditEntryPoint
  public FileStatus[] listStatus(Path f) throws FileNotFoundException,
      IOException {
    Path path = qualify(f);
    return trackDurationAndSpan(INVOCATION_LIST_STATUS, path, () ->
        once("listStatus", path.toString(),
            () -> iteratorToStatuses(innerListStatus(path))));
  }

  /**
   * List the statuses of the files/directories in the given path if the path is
   * a directory. The returned iterator is within the current active span.
   *
   * Auditing: This method MUST be called within a span.
   * The span is attached to the iterator. All further S3 calls
   * made by the iterator will be within the span.
   * @param f qualified path
   * @return the statuses of the files/directories in the given patch
   * @throws FileNotFoundException when the path does not exist;
   * @throws IOException due to an IO problem.
   * @throws SdkException on failures inside the AWS SDK
   */
  private RemoteIterator<S3AFileStatus> innerListStatus(Path f)
          throws FileNotFoundException,
          IOException, SdkException {
    Path path = qualify(f);
    LOG.debug("List status for path: {}", path);

    final RemoteIterator<S3AFileStatus> statusIt = listing
        .getFileStatusesAssumingNonEmptyDir(path, getActiveAuditSpan());
    if (!statusIt.hasNext()) {
      // We may have an empty dir, or may have file or may have nothing.
      // So we call innerGetFileStatus to get the status, this may throw
      // FileNotFoundException if we have nothing.
      // So We are guaranteed to have either a dir marker or a file.
      final S3AFileStatus fileStatus = innerGetFileStatus(path, false,
              StatusProbeEnum.ALL);
      // If it is a file return directly.
      if (fileStatus.isFile()) {
        LOG.debug("Adding: rd (not a dir): {}", path);
        S3AFileStatus[] stats = new S3AFileStatus[1];
        stats[0] = fileStatus;
        return listing.createProvidedFileStatusIterator(
                stats,
                ACCEPT_ALL,
                Listing.ACCEPT_ALL_OBJECTS);
      }
    }
    // Here we have a directory which may or may not be empty.
    return statusIt;
  }

  /**
   * Create a {@code ListObjectsRequest} request against this bucket,
   * with the maximum keys returned in a query set by {@link #maxKeys}.
   * @param key key for request
   * @param delimiter any delimiter
   * @return the request
   */
  @VisibleForTesting
  public S3ListRequest createListObjectsRequest(String key,
      String delimiter) {
    return createListObjectsRequest(key, delimiter, maxKeys);
  }

  /**
   * Create the List objects request appropriate for the
   * active list request option.
   * @param key key for request
   * @param delimiter any delimiter
   * @param limit limit of keys
   * @return the request
   */
  private S3ListRequest createListObjectsRequest(String key,
      String delimiter, int limit) {
    if (!useListV1) {
      ListObjectsV2Request.Builder requestBuilder =
          getRequestFactory().newListObjectsV2RequestBuilder(
              key, delimiter, limit);
      return S3ListRequest.v2(requestBuilder.build());
    } else {
      ListObjectsRequest.Builder requestBuilder =
          getRequestFactory().newListObjectsV1RequestBuilder(
              key, delimiter, limit);
      return S3ListRequest.v1(requestBuilder.build());
    }
  }

  /**
   * Set the current working directory for the given file system. All relative
   * paths will be resolved relative to it.
   *
   * @param newDir the current working directory.
   */
  public void setWorkingDirectory(Path newDir) {
    workingDir = makeQualified(newDir);
  }

  /**
   * Get the current working directory for the given file system.
   * @return the directory pathname
   */
  public Path getWorkingDirectory() {
    return workingDir;
  }

  /**
   * Get the username of the FS.
   * @return the short name of the user who instantiated the FS
   */
  public String getUsername() {
    return username;
  }

  /**
   * Get the owner of this FS: who created it?
   * @return the owner of the FS.
   */
  public UserGroupInformation getOwner() {
    return owner;
  }

  /**
   *
   * Make the given path and all non-existent parents into
   * directories. Has the semantics of Unix {@code 'mkdir -p'}.
   * Existence of the directory hierarchy is not an error.
   * Parent elements are scanned to see if any are a file,
   * <i>except under "MAGIC PATH"</i> paths.
   * There the FS assumes that the destination directory creation
   * did that scan and that paths in job/task attempts are all
   * "well formed"
   * @param p path to create
   * @param permission to apply to path
   * @return true if a directory was created or already existed
   * @throws FileAlreadyExistsException there is a file at the path specified
   * or is discovered on one of its ancestors.
   * @throws IOException other IO problems
   */
  @Override
  @AuditEntryPoint
  public boolean mkdirs(Path p, FsPermission permission) throws IOException,
      FileAlreadyExistsException {
    Path path = qualify(p);
    return trackDurationAndSpan(
        INVOCATION_MKDIRS, path,
        new MkdirOperation(
            createStoreContext(),
            path,
            createMkdirOperationCallbacks(),
            isMagicCommitPath(path),
            performanceFlags.enabled(PerformanceFlagEnum.Mkdir)));
  }

  /**
   * Override point: create the callbacks for Mkdir.
   * This does not create a new span; caller must be in one.
   * @return an implementation of the MkdirCallbacks,
   */
  @VisibleForTesting
  public MkdirOperation.MkdirCallbacks createMkdirOperationCallbacks() {
    return new MkdirOperationCallbacksImpl();
  }

  /**
   * Callbacks from the {@link MkdirOperation}.
   */
  protected class MkdirOperationCallbacksImpl implements
      MkdirOperation.MkdirCallbacks {

    @Override
    public S3AFileStatus probePathStatus(final Path path,
        final Set<StatusProbeEnum> probes) throws IOException {
      return S3AFileSystem.this.innerGetFileStatus(path, false, probes);
    }

    @Override
    public void createFakeDirectory(final Path dir)
        throws IOException {
      S3AFileSystem.this.createFakeDirectory(
          pathToKey(dir),
          PutObjectOptions.defaultOptions());
    }
  }

  /**
   * This is a very slow operation against object storage.
   * Execute it as a single span with whatever optimizations
   * have been implemented.
   * {@inheritDoc}
   */
  @Override
  @Retries.RetryTranslated
  @AuditEntryPoint
  public ContentSummary getContentSummary(final Path f) throws IOException {
    final Path path = qualify(f);
    return trackDurationAndSpan(
        INVOCATION_GET_CONTENT_SUMMARY, path,
        new GetContentSummaryOperation(
            createStoreContext(),
            path,
            createGetContentSummaryCallbacks()));
  }

  /**
   * Override point: create the callbacks for getContentSummary.
   * This does not create a new span; caller must be in one.
   * @return an implementation of the GetContentSummaryCallbacksImpl
   */
  protected GetContentSummaryOperation.GetContentSummaryCallbacks
      createGetContentSummaryCallbacks() {
    return new GetContentSummaryCallbacksImpl();
  }

  /**
   * Callbacks from the {@link GetContentSummaryOperation}.
   */
  protected class GetContentSummaryCallbacksImpl implements
      GetContentSummaryOperation.GetContentSummaryCallbacks {

    @Override
    public S3AFileStatus probePathStatus(final Path path,
        final Set<StatusProbeEnum> probes) throws IOException {
      return S3AFileSystem.this.innerGetFileStatus(path, false, probes);
    }

    @Override
    public RemoteIterator<S3ALocatedFileStatus> listFilesIterator(final Path path,
        final boolean recursive) throws IOException {
      return S3AFileSystem.this.innerListFiles(path, recursive, Listing.ACCEPT_ALL_OBJECTS, null);
    }
  }

  /**
   * Soft check of access by forwarding to the audit manager
   * and so on to the auditor.
   * {@inheritDoc}
   */
  @Override
  @AuditEntryPoint
  public void access(final Path f, final FsAction mode)
      throws AccessControlException, FileNotFoundException, IOException {
    Path path = qualify(f);
    LOG.debug("check access mode {} for {}", path, mode);
    trackDurationAndSpan(
        INVOCATION_ACCESS, path, () -> {
          final S3AFileStatus stat = innerGetFileStatus(path, false,
              StatusProbeEnum.ALL);
          if (!getAuditManager().checkAccess(path, stat, mode)) {
            incrementStatistic(AUDIT_ACCESS_CHECK_FAILURE);
            throw new AccessControlException(String.format(
                "Permission denied: user=%s, path=\"%s\":%s:%s:%s%s",
                getOwner().getUserName(),
                stat.getPath(),
                stat.getOwner(), stat.getGroup(),
                stat.isDirectory() ? "d" : "-", mode));
          }
          // simply for the API binding.
          return true;
        });
  }

  /**
   * Return a file status object that represents the path.
   * @param f The path we want information from
   * @return a FileStatus object
   * @throws FileNotFoundException when the path does not exist
   * @throws IOException on other problems.
   */
  @Override
  @AuditEntryPoint
  @Retries.RetryTranslated
  public FileStatus getFileStatus(final Path f) throws IOException {
    Path path = qualify(f);
    if (isTrackMagicCommitsInMemoryEnabled(getConf()) && isMagicCommitPath(path)) {
      // Some downstream apps might call getFileStatus for a magic path to get the file size.
      // when commit data is stored in memory construct the dummy S3AFileStatus with correct
      // file size fetched from the memory.
      if (InMemoryMagicCommitTracker.getPathToBytesWritten().containsKey(path)) {
        long len = InMemoryMagicCommitTracker.getPathToBytesWritten().get(path);
        return new S3AFileStatus(len,
            0L,
            path,
            getDefaultBlockSize(path),
            username,
            MAGIC_COMMITTER_PENDING_OBJECT_ETAG_NAME,
            null);
      }
    }
    return trackDurationAndSpan(
        INVOCATION_GET_FILE_STATUS, path, () ->
            innerGetFileStatus(path, false, StatusProbeEnum.ALL));
  }

  /**
   * Get the status of a file or directory.
   * Internal version of {@link #getFileStatus(Path)}.
   * @param f The path we want information from
   * @param needEmptyDirectoryFlag if true, implementation will calculate
   *        a TRUE or FALSE value for {@link S3AFileStatus#isEmptyDirectory()}
   * @param probes probes to make.
   * @return a S3AFileStatus object
   * @throws FileNotFoundException when the path does not exist
   * @throws IOException on other problems.
   */
  @VisibleForTesting
  @Retries.RetryTranslated
  S3AFileStatus innerGetFileStatus(final Path f,
      final boolean needEmptyDirectoryFlag,
      final Set<StatusProbeEnum> probes) throws IOException {
    final Path path = qualify(f);
    String key = pathToKey(path);
    LOG.debug("Getting path status for {}  ({}); needEmptyDirectory={}",
        path, key, needEmptyDirectoryFlag);
    return s3GetFileStatus(path,
        key,
        probes,
        needEmptyDirectoryFlag);

  }

  /**
   * Probe store for file status with control of which probes are issued..
   * Used to implement {@link #innerGetFileStatus(Path, boolean, Set)},
   * and for direct management of empty directory blobs.
   *
   * Checks made, in order:
   * <ol>
   *   <li>
   *     Head: look for an object at the given key, provided that
   *     the key doesn't end in "/"
   *   </li>
   *   <li>
   *     DirMarker/List: issue a LIST on the key (with / if needed), require one
   *     entry to be found for the path to be considered a non-empty directory.
   *   </li>
   * </ol>
   *
   * Notes:
   * <ul>
   *   <li>
   *     Objects ending in / are treated as directory markers,
   *     irrespective of length.
   *   </li>
   *   <li>
   *     The HEAD requests require the permissions to read an object,
   *     including (we believe) the ability to decrypt the file.
   *     At the very least, for SSE-C markers, you need the same key on
   *     the client for the HEAD to work.
   *   </li>
   *   <li>
   *     The List probe needs list permission.
   *   </li>
   * </ul>
   *
   * Retry policy: retry translated.
   * @param path Qualified path
   * @param key  Key string for the path
   * @param probes probes to make
   * @param needEmptyDirectoryFlag if true, implementation will calculate
   *        a TRUE or FALSE value for {@link S3AFileStatus#isEmptyDirectory()}
   * @return Status
   * @throws FileNotFoundException the supplied probes failed.
   * @throws IOException on other problems.
   */
  @VisibleForTesting
  @Retries.RetryTranslated
  S3AFileStatus s3GetFileStatus(final Path path,
      final String key,
      final Set<StatusProbeEnum> probes,
      final boolean needEmptyDirectoryFlag) throws IOException {
    LOG.debug("S3GetFileStatus {}", path);
    // either you aren't looking for the directory flag, or you are,
    // and if you are, the probe list must contain list.
    Preconditions.checkArgument(!needEmptyDirectoryFlag
        || probes.contains(StatusProbeEnum.List),
        "s3GetFileStatus(%s) wants to know if a directory is empty but"
            + " does not request a list probe", path);
    if (key.isEmpty() && !needEmptyDirectoryFlag) {
      return new S3AFileStatus(Tristate.UNKNOWN, path, username);
    }

    if (!key.isEmpty() && !key.endsWith("/")
        && probes.contains(StatusProbeEnum.Head)) {
      try {
        // look for the simple file
        HeadObjectResponse meta = getObjectMetadata(key);
        LOG.debug("Found exact file: normal file {}", key);
        return new S3AFileStatus(meta.contentLength(),
            meta.lastModified().toEpochMilli(),
            path,
            getDefaultBlockSize(path),
            username,
            meta.eTag(),
            meta.versionId());
      } catch (AwsServiceException e) {
        // if the response is a 404 error, it just means that there is
        // no file at that path...the remaining checks will be needed.
        // But: an empty bucket is also a 404, so check for that
        // and fail.
        if (e.statusCode() != SC_404_NOT_FOUND || isUnknownBucket(e)) {
          throw translateException("getFileStatus", path, e);
        }
      } catch (SdkException e) {
        throw translateException("getFileStatus", path, e);
      }
    }

    // execute the list
    if (probes.contains(StatusProbeEnum.List)) {
      try {
        // this will find a marker dir / as well as an entry.
        // When making a simple "is this a dir check" all is good.
        // but when looking for an empty dir, we need to verify there are no
        // children, so ask for two entries, so as to find
        // a child
        String dirKey = maybeAddTrailingSlash(key);
        // list size is dir marker + at least one entry

        final int listSize = 2;
        S3ListRequest request = createListObjectsRequest(dirKey, "/",
            listSize);
        // execute the request
        S3ListResult listResult = listObjects(request,
            getDurationTrackerFactory());

        if (listResult.hasPrefixesOrObjects()) {
          if (LOG.isDebugEnabled()) {
            LOG.debug("Found path as directory (with /)");
            listResult.logAtDebug(LOG);
          }
          // At least one entry has been found.
          // If looking for an empty directory, the marker must exist but no
          // children.
          // So the listing must contain the marker entry only.
          if (needEmptyDirectoryFlag
              && listResult.representsEmptyDirectory(dirKey)) {
            return new S3AFileStatus(Tristate.TRUE, path, username);
          }
          // either an empty directory is not needed, or the
          // listing does not meet the requirements.
          return new S3AFileStatus(Tristate.FALSE, path, username);
        } else if (key.isEmpty()) {
          LOG.debug("Found root directory");
          return new S3AFileStatus(Tristate.TRUE, path, username);
        }
      } catch (AwsServiceException e) {
        if (e.statusCode() != SC_404_NOT_FOUND || isUnknownBucket(e)) {
          throw translateException("getFileStatus", path, e);
        }
      } catch (SdkException e) {
        throw translateException("getFileStatus", path, e);
      }
    }

    LOG.debug("Not Found: {}", path);
    throw new FileNotFoundException("No such file or directory: " + path);
  }

  /**
   * Probe S3 for a file or dir existing, with the given probe set.
   * Retry policy: retrying; translated.
   * @param path qualified path to look for
   * @param probes probes to make
   * @return true if path exists in S3
   * @throws IOException IO failure
   */
  @Retries.RetryTranslated
  private boolean s3Exists(final Path path, final Set<StatusProbeEnum> probes)
      throws IOException {
    String key = pathToKey(path);
    try {
      s3GetFileStatus(path, key, probes, false);
      return true;
    } catch (FileNotFoundException e) {
      return false;
    }
  }

  /**
   * The src file is on the local disk.  Add it to FS at
   * the given dst name.
   *
   * This version doesn't need to create a temporary file to calculate the md5.
   * If {@link Constants#OPTIMIZED_COPY_FROM_LOCAL} is set to false,
   * the superclass implementation is used.
   *
   * @param delSrc whether to delete the src
   * @param overwrite whether to overwrite an existing file
   * @param src path
   * @param dst path
   * @throws IOException IO problem
   * @throws FileAlreadyExistsException the destination file exists and
   * overwrite==false
   */
  @Override
  @AuditEntryPoint
  public void copyFromLocalFile(boolean delSrc, boolean overwrite, Path src,
                                Path dst) throws IOException {
    checkNotClosed();
    LOG.debug("Copying local file from {} to {} (delSrc={} overwrite={}",
        src, dst, delSrc, overwrite);
    if (optimizedCopyFromLocal) {
      trackDurationAndSpan(INVOCATION_COPY_FROM_LOCAL_FILE, dst, () ->
          new CopyFromLocalOperation(
              createStoreContext(),
              src,
              dst,
              delSrc,
              overwrite,
              createCopyFromLocalCallbacks(getActiveAuditSpan()))
              .execute());
    } else {
      // call the superclass, but still count statistics.
      // there is no overall span here, as each FS API call will
      // be in its own span.
      LOG.debug("Using base copyFromLocalFile implementation");
      trackDurationAndSpan(INVOCATION_COPY_FROM_LOCAL_FILE, dst, () -> {
        super.copyFromLocalFile(delSrc, overwrite, src, dst);
        return null;
      });
    }
  }

  /**
   * Create the CopyFromLocalCallbacks;
   * protected to assist in mocking.
   * @param span audit span.
   * @return the callbacks
   * @throws IOException failure to get the local fs.
   */
  protected CopyFromLocalOperation.CopyFromLocalOperationCallbacks
      createCopyFromLocalCallbacks(final AuditSpanS3A span) throws IOException {
    LocalFileSystem local = getLocal(getConf());
    return new CopyFromLocalCallbacksImpl(span, local);
  }

  protected final class CopyFromLocalCallbacksImpl implements
      CopyFromLocalOperation.CopyFromLocalOperationCallbacks {

    /** Span to use for all operations. */
    private final AuditSpanS3A span;
    private final LocalFileSystem local;

    private CopyFromLocalCallbacksImpl(final AuditSpanS3A span,
        LocalFileSystem local) {
      this.span = span;
      this.local = local;
    }

    @Override
    public RemoteIterator<LocatedFileStatus> listLocalStatusIterator(
        final Path path) throws IOException {
      return local.listLocatedStatus(path);
    }

    @Override
    public File pathToLocalFile(Path path) {
      return local.pathToFile(path);
    }

    @Override
    public boolean deleteLocal(Path path, boolean recursive) throws IOException {
      return local.delete(path, recursive);
    }

    @Override
    @Retries.RetryTranslated
    public void copyLocalFileFromTo(File file, Path from, Path to) throws IOException {
      // the duration of the put is measured, but the active span is the
      // constructor-supplied one -this ensures all audit log events are grouped correctly
      span.activate();
      trackDuration(getDurationTrackerFactory(), OBJECT_PUT_REQUESTS.getSymbol(), () -> {
        final String key = pathToKey(to);
        PutObjectRequest.Builder putObjectRequestBuilder =
            newPutObjectRequestBuilder(key, file.length(), false);
        final String dest = to.toString();
        S3AFileSystem.this.invoker.retry("putObject(" + dest + ")", dest, true, () ->
            executePut(putObjectRequestBuilder.build(), null,
                PutObjectOptions.defaultOptions(), file));
        return null;
      });
    }

    @Override
    @Retries.RetryTranslated
    public FileStatus getFileStatus(Path f) throws IOException {
      return S3AFileSystem.this.getFileStatus(f);
    }

    @Override
    @Retries.RetryTranslated
    public boolean createEmptyDir(Path path, StoreContext storeContext)
        throws IOException {
      return trackDuration(getDurationTrackerFactory(),
          INVOCATION_MKDIRS.getSymbol(),
          new MkdirOperation(
              storeContext,
              path,
              createMkdirOperationCallbacks(),
              false,
              performanceFlags.enabled(PerformanceFlagEnum.Mkdir)));
    }
  }

  /**
   * Execute a PUT via the transfer manager, blocking for completion.
   * @param putObjectRequest request
   * @param progress optional progress callback
   * @param putOptions put object options
   * @return the upload result
   * @throws IOException IO failure
   * @throws CancellationException if the wait() was cancelled
   */
  @Retries.OnceTranslated("For PUT; post-PUT actions are RetrySwallowed")
  PutObjectResponse executePut(
      final PutObjectRequest putObjectRequest,
      final Progressable progress,
      final PutObjectOptions putOptions,
      final File file)
      throws IOException {
    String key = putObjectRequest.key();
    ProgressableProgressListener listener =
        new ProgressableProgressListener(store, key, progress);
    UploadInfo info = putObject(putObjectRequest, file, listener);
    PutObjectResponse result = store.waitForUploadCompletion(key, info).response();
    listener.uploadCompleted(info.getFileUpload());
    return result;
  }

  /**
   * This override bypasses checking for existence.
   *
   * @param f the path to delete; this may be unqualified.
   * @return true, always.
   * @throws IOException IO failure
   */
  @Override
  public boolean deleteOnExit(Path f) throws IOException {
    Path qualifedPath = makeQualified(f);
    synchronized (deleteOnExit) {
      deleteOnExit.add(qualifedPath);
    }
    return true;
  }

  /**
   * Cancel the scheduled deletion of the path when the FileSystem is closed.
   * @param f the path to cancel deletion
   * @return true if the path was found in the delete-on-exit list.
   */
  @Override
  public boolean cancelDeleteOnExit(Path f) {
    Path qualifedPath = makeQualified(f);
    synchronized (deleteOnExit) {
      return deleteOnExit.remove(qualifedPath);
    }
  }

  /**
   * Delete all paths that were marked as delete-on-exit. This recursively
   * deletes all files and directories in the specified paths. It does not
   * check if file exists and filesystem is closed.
   *
   * The time to process this operation is {@code O(paths)}, with the actual
   * time dependent on the time for existence and deletion operations to
   * complete, successfully or not.
   */
  @Override
  protected void processDeleteOnExit() {
    synchronized (deleteOnExit) {
      for (Iterator<Path> iter = deleteOnExit.iterator(); iter.hasNext();) {
        Path path = iter.next();
        try {
          deleteWithoutCloseCheck(path, true);
        } catch (IOException e) {
          LOG.info("Ignoring failure to deleteOnExit for path {}", path);
          LOG.debug("The exception for deleteOnExit is {}", e);
        }
        iter.remove();
      }
    }
  }

  /**
   * Close the filesystem. This shuts down all transfers.
   * @throws IOException IO problem
   */
  @Override
  public void close() throws IOException {
    if (closed.getAndSet(true)) {
      // already closed
      return;
    }
    isClosed = true;
    LOG.debug("Filesystem {} is closed", uri);
    try {
      super.close();
    } finally {
      stopAllServices();
      // log IO statistics, including of any file deletion during
      // superclass close
      if (getConf() != null) {
        String iostatisticsLoggingLevel =
            getConf().getTrimmed(IOSTATISTICS_LOGGING_LEVEL,
                IOSTATISTICS_LOGGING_LEVEL_DEFAULT);
        logIOStatisticsAtLevel(LOG, iostatisticsLoggingLevel, getIOStatistics());
      }
    }
  }

  /**
   * Stop all services.
   * This is invoked in close() and during failures of initialize()
   * -make sure that all operations here are robust to failures in
   * both the expected state of this FS and of failures while being stopped.
   */
  protected synchronized void stopAllServices() {
    try {
      trackDuration(getDurationTrackerFactory(), FILESYSTEM_CLOSE.getSymbol(), () -> {
        closeAutocloseables(LOG, store);
        store = null;
        s3Client = null;

        // At this point the S3A client is shut down,
        // now the executor pools are closed
        HadoopExecutors.shutdown(boundedThreadPool, LOG,
            THREAD_POOL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS);
        boundedThreadPool = null;
        HadoopExecutors.shutdown(unboundedThreadPool, LOG,
            THREAD_POOL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS);
        unboundedThreadPool = null;
        if (futurePool != null) {
          futurePool.shutdown(LOG, THREAD_POOL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS);
          futurePool = null;
        }
        // other services are shutdown.
        cleanupWithLogger(LOG,
            delegationTokens.orElse(null),
            signerManager,
            auditManager);
        closeAutocloseables(LOG, credentials);
        delegationTokens = Optional.empty();
        signerManager = null;
        credentials = null;
        return null;
      });
    } catch (IOException e) {
      // failure during shutdown.
      // this should only be from the signature of trackDurationAndSpan().
      LOG.warn("Failure during service shutdown", e);
    }
    // and once this duration has been tracked, close the statistics
    // other services are shutdown.
    cleanupWithLogger(LOG, instrumentation);
  }

  /**
   * Verify that the filesystem has not been closed. Non blocking; this gives
   * the last state of the volatile {@link #closed} field.
   * @throws PathIOException if the FS is closed.
   */
  private void checkNotClosed() throws PathIOException {
    if (isClosed) {
      throw new PathIOException(uri.toString(), E_FS_CLOSED);
    }
  }

  /**
   * Get the delegation token support for this filesystem;
   * not null iff delegation support is enabled.
   * @return the token support, or an empty option.
   */
  @VisibleForTesting
  public Optional<S3ADelegationTokens> getDelegationTokens() {
    return delegationTokens;
  }

  /**
   * Return a service name iff delegation tokens are enabled and the
   * token binding is issuing delegation tokens.
   * @return the canonical service name or null
   */
  @Override
  public String getCanonicalServiceName() {
    // this could all be done in map statements, but it'd be harder to
    // understand and maintain.
    // Essentially: no DTs, no canonical service name.
    if (!delegationTokens.isPresent()) {
      return null;
    }
    // DTs present: ask the binding if it is willing to
    // serve tokens (or fail noisily).
    S3ADelegationTokens dt = delegationTokens.get();
    return dt.getTokenIssuingPolicy() != NoTokensAvailable
        ? dt.getCanonicalServiceName()
        : null;
  }

  /**
   * Get a delegation token if the FS is set up for them.
   * If the user already has a token, it is returned,
   * <i>even if it has expired</i>.
   * @param renewer the account name that is allowed to renew the token.
   * @return the delegation token or null
   * @throws IOException IO failure
   */
  @Override
  @AuditEntryPoint
  public Token<AbstractS3ATokenIdentifier> getDelegationToken(String renewer)
      throws IOException {
    checkNotClosed();
    LOG.debug("Delegation token requested");
    if (delegationTokens.isPresent()) {
      return trackDurationAndSpan(
          INVOCATION_GET_DELEGATION_TOKEN, null, () ->
              delegationTokens.get().getBoundOrNewDT(
                  encryptionSecrets,
                  (renewer != null ? new Text(renewer) : new Text())));
    } else {
      // Delegation token support is not set up
      LOG.debug("Token support is not enabled");
      return null;
    }
  }

  /**
   * Ask any DT plugin for any extra token issuers.
   * These do not get told of the encryption secrets and can
   * return any type of token.
   * This allows DT plugins to issue extra tokens for
   * ancillary services.
   */
  @Override
  public DelegationTokenIssuer[] getAdditionalTokenIssuers()
      throws IOException {
    checkNotClosed();
    if (delegationTokens.isPresent()) {
      return delegationTokens.get().getAdditionalTokenIssuers();
    } else {
      // Delegation token support is not set up
      LOG.debug("Token support is not enabled");
      return null;
    }
  }

  /**
   * Build the AWS policy for restricted access to the resources needed
   * by this bucket.
   * if needed, and KMS operations.
   * @param access access level desired.
   * @return a policy for use in roles
   */
  @Override
  @InterfaceAudience.Private
  public List<RoleModel.Statement> listAWSPolicyRules(
      final Set<AccessLevel> access) {
    if (access.isEmpty()) {
      return Collections.emptyList();
    }
    List<RoleModel.Statement> statements = new ArrayList<>(
        allowS3Operations(bucket,
            access.contains(AccessLevel.WRITE)
                || access.contains(AccessLevel.ADMIN)));

    // no attempt is made to qualify KMS access; there's no
    // way to predict read keys, and not worried about granting
    // too much encryption access.
    statements.add(STATEMENT_ALLOW_KMS_RW);
    if (s3ExpressStore) {
      LOG.warn("S3Express store polices not yet implemented");
    }

    return statements;
  }

  /**
   * Copy a single object in the bucket via a COPY operation.
   * There's no update of metadata, etc.
   * Callers must implement.
   * @param srcKey source object path
   * @param dstKey destination object path
   * @param size object size
   * @param srcAttributes S3 attributes of the source object
   * @param readContext the read context
   * @return the result of the copy
   * @throws InterruptedIOException the operation was interrupted
   * @throws IOException Other IO problems
   */
  @Retries.RetryTranslated
  private CopyObjectResponse copyFile(String srcKey, String dstKey, long size,
      S3ObjectAttributes srcAttributes, S3AReadOpContext readContext)
      throws IOException {
    LOG.debug("copyFile {} -> {} ", srcKey, dstKey);

    ChangeTracker changeTracker = new ChangeTracker(
        keyToQualifiedPath(srcKey).toString(),
        changeDetectionPolicy,
        readContext.getS3AStatisticsContext()
            .newInputStreamStatistics()
            .getChangeTrackerStatistics(),
        srcAttributes);

    String action = "copyFile(" + srcKey + ", " + dstKey + ")";
    Invoker readInvoker = readContext.getReadInvoker();

    HeadObjectResponse srcom;
    try {
      srcom = once(action, srcKey,
          () ->
              getObjectMetadata(srcKey, changeTracker, readInvoker, "copy"));
    } catch (FileNotFoundException e) {
      // if rename fails at this point it means that the expected file was not
      // found.
      // This means the File was deleted since LIST enumerated it.
      LOG.debug("getObjectMetadata({}) failed to find an expected file",
          srcKey, e);
      throw new RemoteFileChangedException(
          keyToQualifiedPath(srcKey).toString(),
          action,
          RemoteFileChangedException.FILE_NOT_FOUND_SINGLE_ATTEMPT,
          e);
    }

    CopyObjectRequest.Builder copyObjectRequestBuilder =
        getRequestFactory().newCopyObjectRequestBuilder(srcKey, dstKey, srcom);
    changeTracker.maybeApplyConstraint(copyObjectRequestBuilder);
    final CopyObjectRequest copyRequest = copyObjectRequestBuilder.build();
    LOG.debug("Copy Request: {}", copyRequest);
    CopyObjectResponse response;

    // transfer manager is skipped if disabled or the file is too small to worry about
    final boolean useTransferManager = isMultipartCopyEnabled && size >= multiPartThreshold;
    if (useTransferManager) {
      // use transfer manager
      response = readInvoker.retry(
          action, srcKey,
          true,
          () -> {
            incrementStatistic(OBJECT_COPY_REQUESTS);

            Copy copy = store.getOrCreateTransferManager().copy(
                CopyRequest.builder()
                    .copyObjectRequest(copyRequest)
                    .build());

            try {
              CompletedCopy completedCopy = copy.completionFuture().join();
              return completedCopy.response();
            } catch (CompletionException e) {
              Throwable cause = e.getCause();
              if (cause instanceof SdkException) {
                // if this is a 412 precondition failure, it may
                // be converted to a RemoteFileChangedException
                SdkException awsException = (SdkException)cause;
                changeTracker.processException(awsException, "copy");
                throw awsException;
              }
              throw extractException(action, srcKey, e);
            }
          });
    } else {
      // single part copy bypasses transfer manager
      // note, this helps with some mock testing, e.g. HBoss. as there is less to mock.
      response = readInvoker.retry(
          action, srcKey,
          true,
          () -> {
            LOG.debug("copyFile: single part copy {} -> {} of size {}", srcKey, dstKey, size);
            incrementStatistic(OBJECT_COPY_REQUESTS);
            try {
              return getS3Client().copyObject(copyRequest);
            } catch (SdkException awsException) {
              // if this is a 412 precondition failure, it may
              // be converted to a RemoteFileChangedException
              changeTracker.processException(awsException, "copy");
              // otherwise, rethrow
              throw awsException;
            }
          });
    }

    changeTracker.processResponse(response);
    incrementWriteOperations();
    instrumentation.filesCopied(1, size);
    return response;
  }

  /**
   * Initiate a multipart upload from the preconfigured request.
   * Retry policy: none + untranslated.
   * @param request request to initiate
   * @return the result of the call
   * @throws SdkException on failures inside the AWS SDK
   * @throws IOException Other IO problems
   */
  @Retries.OnceRaw
  CreateMultipartUploadResponse initiateMultipartUpload(
      CreateMultipartUploadRequest request) throws IOException {
    LOG.debug("Initiate multipart upload to {}", request.key());
    return trackDurationOfSupplier(getDurationTrackerFactory(),
        OBJECT_MULTIPART_UPLOAD_INITIATED.getSymbol(),
        () -> getS3Client().createMultipartUpload(request));
  }

  /**
   * Perform post-write actions.
   * <p>
   * This operation MUST be called after any PUT/multipart PUT completes
   * successfully.
   * @param key key written to
   * @param length total length of file written
   * @param putOptions put object options
   */
  @InterfaceAudience.Private
  @Retries.RetryExceptionsSwallowed
  void finishedWrite(
      String key,
      long length,
      PutObjectOptions putOptions) {
    LOG.debug("Finished write to {}, len {}.",
        key, length);
    Preconditions.checkArgument(length >= 0, "content length is negative");
  }

  /**
   * Create a fake directory, always ending in "/".
   * Retry policy: retrying; translated.
   * @param objectName name of directory object.
   * @param putOptions put object options
   * @throws IOException IO failure
   */
  @Retries.RetryTranslated
  private void createFakeDirectory(final String objectName,
      final PutObjectOptions putOptions)
      throws IOException {
    createEmptyObject(objectName, putOptions);
  }

  /**
   * Used to create an empty file that represents an empty directory.
   * Retry policy: retrying; translated.
   * @param objectName object to create
   * @param putOptions put object options
   * @throws IOException IO failure
   */
  @Retries.RetryTranslated
  private void createEmptyObject(final String objectName, PutObjectOptions putOptions)
      throws IOException {

    S3ADataBlocks.BlockUploadData uploadData = new S3ADataBlocks.BlockUploadData(
        new byte[0], 0, 0, null);

    invoker.retry("PUT 0-byte object ", objectName, true,
        () -> putObjectDirect(
            getRequestFactory().newDirectoryMarkerRequest(objectName).build(),
            putOptions,
            uploadData,
            getDurationTrackerFactory()));
    incrementPutProgressStatistics(objectName, 0);
    instrumentation.directoryCreated();
  }

  /**
   * Return the number of bytes that large input files should be optimally
   * be split into to minimize I/O time.
   */
  public long getDefaultBlockSize() {
    return getConf().getLongBytes(FS_S3A_BLOCK_SIZE, DEFAULT_BLOCKSIZE);
  }

  @Override
  public String toString() {
    final StringBuilder sb = new StringBuilder(
        "S3AFileSystem{");
    sb.append("uri=").append(uri);
    sb.append(", workingDir=").append(workingDir);
    sb.append(", partSize=").append(partSize);
    sb.append(", enableMultiObjectsDelete=").append(enableMultiObjectsDelete);
    sb.append(", maxKeys=").append(maxKeys);
    sb.append(", performanceFlags=").append(performanceFlags);
    if (cannedACL != null) {
      sb.append(", cannedACL=").append(cannedACL);
    }
    if (openFileHelper != null) {
      sb.append(", ").append(openFileHelper);
    }
    if (getConf() != null) {
      sb.append(", blockSize=").append(getDefaultBlockSize());
    }
    sb.append(", multiPartThreshold=").append(multiPartThreshold);
    if (getS3EncryptionAlgorithm() != null) {
      sb.append(", s3EncryptionAlgorithm='")
          .append(getS3EncryptionAlgorithm())
          .append('\'');
    }
    if (blockFactory != null) {
      sb.append(", blockFactory=").append(blockFactory);
    }
    sb.append(", auditManager=").append(auditManager);
    sb.append(", useListV1=").append(useListV1);
    if (committerIntegration != null) {
      sb.append(", magicCommitter=").append(isMagicCommitEnabled());
    }
    sb.append(", boundedExecutor=").append(boundedThreadPool);
    sb.append(", unboundedExecutor=").append(unboundedThreadPool);
    sb.append(", credentials=").append(credentials);
    sb.append(", delegation tokens=")
        .append(delegationTokens.map(Objects::toString).orElse("disabled"));
    // if logging at debug, toString returns the entire IOStatistics set.
    if (getInstrumentation() != null) {
      sb.append(", instrumentation {")
          .append(getInstrumentation().toString())
          .append("}");
    }
    sb.append(", ClientSideEncryption=").append(isCSEEnabled);

    if (accessPoint != null) {
      sb.append(", arnForBucket=").append(accessPoint.getFullArn());
    }
    sb.append('}');
    return sb.toString();
  }

  /**
   * Get the partition size for multipart operations.
   * @return the value as set during initialization
   */
  public long getPartitionSize() {
    return partSize;
  }

  /**
   * Get the threshold for multipart files.
   * @return the value as set during initialization
   */
  public long getMultiPartThreshold() {
    return multiPartThreshold;
  }

  /**
   * Get the maximum key count.
   * @return a value, valid after initialization
   */
  int getMaxKeys() {
    return maxKeys;
  }

  /**
   * Is magic commit enabled?
   * @return true if magic commit support is turned on.
   */
  public boolean isMagicCommitEnabled() {
    return committerIntegration.isMagicCommitEnabled();
  }

  /**
   * Predicate: is a path a magic commit path?
   * True if magic commit is enabled and the path qualifies as special,
   * and is not a a .pending or .pendingset file,
   * @param path path to examine
   * @return true if writing a file to the path triggers a "magic" write.
   */
  public boolean isMagicCommitPath(Path path) {
    return committerIntegration.isMagicCommitPath(path);
  }

  /**
   * Predicate: is a path under a magic commit path?
   * True if magic commit is enabled and the path is under "MAGIC PATH",
   * irrespective of file type.
   * @param path path to examine
   * @return true if the path is in a magic dir and the FS has magic writes enabled.
   */
  private boolean isUnderMagicCommitPath(Path path) {
    return committerIntegration.isUnderMagicPath(path);
  }

  /**
   * Increments the statistic {@link Statistic#INVOCATION_GLOB_STATUS}.
   * Override superclass so as to disable symlink resolution as symlinks
   * are not supported by S3A.
   * {@inheritDoc}
   */
  @Override
  public FileStatus[] globStatus(Path pathPattern) throws IOException {
    return globStatus(pathPattern, ACCEPT_ALL);
  }

  /**
   * Increments the statistic {@link Statistic#INVOCATION_GLOB_STATUS}.
   * Override superclass so as to disable symlink resolution as symlinks
   * are not supported by S3A.
   *
   * Although an AuditEntryPoint, the globber itself will talk do
   * the filesystem through the filesystem API, so its operations will
   * all appear part of separate operations.
   * {@inheritDoc}
   */
  @Override
  @AuditEntryPoint
  public FileStatus[] globStatus(
      final Path pathPattern,
      final PathFilter filter)
      throws IOException {
    return trackDurationAndSpan(
        INVOCATION_GLOB_STATUS, pathPattern, () ->
            Globber.createGlobber(this)
                .withPathPattern(pathPattern)
                .withPathFiltern(filter)
                .withResolveSymlinks(false)
                .build()
                .glob());
  }

  /**
   * Override superclass so as to add statistic collection.
   * {@inheritDoc}
   */
  @Override
  @AuditEntryPoint
  public boolean exists(Path f) throws IOException {
    final Path path = qualify(f);
    try {
      trackDurationAndSpan(
          INVOCATION_EXISTS, path, () ->
              innerGetFileStatus(path, false, StatusProbeEnum.ALL));
      return true;
    } catch (FileNotFoundException e) {
      return false;
    }
  }

  /**
   * Optimized probe for a path referencing a dir.
   * Even though it is optimized to a single HEAD, applications
   * should not over-use this method...it is all too common.
   * {@inheritDoc}
   */
  @Override
  @AuditEntryPoint
  @SuppressWarnings("deprecation")
  public boolean isDirectory(Path f) throws IOException {
    final Path path = qualify(f);
    try {
      return trackDurationAndSpan(
          INVOCATION_IS_DIRECTORY, path, () ->
              innerGetFileStatus(path, false, StatusProbeEnum.DIRECTORIES)
                  .isDirectory());
    } catch (FileNotFoundException e) {
      // not found or it is a file.
      return false;
    }
  }

  /**
   * Optimized probe for a path referencing a file.
   * Even though it is optimized to a single HEAD, applications
   * should not over-use this method...it is all too common.
   * {@inheritDoc}
   */
  @Override
  @AuditEntryPoint
  @SuppressWarnings("deprecation")
  public boolean isFile(Path f) throws IOException {
    final Path path = qualify(f);
    try {
      return trackDurationAndSpan(INVOCATION_IS_FILE, path, () ->
          innerGetFileStatus(path, false, StatusProbeEnum.HEAD_ONLY)
              .isFile());
    } catch (FileNotFoundException e) {
      // not found or it is a dir.
      return false;
    }
  }

  /**
   * When enabled, get the etag of a object at the path via HEAD request and
   * return it as a checksum object.
   * <ol>
   *   <li>If a tag has not changed, consider the object unchanged.</li>
   *   <li>Two tags being different does not imply the data is different.</li>
   * </ol>
   * Different S3 implementations may offer different guarantees.
   *
   * This check is (currently) only made if
   * {@link Constants#ETAG_CHECKSUM_ENABLED} is set; turning it on
   * has caused problems with Distcp (HADOOP-15273).
   *
   * @param f The file path
   * @param length The length of the file range for checksum calculation
   * @return The EtagChecksum or null if checksums are not enabled or supported.
   * @throws IOException IO failure
   * @see <a href="http://docs.aws.amazon.com/AmazonS3/latest/API/RESTCommonResponseHeaders.html">Common Response Headers</a>
   */
  @Override
  @Retries.RetryTranslated
  @AuditEntryPoint
  public EtagChecksum getFileChecksum(Path f, final long length)
      throws IOException {
    Preconditions.checkArgument(length >= 0);
    final Path path = qualify(f);
    if (getConf().getBoolean(ETAG_CHECKSUM_ENABLED,
        ETAG_CHECKSUM_ENABLED_DEFAULT)) {
      return trackDurationAndSpan(INVOCATION_GET_FILE_CHECKSUM, path, () -> {
        LOG.debug("getFileChecksum({})", path);
        HeadObjectResponse headers = getObjectMetadata(path, null,
            invoker,
            "getFileChecksum are");
        String eTag = headers.eTag();
        return eTag != null ? new EtagChecksum(eTag) : null;
      });
    } else {
      // disabled
      return null;
    }
  }

  /**
   * Get header processing support.
   * @return a new header processing instance.
   */
  private HeaderProcessing getHeaderProcessing() {
    return new HeaderProcessing(createStoreContext(),
        createHeaderProcessingCallbacks());
  }

  @Override
  @AuditEntryPoint
  public byte[] getXAttr(final Path path, final String name)
      throws IOException {
    checkNotClosed();
    try (AuditSpan span = createSpan(
        INVOCATION_XATTR_GET_NAMED.getSymbol(),
        path.toString(), null)) {
      return getHeaderProcessing().getXAttr(path, name);
    }
  }

  @Override
  @AuditEntryPoint
  public Map<String, byte[]> getXAttrs(final Path path) throws IOException {
    checkNotClosed();
    try (AuditSpan span = createSpan(
        INVOCATION_XATTR_GET_MAP.getSymbol(),
        path.toString(), null)) {
      return getHeaderProcessing().getXAttrs(path);
    }
  }

  @Override
  @AuditEntryPoint
  public Map<String, byte[]> getXAttrs(final Path path,
      final List<String> names)
      throws IOException {
    checkNotClosed();
    try (AuditSpan span = createSpan(
        INVOCATION_XATTR_GET_NAMED_MAP.getSymbol(),
        path.toString(), null)) {
      return getHeaderProcessing().getXAttrs(path, names);
    }
  }

  @Override
  @AuditEntryPoint
  public List<String> listXAttrs(final Path path) throws IOException {
    checkNotClosed();
    try (AuditSpan span = createSpan(
        INVOCATION_OP_XATTR_LIST.getSymbol(),
        path.toString(), null)) {
      return getHeaderProcessing().listXAttrs(path);
    }
  }

  /**
   * Create the callbacks.
   * @return An implementation of the header processing
   * callbacks.
   */
  protected HeaderProcessing.HeaderProcessingCallbacks
      createHeaderProcessingCallbacks() {
    return new HeaderProcessingCallbacksImpl();
  }

  /**
   * Operations needed for Header Processing.
   */
  protected final class HeaderProcessingCallbacksImpl implements
      HeaderProcessing.HeaderProcessingCallbacks {

    @Override
    public HeadObjectResponse getObjectMetadata(final String key)
        throws IOException {
      return once("getObjectMetadata", key, () ->
          S3AFileSystem.this.getObjectMetadata(key));
    }

    @Override
    public HeadBucketResponse getBucketMetadata()
        throws IOException {
      return once("getBucketMetadata", bucket, () ->
          S3AFileSystem.this.getBucketMetadata());
    }
  }
  /**
   * {@inheritDoc}.
   *
   * This implementation is optimized for S3, which can do a bulk listing
   * off all entries under a path in one single operation. Thus there is
   * no need to recursively walk the directory tree.
   *
   * Instead a {@link ListObjectsRequest} is created requesting a (windowed)
   * listing of all entries under the given path. This is used to construct
   * an {@code ObjectListingIterator} instance, iteratively returning the
   * sequence of lists of elements under the path. This is then iterated
   * over in a {@code FileStatusListingIterator}, which generates
   * {@link S3AFileStatus} instances, one per listing entry.
   * These are then translated into {@link LocatedFileStatus} instances.
   *
   * This is essentially a nested and wrapped set of iterators, with some
   * generator classes.
   * @param f a path
   * @param recursive if the subdirectories need to be traversed recursively
   *
   * @return an iterator that traverses statuses of the files/directories
   *         in the given path
   * @throws FileNotFoundException if {@code path} does not exist
   * @throws IOException if any I/O error occurred
   */
  @Override
  @Retries.RetryTranslated
  @AuditEntryPoint
  public RemoteIterator<LocatedFileStatus> listFiles(Path f,
      boolean recursive) throws FileNotFoundException, IOException {
    final Path path = qualify(f);
    return toLocatedFileStatusIterator(
        trackDurationAndSpan(INVOCATION_LIST_FILES, path, () ->
            innerListFiles(path, recursive,
                new Listing.AcceptFilesOnly(path), null)));
  }

  /**
   * Recursive List of files and empty directories.
   * @param f path to list from
   * @param recursive recursive?
   * @return an iterator.
   * @throws IOException failure
   */
  @InterfaceAudience.Private
  @Retries.RetryTranslated
  @AuditEntryPoint
  public RemoteIterator<S3ALocatedFileStatus> listFilesAndEmptyDirectories(
      Path f, boolean recursive) throws IOException {
    final Path path = qualify(f);
    return trackDurationAndSpan(INVOCATION_LIST_FILES, path, () ->
        innerListFiles(path, recursive,
            Listing.ACCEPT_ALL_OBJECTS,
            null));
  }

  /**
   * List files under the path.
   * <ol>
   *   <li>
   *     The optional {@code status} parameter will be used to skip the
   *     initial getFileStatus call.
   *   </li>
   * </ol>
   *
   * @param f path
   * @param recursive recursive listing?
   * @param acceptor file status filter
   * @param status optional status of path to list.
   * @return an iterator over the listing.
   * @throws IOException failure
   */
  @Retries.RetryTranslated
  private RemoteIterator<S3ALocatedFileStatus> innerListFiles(
      final Path f,
      final boolean recursive,
      final Listing.FileStatusAcceptor acceptor,
      final S3AFileStatus status) throws IOException {
    Path path = qualify(f);
    LOG.debug("listFiles({}, {})", path, recursive);
    try {
      // if a status was given and it is a file.
      if (status != null && status.isFile()) {
        // simple case: File
        LOG.debug("Path is a file: {}", path);
        return listing.createSingleStatusIterator(
            toLocatedFileStatus(status));
      }
      // Assuming the path to be a directory
      // do a bulk operation.
      RemoteIterator<S3ALocatedFileStatus> listFilesAssumingDir =
              listing.getListFilesAssumingDir(path,
                      recursive,
                      acceptor,
                  getActiveAuditSpan());
      // If there are no list entries present, we
      // fallback to file existence check as the path
      // can be a file or empty directory.
      if (!listFilesAssumingDir.hasNext()) {
        // If file status was already passed, reuse it.
        final S3AFileStatus fileStatus = status != null
                ? status
                : innerGetFileStatus(path, false, StatusProbeEnum.ALL);
        if (fileStatus.isFile()) {
          return listing.createSingleStatusIterator(
                  toLocatedFileStatus(fileStatus));
        }
      }
      // If we have reached here, it means either there are files
      // in this directory or it is empty.
      return listFilesAssumingDir;
    } catch (SdkException e) {
      throw translateException("listFiles", path, e);
    }
  }

  /**
   * Override superclass so as to add statistic collection.
   * {@inheritDoc}
   */
  @Override
  public RemoteIterator<LocatedFileStatus> listLocatedStatus(Path f)
      throws FileNotFoundException, IOException {
    return listLocatedStatus(f, ACCEPT_ALL);
  }

  /**
   * {@inheritDoc}.
   *
   * S3 Optimized directory listing. The initial operation performs the
   * first bulk listing; extra listings will take place
   * when all the current set of results are used up.
   * @param f a path
   * @param filter a path filter
   * @return an iterator that traverses statuses of the files/directories
   *         in the given path
   * @throws FileNotFoundException if {@code path} does not exist
   * @throws IOException if any I/O error occurred
   */
  @Override
  @Retries.OnceTranslated("s3guard not retrying")
  @AuditEntryPoint
  public RemoteIterator<LocatedFileStatus> listLocatedStatus(final Path f,
      final PathFilter filter)
      throws FileNotFoundException, IOException {
    Path path = qualify(f);
    AuditSpan span = entryPoint(INVOCATION_LIST_LOCATED_STATUS, path);
    LOG.debug("listLocatedStatus({}, {}", path, filter);
    RemoteIterator<? extends LocatedFileStatus> iterator =
        once("listLocatedStatus", path.toString(),
          () -> {
            // Assuming the path to be a directory,
            // trigger a list call directly.
            final RemoteIterator<S3ALocatedFileStatus>
                    locatedFileStatusIteratorForDir =
                    listing.getLocatedFileStatusIteratorForDir(path, filter,
                        span);

            // If no listing is present then path might be a file.
            if (!locatedFileStatusIteratorForDir.hasNext()) {
              final S3AFileStatus fileStatus =
                  innerGetFileStatus(path, false, StatusProbeEnum.ALL);
              if (fileStatus.isFile()) {
                // simple case: File
                LOG.debug("Path is a file");
                return listing.createSingleStatusIterator(
                        filter.accept(path)
                                ? toLocatedFileStatus(fileStatus)
                                : null);
              }
            }
            // Either empty or non-empty directory.
            return locatedFileStatusIteratorForDir;
          });
    return toLocatedFileStatusIterator(iterator);
  }

  /**
   * Build a {@link S3ALocatedFileStatus} from a {@link FileStatus} instance.
   * @param status file status
   * @return a located status with block locations set up from this FS.
   * @throws IOException IO Problems.
   */
  S3ALocatedFileStatus toLocatedFileStatus(S3AFileStatus status)
      throws IOException {
    return new S3ALocatedFileStatus(status,
        status.isFile() ?
          getFileBlockLocations(status, 0, status.getLen())
          : null);
  }

  /**
   * List any pending multipart uploads whose keys begin with prefix, using
   * an iterator that can handle an unlimited number of entries.
   * See {@link #listMultipartUploads(String)} for a non-iterator version of
   * this.
   *
   * @param prefix optional key prefix to search
   * @return Iterator over multipart uploads.
   * @throws IOException on failure
   */
  @InterfaceAudience.Private
  @Retries.RetryTranslated
  @AuditEntryPoint
  public RemoteIterator<MultipartUpload> listUploads(@Nullable String prefix)
      throws IOException {
    // span is picked up retained in the listing.
    checkNotClosed();
    try (AuditSpan span = createSpan(MULTIPART_UPLOAD_LIST.getSymbol(),
        prefix, null)) {
      return listUploadsUnderPrefix(createStoreContext(), prefix);
    }
  }

  /**
   * List any pending multipart uploads whose keys begin with prefix, using
   * an iterator that can handle an unlimited number of entries.
   * See {@link #listMultipartUploads(String)} for a non-iterator version of
   * this.
   * @param storeContext store conext.
   * @param prefix optional key prefix to search
   * @return Iterator over multipart uploads.
   * @throws IOException on failure
   */
  @InterfaceAudience.Private
  @Retries.RetryTranslated
  public RemoteIterator<MultipartUpload> listUploadsUnderPrefix(
      final StoreContext storeContext,
      final @Nullable String prefix)
      throws IOException {
    // span is picked up retained in the listing.
    String p = prefix;
    if (prefix != null && !prefix.isEmpty() && !prefix.endsWith("/")) {
      p = prefix + "/";
    }
    // duration tracking is done in iterator.
    return MultipartUtils.listMultipartUploads(storeContext, getS3Client(), p, maxKeys);
  }

  /**
   * Listing all multipart uploads; limited to the first few hundred.
   * See {@link #listUploads(String)} for an iterator-based version that does
   * not limit the number of entries returned.
   * Retry policy: retry, translated.
   * @return a listing of multipart uploads.
   * @param prefix prefix to scan for, "" for none
   * @throws IOException IO failure, including any uprated SdkException
   */
  @InterfaceAudience.Private
  @Retries.RetryTranslated
  public List<MultipartUpload> listMultipartUploads(String prefix)
      throws IOException {
    // add a trailing / if needed.
    if (prefix != null && !prefix.isEmpty() && !prefix.endsWith("/")) {
      prefix = prefix + "/";
    }
    String p = prefix;
    return invoker.retry("listMultipartUploads", p, true, () -> {
      final ListMultipartUploadsRequest request = getRequestFactory()
          .newListMultipartUploadsRequestBuilder(p).build();
      return trackDuration(getInstrumentation(), MULTIPART_UPLOAD_LIST.getSymbol(), () ->
          getS3Client().listMultipartUploads(request).uploads());
    });
  }

  /**
   * Abort a multipart upload.
   * Retry policy: none.
   * @param destKey destination key
   * @param uploadId Upload ID
   * @throws IOException IO failure, including any uprated SdkException
   */
  @Retries.OnceTranslated
  public void abortMultipartUpload(String destKey, String uploadId) throws IOException {
    LOG.debug("Aborting multipart upload {} to {}", uploadId, destKey);
    trackDuration(getInstrumentation(), OBJECT_MULTIPART_UPLOAD_ABORTED.getSymbol(), () ->
        getS3Client().abortMultipartUpload(
            getRequestFactory().newAbortMultipartUploadRequestBuilder(
                destKey,
                uploadId).build()));
  }

  /**
   * Abort a multipart upload.
   * Retry policy: none.
   * @param upload the listed upload to abort.
   * @throws IOException IO failure, including any uprated SdkException
   */
  @Retries.OnceTranslated
  public void abortMultipartUpload(MultipartUpload upload) throws IOException {
    String destKey = upload.key();
    String uploadId = upload.uploadId();
    if (LOG.isDebugEnabled()) {
      DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
      LOG.debug("Aborting multipart upload {} to {} initiated by {} on {}",
          uploadId, destKey, upload.initiator(),
          df.format(Date.from(upload.initiated())));
    }
    abortMultipartUpload(destKey, uploadId);
  }

  /**
   * Create a new instance of the committer statistics.
   * @return a new committer statistics instance
   */
  public CommitterStatistics newCommitterStatistics() {
    return statisticsContext.newCommitterStatistics();
  }

  @SuppressWarnings("deprecation")
  @Override
  public boolean hasPathCapability(final Path path, final String capability)
      throws IOException {
    final Path p = makeQualified(path);
    String cap = validatePathCapabilityArgs(p, capability);
    switch (cap) {

    case CommitConstants.STORE_CAPABILITY_MAGIC_COMMITTER:
    case CommitConstants.STORE_CAPABILITY_MAGIC_COMMITTER_OLD:
      // capability depends on FS configuration
      return isMagicCommitEnabled();

    case CommonPathCapabilities.FS_CHECKSUMS:
      // capability depends on FS configuration
      return getConf().getBoolean(ETAG_CHECKSUM_ENABLED,
          ETAG_CHECKSUM_ENABLED_DEFAULT);

    case CommonPathCapabilities.ABORTABLE_STREAM:
      return true;
    case CommonPathCapabilities.FS_MULTIPART_UPLOADER:
      // client side encryption doesn't support multipart uploader.
      return !isCSEEnabled;

    // this client is safe to use with buckets
    // containing directory markers anywhere in
    // the hierarchy
    case STORE_CAPABILITY_DIRECTORY_MARKER_AWARE:
      return true;

    case ENABLE_MULTI_DELETE:
      return enableMultiObjectsDelete;

      // Do directory operations purge uploads.
    case DIRECTORY_OPERATIONS_PURGE_UPLOADS:
      return dirOperationsPurgeUploads;

      // this is a v2 sdk release.
    case STORE_CAPABILITY_AWS_V2:
      return true;

      // is this store S3 Express?
      // if so, note that directory listings may be inconsistent
    case STORE_CAPABILITY_S3_EXPRESS_STORAGE:
    case DIRECTORY_LISTING_INCONSISTENT:
      return s3ExpressStore;

    // etags are available in listings, but they
    // are not consistent across renames.
    // therefore, only availability is declared
    case CommonPathCapabilities.ETAGS_AVAILABLE:
      // block locations are generated locally
    case CommonPathCapabilities.VIRTUAL_BLOCK_LOCATIONS:
      return true;

    case STORE_CAPABILITY_DIRECTORY_MARKER_POLICY_KEEP:
    case STORE_CAPABILITY_DIRECTORY_MARKER_ACTION_KEEP:
      return true;
    // never true
    case STORE_CAPABILITY_DIRECTORY_MARKER_POLICY_AUTHORITATIVE:
    case STORE_CAPABILITY_DIRECTORY_MARKER_ACTION_DELETE:
      return false;

    case STORE_CAPABILITY_DIRECTORY_MARKER_MULTIPART_UPLOAD_ENABLED:
      return isMultipartUploadEnabled();

    // create file options
    case FS_S3A_CREATE_PERFORMANCE:
    case FS_S3A_CREATE_HEADER:
      return true;

    // is the FS configured for create file performance
    case FS_S3A_CREATE_PERFORMANCE_ENABLED:
      return performanceFlags.enabled(PerformanceFlagEnum.Create);

      // is the optimized copy from local enabled.
    case OPTIMIZED_COPY_FROM_LOCAL:
      return optimizedCopyFromLocal;

    // probe for a fips endpoint
    case FIPS_ENDPOINT:
      return fipsEnabled;

    // is S3 Access Grants enabled
    case AWS_S3_ACCESS_GRANTS_ENABLED:
      return s3AccessGrantsEnabled;

      // stream leak detection.
    case StreamStatisticNames.STREAM_LEAKS:
      return !prefetchEnabled;

    default:
      // is it a performance flag?
      if (performanceFlags.hasCapability(capability)) {
        return true;
      }
      // fall through
    }

    // hand off to superclass
    return super.hasPathCapability(p, cap);
  }

  /**
   * Return the capabilities of this filesystem instance.
   *
   * This has been supplanted by {@link #hasPathCapability(Path, String)}.
   * @param capability string to query the stream support for.
   * @return whether the FS instance has the capability.
   */
  @Deprecated
  @Override
  public boolean hasCapability(String capability) {
    try {
      return hasPathCapability(new Path("/"), capability);
    } catch (IOException ex) {
      // should never happen, so log and downgrade.
      LOG.debug("Ignoring exception on hasCapability({}})", capability, ex);
      return false;
    }
  }

  /**
   * Get a shared copy of the AWS credentials, with its reference
   * counter updated.
   * Caller is required to call {@code close()} on this after
   * they have finished using it.
   * @param purpose what is this for? This is initially for logging
   * @return a reference to shared credentials.
   */
  public AWSCredentialProviderList shareCredentials(final String purpose) {
    LOG.debug("Sharing credentials for: {}", purpose);
    return credentials.share();
  }

  /**
   * Get the file status of the source file.
   * If in the fileInformation parameter return that
   * if not found, issue a HEAD request, looking for a
   * file only.
   * @param path path of the file to open
   * @param fileInformation information on the file to open
   * @return a file status
   * @throws FileNotFoundException if a HEAD request found no file
   * @throws IOException IO failure
   */
  private S3AFileStatus extractOrFetchSimpleFileStatus(
      final Path path,
      final OpenFileSupport.OpenFileInformation fileInformation)
      throws IOException {
    S3AFileStatus fileStatus = fileInformation.getStatus();
    if (fileStatus == null) {
      // we check here for the passed in status
      // being a directory
      fileStatus = innerGetFileStatus(path, false,
          StatusProbeEnum.HEAD_ONLY);
    }
    if (fileStatus.isDirectory()) {
      throw new FileNotFoundException(path.toString() + " is a directory");
    }

    return fileStatus;
  }

  /**
   * Initiate the open() operation.
   * This is invoked from both the FileSystem and FileContext APIs.
   * It's declared as an audit entry point but the span creation is pushed
   * down into the open operation s it ultimately calls.
   * @param rawPath path to the file
   * @param parameters open file parameters from the builder.
   * @return a future which will evaluate to the opened file.
   * @throws IOException failure to resolve the link.
   * @throws IllegalArgumentException unknown mandatory key
   */
  @Override
  @Retries.RetryTranslated
  @AuditEntryPoint
  public CompletableFuture<FSDataInputStream> openFileWithOptions(
      final Path rawPath,
      final OpenFileParameters parameters) throws IOException {
    final Path path = qualify(rawPath);
    OpenFileSupport.OpenFileInformation fileInformation =
        openFileHelper.prepareToOpenFile(
            path,
            parameters,
            getDefaultBlockSize());
    CompletableFuture<FSDataInputStream> result = new CompletableFuture<>();
    unboundedThreadPool.submit(() ->
        LambdaUtils.eval(result,
            () -> executeOpen(path, fileInformation)));
    return result;
  }

  @Override
  @AuditEntryPoint
  public S3AMultipartUploaderBuilder createMultipartUploader(
      final Path basePath)
      throws IOException {
    if(isCSEEnabled) {
      throw new UnsupportedOperationException("Multi-part uploader not "
          + "supported for Client side encryption.");
    }
    final Path path = makeQualified(basePath);
    try (AuditSpan span = entryPoint(MULTIPART_UPLOAD_INSTANTIATED, path)) {
      StoreContext ctx = createStoreContext();
      return new S3AMultipartUploaderBuilder(this,
          createWriteOperationHelper(span),
          ctx,
          path,
          statisticsContext.createMultipartUploaderStatistics());
    }
  }

  /**
   * Build an immutable store context.
   * If called while the FS is being initialized,
   * some of the context will be incomplete.
   * new store context instances should be created as appropriate.
   * @return the store context of this FS.
   */
  @Override
  @InterfaceAudience.Private
  public StoreContext createStoreContext() {

    // please keep after setFsURI() in alphabetical order
    return new StoreContextBuilder()
        .setFsURI(getUri())
        .setAuditor(getAuditor())
        .setBucket(getBucket())
        .setChangeDetectionPolicy(changeDetectionPolicy)
        .setConfiguration(getConf())
        .setContextAccessors(new ContextAccessorsImpl())
        .setEnableCSE(isCSEEnabled)
        .setExecutor(boundedThreadPool)
        .setExecutorCapacity(executorCapacity)
        .setInputPolicy(getInputPolicy())
        .setInstrumentation(statisticsContext)
        .setInvoker(invoker)
        .setMultiObjectDeleteEnabled(enableMultiObjectsDelete)
        .setOwner(owner)
        .setPerformanceFlags(performanceFlags)
        .setStorageStatistics(getStorageStatistics())
        .setUseListV1(useListV1)
        .setUsername(getUsername())
        .build();
  }

  /**
   * Create a marker tools operations binding for this store.
   * Auditing:
   * @param target target path
   * @return callbacks for operations.
   * @throws IOException if raised during span creation
   */
  @AuditEntryPoint
  @InterfaceAudience.Private
  public MarkerToolOperations createMarkerToolOperations(final String target)
      throws IOException {
    createSpan("marker-tool-scan", target,
        null);
    return new MarkerToolOperationsImpl(new OperationCallbacksImpl(createStoreContext()));
  }

  /**
   * This is purely for testing, as it force initializes all static
   * initializers. See HADOOP-17385 for details.
   */
  @InterfaceAudience.Private
  public static void initializeClass() {
    LOG.debug("Initialize S3A class");
  }

  /**
   * The implementation of context accessors.
   */
  private class ContextAccessorsImpl implements ContextAccessors {

    @Override
    public Path keyToPath(final String key) {
      return keyToQualifiedPath(key);
    }

    @Override
    public String pathToKey(final Path path) {
      return S3AFileSystem.this.pathToKey(path);
    }

    @Override
    public File createTempFile(final String prefix, final long size)
        throws IOException {
      return createTmpFileForWrite(prefix, size, getConf());
    }

    @Override
    public String getBucketLocation() throws IOException {
      return S3AFileSystem.this.getBucketLocation();
    }

    @Override
    public Path makeQualified(final Path path) {
      return S3AFileSystem.this.makeQualified(path);
    }

    @Override
    public AuditSpan getActiveAuditSpan() {
      return S3AFileSystem.this.getActiveAuditSpan();
    }

    @Override
    public RequestFactory getRequestFactory() {
      return S3AFileSystem.this.getRequestFactory();
    }
  }

  /**
   * a method to know if Client side encryption is enabled or not.
   * @return a boolean stating if CSE is enabled.
   */
  public boolean isCSEEnabled() {
    return isCSEEnabled;
  }

  public boolean isMultipartUploadEnabled() {
    return isMultipartUploadEnabled;
  }

  /**
   * S3A implementation to create a bulk delete operation using
   * which actual bulk delete calls can be made.
   * @return an implementation of the bulk delete.
   */
  @Override
  public BulkDelete createBulkDelete(final Path path)
      throws IllegalArgumentException, IOException {

    final Path p = makeQualified(path);
    final AuditSpanS3A span = createSpan("bulkdelete", p.toString(), null);
    final int size = enableMultiObjectsDelete ? pageSize : 1;
    return new BulkDeleteOperation(
        createStoreContext(),
        createBulkDeleteCallbacks(p, size, span),
        p,
        size,
        span);
  }

  /**
   * Create the callbacks for the bulk delete operation.
   * @param path path to delete.
   * @param pageSize page size.
   * @param span span for operations.
   * @return an instance of the Bulk Delete callbacks.
   */
  protected BulkDeleteOperation.BulkDeleteOperationCallbacks createBulkDeleteCallbacks(
      Path path, int pageSize, AuditSpanS3A span) {
    return new BulkDeleteOperationCallbacksImpl(store, pathToKey(path), pageSize, span);
  }

}