AbstractAbfsIntegrationTest.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.fs.azurebfs;

import java.io.IOException;
import java.net.URI;
import java.util.Hashtable;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Assume;
import org.junit.Before;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azurebfs.constants.AbfsServiceType;
import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
import org.apache.hadoop.fs.azurebfs.oauth2.AccessTokenProvider;
import org.apache.hadoop.fs.azurebfs.security.AbfsDelegationTokenManager;
import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream;
import org.apache.hadoop.fs.azurebfs.services.AuthType;
import org.apache.hadoop.fs.azurebfs.services.ITestAbfsClient;
import org.apache.hadoop.fs.azure.AzureNativeFileSystemStore;
import org.apache.hadoop.fs.azure.NativeAzureFileSystem;
import org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation;
import org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes;
import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
import org.apache.hadoop.fs.azurebfs.utils.AzcopyToolHelper;
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderFormat;
import org.apache.hadoop.fs.azurebfs.utils.UriUtils;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.IOUtils;

import static org.apache.hadoop.fs.azure.AzureBlobStorageTestAccount.WASB_ACCOUNT_NAME_DOMAIN_SUFFIX;
import static org.apache.hadoop.fs.azure.NativeAzureFileSystem.APPEND_SUPPORT_ENABLE_PROPERTY_NAME;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.COLON;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.FORWARD_SLASH;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.TRUE;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.*;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.ABFS_BLOB_DOMAIN_NAME;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.ABFS_DFS_DOMAIN_NAME;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.HTTPS_SCHEME;
import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.FILE_SYSTEM_NOT_FOUND;
import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.*;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
import static org.junit.Assume.assumeTrue;

/**
 * Base for AzureBlobFileSystem Integration tests.
 *
 * <I>Important: This is for integration tests only.</I>
 */
public abstract class AbstractAbfsIntegrationTest extends
        AbstractAbfsTestWithTimeout {

  private static final Logger LOG =
      LoggerFactory.getLogger(AbstractAbfsIntegrationTest.class);

  private boolean isIPAddress;
  private NativeAzureFileSystem wasb;
  private AzureBlobFileSystem abfs;
  private String abfsScheme;

  private Configuration rawConfig;
  private AbfsConfiguration abfsConfig;
  private String fileSystemName;
  private String accountName;
  private String testUrl;
  private AuthType authType;
  private boolean useConfiguredFileSystem = false;
  private boolean usingFilesystemForSASTests = false;
  public static final int SHORTENED_GUID_LEN = 12;

  protected AbstractAbfsIntegrationTest() throws Exception {
    fileSystemName = TEST_CONTAINER_PREFIX + UUID.randomUUID().toString();
    rawConfig = new Configuration();
    rawConfig.addResource(TEST_CONFIGURATION_FILE_NAME);

    this.accountName = rawConfig.get(FS_AZURE_ACCOUNT_NAME);
    if (accountName == null) {
      // check if accountName is set using different config key
      accountName = rawConfig.get(FS_AZURE_ABFS_ACCOUNT_NAME);
    }
    assumeTrue("Not set: " + FS_AZURE_ABFS_ACCOUNT_NAME,
            accountName != null && !accountName.isEmpty());

    final String abfsUrl = this.getFileSystemName() + "@" + this.getAccountName();
    URI defaultUri = null;

    abfsConfig = new AbfsConfiguration(rawConfig, accountName, identifyAbfsServiceTypeFromUrl(abfsUrl));

    authType = abfsConfig.getEnum(FS_AZURE_ACCOUNT_AUTH_TYPE_PROPERTY_NAME, AuthType.SharedKey);
    assumeValidAuthConfigsPresent();

    abfsScheme = authType == AuthType.SharedKey ? FileSystemUriSchemes.ABFS_SCHEME
            : FileSystemUriSchemes.ABFS_SECURE_SCHEME;

    try {
      defaultUri = new URI(abfsScheme, abfsUrl, null, null, null);
    } catch (Exception ex) {
      throw new AssertionError(ex);
    }

    this.testUrl = defaultUri.toString();
    abfsConfig.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, defaultUri.toString());
    abfsConfig.setBoolean(AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION, true);
    if (isAppendBlobEnabled()) {
      String appendblobDirs = this.testUrl + "," + abfsConfig.get(FS_AZURE_CONTRACT_TEST_URI);
      rawConfig.set(FS_AZURE_APPEND_BLOB_KEY, appendblobDirs);
    }
    // For testing purposes, an IP address and port may be provided to override
    // the host specified in the FileSystem URI.  Also note that the format of
    // the Azure Storage Service URI changes from
    // http[s]://[account][domain-suffix]/[filesystem] to
    // http[s]://[ip]:[port]/[account]/[filesystem].
    String endPoint = abfsConfig.get(AZURE_ABFS_ENDPOINT);
    if (endPoint != null && endPoint.contains(":") && endPoint.split(":").length == 2) {
      this.isIPAddress = true;
    } else {
      this.isIPAddress = false;
    }

    // For tests, we want to enforce checksum validation so that any regressions can be caught.
    abfsConfig.setIsChecksumValidationEnabled(true);
  }

  protected boolean getIsNamespaceEnabled(AzureBlobFileSystem fs)
      throws IOException {
    return fs.getIsNamespaceEnabled(getTestTracingContext(fs, false));
  }

  public static TracingContext getSampleTracingContext(AzureBlobFileSystem fs,
      boolean needsPrimaryReqId) {
    String correlationId, fsId;
    TracingHeaderFormat format;
    correlationId = "test-corr-id";
    fsId = "test-filesystem-id";
    format = TracingHeaderFormat.ALL_ID_FORMAT;
    return new TracingContext(correlationId, fsId,
        FSOperationType.TEST_OP, needsPrimaryReqId, format, null);
  }

  public TracingContext getTestTracingContext(AzureBlobFileSystem fs,
      boolean needsPrimaryReqId) {
    String correlationId, fsId;
    TracingHeaderFormat format;
    if (fs == null) {
      correlationId = "test-corr-id";
      fsId = "test-filesystem-id";
      format = TracingHeaderFormat.ALL_ID_FORMAT;
    } else {
      AbfsConfiguration abfsConf = fs.getAbfsStore().getAbfsConfiguration();
      correlationId = abfsConf.getClientCorrelationId();
      fsId = fs.getFileSystemId();
      format = abfsConf.getTracingHeaderFormat();
    }
    return new TracingContext(correlationId, fsId,
        FSOperationType.TEST_OP, needsPrimaryReqId, format, null);
  }

  @Before
  public void setup() throws Exception {
    //Create filesystem first to make sure getWasbFileSystem() can return an existing filesystem.
    createFileSystem();

    // Only live account without namespace support can run ABFS&WASB
    // compatibility tests
    if (!isIPAddress && (abfsConfig.getAuthType(accountName) != AuthType.SAS)
        && !abfs.getIsNamespaceEnabled(getTestTracingContext(
            getFileSystem(), false))) {
      final URI wasbUri = new URI(
          abfsUrlToWasbUrl(getTestUrl(), abfsConfig.isHttpsAlwaysUsed()));
      final AzureNativeFileSystemStore azureNativeFileSystemStore =
          new AzureNativeFileSystemStore();

      // update configuration with wasb credentials
      String accountNameWithoutDomain = accountName.split("\\.")[0];
      String wasbAccountName = accountNameWithoutDomain + WASB_ACCOUNT_NAME_DOMAIN_SUFFIX;
      String keyProperty = FS_AZURE_ACCOUNT_KEY + "." + wasbAccountName;
      if (rawConfig.get(keyProperty) == null) {
        rawConfig.set(keyProperty, getAccountKey());
      }
      rawConfig.set(APPEND_SUPPORT_ENABLE_PROPERTY_NAME, TRUE);

      azureNativeFileSystemStore.initialize(
          wasbUri,
          rawConfig,
          new AzureFileSystemInstrumentation(rawConfig));

      wasb = new NativeAzureFileSystem(azureNativeFileSystemStore);
      wasb.initialize(wasbUri, rawConfig);
    }
  }

  @After
  public void teardown() throws Exception {
    try {
      IOUtils.closeStream(wasb);
      wasb = null;

      if (abfs == null) {
        return;
      }
      TracingContext tracingContext = getTestTracingContext(getFileSystem(), false);

      if (usingFilesystemForSASTests) {
        abfsConfig.set(FS_AZURE_ACCOUNT_AUTH_TYPE_PROPERTY_NAME, AuthType.SharedKey.name());
        AzureBlobFileSystem tempFs = (AzureBlobFileSystem) FileSystem.newInstance(rawConfig);
        tempFs.getAbfsStore().deleteFilesystem(tracingContext);
      }
      else if (!useConfiguredFileSystem) {
        // Delete all uniquely created filesystem from the account
        final AzureBlobFileSystemStore abfsStore = abfs.getAbfsStore();
        abfsStore.deleteFilesystem(tracingContext);

        AbfsRestOperationException ex = intercept(AbfsRestOperationException.class,
            new Callable<Hashtable<String, String>>() {
              @Override
              public Hashtable<String, String> call() throws Exception {
                return abfsStore.getFilesystemProperties(tracingContext);
              }
            });
        if (FILE_SYSTEM_NOT_FOUND.getStatusCode() != ex.getStatusCode()) {
          LOG.warn("Deleted test filesystem may still exist: {}", abfs, ex);
        }
      }
    } catch (Exception e) {
      LOG.warn("During cleanup: {}", e, e);
    } finally {
      IOUtils.closeStream(abfs);
      abfs = null;
    }
  }

  public AccessTokenProvider getAccessTokenProvider(final AzureBlobFileSystem fs) {
    return ITestAbfsClient.getAccessTokenProvider(fs.getAbfsStore().getClient());
  }

  public void loadConfiguredFileSystem() throws Exception {
    // disable auto-creation of filesystem
    abfsConfig.setBoolean(AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION,
          false);

    // AbstractAbfsIntegrationTest always uses a new instance of FileSystem,
    // need to disable that and force filesystem provided in test configs.
    assumeValidTestConfigPresent(this.getRawConfiguration(), FS_AZURE_CONTRACT_TEST_URI);

    String[] authorityParts =
        (new URI(rawConfig.get(FS_AZURE_CONTRACT_TEST_URI))).getRawAuthority().split(
      AbfsHttpConstants.AZURE_DISTRIBUTED_FILE_SYSTEM_AUTHORITY_DELIMITER, 2);
    this.fileSystemName = authorityParts[0];

    // Reset URL with configured filesystem
    final String abfsUrl = this.getFileSystemName() + "@" + this.getAccountName();
    URI defaultUri = null;

    defaultUri = new URI(abfsScheme, abfsUrl, null, null, null);

    this.testUrl = defaultUri.toString();
    abfsConfig.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY,
        defaultUri.toString());

    useConfiguredFileSystem = true;
  }

  /**
   * Create a filesystem for SAS tests using the SharedKey authentication.
   * We do not allow filesystem creation with SAS because certain type of SAS do not have
   * required permissions, and it is not known what type of SAS is configured by user.
   * @throws Exception
   */
  protected void createFilesystemForSASTests() throws Exception {
    createFilesystemWithTestFileForSASTests(null);
  }

  /**
   * Create a filesystem for SAS tests along with a test file using SharedKey authentication.
   * We do not allow filesystem creation with SAS because certain type of SAS do not have
   * required permissions, and it is not known what type of SAS is configured by user.
   * @param testPath path of the test file.
   * @throws Exception
   */
  protected void createFilesystemWithTestFileForSASTests(Path testPath) throws Exception {
    try (AzureBlobFileSystem tempFs = (AzureBlobFileSystem) FileSystem.newInstance(rawConfig)){
      ContractTestUtils.assertPathExists(tempFs, "This path should exist",
          new Path("/"));
      if (testPath != null) {
        tempFs.create(testPath).close();
      }
      abfsConfig.set(FS_AZURE_ACCOUNT_AUTH_TYPE_PROPERTY_NAME, AuthType.SAS.name());
      usingFilesystemForSASTests = true;
    }
  }

  public AzureBlobFileSystem getFileSystem() throws IOException {
    return abfs;
  }

  public AzureBlobFileSystem getFileSystem(Configuration configuration) throws Exception{
    final AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.get(configuration);
    return fs;
  }

  public AzureBlobFileSystem getFileSystem(String abfsUri) throws Exception {
    abfsConfig.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, abfsUri);
    final AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.get(rawConfig);
    return fs;
  }

  /**
   * Creates the filesystem; updates the {@link #abfs} field.
   * @return the created filesystem.
   * @throws IOException failure during create/init.
   */
  public AzureBlobFileSystem createFileSystem() throws IOException {
    if (abfs == null) {
      abfs = (AzureBlobFileSystem) FileSystem.newInstance(rawConfig);
    }
    return abfs;
  }


  protected NativeAzureFileSystem getWasbFileSystem() {
    return wasb;
  }

  protected String getHostName() {
    // READ FROM ENDPOINT, THIS IS CALLED ONLY WHEN TESTING AGAINST DEV-FABRIC
    String endPoint = abfsConfig.get(AZURE_ABFS_ENDPOINT);
    return endPoint.split(":")[0];
  }

  protected void setTestUrl(String testUrl) {
    this.testUrl = testUrl;
  }

  protected String getTestUrl() {
    return testUrl;
  }

  protected void setFileSystemName(String fileSystemName) {
    this.fileSystemName = fileSystemName;
  }

  protected String getMethodName() {
    return methodName.getMethodName();
  }

  protected String getFileSystemName() {
    return fileSystemName;
  }

  protected String getAccountName() {
    return this.accountName;
  }

  protected String getAccountKey() {
    return abfsConfig.get(FS_AZURE_ACCOUNT_KEY);
  }

  public AbfsConfiguration getConfiguration() {
    return abfsConfig;
  }

  public AbfsConfiguration getConfiguration(AzureBlobFileSystem fs) {
    return fs.getAbfsStore().getAbfsConfiguration();
  }

  public Map<String, Long> getInstrumentationMap(AzureBlobFileSystem fs) {
    return fs.getInstrumentationMap();
  }

  public Configuration getRawConfiguration() {
    return abfsConfig.getRawConfiguration();
  }

  public AuthType getAuthType() {
    return this.authType;
  }

  public String getAbfsScheme() {
    return this.abfsScheme;
  }

  protected boolean isIPAddress() {
    return isIPAddress;
  }

  /**
   * Write a buffer to a file.
   * @param path path
   * @param buffer buffer
   * @throws IOException failure
   */
  protected void write(Path path, byte[] buffer) throws IOException {
    ContractTestUtils.writeDataset(getFileSystem(), path, buffer, buffer.length,
        CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT, false);
  }

  /**
   * Touch a file in the test store. Will overwrite any existing file.
   * @param path path
   * @throws IOException failure.
   */
  protected void touch(Path path) throws IOException {
    ContractTestUtils.touch(getFileSystem(), path);
  }

  protected static String wasbUrlToAbfsUrl(final String wasbUrl) {
    return convertTestUrls(
        wasbUrl, FileSystemUriSchemes.WASB_SCHEME, FileSystemUriSchemes.WASB_SECURE_SCHEME, FileSystemUriSchemes.WASB_DNS_PREFIX,
        FileSystemUriSchemes.ABFS_SCHEME, FileSystemUriSchemes.ABFS_SECURE_SCHEME, FileSystemUriSchemes.ABFS_DNS_PREFIX, false);
  }

  protected static String abfsUrlToWasbUrl(final String abfsUrl, final boolean isAlwaysHttpsUsed) {
    return convertTestUrls(
        abfsUrl, FileSystemUriSchemes.ABFS_SCHEME, FileSystemUriSchemes.ABFS_SECURE_SCHEME, FileSystemUriSchemes.ABFS_DNS_PREFIX,
        FileSystemUriSchemes.WASB_SCHEME, FileSystemUriSchemes.WASB_SECURE_SCHEME, FileSystemUriSchemes.WASB_DNS_PREFIX, isAlwaysHttpsUsed);
  }

  private AbfsServiceType identifyAbfsServiceTypeFromUrl(String defaultUri) {
    if (defaultUri.contains(ABFS_BLOB_DOMAIN_NAME)) {
      return AbfsServiceType.BLOB;
    }
    return AbfsServiceType.DFS;
  }

  private static String convertTestUrls(
      final String url,
      final String fromNonSecureScheme,
      final String fromSecureScheme,
      final String fromDnsPrefix,
      final String toNonSecureScheme,
      final String toSecureScheme,
      final String toDnsPrefix,
      final boolean isAlwaysHttpsUsed) {
    String data = null;
    if (url.startsWith(fromNonSecureScheme + "://") && isAlwaysHttpsUsed) {
      data = url.replace(fromNonSecureScheme + "://", toSecureScheme + "://");
    } else if (url.startsWith(fromNonSecureScheme + "://")) {
      data = url.replace(fromNonSecureScheme + "://", toNonSecureScheme + "://");
    } else if (url.startsWith(fromSecureScheme + "://")) {
      data = url.replace(fromSecureScheme + "://", toSecureScheme + "://");
    }

    if (data != null) {
      data = data.replace("." + fromDnsPrefix + ".",
          "." + toDnsPrefix + ".");
    }
    return data;
  }

  public Path getTestPath() {
    Path path = new Path(UriUtils.generateUniqueTestPath());
    return path;
  }

  public AzureBlobFileSystemStore getAbfsStore(final AzureBlobFileSystem fs) {
    return fs.getAbfsStore();
  }

  public AbfsClient getAbfsClient(final AzureBlobFileSystemStore abfsStore) {
    return abfsStore.getClient();
  }

  public void setAbfsClient(AzureBlobFileSystemStore abfsStore,
      AbfsClient client) {
    abfsStore.setClient(client);
  }

  public Path makeQualified(Path path) throws java.io.IOException {
    return getFileSystem().makeQualified(path);
  }

  /**
   * Create a path under the test path provided by
   * {@link #getTestPath()}.
   * @param filepath path string in
   * @return a path qualified by the test filesystem
   * @throws IOException IO problems
   */
  protected Path path(String filepath) throws IOException {
    return getFileSystem().makeQualified(
        new Path(getTestPath(), getUniquePath(filepath)));
  }

  /**
   * Generate a unique path using the given filepath.
   * @param filepath path string
   * @return unique path created from filepath and a GUID
   */
  protected Path getUniquePath(String filepath) {
    if (filepath.equals("/")) {
      return new Path(filepath);
    }
    return new Path(filepath + StringUtils
        .right(UUID.randomUUID().toString(), SHORTENED_GUID_LEN));
  }

  /**
   * Get any Delegation Token manager created by the filesystem.
   * @return the DT manager or null.
   * @throws IOException failure
   */
  protected AbfsDelegationTokenManager getDelegationTokenManager()
      throws IOException {
    return getFileSystem().getDelegationTokenManager();
  }

  /**
   * Generic create File and enabling AbfsOutputStream Flush.
   *
   * @param fs   AzureBlobFileSystem that is initialised in the test.
   * @param path Path of the file to be created.
   * @return AbfsOutputStream for writing.
   * @throws AzureBlobFileSystemException
   */
  protected AbfsOutputStream createAbfsOutputStreamWithFlushEnabled(
      AzureBlobFileSystem fs,
      Path path) throws IOException {
    AzureBlobFileSystemStore abfss = fs.getAbfsStore();
    abfss.getAbfsConfiguration().setDisableOutputStreamFlush(false);

    return (AbfsOutputStream) abfss.createFile(path, fs.getFsStatistics(),
        true, FsPermission.getDefault(), FsPermission.getUMask(fs.getConf()),
        getTestTracingContext(fs, false));
  }

  /**
   * Custom assertion for AbfsStatistics which have statistics, expected
   * value and map of statistics and value as its parameters.
   * @param statistic the AbfsStatistics which needs to be asserted.
   * @param expectedValue the expected value of the statistics.
   * @param metricMap map of (String, Long) with statistics name as key and
   *                  statistics value as map value.
   */
  protected long assertAbfsStatistics(AbfsStatistic statistic,
      long expectedValue, Map<String, Long> metricMap) {
    assertEquals("Mismatch in " + statistic.getStatName(), expectedValue,
        (long) metricMap.get(statistic.getStatName()));
    return expectedValue;
  }

  protected void assumeValidTestConfigPresent(final Configuration conf, final String key) {
    String configuredValue = conf.get(accountProperty(key, accountName),
        conf.get(key, ""));
    Assume.assumeTrue(String.format("Missing Required Test Config: %s.", key),
        !configuredValue.isEmpty());
  }

  protected void assumeValidAuthConfigsPresent() {
    final AuthType currentAuthType = getAuthType();
    Assume.assumeFalse(
        "SAS Based Authentication Not Allowed For Integration Tests",
        currentAuthType == AuthType.SAS);
    if (currentAuthType == AuthType.SharedKey) {
      assumeValidTestConfigPresent(getRawConfiguration(), FS_AZURE_ACCOUNT_KEY);
    } else {
      assumeValidTestConfigPresent(getRawConfiguration(),
          FS_AZURE_ACCOUNT_TOKEN_PROVIDER_TYPE_PROPERTY_NAME);
    }
  }

  protected boolean isAppendBlobEnabled() {
    return getRawConfiguration().getBoolean(FS_AZURE_TEST_APPENDBLOB_ENABLED, false);
  }

  protected AbfsServiceType getAbfsServiceType() {
    return abfsConfig.getFsConfiguredServiceType();
  }

  /**
   * Returns the service type to be used for Ingress Operations irrespective of account type.
   * Default value is the same as the service type configured for the file system.
   * @return the service type.
   */
  public AbfsServiceType getIngressServiceType() {
    return abfsConfig.getIngressServiceType();
  }

  /**
   * Create directory with implicit parent directory.
   * @param path path to create. Can be relative or absolute.
   */
  protected void createAzCopyFolder(Path path) throws Exception {
    Assume.assumeTrue(getAbfsServiceType() == AbfsServiceType.BLOB);
    assumeValidTestConfigPresent(getRawConfiguration(), FS_AZURE_TEST_FIXED_SAS_TOKEN);
    String sasToken = getRawConfiguration().get(FS_AZURE_TEST_FIXED_SAS_TOKEN);
    AzcopyToolHelper azcopyHelper = AzcopyToolHelper.getInstance(sasToken);
    azcopyHelper.createFolderUsingAzcopy(getAzcopyAbsolutePath(path));
  }

  /**
   * Create file with implicit parent directory.
   * @param path path to create. Can be relative or absolute.
   */
  protected void createAzCopyFile(Path path) throws Exception {
    Assume.assumeTrue(getAbfsServiceType() == AbfsServiceType.BLOB);
    assumeValidTestConfigPresent(getRawConfiguration(), FS_AZURE_TEST_FIXED_SAS_TOKEN);
    String sasToken = getRawConfiguration().get(FS_AZURE_TEST_FIXED_SAS_TOKEN);
    AzcopyToolHelper azcopyHelper = AzcopyToolHelper.getInstance(sasToken);
    azcopyHelper.createFileUsingAzcopy(getAzcopyAbsolutePath(path));
  }

  private String getAzcopyAbsolutePath(Path path) throws IOException {
    String pathFromContainerRoot = getFileSystem().makeQualified(path).toUri().getPath();
    return HTTPS_SCHEME + COLON + FORWARD_SLASH + FORWARD_SLASH
        + accountName + FORWARD_SLASH + fileSystemName + pathFromContainerRoot;
  }

  /**
   * Utility method to assume that the test is running against a Blob service.
   * Otherwise, the test will be skipped.
   */
  protected void assumeBlobServiceType() {
    Assume.assumeTrue("Blob service type is required for this test",
        getAbfsServiceType() == AbfsServiceType.BLOB);
  }

  /**
   * Utility method to assume that the test is running against a DFS service.
   * Otherwise, the test will be skipped.
   */
  protected void assumeDfsServiceType() {
    Assume.assumeTrue("DFS service type is required for this test",
        getAbfsServiceType() == AbfsServiceType.DFS);
  }

  /**
   * Utility method to assume that the test is running against a HNS Enabled account.
   * Otherwise, the test will be skipped.
   * @throws IOException if an error occurs while checking the account type.
   */
  protected void assumeHnsEnabled() throws IOException {
    assumeHnsEnabled("HNS-Enabled account must be used for this test");
  }

  /**
   * Utility method to assume that the test is running against a HNS Enabled account.
   * @param errorMessage error message to be displayed if the test is skipped.
   * @throws IOException if an error occurs while checking the account type.
   */
  protected void assumeHnsEnabled(String errorMessage) throws IOException {
    Assume.assumeTrue(errorMessage, getIsNamespaceEnabled(getFileSystem()));
  }

  /**
   * Utility method to assume that the test is running against a HNS Disabled account.
   * Otherwise, the test will be skipped.
   * @throws IOException if an error occurs while checking the account type.
   */
  protected void assumeHnsDisabled() throws IOException {
    assumeHnsDisabled("HNS-Enabled account must not be used for this test");
  }

  /**
   * Utility method to assume that the test is running against a HNS Disabled account.
   * @param message error message to be displayed if the test is skipped.
   * @throws IOException if an error occurs while checking the account type.
   */
  protected void assumeHnsDisabled(String message) throws IOException {
    Assume.assumeFalse(message, getIsNamespaceEnabled(getFileSystem()));
  }

  /**
   * Assert that the path contains the expected DNS suffix.
   * If service type is blob, then path should have blob domain name.
   * @param path to be asserted.
   */
  protected void assertPathDns(Path path) {
    String expectedDns = getAbfsServiceType() == AbfsServiceType.BLOB
        ? ABFS_BLOB_DOMAIN_NAME : ABFS_DFS_DOMAIN_NAME;
    Assertions.assertThat(path.toString())
        .describedAs("Path does not contain expected DNS")
        .contains(expectedDns);
  }

  /**
   * Checks a list of futures for exceptions.
   *
   * This method iterates over a list of futures, waits for each task to complete,
   * and handles any exceptions thrown by the lambda expressions. If a
   * RuntimeException is caught, it increments the exceptionCaught counter.
   * If an unexpected exception is caught, it prints the exception to the standard error.
   * Finally, it asserts that no RuntimeExceptions were caught.
   *
   * @param futures The list of futures to check for exceptions.
   */
  protected void checkFuturesForExceptions(List<Future<?>> futures, int exceptionVal) {
    int exceptionCaught = 0;
    for (Future<?> future : futures) {
      try {
        future.get(); // wait for the task to complete and handle any exceptions thrown by the lambda expression
      } catch (ExecutionException e) {
        Throwable cause = e.getCause();
        if (cause instanceof RuntimeException) {
          exceptionCaught++;
        } else {
          System.err.println("Unexpected exception caught: " + cause);
        }
      } catch (InterruptedException e) {
        // handle interruption
      }
    }
    assertEquals(exceptionCaught, exceptionVal);
  }

  /**
   * Assumes that recovery through client transaction ID is enabled.
   * Namespace is enabled for the given AzureBlobFileSystem.
   * Service type is DFS.
   * Assumes that the client transaction ID is enabled in the configuration.
   *
   * @throws IOException in case of an error
   */
  protected void assumeRecoveryThroughClientTransactionID(boolean isCreate)
      throws IOException {
    // Assumes that recovery through client transaction ID is enabled.
    Assume.assumeTrue("Recovery through client transaction ID is not enabled",
        getConfiguration().getIsClientTransactionIdEnabled());
    // Assumes that service type is DFS.
    assumeDfsServiceType();
    // Assumes that namespace is enabled for the given AzureBlobFileSystem.
    assumeHnsEnabled();
    if (isCreate) {
      // Assume that create client is DFS client.
      Assume.assumeTrue("Ingress service type is not DFS",
          AbfsServiceType.DFS.equals(getIngressServiceType()));
      // Assume that append blob is not enabled in DFS client.
      Assume.assumeFalse("Append blob is enabled in DFS client",
          isAppendBlobEnabled());
    }
  }
}