S3AUtils.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.fs.s3a;

import software.amazon.awssdk.awscore.exception.AwsServiceException;
import software.amazon.awssdk.core.exception.AbortedException;
import software.amazon.awssdk.core.exception.ApiCallAttemptTimeoutException;
import software.amazon.awssdk.core.exception.ApiCallTimeoutException;
import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.core.retry.RetryUtils;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.S3Exception;
import software.amazon.awssdk.services.s3.model.S3Object;

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.PathIOException;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.s3a.impl.S3AEncryption;
import org.apache.hadoop.util.functional.RemoteIterators;
import org.apache.hadoop.fs.s3a.auth.delegation.EncryptionSecrets;
import org.apache.hadoop.fs.s3a.impl.MultiObjectDeleteException;
import org.apache.hadoop.fs.s3native.S3xLoginHelper;
import org.apache.hadoop.net.ConnectTimeoutException;
import org.apache.hadoop.security.ProviderUtils;
import org.apache.hadoop.util.Preconditions;

import org.apache.hadoop.util.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;
import java.io.Closeable;
import java.io.EOFException;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.UncheckedIOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.net.SocketTimeoutException;
import java.net.URI;
import java.nio.file.AccessDeniedException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;

import static org.apache.commons.lang3.StringUtils.isEmpty;
import static org.apache.hadoop.fs.s3a.AWSCredentialProviderList.maybeTranslateCredentialException;
import static org.apache.hadoop.fs.s3a.Constants.*;
import static org.apache.hadoop.fs.s3a.audit.AuditIntegration.maybeTranslateAuditException;
import static org.apache.hadoop.fs.s3a.impl.ErrorTranslation.isUnknownBucket;
import static org.apache.hadoop.fs.s3a.impl.ErrorTranslation.maybeProcessEncryptionClientException;
import static org.apache.hadoop.fs.s3a.impl.InstantiationIOException.instantiationException;
import static org.apache.hadoop.fs.s3a.impl.InstantiationIOException.isAbstract;
import static org.apache.hadoop.fs.s3a.impl.InstantiationIOException.isNotInstanceOf;
import static org.apache.hadoop.fs.s3a.impl.InstantiationIOException.unsupportedConstructor;
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.*;
import static org.apache.hadoop.fs.s3a.impl.ErrorTranslation.maybeExtractIOException;
import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
import static org.apache.hadoop.util.functional.RemoteIterators.filteringRemoteIterator;

/**
 * Utility methods for S3A code.
 * Some methods are marked LimitedPrivate since they are being used in an
 * external project.
 */
@InterfaceAudience.Private
@InterfaceStability.Evolving
public final class S3AUtils {

  private static final Logger LOG = LoggerFactory.getLogger(S3AUtils.class);

  static final String ENDPOINT_KEY = "Endpoint";

  /** Filesystem is closed; kept here to keep the errors close. */
  static final String E_FS_CLOSED = "FileSystem is closed!";

  /**
   * Core property for provider path. Duplicated here for consistent
   * code across Hadoop version: {@value}.
   */
  static final String CREDENTIAL_PROVIDER_PATH =
      "hadoop.security.credential.provider.path";

  /**
   * Encryption SSE-C used but the config lacks an encryption key.
   */
  public static final String SSE_C_NO_KEY_ERROR =
      S3AEncryptionMethods.SSE_C.getMethod()
          + " is enabled but no encryption key was declared in "
          + Constants.S3_ENCRYPTION_KEY;
  /**
   * Encryption SSE-S3 is used but the caller also set an encryption key.
   */
  public static final String SSE_S3_WITH_KEY_ERROR =
      S3AEncryptionMethods.SSE_S3.getMethod()
          + " is enabled but an encryption key was set in "
          + Constants.S3_ENCRYPTION_KEY;
  public static final String EOF_MESSAGE_IN_XML_PARSER
      = "Failed to sanitize XML document destined for handler class";

  public static final String EOF_READ_DIFFERENT_LENGTH
      = "Data read has a different length than the expected";

  private static final String BUCKET_PATTERN = FS_S3A_BUCKET_PREFIX + "%s.%s";

  private S3AUtils() {
  }

  /**
   * Translate an exception raised in an operation into an IOException.
   * The specific type of IOException depends on the class of
   * {@link SdkException} passed in, and any status codes included
   * in the operation. That is: HTTP error codes are examined and can be
   * used to build a more specific response.
   *
   * @see <a href="http://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html">S3 Error responses</a>
   * @see <a href="http://docs.aws.amazon.com/AmazonS3/latest/dev/ErrorBestPractices.html">Amazon S3 Error Best Practices</a>
   * @param operation operation
   * @param path path operated on (must not be null)
   * @param exception amazon exception raised
   * @return an IOE which wraps the caught exception.
   */
  public static IOException translateException(String operation,
      Path path,
      SdkException exception) {
    return translateException(operation, path.toString(), exception);
  }

  /**
   * Translate an exception raised in an operation into an IOException.
   * The specific type of IOException depends on the class of
   * {@link SdkException} passed in, and any status codes included
   * in the operation. That is: HTTP error codes are examined and can be
   * used to build a more specific response.
   * @param operation operation
   * @param path path operated on (may be null)
   * @param exception amazon exception raised
   * @return an IOE which wraps the caught exception.
   */
  @SuppressWarnings("ThrowableInstanceNeverThrown")
  public static IOException translateException(@Nullable String operation,
      @Nullable String path,
      SdkException exception) {
    String message = String.format("%s%s: %s",
        operation,
        StringUtils.isNotEmpty(path)? (" on " + path) : "",
        exception);

    if (path == null || path.isEmpty()) {
      // handle null path by giving it a stub value.
      // not ideal/informative, but ensures that the path is never null in
      // exceptions constructed.
      path = "/";
    }

    exception = maybeProcessEncryptionClientException(exception);

    if (!(exception instanceof AwsServiceException)) {
      // exceptions raised client-side: connectivity, auth, network problems...
      Exception innerCause = containsInterruptedException(exception);
      if (innerCause != null) {
        // interrupted IO, or a socket exception underneath that class
        return translateInterruptedException(exception, innerCause, message);
      }
      if (isMessageTranslatableToEOF(exception)) {
        // call considered an sign of connectivity failure
        return (EOFException)new EOFException(message).initCause(exception);
      }
      // if the exception came from the auditor, hand off translation
      // to it.
      IOException ioe = maybeTranslateAuditException(path, exception);
      if (ioe != null) {
        return ioe;
      }
      ioe = maybeTranslateCredentialException(path, exception);
      if (ioe != null) {
        return ioe;
      }
      // network problems covered by an IOE inside the exception chain.
      ioe = maybeExtractIOException(path, exception, message);
      if (ioe != null) {
        return ioe;
      }
      // timeout issues
      // ApiCallAttemptTimeoutException: a single HTTP request attempt failed.
      // ApiCallTimeoutException: a request with any configured retries failed.
      // The ApiCallTimeoutException exception should be the only one seen in
      // the S3A code, but for due diligence both are handled and mapped to
      // our own AWSApiCallTimeoutException.
      if (exception instanceof ApiCallTimeoutException
          || exception instanceof ApiCallAttemptTimeoutException) {
        // An API call to an AWS service timed out.
        // This is a subclass of ConnectTimeoutException so
        // all retry logic for that exception is handled without
        // having to look down the stack for a
        return new AWSApiCallTimeoutException(message, exception);
      }
      // no custom handling.
      return new AWSClientIOException(message, exception);
    } else {
      // "error response returned by an S3 or other service."
      // These contain more details and should be translated based
      // on the HTTP status code and other details.
      IOException ioe;
      AwsServiceException ase = (AwsServiceException) exception;
      // this exception is non-null if the service exception is an s3 one
      S3Exception s3Exception = ase instanceof S3Exception
          ? (S3Exception) ase
          : null;
      int status = ase.statusCode();
      if (ase.awsErrorDetails() != null) {
        message = message + ":" + ase.awsErrorDetails().errorCode();
      }

      // big switch on the HTTP status code.
      switch (status) {

      case SC_301_MOVED_PERMANENTLY:
      case SC_307_TEMPORARY_REDIRECT:
        if (s3Exception != null) {
          message = String.format("Received permanent redirect response to "
                  + "region %s.  This likely indicates that the S3 region "
                  + "configured in %s does not match the AWS region containing " + "the bucket.",
              s3Exception.awsErrorDetails().sdkHttpResponse().headers().get(BUCKET_REGION_HEADER),
              AWS_REGION);
          ioe = new AWSRedirectException(message, s3Exception);
        } else {
          ioe = new AWSRedirectException(message, ase);
        }
        break;

      case SC_400_BAD_REQUEST:
        ioe = new AWSBadRequestException(message, ase);
        break;

      // permissions
      case SC_401_UNAUTHORIZED:
      case SC_403_FORBIDDEN:
        ioe = new AccessDeniedException(path, null, message);
        ioe.initCause(ase);
        break;

      // the object isn't there
      case SC_404_NOT_FOUND:
        if (isUnknownBucket(ase)) {
          // this is a missing bucket
          ioe = new UnknownStoreException(path, message, ase);
        } else {
          // a normal unknown object.
          // Can also be raised by third-party stores when aborting an unknown multipart upload
          ioe = new FileNotFoundException(message);
          ioe.initCause(ase);
        }
        break;

      // Caused by duplicate create bucket call.
      case SC_409_CONFLICT:
        ioe = new AWSBadRequestException(message, ase);
        break;

      // this also surfaces sometimes and is considered to
      // be ~ a not found exception.
      case SC_410_GONE:
        ioe = new FileNotFoundException(message);
        ioe.initCause(ase);
        break;

      // errors which stores can return from requests which
      // the store does not support.
      case SC_405_METHOD_NOT_ALLOWED:
      case SC_415_UNSUPPORTED_MEDIA_TYPE:
      case SC_501_NOT_IMPLEMENTED:
        ioe = new AWSUnsupportedFeatureException(message, ase);
        break;

      // precondition failure: the object is there, but the precondition
      // (e.g. etag) didn't match. Assume remote file change during
      // rename or status passed in to openfile had an etag which didn't match.
      case SC_412_PRECONDITION_FAILED:
        ioe = new RemoteFileChangedException(path, message, "", ase);
        break;

      // out of range. This may happen if an object is overwritten with
      // a shorter one while it is being read or openFile() was invoked
      // passing a FileStatus or file length less than that of the object.
      // although the HTTP specification says that the response should
      // include a range header specifying the actual range available,
      // this isn't picked up here.
      case SC_416_RANGE_NOT_SATISFIABLE:
        ioe = new RangeNotSatisfiableEOFException(message, ase);
        break;

      // this has surfaced as a "no response from server" message.
      // so rare we haven't replicated it.
      // Treating as an idempotent proxy error.
      case SC_443_NO_RESPONSE:
      case SC_444_NO_RESPONSE:
        ioe = new AWSNoResponseException(message, ase);
        break;

      // throttling
      case SC_429_TOO_MANY_REQUESTS_GCS:    // google cloud through this connector
      case SC_503_SERVICE_UNAVAILABLE:      // AWS
        ioe = new AWSServiceThrottledException(message, ase);
        break;

      // gateway timeout
      case SC_504_GATEWAY_TIMEOUT:
        ioe = new AWSApiCallTimeoutException(message, ase);
        break;

      // internal error
      case SC_500_INTERNAL_SERVER_ERROR:
        ioe = new AWSStatus500Exception(message, ase);
        break;

      case SC_200_OK:
        if (exception instanceof MultiObjectDeleteException) {
          // failure during a bulk delete
          return ((MultiObjectDeleteException) exception)
              .translateException(message);
        }
        // other 200: FALL THROUGH

      default:
        // no specifically handled exit code.

        // convert all unknown 500+ errors to a 500 exception
        if (status > SC_500_INTERNAL_SERVER_ERROR) {
          ioe = new AWSStatus500Exception(message, ase);
          break;
        }

        // Choose an IOE subclass based on the class of the caught exception
        ioe = s3Exception != null
            ? new AWSS3IOException(message, s3Exception)
            : new AWSServiceIOException(message, ase);
        break;
      }
      return ioe;
    }
  }

  /**
   * Extract an exception from a failed future, and convert to an IOE.
   * @param operation operation which failed
   * @param path path operated on (may be null)
   * @param ee execution exception
   * @return an IOE which can be thrown
   */
  public static IOException extractException(String operation,
      String path,
      ExecutionException ee) {
    return convertExceptionCause(operation, path, ee.getCause());
  }

  /**
   * Extract an exception from a failed future, and convert to an IOE.
   * @param operation operation which failed
   * @param path path operated on (may be null)
   * @param ce completion exception
   * @return an IOE which can be thrown
   */
  public static IOException extractException(String operation,
      String path,
      CompletionException ce) {
    return convertExceptionCause(operation, path, ce.getCause());
  }

  /**
   * Convert the cause of a concurrent exception to an IOE.
   * @param operation operation which failed
   * @param path path operated on (may be null)
   * @param cause cause of a concurrent exception
   * @return an IOE which can be thrown
   */
  private static IOException convertExceptionCause(String operation,
      String path,
      Throwable cause) {
    IOException ioe;
    if (cause instanceof SdkException) {
      ioe = translateException(operation, path, (SdkException) cause);
    } else if (cause instanceof IOException) {
      ioe = (IOException) cause;
    } else {
      ioe = new IOException(operation + " failed: " + cause, cause);
    }
    return ioe;
  }

  /**
   * Recurse down the exception loop looking for any inner details about
   * an interrupted exception.
   * @param thrown exception thrown
   * @return the actual exception if the operation was an interrupt
   */
  static Exception containsInterruptedException(Throwable thrown) {
    if (thrown == null) {
      return null;
    }
    if (thrown instanceof InterruptedException ||
        thrown instanceof InterruptedIOException ||
        thrown instanceof AbortedException) {
      return (Exception)thrown;
    }
    // tail recurse
    return containsInterruptedException(thrown.getCause());
  }

  /**
   * Handles translation of interrupted exception. This includes
   * preserving the class of the fault for better retry logic
   * @param exception outer exception
   * @param innerCause inner cause (which is guaranteed to be some form
   * of interrupted exception
   * @param message message for the new exception.
   * @return an IOE which can be rethrown
   */
  private static InterruptedIOException translateInterruptedException(
      SdkException exception,
      final Exception innerCause,
      String message) {
    InterruptedIOException ioe;
    if (innerCause instanceof SocketTimeoutException) {
      ioe = new SocketTimeoutException(message);
    } else {
      String name = innerCause.getClass().getName();
      if (name.endsWith(".ConnectTimeoutException")
          || name.endsWith(".ConnectionPoolTimeoutException")
          || name.endsWith("$ConnectTimeoutException")) {
        // TODO: review in v2
        // TCP connection http timeout from the shaded or unshaded filenames
        // com.amazonaws.thirdparty.apache.http.conn.ConnectTimeoutException
        ioe = new ConnectTimeoutException(message);
      } else {
        // any other exception
        ioe = new InterruptedIOException(message);
      }
    }
    ioe.initCause(exception);
    return ioe;
  }

  /**
   * Is the exception an instance of a throttling exception. That
   * is an AmazonServiceException with a 503 response, an
   * {@link AWSServiceThrottledException},
   * or anything which the AWS SDK's RetryUtils considers to be
   * a throttling exception.
   * @param ex exception to examine
   * @return true if it is considered a throttling exception
   */
  public static boolean isThrottleException(Exception ex) {
    return ex instanceof AWSServiceThrottledException
        || (ex instanceof AwsServiceException
            && 503  == ((AwsServiceException)ex).statusCode())
        || (ex instanceof SdkException
            && RetryUtils.isThrottlingException((SdkException) ex));
  }

  /**
   * Cue that an AWS exception is likely to be an EOF Exception based
   * on the message coming back from the client. This is likely to be
   * brittle, so only a hint.
   * @param ex exception
   * @return true if this is believed to be a sign the connection was broken.
   */
  public static boolean isMessageTranslatableToEOF(SdkException ex) {
    // TODO: review in v2
    return ex.toString().contains(EOF_MESSAGE_IN_XML_PARSER) ||
            ex.toString().contains(EOF_READ_DIFFERENT_LENGTH);
  }

  /**
   * Get low level details of an amazon exception for logging; multi-line.
   * @param e exception
   * @return string details
   */
  public static String stringify(AwsServiceException e) {
    StringBuilder builder = new StringBuilder(
        String.format("%s error %d: %s; %s%s%n",
            e.awsErrorDetails().serviceName(),
            e.statusCode(),
            e.awsErrorDetails().errorCode(),
            e.awsErrorDetails().errorMessage(),
            (e.retryable() ? " (retryable)": "")
        ));
    String rawResponseContent = e.awsErrorDetails().rawResponse().asUtf8String();
    if (rawResponseContent != null) {
      builder.append(rawResponseContent);
    }
    return builder.toString();
  }

  /**
   * Create a files status instance from a listing.
   * @param keyPath path to entry
   * @param s3Object s3Object entry
   * @param blockSize block size to declare.
   * @param owner owner of the file
   * @param eTag S3 object eTag or null if unavailable
   * @param versionId S3 object versionId or null if unavailable
   * @param size s3 object size
   * @return a status entry
   */
  public static S3AFileStatus createFileStatus(Path keyPath,
      S3Object s3Object,
      long blockSize,
      String owner,
      String eTag,
      String versionId,
      long size) {
    return createFileStatus(keyPath,
        objectRepresentsDirectory(s3Object.key()),
        size, Date.from(s3Object.lastModified()), blockSize, owner, eTag, versionId);
  }

  /**
   * Create a file status for object we just uploaded.  For files, we use
   * current time as modification time, since s3a uses S3's service-based
   * modification time, which will not be available until we do a
   * getFileStatus() later on.
   * @param keyPath path for created object
   * @param isDir true iff directory
   * @param size file length
   * @param blockSize block size for file status
   * @param owner Hadoop username
   * @param eTag S3 object eTag or null if unavailable
   * @param versionId S3 object versionId or null if unavailable
   * @return a status entry
   */
  public static S3AFileStatus createUploadFileStatus(Path keyPath,
      boolean isDir, long size, long blockSize, String owner,
      String eTag, String versionId) {
    Date date = isDir ? null : new Date();
    return createFileStatus(keyPath, isDir, size, date, blockSize, owner,
        eTag, versionId);
  }

  /* Date 'modified' is ignored when isDir is true. */
  private static S3AFileStatus createFileStatus(Path keyPath, boolean isDir,
      long size, Date modified, long blockSize, String owner,
      String eTag, String versionId) {
    if (isDir) {
      return new S3AFileStatus(Tristate.UNKNOWN, keyPath, owner);
    } else {
      return new S3AFileStatus(size, dateToLong(modified), keyPath, blockSize,
          owner, eTag, versionId);
    }
  }

  /**
   * Predicate: does the object represent a directory?.
   * @param name object name
   * @return true if it meets the criteria for being an object
   */
  public static boolean objectRepresentsDirectory(final String name) {
    return !name.isEmpty()
        && name.charAt(name.length() - 1) == '/';
  }

  /**
   * Date to long conversion.
   * Handles null Dates that can be returned by AWS by returning 0
   * @param date date from AWS query
   * @return timestamp of the object
   */
  public static long dateToLong(final Date date) {
    if (date == null) {
      return 0L;
    }

    return date.getTime();
  }

  /**
   * Creates an instance of a class using reflection. The
   * class must implement one of the following means of construction, which are
   * attempted in order:
   *
   * <ol>
   * <li>a public constructor accepting java.net.URI and
   *     org.apache.hadoop.conf.Configuration</li>
   * <li>a public constructor accepting
   *    org.apache.hadoop.conf.Configuration</li>
   * <li>a public static method named as per methodName, that accepts no
   *    arguments and returns an instance of
   *    specified type, or</li>
   * <li>a public default constructor.</li>
   * </ol>
   *
   * @param className name of class for which instance is to be created
   * @param conf configuration
   * @param uri URI of the FS
   * @param interfaceImplemented interface that this class implements
   * @param methodName name of factory method to be invoked
   * @param configKey config key under which this class is specified
   * @param <InstanceT> Instance of class
   * @return instance of the specified class
   * @throws IOException on any problem
   */
  @SuppressWarnings("unchecked")
  public static <InstanceT> InstanceT getInstanceFromReflection(String className,
      Configuration conf,
      @Nullable URI uri,
      Class<? extends InstanceT> interfaceImplemented,
      String methodName,
      String configKey) throws IOException {
    try {
      Class<?> instanceClass = S3AUtils.class.getClassLoader().loadClass(className);
      if (Modifier.isAbstract(instanceClass.getModifiers())) {
        throw isAbstract(uri, className, configKey);
      }
      if (!interfaceImplemented.isAssignableFrom(instanceClass)) {
        throw isNotInstanceOf(uri, className, interfaceImplemented.getName(), configKey);

      }
      Constructor cons;
      if (conf != null) {
        // new X(uri, conf)
        cons = getConstructor(instanceClass, URI.class, Configuration.class);

        if (cons != null) {
          return (InstanceT) cons.newInstance(uri, conf);
        }
        // new X(conf)
        cons = getConstructor(instanceClass, Configuration.class);
        if (cons != null) {
          return (InstanceT) cons.newInstance(conf);
        }
      }

      // X.methodName()
      Method factory = getFactoryMethod(instanceClass, interfaceImplemented, methodName);
      if (factory != null) {
        return (InstanceT) factory.invoke(null);
      }

      // new X()
      cons = getConstructor(instanceClass);
      if (cons != null) {
        return (InstanceT) cons.newInstance();
      }

      // no supported constructor or factory method found
      throw unsupportedConstructor(uri, className, configKey);
    } catch (InvocationTargetException e) {
      Throwable targetException = e.getTargetException();
      if (targetException == null) {
        targetException = e;
      }
      if (targetException instanceof IOException) {
        throw (IOException) targetException;
      } else if (targetException instanceof SdkException) {
        throw translateException("Instantiate " + className, "/", (SdkException) targetException);
      } else {
        // supported constructor or factory method found, but the call failed
        throw instantiationException(uri, className, configKey, targetException);
      }
    } catch (ReflectiveOperationException | IllegalArgumentException e) {
      // supported constructor or factory method found, but the call failed
      throw instantiationException(uri, className, configKey, e);
    }
  }


  /**
   * Set a key if the value is non-empty.
   * @param config config to patch
   * @param key key to set
   * @param val value to probe and set
   * @param origin origin
   * @return true if the property was set
   */
  public static boolean setIfDefined(Configuration config, String key,
      String val, String origin) {
    if (StringUtils.isNotEmpty(val)) {
      config.set(key, val, origin);
      return true;
    } else {
      return false;
    }
  }

  /**
   * Return the access key and secret for S3 API use.
   * or indicated in the UserInfo of the name URI param.
   * @param name the URI for which we need the access keys; may be null
   * @param conf the Configuration object to interrogate for keys.
   * @return AWSAccessKeys
   * @throws IOException problems retrieving passwords from KMS.
   */
  public static S3xLoginHelper.Login getAWSAccessKeys(URI name,
      Configuration conf) throws IOException {
    Configuration c = ProviderUtils.excludeIncompatibleCredentialProviders(
        conf, S3AFileSystem.class);
    String bucket = name != null ? name.getHost() : "";

    // get the secrets from the configuration

    // get the access key
    String accessKey = lookupPassword(bucket, c, ACCESS_KEY);

    // and the secret
    String secretKey = lookupPassword(bucket, c, SECRET_KEY);

    return new S3xLoginHelper.Login(accessKey, secretKey);
  }

  /**
   * Get a password from a configuration, including JCEKS files, handling both
   * the absolute key and bucket override.
   * @param bucket bucket or "" if none known
   * @param conf configuration
   * @param baseKey base key to look up, e.g "fs.s3a.secret.key"
   * @param overrideVal override value: if non empty this is used instead of
   * querying the configuration.
   * @return a password or "".
   * @throws IOException on any IO problem
   * @throws IllegalArgumentException bad arguments
   */
  @Deprecated
  public static String lookupPassword(
      String bucket,
      Configuration conf,
      String baseKey,
      String overrideVal)
      throws IOException {
    return lookupPassword(bucket, conf, baseKey, overrideVal, "");
  }

  /**
   * Get a password from a configuration, including JCEKS files, handling both
   * the absolute key and bucket override.
   * @param bucket bucket or "" if none known
   * @param conf configuration
   * @param baseKey base key to look up, e.g "fs.s3a.secret.key"
   * @return a password or "".
   * @throws IOException on any IO problem
   * @throws IllegalArgumentException bad arguments
   */
  public static String lookupPassword(
      String bucket,
      Configuration conf,
      String baseKey)
      throws IOException {
    return lookupPassword(bucket, conf, baseKey, null, "");
  }

  /**
   * Get a password from a configuration, including JCEKS files, handling both
   * the absolute key and bucket override.
   * <br>
   * <i>Note:</i> LimitedPrivate for ranger repository to get secrets.
   * @param bucket bucket or "" if none known
   * @param conf configuration
   * @param baseKey base key to look up, e.g "fs.s3a.secret.key"
   * @param overrideVal override value: if non empty this is used instead of
   * querying the configuration.
   * @param defVal value to return if there is no password
   * @return a password or the value of defVal.
   * @throws IOException on any IO problem
   * @throws IllegalArgumentException bad arguments
   */
  @InterfaceAudience.LimitedPrivate("Ranger")
  public static String lookupPassword(
      String bucket,
      Configuration conf,
      String baseKey,
      String overrideVal,
      String defVal)
      throws IOException {
    String initialVal;
    Preconditions.checkArgument(baseKey.startsWith(FS_S3A_PREFIX),
        "%s does not start with $%s", baseKey, FS_S3A_PREFIX);
    // if there's a bucket, work with it
    if (StringUtils.isNotEmpty(bucket)) {
      String subkey = baseKey.substring(FS_S3A_PREFIX.length());
      String shortBucketKey = String.format(
          BUCKET_PATTERN, bucket, subkey);
      String longBucketKey = String.format(
          BUCKET_PATTERN, bucket, baseKey);

      // set from the long key unless overidden.
      initialVal = getPassword(conf, longBucketKey, overrideVal);
      // then override from the short one if it is set
      initialVal = getPassword(conf, shortBucketKey, initialVal);
    } else {
      // no bucket, make the initial value the override value
      initialVal = overrideVal;
    }
    return getPassword(conf, baseKey, initialVal, defVal);
  }

  /**
   * Get a password from a configuration, or, if a value is passed in,
   * pick that up instead.
   * @param conf configuration
   * @param key key to look up
   * @param val current value: if non empty this is used instead of
   * querying the configuration.
   * @return a password or "".
   * @throws IOException on any problem
   */
  private static String getPassword(Configuration conf, String key, String val)
      throws IOException {
    return getPassword(conf, key, val, "");
  }

  /**
   * Get a password from a configuration, or, if a value is passed in,
   * pick that up instead.
   * @param conf configuration
   * @param key key to look up
   * @param val current value: if non empty this is used instead of
   * querying the configuration.
   * @param defVal default value if nothing is set
   * @return a password or "".
   * @throws IOException on any problem
   */
  private static String getPassword(Configuration conf,
      String key,
      String val,
      String defVal) throws IOException {
    return isEmpty(val)
        ? lookupPassword(conf, key, defVal)
        : val;
  }

  /**
   * Get a password from a configuration/configured credential providers.
   * @param conf configuration
   * @param key key to look up
   * @param defVal value to return if there is no password
   * @return a password or the value in {@code defVal}
   * @throws IOException on any problem
   */
  static String lookupPassword(Configuration conf, String key, String defVal)
      throws IOException {
    try {
      final char[] pass = conf.getPassword(key);
      return pass != null ?
          new String(pass).trim()
          : defVal;
    } catch (IOException ioe) {
      throw new IOException("Cannot find password option " + key, ioe);
    }
  }

  /**
   * String information about a summary entry for debug messages.
   * @param s3Object s3Object entry
   * @return string value
   */
  public static String stringify(S3Object s3Object) {
    StringBuilder builder = new StringBuilder(s3Object.key().length() + 100);
    builder.append("\"").append(s3Object.key()).append("\" ");
    builder.append("size=").append(s3Object.size());
    return builder.toString();
  }

  /**
   * Get a integer option &gt;= the minimum allowed value.
   * @param conf configuration
   * @param key key to look up
   * @param defVal default value
   * @param min minimum value
   * @return the value
   * @throws IllegalArgumentException if the value is below the minimum
   */
  public static int intOption(Configuration conf, String key, int defVal, int min) {
    int v = conf.getInt(key, defVal);
    Preconditions.checkArgument(v >= min,
        String.format("Value of %s: %d is below the minimum value %d",
            key, v, min));
    LOG.debug("Value of {} is {}", key, v);
    return v;
  }

  /**
   * Get a long option &gt;= the minimum allowed value.
   * @param conf configuration
   * @param key key to look up
   * @param defVal default value
   * @param min minimum value
   * @return the value
   * @throws IllegalArgumentException if the value is below the minimum
   */
  public static long longOption(Configuration conf,
      String key,
      long defVal,
      long min) {
    long v = conf.getLong(key, defVal);
    Preconditions.checkArgument(v >= min,
        String.format("Value of %s: %d is below the minimum value %d",
            key, v, min));
    LOG.debug("Value of {} is {}", key, v);
    return v;
  }

  /**
   * Get a long option &gt;= the minimum allowed value, supporting memory
   * prefixes K,M,G,T,P.
   * @param conf configuration
   * @param key key to look up
   * @param defVal default value
   * @param min minimum value
   * @return the value
   * @throws IllegalArgumentException if the value is below the minimum
   */
  public static long longBytesOption(Configuration conf,
                             String key,
                             long defVal,
                             long min) {
    long v = conf.getLongBytes(key, defVal);
    Preconditions.checkArgument(v >= min,
            String.format("Value of %s: %d is below the minimum value %d",
                    key, v, min));
    LOG.debug("Value of {} is {}", key, v);
    return v;
  }

  /**
   * Get a size property from the configuration: this property must
   * be at least equal to {@link Constants#MULTIPART_MIN_SIZE}.
   * If it is too small, it is rounded up to that minimum, and a warning
   * printed.
   * @param conf configuration
   * @param property property name
   * @param defVal default value
   * @return the value, guaranteed to be above the minimum size
   */
  public static long getMultipartSizeProperty(Configuration conf,
      String property, long defVal) {
    long partSize = conf.getLongBytes(property, defVal);
    if (partSize < MULTIPART_MIN_SIZE) {
      LOG.warn("{} must be at least 5 MB; configured value is {}",
          property, partSize);
      partSize = MULTIPART_MIN_SIZE;
    }
    return partSize;
  }

  /**
   * Validates the output stream configuration.
   * @param path path: for error messages
   * @param conf : configuration object for the given context
   * @throws PathIOException Unsupported configuration.
   */
  public static void validateOutputStreamConfiguration(final Path path,
      Configuration conf) throws PathIOException {
    if(!checkDiskBuffer(conf)){
      throw new PathIOException(path.toString(),
          "Unable to create OutputStream with the given"
          + " multipart upload and buffer configuration.");
    }
  }

  /**
   * Check whether the configuration for S3ABlockOutputStream is
   * consistent or not. Multipart uploads allow all kinds of fast buffers to
   * be supported. When the option is disabled only disk buffers are allowed to
   * be used as the file size might be bigger than the buffer size that can be
   * allocated.
   * @param conf : configuration object for the given context
   * @return true if the disk buffer and the multipart settings are supported
   */
  public static boolean checkDiskBuffer(Configuration conf) {
    boolean isMultipartUploadEnabled = conf.getBoolean(MULTIPART_UPLOADS_ENABLED,
        DEFAULT_MULTIPART_UPLOAD_ENABLED);
    return isMultipartUploadEnabled
        || FAST_UPLOAD_BUFFER_DISK.equals(
            conf.get(FAST_UPLOAD_BUFFER, DEFAULT_FAST_UPLOAD_BUFFER));
  }

  /**
   * Ensure that the long value is in the range of an integer.
   * @param name property name for error messages
   * @param size original size
   * @return the size, guaranteed to be less than or equal to the max
   * value of an integer.
   */
  public static int ensureOutputParameterInRange(String name, long size) {
    if (size > Integer.MAX_VALUE) {
      LOG.warn("s3a: {} capped to ~2.14GB" +
          " (maximum allowed size with current output mechanism)", name);
      return Integer.MAX_VALUE;
    } else {
      return (int)size;
    }
  }

  /**
   * Returns the public constructor of {@code cl} specified by the list of
   * {@code args} or {@code null} if {@code cl} has no public constructor that
   * matches that specification.
   * @param cl class
   * @param args constructor argument types
   * @return constructor or null
   */
  private static Constructor<?> getConstructor(Class<?> cl, Class<?>... args) {
    try {
      Constructor cons = cl.getDeclaredConstructor(args);
      return Modifier.isPublic(cons.getModifiers()) ? cons : null;
    } catch (NoSuchMethodException | SecurityException e) {
      return null;
    }
  }

  /**
   * Returns the public static method of {@code cl} that accepts no arguments
   * and returns {@code returnType} specified by {@code methodName} or
   * {@code null} if {@code cl} has no public static method that matches that
   * specification.
   * @param cl class
   * @param returnType return type
   * @param methodName method name
   * @return method or null
   */
  private static Method getFactoryMethod(Class<?> cl, Class<?> returnType,
      String methodName) {
    try {
      Method m = cl.getDeclaredMethod(methodName);
      if (Modifier.isPublic(m.getModifiers()) &&
          Modifier.isStatic(m.getModifiers()) &&
          returnType.isAssignableFrom(m.getReturnType())) {
        return m;
      } else {
        return null;
      }
    } catch (NoSuchMethodException | SecurityException e) {
      return null;
    }
  }

  /**
   * Propagates bucket-specific settings into generic S3A configuration keys.
   * This is done by propagating the values of the form
   * {@code fs.s3a.bucket.${bucket}.key} to
   * {@code fs.s3a.key}, for all values of "key" other than a small set
   * of unmodifiable values.
   *
   * The source of the updated property is set to the key name of the bucket
   * property, to aid in diagnostics of where things came from.
   *
   * Returns a new configuration. Why the clone?
   * You can use the same conf for different filesystems, and the original
   * values are not updated.
   *
   * The {@code fs.s3a.impl} property cannot be set, nor can
   * any with the prefix {@code fs.s3a.bucket}.
   *
   * This method does not propagate security provider path information from
   * the S3A property into the Hadoop common provider: callers must call
   * {@link #patchSecurityCredentialProviders(Configuration)} explicitly.
   *
   * <br>
   * <i>Note:</i> LimitedPrivate for ranger repository to set up
   * per-bucket configurations.
   * @param source Source Configuration object.
   * @param bucket bucket name. Must not be empty.
   * @return a (potentially) patched clone of the original.
   */
  @InterfaceAudience.LimitedPrivate("Ranger")
  public static Configuration propagateBucketOptions(Configuration source,
      String bucket) {

    Preconditions.checkArgument(StringUtils.isNotEmpty(bucket), "bucket is null/empty");
    final String bucketPrefix = FS_S3A_BUCKET_PREFIX + bucket +'.';
    LOG.debug("Propagating entries under {}", bucketPrefix);
    final Configuration dest = new Configuration(source);
    for (Map.Entry<String, String> entry : source) {
      final String key = entry.getKey();
      // get the (unexpanded) value.
      final String value = entry.getValue();
      if (!key.startsWith(bucketPrefix) || bucketPrefix.equals(key)) {
        continue;
      }
      // there's a bucket prefix, so strip it
      final String stripped = key.substring(bucketPrefix.length());
      if (stripped.startsWith("bucket.") || "impl".equals(stripped)) {
        //tell user off
        LOG.debug("Ignoring bucket option {}", key);
      }  else {
        // propagate the value, building a new origin field.
        // to track overwrites, the generic key is overwritten even if
        // already matches the new one.
        String origin = "[" + StringUtils.join(
            source.getPropertySources(key), ", ") +"]";
        final String generic = FS_S3A_PREFIX + stripped;
        LOG.debug("Updating {} from {}", generic, origin);
        dest.set(generic, value, key + " via " + origin);
      }
    }
    return dest;
  }


  /**
   * Delete a path quietly: failures are logged at DEBUG.
   * @param fs filesystem
   * @param path path
   * @param recursive recursive?
   */
  public static void deleteQuietly(FileSystem fs,
      Path path,
      boolean recursive) {
    try {
      fs.delete(path, recursive);
    } catch (IOException e) {
      LOG.debug("Failed to delete {}", path, e);
    }
  }

  /**
   * Delete a path: failures are logged at WARN.
   * @param fs filesystem
   * @param path path
   * @param recursive recursive?
   */
  public static void deleteWithWarning(FileSystem fs,
      Path path,
      boolean recursive) {
    try {
      fs.delete(path, recursive);
    } catch (IOException e) {
      LOG.warn("Failed to delete {}", path, e);
    }
  }

  /**
   * Convert the data of an iterator of {@link S3AFileStatus} to
   * an array.
   * @param iterator a non-null iterator
   * @return a possibly-empty array of file status entries
   * @throws IOException failure
   */
  public static S3AFileStatus[] iteratorToStatuses(
      RemoteIterator<S3AFileStatus> iterator)
      throws IOException {
    S3AFileStatus[] statuses = RemoteIterators
        .toArray(iterator, new S3AFileStatus[0]);
    return statuses;
  }

  /**
   * Get the length of the PUT, verifying that the length is known.
   * @param putObjectRequest a request bound to a file or a stream.
   * @return the request length
   * @throws IllegalArgumentException if the length is negative
   */
  public static long getPutRequestLength(PutObjectRequest putObjectRequest) {
    long len = putObjectRequest.contentLength();

    Preconditions.checkState(len >= 0, "Cannot PUT object of unknown length");
    return len;
  }

  /**
   * An interface for use in lambda-expressions working with
   * directory tree listings.
   */
  @FunctionalInterface
  public interface CallOnLocatedFileStatus {
    void call(LocatedFileStatus status) throws IOException;
  }

  /**
   * An interface for use in lambda-expressions working with
   * directory tree listings.
   */
  @FunctionalInterface
  public interface LocatedFileStatusMap<T> {
    T call(LocatedFileStatus status) throws IOException;
  }

  /**
   * Apply an operation to every {@link LocatedFileStatus} in a remote
   * iterator.
   * @param iterator iterator from a list
   * @param eval closure to evaluate
   * @return the number of files processed
   * @throws IOException anything in the closure, or iteration logic.
   */
  public static long applyLocatedFiles(
      RemoteIterator<? extends LocatedFileStatus> iterator,
      CallOnLocatedFileStatus eval) throws IOException {
    return RemoteIterators.foreach(iterator, eval::call);
  }

  /**
   * Map an operation to every {@link LocatedFileStatus} in a remote
   * iterator, returning a list of the results.
   * @param <T> return type of map
   * @param iterator iterator from a list
   * @param eval closure to evaluate
   * @return the list of mapped results.
   * @throws IOException anything in the closure, or iteration logic.
   */
  public static <T> List<T> mapLocatedFiles(
      RemoteIterator<? extends LocatedFileStatus> iterator,
      LocatedFileStatusMap<T> eval) throws IOException {
    final List<T> results = new ArrayList<>();
    applyLocatedFiles(iterator,
        (s) -> results.add(eval.call(s)));
    return results;
  }

  /**
   * Map an operation to every {@link LocatedFileStatus} in a remote
   * iterator, returning a list of the all results which were not empty.
   * @param <T> return type of map
   * @param iterator iterator from a list
   * @param eval closure to evaluate
   * @return the flattened list of mapped results.
   * @throws IOException anything in the closure, or iteration logic.
   */
  public static <T> List<T> flatmapLocatedFiles(
      RemoteIterator<LocatedFileStatus> iterator,
      LocatedFileStatusMap<Optional<T>> eval) throws IOException {
    final List<T> results = new ArrayList<>();
    applyLocatedFiles(iterator,
        (s) -> eval.call(s).map(r -> results.add(r)));
    return results;
  }

  /**
   * List located files and filter them as a classic listFiles(path, filter)
   * would do.
   * This will be incremental, fetching pages async.
   * While it is rare for job to have many thousands of files, jobs
   * against versioned buckets may return earlier if there are many
   * non-visible objects.
   * @param fileSystem filesystem
   * @param path path to list
   * @param recursive recursive listing?
   * @param filter filter for the filename
   * @return interator over the entries.
   * @throws IOException IO failure.
   */
  public static RemoteIterator<LocatedFileStatus> listAndFilter(FileSystem fileSystem,
      Path path, boolean recursive, PathFilter filter) throws IOException {
    return filteringRemoteIterator(
        fileSystem.listFiles(path, recursive),
        status -> filter.accept(status.getPath()));
  }

  /**
   * Convert a value into a non-empty Optional instance if
   * the value of {@code include} is true.
   * @param include flag to indicate the value is to be included.
   * @param value value to return
   * @param <T> type of option.
   * @return if include is false, Optional.empty. Otherwise, the value.
   */
  public static <T> Optional<T> maybe(boolean include, T value) {
    return include ? Optional.of(value) : Optional.empty();
  }

  /**
   * Patch the security credential provider information in
   * {@link #CREDENTIAL_PROVIDER_PATH}
   * with the providers listed in
   * {@link Constants#S3A_SECURITY_CREDENTIAL_PROVIDER_PATH}.
   *
   * This allows different buckets to use different credential files.
   * @param conf configuration to patch
   */
  static void patchSecurityCredentialProviders(Configuration conf) {
    Collection<String> customCredentials = conf.getStringCollection(
        S3A_SECURITY_CREDENTIAL_PROVIDER_PATH);
    Collection<String> hadoopCredentials = conf.getStringCollection(
        CREDENTIAL_PROVIDER_PATH);
    if (!customCredentials.isEmpty()) {
      List<String> all = Lists.newArrayList(customCredentials);
      all.addAll(hadoopCredentials);
      String joined = StringUtils.join(all, ',');
      LOG.debug("Setting {} to {}", CREDENTIAL_PROVIDER_PATH,
          joined);
      conf.set(CREDENTIAL_PROVIDER_PATH, joined,
          "patch of " + S3A_SECURITY_CREDENTIAL_PROVIDER_PATH);
    }
  }

  /**
   * Lookup a per-bucket-secret from a configuration including JCEKS files.
   * No attempt is made to look for the global configuration.
   * @param bucket bucket or "" if none known
   * @param conf configuration
   * @param baseKey base key to look up, e.g "fs.s3a.secret.key"
   * @return the secret or null.
   * @throws IOException on any IO problem
   * @throws IllegalArgumentException bad arguments
   */
  public static String lookupBucketSecret(
      String bucket,
      Configuration conf,
      String baseKey)
      throws IOException {

    Preconditions.checkArgument(!isEmpty(bucket), "null/empty bucket argument");
    Preconditions.checkArgument(baseKey.startsWith(FS_S3A_PREFIX),
        "%s does not start with $%s", baseKey, FS_S3A_PREFIX);
    String subkey = baseKey.substring(FS_S3A_PREFIX.length());

    // set from the long key unless overidden.
    String longBucketKey = String.format(
        BUCKET_PATTERN, bucket, baseKey);
    String initialVal = getPassword(conf, longBucketKey, null, null);
    // then override from the short one if it is set
    String shortBucketKey = String.format(
        BUCKET_PATTERN, bucket, subkey);
    return getPassword(conf, shortBucketKey, initialVal, null);
  }

  /**
   * Get any S3 encryption key, without propagating exceptions from
   * JCEKs files.
   * @param bucket bucket to query for
   * @param conf configuration to examine
   * @return the encryption key or ""
   * @throws IllegalArgumentException bad arguments.
   */
  public static String getS3EncryptionKey(
      String bucket,
      Configuration conf) {
    try {
      return getS3EncryptionKey(bucket, conf, false);
    } catch (IOException e) {
      // never going to happen, but to make sure, covert to
      // runtime exception
      throw new UncheckedIOException(e);
    }
  }

    /**
     * Get any SSE/CSE key from a configuration/credential provider.
     * This operation handles the case where the option has been
     * set in the provider or configuration to the option
     * {@code SERVER_SIDE_ENCRYPTION_KEY}.
     * IOExceptions raised during retrieval are swallowed.
     * @param bucket bucket to query for
     * @param conf configuration to examine
     * @param propagateExceptions should IO exceptions be rethrown?
     * @return the encryption key or ""
     * @throws IllegalArgumentException bad arguments.
     * @throws IOException if propagateExceptions==true and reading a JCEKS file raised an IOE
     */
  @SuppressWarnings("deprecation")
  public static String getS3EncryptionKey(
      String bucket,
      Configuration conf,
      boolean propagateExceptions) throws IOException {
    try {
      // look up the per-bucket value of the new key,
      // which implicitly includes the deprecation remapping
      String key = lookupBucketSecret(bucket, conf, S3_ENCRYPTION_KEY);
      if (key == null) {
        // old key in bucket, jceks
        key = lookupBucketSecret(bucket, conf, SERVER_SIDE_ENCRYPTION_KEY);
      }
      if (key == null) {
        // new key, global; implicit translation of old key in XML files.
        key = lookupPassword(null, conf, S3_ENCRYPTION_KEY);
      }
      if (key == null) {
        // old key, JCEKS
        key = lookupPassword(null, conf, SERVER_SIDE_ENCRYPTION_KEY);
      }
      if (key == null) {
        // no key, return ""
        key = "";
      }
      return key;
    } catch (IOException e) {
      if (propagateExceptions) {
        throw e;
      }
      LOG.warn("Cannot retrieve {} for bucket {}",
          S3_ENCRYPTION_KEY, bucket, e);
      return "";
    }
  }

  /**
   * Get the server-side encryption or client side encryption algorithm.
   * This includes validation of the configuration, checking the state of
   * the encryption key given the chosen algorithm.
   *
   * @param bucket bucket to query for
   * @param conf configuration to scan
   * @return the encryption mechanism (which will be {@code NONE} unless
   * one is set.
   * @throws IOException on JCKES lookup or invalid method/key configuration.
   */
  public static S3AEncryptionMethods getEncryptionAlgorithm(String bucket,
      Configuration conf) throws IOException {
    return buildEncryptionSecrets(bucket, conf).getEncryptionMethod();
  }

  /**
   * Get the server-side encryption or client side encryption algorithm.
   * This includes validation of the configuration, checking the state of
   * the encryption key given the chosen algorithm.
   *
   * @param bucket bucket to query for
   * @param conf configuration to scan
   * @return the encryption mechanism (which will be {@code NONE} unless
   * one is set and secrets.
   * @throws IOException on JCKES lookup or invalid method/key configuration.
   */
  @SuppressWarnings("deprecation")
  public static EncryptionSecrets buildEncryptionSecrets(String bucket,
      Configuration conf) throws IOException {

    // new key, per-bucket
    // this will include fixup of the old key in config XML entries
    String algorithm = lookupBucketSecret(bucket, conf, S3_ENCRYPTION_ALGORITHM);
    if (algorithm == null) {
      // try the old key, per-bucket setting, which will find JCEKS values
      algorithm = lookupBucketSecret(bucket, conf, SERVER_SIDE_ENCRYPTION_ALGORITHM);
    }
    if (algorithm == null) {
      // new key, global setting
      // this will include fixup of the old key in config XML entries
      algorithm = lookupPassword(null, conf, S3_ENCRYPTION_ALGORITHM);
    }
    if (algorithm == null) {
      // old key, global setting, for JCEKS entries.
      algorithm = lookupPassword(null, conf, SERVER_SIDE_ENCRYPTION_ALGORITHM);
    }
    // now determine the algorithm
    final S3AEncryptionMethods encryptionMethod = S3AEncryptionMethods.getMethod(algorithm);

    // look up the encryption key
    String encryptionKey = getS3EncryptionKey(bucket, conf,
        encryptionMethod.requiresSecret());
    int encryptionKeyLen =
        StringUtils.isBlank(encryptionKey) ? 0 : encryptionKey.length();
    String diagnostics = passwordDiagnostics(encryptionKey, "key");
    String encryptionContext = S3AEncryption.getS3EncryptionContextBase64Encoded(bucket, conf,
        encryptionMethod.requiresSecret());
    switch (encryptionMethod) {
    case SSE_C:
      LOG.debug("Using SSE-C with {}", diagnostics);
      if (encryptionKeyLen == 0) {
        throw new IOException(SSE_C_NO_KEY_ERROR);
      }
      break;

    case SSE_S3:
      if (encryptionKeyLen != 0) {
        throw new IOException(SSE_S3_WITH_KEY_ERROR
            + " (" + diagnostics + ")");
      }
      break;

    case SSE_KMS:
      LOG.debug("Using SSE-KMS with {}",
          diagnostics);
      break;

    case CSE_KMS:
      LOG.debug("Using CSE-KMS with {}",
          diagnostics);
      break;

    case DSSE_KMS:
      LOG.debug("Using DSSE-KMS with {}",
          diagnostics);
      break;

    case NONE:
    default:
      LOG.debug("Data is unencrypted");
      break;
    }
    return new EncryptionSecrets(encryptionMethod, encryptionKey, encryptionContext);
  }

  /**
   * Provide a password diagnostics string.
   * This aims to help diagnostics without revealing significant password details
   * @param pass password
   * @param description description for text, e.g "key" or "password"
   * @return text for use in messages.
   */
  private static String passwordDiagnostics(String pass, String description) {
    if (pass == null) {
      return "null " + description;
    }
    int len = pass.length();
    switch (len) {
    case 0:
      return "empty " + description;
    case 1:
      return description + " of length 1";

    default:
      return description + " of length " + len + " ending with "
          + pass.charAt(len - 1);
    }
  }

  /**
   * Close the Closeable objects and <b>ignore</b> any Exception or
   * null pointers.
   * This is obsolete: use
   * {@link org.apache.hadoop.io.IOUtils#cleanupWithLogger(Logger, Closeable...)}
   * @param log the log to log at debug level. Can be null.
   * @param closeables the objects to close
   */
  @Deprecated
  public static void closeAll(Logger log,
      Closeable... closeables) {
    cleanupWithLogger(log, closeables);
  }

  /**
   * Close the Closeable objects and <b>ignore</b> any Exception or
   * null pointers.
   * (This is the SLF4J equivalent of that in {@code IOUtils}).
   * @param log the log to log at debug level. Can be null.
   * @param closeables the objects to close
   */
  public static void closeAutocloseables(Logger log,
      AutoCloseable... closeables) {
    if (log == null) {
      log = LOG;
    }
    for (AutoCloseable c : closeables) {
      if (c != null) {
        try {
          log.debug("Closing {}", c);
          c.close();
        } catch (Exception e) {
          log.debug("Exception in closing {}", c, e);
        }
      }
    }
  }

  /**
   * Set a bucket-specific property to a particular value.
   * If the generic key passed in has an {@code fs.s3a. prefix},
   * that's stripped off, so that when the the bucket properties are propagated
   * down to the generic values, that value gets copied down.
   * @param conf configuration to set
   * @param bucket bucket name
   * @param genericKey key; can start with "fs.s3a."
   * @param value value to set
   */
  public static void setBucketOption(Configuration conf, String bucket,
      String genericKey, String value) {
    final String baseKey = genericKey.startsWith(FS_S3A_PREFIX) ?
        genericKey.substring(FS_S3A_PREFIX.length())
        : genericKey;
    conf.set(FS_S3A_BUCKET_PREFIX + bucket + '.' + baseKey, value, "S3AUtils");
  }

  /**
   * Clear a bucket-specific property.
   * If the generic key passed in has an {@code fs.s3a. prefix},
   * that's stripped off, so that when the the bucket properties are propagated
   * down to the generic values, that value gets copied down.
   * @param conf configuration to set
   * @param bucket bucket name
   * @param genericKey key; can start with "fs.s3a."
   */
  public static void clearBucketOption(Configuration conf, String bucket,
      String genericKey) {
    final String baseKey = genericKey.startsWith(FS_S3A_PREFIX) ?
        genericKey.substring(FS_S3A_PREFIX.length())
        : genericKey;
    String k = FS_S3A_BUCKET_PREFIX + bucket + '.' + baseKey;
    LOG.debug("Unset {}", k);
    conf.unset(k);
  }

  /**
   * Get a bucket-specific property.
   * If the generic key passed in has an {@code fs.s3a. prefix},
   * that's stripped off.
   * @param conf configuration to set
   * @param bucket bucket name
   * @param genericKey key; can start with "fs.s3a."
   * @return the bucket option, null if there is none
   */
  public static String getBucketOption(Configuration conf, String bucket,
      String genericKey) {
    final String baseKey = genericKey.startsWith(FS_S3A_PREFIX) ?
        genericKey.substring(FS_S3A_PREFIX.length())
        : genericKey;
    return conf.get(FS_S3A_BUCKET_PREFIX + bucket + '.' + baseKey);
  }

  /**
   * Turns a path (relative or otherwise) into an S3 key, adding a trailing
   * "/" if the path is not the root <i>and</i> does not already have a "/"
   * at the end.
   *
   * @param key s3 key or ""
   * @return the with a trailing "/", or, if it is the root key, "",
   */
  public static String maybeAddTrailingSlash(String key) {
    if (!key.isEmpty() && !key.endsWith("/")) {
      return key + '/';
    } else {
      return key;
    }
  }

  /**
   * Path filter which ignores any file which starts with . or _.
   */
  public static final PathFilter HIDDEN_FILE_FILTER = new PathFilter() {
    @Override
    public boolean accept(Path path) {
      String name = path.getName();
      return !name.startsWith("_") && !name.startsWith(".");
    }

    @Override
    public String toString() {
      return "HIDDEN_FILE_FILTER";
    }
  };

  /**
   * A Path filter which accepts all filenames.
   */
  public static final PathFilter ACCEPT_ALL = new PathFilter() {
    @Override
    public boolean accept(Path file) {
      return true;
    }

    @Override
    public String toString() {
      return "ACCEPT_ALL";
    }
  };

  /**
   * Format a byte range for a request header.
   * See https://www.rfc-editor.org/rfc/rfc9110.html#section-14.1.2
   *
   * @param rangeStart the start byte offset
   * @param rangeEnd the end byte offset (inclusive)
   * @return a formatted byte range
   */
  public static String formatRange(long rangeStart, long rangeEnd) {
    return String.format("bytes=%d-%d", rangeStart, rangeEnd);
  }

  /**
   * Get the equal op (=) delimited key-value pairs of the <code>name</code> property as
   * a collection of pair of <code>String</code>s, trimmed of the leading and trailing whitespace
   * after delimiting the <code>name</code> by comma and new line separator.
   * If no such property is specified then empty <code>Map</code> is returned.
   *
   * @param configuration the configuration object.
   * @param name property name.
   * @return property value as a <code>Map</code> of <code>String</code>s, or empty
   * <code>Map</code>.
   */
  public static Map<String, String> getTrimmedStringCollectionSplitByEquals(
      final Configuration configuration,
      final String name) {
    String valueString = configuration.get(name);
    return getTrimmedStringCollectionSplitByEquals(valueString);
  }

  /**
   * Get the equal op (=) delimited key-value pairs of the <code>name</code> property as
   * a collection of pair of <code>String</code>s, trimmed of the leading and trailing whitespace
   * after delimiting the <code>name</code> by comma and new line separator.
   * If no such property is specified then empty <code>Map</code> is returned.
   *
   * @param valueString the string containing the key-value pairs.
   * @return property value as a <code>Map</code> of <code>String</code>s, or empty
   * <code>Map</code>.
   */
  public static Map<String, String> getTrimmedStringCollectionSplitByEquals(
      final String valueString) {
    if (null == valueString) {
      return new HashMap<>();
    }
    return org.apache.hadoop.util.StringUtils
        .getTrimmedStringCollectionSplitByEquals(valueString);
  }


  /**
   * If classloader isolation is {@code true}
   * (through {@link Constants#AWS_S3_CLASSLOADER_ISOLATION}) or not
   * explicitly set, then the classLoader of the input configuration object
   * will be set to the input classloader, otherwise nothing will happen.
   * @param conf configuration object.
   * @param classLoader isolated classLoader.
   */
  static void maybeIsolateClassloader(Configuration conf, ClassLoader classLoader) {
    if (conf.getBoolean(Constants.AWS_S3_CLASSLOADER_ISOLATION,
            Constants.DEFAULT_AWS_S3_CLASSLOADER_ISOLATION)) {
      LOG.debug("Configuration classloader set to S3AFileSystem classloader: {}", classLoader);
      conf.setClassLoader(classLoader);
    } else {
      LOG.debug("Configuration classloader not changed, support classes needed will be loaded " +
                      "from the classloader that instantiated the Configuration object: {}",
              conf.getClassLoader());
    }
  }

}