MockUserBoundSASTokenProvider.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.fs.azurebfs.extensions;

import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
import org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys;
import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidUriException;
import org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider;
import org.apache.hadoop.fs.azurebfs.services.AbfsHttpHeader;
import org.apache.hadoop.fs.azurebfs.services.AbfsJdkHttpOperation;
import org.apache.hadoop.fs.azurebfs.utils.Base64;
import org.apache.hadoop.fs.azurebfs.utils.DelegationSASGeneratorVersionJuly5;
import org.apache.hadoop.fs.azurebfs.utils.SASGenerator;
import org.apache.hadoop.security.AccessControlException;

import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APPLICATION_X_WWW_FORM_URLENCODED;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_POST;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_HTTP_CONNECTION_TIMEOUT;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_HTTP_READ_TIMEOUT;

/**
 * A mock user-bound SAS token provider implementation for testing purposes.
 * Provides functionality to generate user delegation SAS tokens for Azure Blob Storage.
 */
public class MockUserBoundSASTokenProvider implements SASTokenProvider {

  // Constants for URLs and endpoints
  private static final String AZURE_BLOB_ENDPOINT_TEMPLATE = "https://%s.blob.core.windows.net/";
  private static final String AZURE_LOGIN_ENDPOINT_TEMPLATE = "https://login.microsoftonline.com/%s/oauth2/v2.0/token";
  private static final String USER_DELEGATION_QUERY_PARAMS = "?restype=service&comp=userdelegationkey";


  // HTTP related constants
  private static final String UTF_8 = StandardCharsets.UTF_8.toString();
  private static final int RESPONSE_BUFFER_SIZE = 4 * 1024;

  public static final String TEST_OWNER = "325f1619-4205-432f-9fce-3fd594325ce5";
  public static final String CORRELATION_ID = "66ff4ffc-ff17-417e-a2a9-45db8c5b0b5c";
  public static final String NO_AGENT_PATH = "NoAgentPath";

  private DelegationSASGeneratorVersionJuly5 generator;

  /**
   * Initializes the SAS token provider with configuration settings.
   *
   * @param configuration Configuration containing Azure storage settings
   * @param accountName The name of the storage account to initialize for
   * @throws IOException if there is an error during initialization
   */
  @Override
  public void initialize(Configuration configuration, String accountName) throws IOException {
    String appID = configuration.get(TestConfigurationKeys.FS_AZURE_TEST_APP_ID);
    String appSecret = configuration.get(TestConfigurationKeys.FS_AZURE_TEST_APP_SECRET);
    String sktid = configuration.get(TestConfigurationKeys.FS_AZURE_TEST_APP_SERVICE_PRINCIPAL_TENANT_ID);
    String skoid = configuration.get(TestConfigurationKeys.FS_AZURE_TEST_APP_SERVICE_PRINCIPAL_OBJECT_ID);
    String skt = SASGenerator.ISO_8601_FORMATTER.format(Instant.now().minus(SASGenerator.FIVE_MINUTES));
    String ske = SASGenerator.ISO_8601_FORMATTER.format(Instant.now().plus(SASGenerator.ONE_DAY));
    String skv = SASGenerator.AuthenticationVersion.Jul5.toString();

    String skdutid = configuration.get(TestConfigurationKeys.FS_AZURE_TEST_END_USER_TENANT_ID);
    String sduoid = configuration.get(TestConfigurationKeys.FS_AZURE_TEST_END_USER_OBJECT_ID);

    byte[] key = getUserDelegationKey(accountName, appID, appSecret, sktid, skt, ske, skv, skdutid);

    generator = new DelegationSASGeneratorVersionJuly5(key, skoid, sktid, skt, ske, skv, skdutid, sduoid);
  }

  /**
   * Gets the authorization header for Azure AD authentication.
   * Invokes the AAD v2.0 authentication endpoint with a client credentials
   * grant to get an access token.
   * See https://docs.microsoft.com/en-us/azure/active-directory/develop/v2-oauth2-client-creds-grant-flow.
   *
   * @param accountName The storage account name
   * @param appID The Azure AD application ID
   * @param appSecret The Azure AD application secret
   * @param sktid The service principal tenant ID
   * @return The authorization header string with bearer token
   * @throws IOException if there is an error getting the authorization token
   */
  private String getAuthorizationHeader(String accountName, String appID, String appSecret, String sktid) throws IOException {
    String authEndPoint = String.format(AZURE_LOGIN_ENDPOINT_TEMPLATE, sktid);
    ClientCredsTokenProvider provider = new ClientCredsTokenProvider(authEndPoint, appID, appSecret);
    return "Bearer " + provider.getToken().getAccessToken();
  }

  /**
   * Retrieves a user delegation key from Azure Storage.
   *
   * @param accountName The storage account name
   * @param appID The Azure AD application ID
   * @param appSecret The Azure AD application secret
   * @param sktid The service principal tenant ID
   * @param skt The start time for the delegation key
   * @param ske The expiry time for the delegation key
   * @param skv The API version for the request
   * @param skdutid The delegated user tenant ID
   * @return The user delegation key as a byte array
   * @throws IOException if there is an error retrieving the delegation key
   */
  private byte[] getUserDelegationKey(String accountName, String appID, String appSecret,
      String sktid, String skt, String ske, String skv, String skdutid) throws IOException {

    String account = accountName.substring(0, accountName.indexOf(AbfsHttpConstants.DOT));
    String baseUrl = String.format(AZURE_BLOB_ENDPOINT_TEMPLATE, account);
    String urlString = baseUrl + USER_DELEGATION_QUERY_PARAMS;

    URL url;
    try {
      url = new URL(urlString);
    } catch (MalformedURLException ex) {
      throw new InvalidUriException(urlString);
    }

    List<AbfsHttpHeader> requestHeaders = new ArrayList<>();
    requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_VERSION, skv));
    requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.CONTENT_TYPE, APPLICATION_X_WWW_FORM_URLENCODED));
    requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.AUTHORIZATION,
        getAuthorizationHeader(account, appID, appSecret, sktid)));

    final StringBuilder requestBody = new StringBuilder(512);
    requestBody.append("<?xml version=\"1.0\" encoding=\"utf-8\"?><KeyInfo><Start>");
    requestBody.append(skt);
    requestBody.append("</Start><Expiry>");
    requestBody.append(ske);
    requestBody.append("</Expiry><DelegatedUserTid>");
    requestBody.append(skdutid);
    requestBody.append("</DelegatedUserTid></KeyInfo>");

    AbfsJdkHttpOperation op = new AbfsJdkHttpOperation(url, HTTP_METHOD_POST, requestHeaders,
        Duration.ofMillis(DEFAULT_HTTP_CONNECTION_TIMEOUT),
        Duration.ofMillis(DEFAULT_HTTP_READ_TIMEOUT), null);

    byte[] requestBuffer = requestBody.toString().getBytes(UTF_8);
    op.sendPayload(requestBuffer, 0, requestBuffer.length);

    byte[] responseBuffer = new byte[RESPONSE_BUFFER_SIZE];
    op.processResponse(responseBuffer, 0, responseBuffer.length);

    String responseBody = new String(responseBuffer, 0, (int) op.getBytesReceived(), UTF_8);
    int beginIndex = responseBody.indexOf("<Value>") + "<Value>".length();
    int endIndex = responseBody.indexOf("</Value>");
    String value = responseBody.substring(beginIndex, endIndex);
    return Base64.decode(value);
  }

  /**
   * {@inheritDoc}
   *
   * @param path the file or directory path.
   * @param operation the operation to be performed on the path.
   * @return a SAS token to perform the request operation.
   * @throws IOException if there is a network error.
   * @throws AccessControlException if access is denied.
   */
  @Override
  public String getSASToken(String accountName, String fileSystem, String path,
      String operation) throws IOException, AccessControlException {
    // Except for the special case where we test without an agent,
    // the user for these tests is always TEST_OWNER.  The check access operation
    // requires suoid to check permissions for the user and will throw if the
    // user does not have access and otherwise succeed.
    String saoid = null;
    String suoid = null;
    if (path == null || !path.endsWith(NO_AGENT_PATH)) {
      saoid = (operation == SASTokenProvider.CHECK_ACCESS_OPERATION) ? null : TEST_OWNER;
      suoid = (operation == SASTokenProvider.CHECK_ACCESS_OPERATION) ? TEST_OWNER : null;
    }

    return generator.getDelegationSAS(accountName, fileSystem, path, operation,
        saoid, suoid, CORRELATION_ID);
  }
}