EncryptionS3ClientFactory.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.fs.s3a.impl;

import java.io.IOException;
import java.net.URI;

import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.kms.KmsClient;
import software.amazon.awssdk.services.kms.KmsClientBuilder;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.encryption.s3.S3AsyncEncryptionClient;
import software.amazon.encryption.s3.S3EncryptionClient;
import software.amazon.encryption.s3.materials.CryptographicMaterialsManager;
import software.amazon.encryption.s3.materials.DefaultCryptoMaterialsManager;
import software.amazon.encryption.s3.materials.Keyring;
import software.amazon.encryption.s3.materials.KmsKeyring;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.s3a.DefaultS3ClientFactory;
import org.apache.hadoop.util.Preconditions;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.functional.LazyAtomicReference;

import static org.apache.hadoop.fs.s3a.impl.InstantiationIOException.unavailable;

/**
 * Factory class to create encrypted s3 client and encrypted async s3 client.
 */
public class EncryptionS3ClientFactory extends DefaultS3ClientFactory {

  /**
   * Encryption client class name.
   * value: {@value}
   */
  private static final String ENCRYPTION_CLIENT_CLASSNAME =
      "software.amazon.encryption.s3.S3EncryptionClient";

  /**
   * Encryption client availability.
   */
  private static final LazyAtomicReference<Boolean> ENCRYPTION_CLIENT_AVAILABLE =
      LazyAtomicReference.lazyAtomicReferenceFromSupplier(
          EncryptionS3ClientFactory::checkForEncryptionClient
      );


  /**
   * S3Client to be wrapped by encryption client.
   */
  private S3Client s3Client;

  /**
   * S3AsyncClient to be wrapped by encryption client.
   */
  private S3AsyncClient s3AsyncClient;

  /**
   * Checks if {@link #ENCRYPTION_CLIENT_CLASSNAME} is available in the class path.
   * @return true if available, false otherwise.
   */
  private static boolean checkForEncryptionClient() {
    try {
      ClassLoader cl = EncryptionS3ClientFactory.class.getClassLoader();
      cl.loadClass(ENCRYPTION_CLIENT_CLASSNAME);
      LOG.debug("encryption client class {} found", ENCRYPTION_CLIENT_CLASSNAME);
      return true;
    } catch (Exception e) {
      LOG.debug("encryption client class {} not found", ENCRYPTION_CLIENT_CLASSNAME, e);
      return false;
    }
  }

  /**
   * Is the Encryption client available?
   * @return true if it was found in the classloader
   */
  private static synchronized boolean isEncryptionClientAvailable() {
    return ENCRYPTION_CLIENT_AVAILABLE.get();
  }

  /**
   * Creates both synchronous and asynchronous encrypted s3 clients.
   * Synchronous client is wrapped by encryption client first and then
   * Asynchronous client is wrapped by encryption client.
   * @param uri S3A file system URI
   * @param parameters parameter object
   * @return encrypted s3 client
   * @throws IOException IO failures
   */
  @Override
  public S3Client createS3Client(URI uri, S3ClientCreationParameters parameters)
      throws IOException {
    if (!isEncryptionClientAvailable()) {
      throw unavailable(uri, ENCRYPTION_CLIENT_CLASSNAME, null,
          "No encryption client available");
    }

    s3Client = super.createS3Client(uri, parameters);
    s3AsyncClient = super.createS3AsyncClient(uri, parameters);

    return createS3EncryptionClient(parameters);
  }

  /**
   * Create async encrypted s3 client.
   * @param uri S3A file system URI
   * @param parameters parameter object
   * @return async encrypted s3 client
   * @throws IOException IO failures
   */
  @Override
  public S3AsyncClient createS3AsyncClient(URI uri, S3ClientCreationParameters parameters)
      throws IOException {
    if (!isEncryptionClientAvailable()) {
      throw unavailable(uri, ENCRYPTION_CLIENT_CLASSNAME, null,
          "No encryption client available");
    }
    return createS3AsyncEncryptionClient(parameters);
  }

  /**
   * Creates an S3EncryptionClient instance based on the provided parameters.
   *
   * @param parameters The S3ClientCreationParameters containing the necessary configuration.
   * @return An instance of S3EncryptionClient.
   * @throws IOException If an error occurs during the creation of the S3EncryptionClient.
   */
  private S3Client createS3EncryptionClient(S3ClientCreationParameters parameters)
      throws IOException {
    CSEMaterials cseMaterials = parameters.getClientSideEncryptionMaterials();
    Preconditions.checkArgument(s3AsyncClient != null,
        "S3 async client not initialized");
    Preconditions.checkArgument(s3Client != null,
        "S3 client not initialized");
    Preconditions.checkArgument(parameters != null,
        "S3ClientCreationParameters is not initialized");

    S3EncryptionClient.Builder s3EncryptionClientBuilder =
        S3EncryptionClient.builder()
            .wrappedAsyncClient(s3AsyncClient)
            .wrappedClient(s3Client)
            // this is required for doing S3 ranged GET calls
            .enableLegacyUnauthenticatedModes(true)
            // this is required for backward compatibility with older encryption clients
            .enableLegacyWrappingAlgorithms(true);

    switch (cseMaterials.getCseKeyType()) {
    case KMS:
      Keyring kmsKeyring = createKmsKeyring(parameters, cseMaterials);
      CryptographicMaterialsManager kmsCryptoMaterialsManager =
          DefaultCryptoMaterialsManager.builder()
              .keyring(kmsKeyring)
              .build();
      s3EncryptionClientBuilder.cryptoMaterialsManager(kmsCryptoMaterialsManager);
      break;
    case CUSTOM:
      Keyring keyring;
      try {
        keyring =
            getKeyringProvider(cseMaterials.getCustomKeyringClassName(), cseMaterials.getConf());
      } catch (RuntimeException e) {
        throw new IOException("Failed to instantiate a custom keyring provider: " + e, e);
      }
      CryptographicMaterialsManager customCryptoMaterialsManager =
          DefaultCryptoMaterialsManager.builder()
              .keyring(keyring)
              .build();
      s3EncryptionClientBuilder.cryptoMaterialsManager(customCryptoMaterialsManager);
      break;
    default:
      break;
    }
    return s3EncryptionClientBuilder.build();
  }

  /**
   * Creates KmsKeyring instance based on the provided S3ClientCreationParameters and CSEMaterials.
   *
   * @param parameters The S3ClientCreationParameters containing the necessary configuration.
   * @param cseMaterials The CSEMaterials containing the KMS key ID and other encryption materials.
   * @return A KmsKeyring instance configured with the appropriate KMS client and wrapping key ID.
   */
  private Keyring createKmsKeyring(S3ClientCreationParameters parameters,
      CSEMaterials cseMaterials) {
    KmsClientBuilder kmsClientBuilder = KmsClient.builder();
    if (parameters.getCredentialSet() != null) {
      kmsClientBuilder.credentialsProvider(parameters.getCredentialSet());
    }
    // check if kms region is configured.
    if (parameters.getKmsRegion() != null) {
      kmsClientBuilder.region(Region.of(parameters.getKmsRegion()));
    } else if (parameters.getRegion() != null) {
      // fallback to s3 region if kms region is not configured.
      kmsClientBuilder.region(Region.of(parameters.getRegion()));
    } else if (parameters.getEndpoint() != null) {
      // fallback to s3 endpoint config if both kms region and s3 region is not set.
      String endpointStr = parameters.getEndpoint();
      URI endpoint = getS3Endpoint(endpointStr, cseMaterials.getConf());
      kmsClientBuilder.endpointOverride(endpoint);
    }
    return KmsKeyring.builder()
        .kmsClient(kmsClientBuilder.build())
        .wrappingKeyId(cseMaterials.getKmsKeyId())
        // this is required for backward compatibility with older encryption clients
        .enableLegacyWrappingAlgorithms(true)
        .build();
  }

  /**
   * Creates an S3AsyncEncryptionClient instance based on the provided parameters.
   *
   * @param parameters The S3ClientCreationParameters containing the necessary configuration.
   * @return An instance of S3AsyncEncryptionClient.
   * @throws IOException If an error occurs during the creation of the S3AsyncEncryptionClient.
   */
  private S3AsyncClient createS3AsyncEncryptionClient(S3ClientCreationParameters parameters)
      throws IOException {
    Preconditions.checkArgument(s3AsyncClient != null,
        "S3 async client not initialized");
    Preconditions.checkArgument(parameters != null,
        "S3ClientCreationParameters is not initialized");

    S3AsyncEncryptionClient.Builder s3EncryptionAsyncClientBuilder =
        S3AsyncEncryptionClient.builder()
            .wrappedClient(s3AsyncClient)
            // this is required for doing S3 ranged GET calls
            .enableLegacyUnauthenticatedModes(true)
            // this is required for backward compatibility with older encryption clients
            .enableLegacyWrappingAlgorithms(true);

    CSEMaterials cseMaterials = parameters.getClientSideEncryptionMaterials();
    switch (cseMaterials.getCseKeyType()) {
    case KMS:
      Keyring kmsKeyring = createKmsKeyring(parameters, cseMaterials);
      CryptographicMaterialsManager kmsCryptoMaterialsManager =
          DefaultCryptoMaterialsManager.builder()
              .keyring(kmsKeyring)
              .build();
      s3EncryptionAsyncClientBuilder.cryptoMaterialsManager(kmsCryptoMaterialsManager);
      break;
    case CUSTOM:
      Keyring keyring;
      try {
        keyring =
            getKeyringProvider(cseMaterials.getCustomKeyringClassName(), cseMaterials.getConf());
      } catch (RuntimeException e) {
        throw new IOException("Failed to instantiate a custom keyring provider: " + e, e);
      }
      CryptographicMaterialsManager customCryptoMaterialsManager =
          DefaultCryptoMaterialsManager.builder()
              .keyring(keyring)
              .build();
      s3EncryptionAsyncClientBuilder.cryptoMaterialsManager(customCryptoMaterialsManager);
      break;
    default:
      break;
    }
    return s3EncryptionAsyncClientBuilder.build();
  }

  /**
   * Creates and returns a Keyring provider instance based on the given class name.
   *
   * <p>This method attempts to instantiate a Keyring provider using reflection. It first tries
   * to create an instance using the standard ReflectionUtils.newInstance method. If that fails,
   * it falls back to an alternative instantiation method, which is primarily used for testing
   * purposes (specifically for CustomKeyring.java).
   *
   * @param className The fully qualified class name of the Keyring provider to instantiate.
   * @param conf The Configuration object to be passed to the Keyring provider constructor.
   * @return An instance of the specified Keyring provider.
   * @throws RuntimeException If unable to create the Keyring provider instance.
   */
  private Keyring getKeyringProvider(String className, Configuration conf)  {
    Class<? extends Keyring> keyringProviderClass = getCustomKeyringProviderClass(className);
    try {
      return ReflectionUtils.newInstance(keyringProviderClass, conf);
    } catch (Exception e) {
      LOG.warn("Failed to create Keyring provider", e);
      // This is for testing purposes to support CustomKeyring.java
      try {
        return ReflectionUtils.newInstance(keyringProviderClass, conf,
            new Class[] {Configuration.class}, conf);
      } catch (Exception ex) {
        throw new RuntimeException("Failed to create Keyring provider", ex);
      }
    }
  }

  /**
   * Retrieves the Class object for the custom Keyring provider based on the provided class name.
   *
   * @param className The fully qualified class name of the custom Keyring provider implementation.
   * @return The Class object representing the custom Keyring provider implementation.
   * @throws IllegalArgumentException If the provided class name is null or empty,
   * or if the specified class is not found.
   */
  private Class<? extends Keyring> getCustomKeyringProviderClass(String className) {
    Preconditions.checkArgument(className !=null && !className.isEmpty(),
        "Custom Keyring class name is null or empty");
    try {
      return Class.forName(className).asSubclass(Keyring.class);
    } catch (ClassNotFoundException e) {
      throw new IllegalArgumentException(
          "Custom CryptographicMaterialsManager class " + className + "not found", e);
    }
  }
}