ITestAbfsClient.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.fs.azurebfs.services;

import java.io.IOException;
import java.net.ProtocolException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.regex.Pattern;

import org.assertj.core.api.Assertions;
import org.junit.Assume;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.mockito.Mockito;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
import org.apache.hadoop.fs.azurebfs.AbfsCountersImpl;
import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore;
import org.apache.hadoop.fs.azurebfs.TestAbfsConfigurationFieldsValidation;
import org.apache.hadoop.fs.azurebfs.constants.AbfsServiceType;
import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
import org.apache.hadoop.fs.azurebfs.constants.HttpOperationType;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsApacheHttpExpect100Exception;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
import org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters;
import org.apache.hadoop.fs.azurebfs.oauth2.AccessTokenProvider;
import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys;
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderFormat;
import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory;
import org.apache.hadoop.test.ReflectionUtils;
import org.apache.http.HttpResponse;

import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
import static org.apache.hadoop.fs.azurebfs.ITestAzureBlobFileSystemListStatus.TEST_CONTINUATION_TOKEN;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APPEND_ACTION;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EXPECT_100_JDK_ERROR;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PATCH;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PUT;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HUNDRED_CONTINUE;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.*;
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.EXPECT;
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_HTTP_METHOD_OVERRIDE;
import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_ACTION;
import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_POSITION;
import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_ABFS_ACCOUNT_NAME;
import static org.apache.hadoop.fs.azurebfs.constants.HttpOperationType.APACHE_HTTP_CLIENT;
import static org.apache.hadoop.fs.azurebfs.constants.HttpOperationType.JDK_HTTP_URL_CONNECTION;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.when;

import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APN_VERSION;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CLIENT_VERSION;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.DOT;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.FORWARD_SLASH;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.JAVA_VENDOR;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.JAVA_VERSION;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.OS_ARCH;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.OS_NAME;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.OS_VERSION;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.SEMICOLON;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.SINGLE_WHITE_SPACE;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_CLUSTER_NAME;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_CLUSTER_TYPE;
import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.TEST_CONFIGURATION_FILE_NAME;

/**
 * Test useragent of abfs client.
 *
 */
@RunWith(Parameterized.class)
public final class ITestAbfsClient extends AbstractAbfsIntegrationTest {

  private static final String ACCOUNT_NAME = "bogusAccountName.dfs.core.windows.net";
  private static final String FS_AZURE_USER_AGENT_PREFIX = "Partner Service";
  private static final String FNS_BLOB_USER_AGENT_IDENTIFIER = "FNS";
  private static final String HUNDRED_CONTINUE_USER_AGENT = SINGLE_WHITE_SPACE + HUNDRED_CONTINUE + SEMICOLON;
  private static final String TEST_PATH = "/testfile";
  public static final int REDUCED_RETRY_COUNT = 2;
  public static final int REDUCED_BACKOFF_INTERVAL = 100;
  public static final int BUFFER_LENGTH = 5;
  public static final int BUFFER_OFFSET = 0;

  private final Pattern userAgentStringPattern;

  @Parameterized.Parameter
  public HttpOperationType httpOperationType;

  @Parameterized.Parameters(name = "{0}")
  public static Iterable<Object[]> params() {
    return Arrays.asList(new Object[][]{
        {HttpOperationType.JDK_HTTP_URL_CONNECTION},
        {APACHE_HTTP_CLIENT}
    });
  }

  public ITestAbfsClient() throws Exception {
    StringBuilder regEx = new StringBuilder();
    regEx.append("^");
    regEx.append(APN_VERSION);
    regEx.append(SINGLE_WHITE_SPACE);
    regEx.append(CLIENT_VERSION);
    regEx.append(SINGLE_WHITE_SPACE);
    regEx.append("\\(");
    regEx.append(System.getProperty(JAVA_VENDOR)
        .replaceAll(SINGLE_WHITE_SPACE, EMPTY_STRING));
    regEx.append(SINGLE_WHITE_SPACE);
    regEx.append("JavaJRE");
    regEx.append(SINGLE_WHITE_SPACE);
    regEx.append(System.getProperty(JAVA_VERSION));
    regEx.append(SEMICOLON);
    regEx.append(SINGLE_WHITE_SPACE);
    regEx.append(System.getProperty(OS_NAME)
        .replaceAll(SINGLE_WHITE_SPACE, EMPTY_STRING));
    regEx.append(SINGLE_WHITE_SPACE);
    regEx.append(System.getProperty(OS_VERSION));
    regEx.append(FORWARD_SLASH);
    regEx.append(System.getProperty(OS_ARCH));
    regEx.append(SEMICOLON);
    regEx.append("([a-zA-Z].*; )?");      // Regex for sslProviderName
    regEx.append("([a-zA-Z].*; )?");      // Regex for tokenProvider
    regEx.append(" ?");
    regEx.append(".+");                   // cluster name
    regEx.append(FORWARD_SLASH);
    regEx.append(".+");            // cluster type
    regEx.append("\\)");
    regEx.append("( .*)?");        //  Regex for user agent prefix
    regEx.append("$");
    this.userAgentStringPattern = Pattern.compile(regEx.toString());
  }

  private String getUserAgentString(AbfsConfiguration config,
      boolean includeSSLProvider) throws IOException, URISyntaxException {
    AbfsCounters abfsCounters = Mockito.spy(new AbfsCountersImpl(new URI("abcd")));
    AbfsClientContext abfsClientContext = new AbfsClientContextBuilder().withAbfsCounters(abfsCounters).build();
    AbfsClient client;
    if (AbfsServiceType.DFS.equals(config.getFsConfiguredServiceType())) {
      client = new AbfsDfsClient(new URL("https://azure.com"), null,
          config, (AccessTokenProvider) null, null, abfsClientContext);
    } else {
      client = new AbfsBlobClient(new URL("https://azure.com"), null,
          config, (AccessTokenProvider) null, null, abfsClientContext);
    }
    String sslProviderName = null;
    if (includeSSLProvider) {
      sslProviderName = DelegatingSSLSocketFactory.getDefaultFactory()
          .getProviderName();
    }
    return client.initializeUserAgent(config, sslProviderName);
  }

  @Test
  public void verifyBasicInfo() throws Exception {
    Assume.assumeTrue(JDK_HTTP_URL_CONNECTION == httpOperationType);
    final Configuration configuration = new Configuration();
    configuration.addResource(TEST_CONFIGURATION_FILE_NAME);
    AbfsConfiguration abfsConfiguration = new AbfsConfiguration(configuration,
        ACCOUNT_NAME);
    verifyBasicInfo(getUserAgentString(abfsConfiguration, false));
  }

  private void verifyBasicInfo(String userAgentStr) {
    Assertions.assertThat(userAgentStr)
        .describedAs("User-Agent string [" + userAgentStr
            + "] should be of the pattern: " + this.userAgentStringPattern.pattern())
        .matches(this.userAgentStringPattern)
        .describedAs("User-Agent string should contain java vendor")
        .contains(System.getProperty(JAVA_VENDOR)
            .replaceAll(SINGLE_WHITE_SPACE, EMPTY_STRING))
        .describedAs("User-Agent string should contain java version")
        .contains(System.getProperty(JAVA_VERSION))
        .describedAs("User-Agent string should contain  OS name")
        .contains(System.getProperty(OS_NAME)
            .replaceAll(SINGLE_WHITE_SPACE, EMPTY_STRING))
        .describedAs("User-Agent string should contain OS version")
        .contains(System.getProperty(OS_VERSION))
        .describedAs("User-Agent string should contain OS arch")
        .contains(System.getProperty(OS_ARCH));
  }

  @Test
  public void verifyUserAgentPrefix()
      throws IOException, IllegalAccessException, URISyntaxException {
    Assume.assumeTrue(JDK_HTTP_URL_CONNECTION == httpOperationType);
    final Configuration configuration = new Configuration();
    configuration.addResource(TEST_CONFIGURATION_FILE_NAME);
    configuration.set(ConfigurationKeys.FS_AZURE_USER_AGENT_PREFIX_KEY, FS_AZURE_USER_AGENT_PREFIX);
    AbfsConfiguration abfsConfiguration = new AbfsConfiguration(configuration,
        ACCOUNT_NAME);
    String userAgentStr = getUserAgentString(abfsConfiguration, false);

    verifyBasicInfo(userAgentStr);
    Assertions.assertThat(userAgentStr)
      .describedAs("User-Agent string should contain " + FS_AZURE_USER_AGENT_PREFIX)
      .contains(FS_AZURE_USER_AGENT_PREFIX);

    configuration.unset(ConfigurationKeys.FS_AZURE_USER_AGENT_PREFIX_KEY);
    abfsConfiguration = new AbfsConfiguration(configuration,
        ACCOUNT_NAME);
    userAgentStr = getUserAgentString(abfsConfiguration, false);

    verifyBasicInfo(userAgentStr);
    Assertions.assertThat(userAgentStr)
      .describedAs("User-Agent string should not contain " + FS_AZURE_USER_AGENT_PREFIX)
      .doesNotContain(FS_AZURE_USER_AGENT_PREFIX);
  }

  /**
   * This method represents a unit test for verifying the behavior of the User-Agent header
   * with respect to the "Expect: 100-continue" header setting in the Azure Blob File System (ABFS) configuration.
   *
   * The test ensures that the User-Agent string includes or excludes specific information based on whether the
   * "Expect: 100-continue" header is enabled or disabled in the configuration.
   *
   */
  @Test
  public void verifyUserAgentExpectHeader()
          throws IOException, IllegalAccessException, URISyntaxException {
    Assume.assumeTrue(JDK_HTTP_URL_CONNECTION == httpOperationType);
    final Configuration configuration = new Configuration();
    configuration.addResource(TEST_CONFIGURATION_FILE_NAME);
    configuration.set(ConfigurationKeys.FS_AZURE_USER_AGENT_PREFIX_KEY, FS_AZURE_USER_AGENT_PREFIX);
    configuration.setBoolean(ConfigurationKeys.FS_AZURE_ACCOUNT_IS_EXPECT_HEADER_ENABLED, true);
    AbfsConfiguration abfsConfiguration = new AbfsConfiguration(configuration,
            ACCOUNT_NAME);
    String userAgentStr = getUserAgentString(abfsConfiguration, false);

    verifyBasicInfo(userAgentStr);
    Assertions.assertThat(userAgentStr)
            .describedAs("User-Agent string should contain " + HUNDRED_CONTINUE_USER_AGENT)
            .contains(HUNDRED_CONTINUE_USER_AGENT);

    configuration.setBoolean(ConfigurationKeys.FS_AZURE_ACCOUNT_IS_EXPECT_HEADER_ENABLED, false);
    abfsConfiguration = new AbfsConfiguration(configuration,
            ACCOUNT_NAME);
    userAgentStr = getUserAgentString(abfsConfiguration, false);

    verifyBasicInfo(userAgentStr);
    Assertions.assertThat(userAgentStr)
            .describedAs("User-Agent string should not contain " + HUNDRED_CONTINUE_USER_AGENT)
            .doesNotContain(HUNDRED_CONTINUE_USER_AGENT);
  }

  @Test
  public void verifyUserAgentWithoutSSLProvider() throws Exception {
    Assume.assumeTrue(JDK_HTTP_URL_CONNECTION == httpOperationType);
    final Configuration configuration = new Configuration();
    configuration.addResource(TEST_CONFIGURATION_FILE_NAME);
    configuration.set(ConfigurationKeys.FS_AZURE_SSL_CHANNEL_MODE_KEY,
        DelegatingSSLSocketFactory.SSLChannelMode.Default_JSSE.name());
    AbfsConfiguration abfsConfiguration = new AbfsConfiguration(configuration,
        ACCOUNT_NAME);
    String userAgentStr = getUserAgentString(abfsConfiguration, true);

    verifyBasicInfo(userAgentStr);
    Assertions.assertThat(userAgentStr)
      .describedAs("User-Agent string should contain sslProvider")
      .contains(DelegatingSSLSocketFactory.getDefaultFactory().getProviderName());

    userAgentStr = getUserAgentString(abfsConfiguration, false);

    verifyBasicInfo(userAgentStr);
    Assertions.assertThat(userAgentStr)
      .describedAs("User-Agent string should not contain sslProvider")
      .doesNotContain(DelegatingSSLSocketFactory.getDefaultFactory().getProviderName());
  }

  @Test
  public void verifyUserAgentClusterName() throws Exception {
    Assume.assumeTrue(JDK_HTTP_URL_CONNECTION == httpOperationType);
    final String clusterName = "testClusterName";
    final Configuration configuration = new Configuration();
    configuration.addResource(TEST_CONFIGURATION_FILE_NAME);
    configuration.set(FS_AZURE_CLUSTER_NAME, clusterName);
    AbfsConfiguration abfsConfiguration = new AbfsConfiguration(configuration,
        ACCOUNT_NAME);
    String userAgentStr = getUserAgentString(abfsConfiguration, false);

    verifyBasicInfo(userAgentStr);
    Assertions.assertThat(userAgentStr)
      .describedAs("User-Agent string should contain cluster name")
      .contains(clusterName);

    configuration.unset(FS_AZURE_CLUSTER_NAME);
    abfsConfiguration = new AbfsConfiguration(configuration,
        ACCOUNT_NAME);
    userAgentStr = getUserAgentString(abfsConfiguration, false);

    verifyBasicInfo(userAgentStr);
    Assertions.assertThat(userAgentStr)
      .describedAs("User-Agent string should not contain cluster name")
      .doesNotContain(clusterName)
      .describedAs("User-Agent string should contain UNKNOWN as cluster name config is absent")
      .contains(DEFAULT_VALUE_UNKNOWN);
  }

  @Test
  public void verifyUserAgentClusterType() throws Exception {
    Assume.assumeTrue(JDK_HTTP_URL_CONNECTION == httpOperationType);
    final String clusterType = "testClusterType";
    final Configuration configuration = new Configuration();
    configuration.addResource(TEST_CONFIGURATION_FILE_NAME);
    configuration.set(FS_AZURE_CLUSTER_TYPE, clusterType);
    AbfsConfiguration abfsConfiguration = new AbfsConfiguration(configuration,
        ACCOUNT_NAME);
    String userAgentStr = getUserAgentString(abfsConfiguration, false);

    verifyBasicInfo(userAgentStr);
    Assertions.assertThat(userAgentStr)
      .describedAs("User-Agent string should contain cluster type")
      .contains(clusterType);

    configuration.unset(FS_AZURE_CLUSTER_TYPE);
    abfsConfiguration = new AbfsConfiguration(configuration,
        ACCOUNT_NAME);
    userAgentStr = getUserAgentString(abfsConfiguration, false);

    verifyBasicInfo(userAgentStr);
    Assertions.assertThat(userAgentStr)
      .describedAs("User-Agent string should not contain cluster type")
      .doesNotContain(clusterType)
      .describedAs("User-Agent string should contain UNKNOWN as cluster type config is absent")
      .contains(DEFAULT_VALUE_UNKNOWN);
  }

  @Test
  // Test to verify the unique identifier in user agent string for FNS-Blob accounts
  public void verifyUserAgentForFNSBlob() throws Exception {
    assumeHnsDisabled();
    assumeBlobServiceType();
    final AzureBlobFileSystem fs = getFileSystem();
    final AbfsConfiguration configuration = fs.getAbfsStore()
        .getAbfsConfiguration();

    String userAgentStr = getUserAgentString(configuration, false);
    verifyBasicInfo(userAgentStr);
    Assertions.assertThat(userAgentStr)
        .describedAs(
            "User-Agent string for FNS accounts on Blob endpoint should contain "
                + FNS_BLOB_USER_AGENT_IDENTIFIER)
        .contains(FNS_BLOB_USER_AGENT_IDENTIFIER);
  }

  @Test
  // Test to verify that the user agent string for non-FNS-Blob accounts
  // does not contain the FNS identifier.
  public void verifyUserAgentForDFS() throws Exception {
    assumeDfsServiceType();
    final AzureBlobFileSystem fs = getFileSystem();
    final AbfsConfiguration configuration = fs.getAbfsStore()
        .getAbfsConfiguration();

    String userAgentStr = getUserAgentString(configuration, false);
    verifyBasicInfo(userAgentStr);
    Assertions.assertThat(userAgentStr)
        .describedAs(
            "User-Agent string for non-FNS-Blob accounts should not contain"
                + FNS_BLOB_USER_AGENT_IDENTIFIER)
        .doesNotContain(FNS_BLOB_USER_AGENT_IDENTIFIER);
  }

  public static AbfsClient createTestClientFromCurrentContext(
      AbfsClient baseAbfsClientInstance,
      AbfsConfiguration abfsConfig) throws IOException, URISyntaxException {
    AuthType currentAuthType = abfsConfig.getAuthType(
        abfsConfig.getAccountName());

    AbfsPerfTracker tracker = new AbfsPerfTracker("test",
        abfsConfig.getAccountName(),
        abfsConfig);
    AbfsCounters abfsCounters = Mockito.spy(new AbfsCountersImpl(new URI("abcd")));

    AbfsClientContext abfsClientContext =
        new AbfsClientContextBuilder().withAbfsPerfTracker(tracker)
                                .withExponentialRetryPolicy(
                                    new ExponentialRetryPolicy(abfsConfig.getMaxIoRetries()))
                                .withAbfsCounters(abfsCounters)
                                .build();

    // Create test AbfsClient
    AbfsClient testClient;
    if (AbfsServiceType.DFS.equals(abfsConfig.getFsConfiguredServiceType())) {
      testClient = new AbfsDfsClient(
          baseAbfsClientInstance.getBaseUrl(),
          (currentAuthType == AuthType.SharedKey
              ? new SharedKeyCredentials(
              abfsConfig.getAccountName().substring(0,
                  abfsConfig.getAccountName().indexOf(DOT)),
              abfsConfig.getStorageAccountKey())
              : null),
          abfsConfig,
          (currentAuthType == AuthType.OAuth
              ? abfsConfig.getTokenProvider()
              : null),
          null,
          abfsClientContext);
    } else {
      testClient = new AbfsBlobClient(
          baseAbfsClientInstance.getBaseUrl(),
          (currentAuthType == AuthType.SharedKey
              ? new SharedKeyCredentials(
              abfsConfig.getAccountName().substring(0,
                  abfsConfig.getAccountName().indexOf(DOT)),
              abfsConfig.getStorageAccountKey())
              : null),
          abfsConfig,
          (currentAuthType == AuthType.OAuth
              ? abfsConfig.getTokenProvider()
              : null),
          null,
          abfsClientContext);
    }

    return testClient;
  }

  public static AbfsClient createBlobClientFromCurrentContext(
      AbfsClient baseAbfsClientInstance,
      AbfsConfiguration abfsConfig) throws IOException, URISyntaxException {
    AuthType currentAuthType = abfsConfig.getAuthType(
        abfsConfig.getAccountName());

    AbfsPerfTracker tracker = new AbfsPerfTracker("test",
        abfsConfig.getAccountName(),
        abfsConfig);
    AbfsCounters abfsCounters = Mockito.spy(new AbfsCountersImpl(new URI("abcd")));

    AbfsClientContext abfsClientContext =
        new AbfsClientContextBuilder().withAbfsPerfTracker(tracker)
            .withExponentialRetryPolicy(
                new ExponentialRetryPolicy(abfsConfig.getMaxIoRetries()))
            .withAbfsCounters(abfsCounters)
            .build();

    AbfsClient testClient = new AbfsBlobClient(
        baseAbfsClientInstance.getBaseUrl(),
        (currentAuthType == AuthType.SharedKey
            ? new SharedKeyCredentials(
            abfsConfig.getAccountName().substring(0,
                abfsConfig.getAccountName().indexOf(DOT)),
            abfsConfig.getStorageAccountKey())
            : null),
        abfsConfig,
        (currentAuthType == AuthType.OAuth
            ? abfsConfig.getTokenProvider()
            : null),
        null,
        abfsClientContext);

    return testClient;
  }

  public static AbfsClient getMockAbfsClient(AbfsClient baseAbfsClientInstance,
      AbfsConfiguration abfsConfig) throws Exception {
    AuthType currentAuthType = abfsConfig.getAuthType(
        abfsConfig.getAccountName());
    AbfsCounters abfsCounters = Mockito.spy(new AbfsCountersImpl(new URI("abcd")));

    org.junit.Assume.assumeTrue(
        (currentAuthType == AuthType.SharedKey)
        || (currentAuthType == AuthType.OAuth));

    AbfsClient client;
    if (AbfsServiceType.DFS.equals(abfsConfig.getFsConfiguredServiceType())) {
      client = mock(AbfsDfsClient.class);
    } else {
      client = mock(AbfsBlobClient.class);
    }
    AbfsPerfTracker tracker = new AbfsPerfTracker(
        "test",
        abfsConfig.getAccountName(),
        abfsConfig);

    when(client.getAbfsPerfTracker()).thenReturn(tracker);
    when(client.getAuthType()).thenReturn(currentAuthType);
    when(client.getExponentialRetryPolicy()).thenReturn(
        new ExponentialRetryPolicy(1));
    when(client.getRetryPolicy(any())).thenReturn(
        new ExponentialRetryPolicy(1));

    when(client.createDefaultUriQueryBuilder()).thenCallRealMethod();
    when(client.createRequestUrl(any(), any())).thenCallRealMethod();
    when(client.createRequestUrl(any(), any(), any())).thenCallRealMethod();
    when(client.getAccessToken()).thenCallRealMethod();
    when(client.getSharedKeyCredentials()).thenCallRealMethod();
    when(client.createDefaultHeaders()).thenCallRealMethod();
    when(client.getAbfsConfiguration()).thenReturn(abfsConfig);

    when(client.getIntercept()).thenReturn(
        AbfsThrottlingInterceptFactory.getInstance(
            abfsConfig.getAccountName().substring(0,
                abfsConfig.getAccountName().indexOf(DOT)), abfsConfig));
    when(client.getAbfsCounters()).thenReturn(abfsCounters);
    Mockito.doReturn(baseAbfsClientInstance.getAbfsApacheHttpClient()).when(client).getAbfsApacheHttpClient();

    // override baseurl
    ReflectionUtils.setFinalField(AbfsClient.class, client, "abfsConfiguration", abfsConfig);

    // override baseurl
    ReflectionUtils.setFinalField(AbfsClient.class, client, "baseUrl", baseAbfsClientInstance.getBaseUrl());

    // override xMsVersion
    ReflectionUtils.setFinalField(AbfsClient.class, client, "xMsVersion", baseAbfsClientInstance.getxMsVersion());

    // override auth provider
    if (currentAuthType == AuthType.SharedKey) {
      ReflectionUtils.setFinalField(AbfsClient.class, client, "sharedKeyCredentials", new SharedKeyCredentials(
              abfsConfig.getAccountName().substring(0,
                  abfsConfig.getAccountName().indexOf(DOT)),
              abfsConfig.getStorageAccountKey()));
    } else {
      ReflectionUtils.setFinalField(AbfsClient.class, client, "tokenProvider", abfsConfig.getTokenProvider());
    }

    // override user agent
    String userAgent = "APN/1.0 Azure Blob FS/3.5.0-SNAPSHOT (PrivateBuild "
        + "JavaJRE 1.8.0_252; Linux 5.3.0-59-generic/amd64; openssl-1.0; "
        + "UNKNOWN/UNKNOWN) MSFT";
    ReflectionUtils.setFinalField(AbfsClient.class, client, "userAgent", userAgent);

    return client;
  }

  /**
   * Test helper method to access private createRequestUrl method.
   * @param client test AbfsClient instace
   * @param path path to generate Url
   * @return return store path url
   * @throws AzureBlobFileSystemException
   */
  public static URL getTestUrl(AbfsClient client, String path) throws
      AzureBlobFileSystemException {
    final AbfsUriQueryBuilder abfsUriQueryBuilder
        = client.createDefaultUriQueryBuilder();
    return client.createRequestUrl(path, abfsUriQueryBuilder.toString());
  }

  /**
   * Test helper method to access private createDefaultHeaders method.
   * @param client test AbfsClient instance
   * @return List of AbfsHttpHeaders
   */
  public static List<AbfsHttpHeader> getTestRequestHeaders(AbfsClient client) {
    return client.createDefaultHeaders();
  }

  /**
   * Test helper method to create an AbfsRestOperation instance.
   * @param type RestOpType
   * @param client AbfsClient
   * @param method HttpMethod
   * @param url Test path url
   * @param requestHeaders request headers
   * @return instance of AbfsRestOperation
   */
  public static AbfsRestOperation getRestOp(AbfsRestOperationType type,
      AbfsClient client,
      String method,
      URL url,
      List<AbfsHttpHeader> requestHeaders, AbfsConfiguration abfsConfiguration) {
    return new AbfsRestOperation(
        type,
        client,
        method,
        url,
        requestHeaders,
        abfsConfiguration);
  }

  public static AccessTokenProvider getAccessTokenProvider(AbfsClient client) {
    return client.getTokenProvider();
  }

  /**
   * Test helper method to get random bytes array.
   * @param length The length of byte buffer.
   * @return byte buffer.
   */
  private byte[] getRandomBytesArray(int length) {
    final byte[] b = new byte[length];
    new Random().nextBytes(b);
    return b;
  }

  @Override
  public AzureBlobFileSystem getFileSystem(final Configuration configuration)
      throws Exception {
    Configuration conf = new Configuration(configuration);
    conf.set(ConfigurationKeys.FS_AZURE_NETWORKING_LIBRARY, httpOperationType.toString());
    return (AzureBlobFileSystem) FileSystem.newInstance(conf);
  }

  /**
   * Test to verify that client retries append request without
   * expect header enabled if append with expect header enabled fails
   * with 4xx kind of error.
   * @throws Exception
   */
  @Test
  public void testExpectHundredContinue() throws Exception {
    // Get the filesystem.
    final AzureBlobFileSystem fs = getFileSystem(getRawConfiguration());

    final Configuration configuration = fs.getAbfsStore().getAbfsConfiguration()
        .getRawConfiguration();
    configuration.addResource(TEST_CONFIGURATION_FILE_NAME);
    AbfsClient abfsClient = fs.getAbfsStore().getClient();

    AbfsConfiguration abfsConfiguration = new AbfsConfiguration(configuration,
        configuration.get(FS_AZURE_ABFS_ACCOUNT_NAME));

    // Update the configuration with reduced retry count and reduced backoff interval.
    AbfsConfiguration abfsConfig
        = TestAbfsConfigurationFieldsValidation.updateRetryConfigs(
        abfsConfiguration,
        REDUCED_RETRY_COUNT, REDUCED_BACKOFF_INTERVAL);

    // Gets the client.
    AbfsClient testClient = Mockito.spy(
        ITestAbfsClient.createTestClientFromCurrentContext(
            abfsClient,
            abfsConfig));

    // Create the append request params with expect header enabled initially.
    AppendRequestParameters appendRequestParameters
        = new AppendRequestParameters(
        BUFFER_OFFSET, BUFFER_OFFSET, BUFFER_LENGTH,
        AppendRequestParameters.Mode.APPEND_MODE, false, null, true, null);

    byte[] buffer = getRandomBytesArray(BUFFER_LENGTH);

    // Create a test container to upload the data.
    Path testPath = path(TEST_PATH);
    fs.create(testPath);
    String finalTestPath = testPath.toString()
        .substring(testPath.toString().lastIndexOf("/"));

    // Creates a list of request headers.
    final List<AbfsHttpHeader> requestHeaders
        = ITestAbfsClient.getTestRequestHeaders(testClient);
    requestHeaders.add(
        new AbfsHttpHeader(X_HTTP_METHOD_OVERRIDE, HTTP_METHOD_PATCH));
    if (appendRequestParameters.isExpectHeaderEnabled()) {
      requestHeaders.add(new AbfsHttpHeader(EXPECT, HUNDRED_CONTINUE));
    }

    // Updates the query parameters.
    final AbfsUriQueryBuilder abfsUriQueryBuilder
        = testClient.createDefaultUriQueryBuilder();
    abfsUriQueryBuilder.addQuery(QUERY_PARAM_ACTION, APPEND_ACTION);
    abfsUriQueryBuilder.addQuery(QUERY_PARAM_POSITION,
        Long.toString(appendRequestParameters.getPosition()));

    // Creates the url for the specified path.
    URL url = testClient.createRequestUrl(finalTestPath, abfsUriQueryBuilder.toString());

    // Create a mock of the AbfsRestOperation to set the urlConnection in the corresponding httpOperation.
    AbfsRestOperation op = Mockito.spy(new AbfsRestOperation(
        AbfsRestOperationType.Append,
        testClient,
        HTTP_METHOD_PUT,
        url,
        requestHeaders, buffer,
        appendRequestParameters.getoffset(),
        appendRequestParameters.getLength(), null, abfsConfig));

    Mockito.doAnswer(answer -> {
      AbfsHttpOperation httpOperation = Mockito.spy((AbfsHttpOperation) answer.callRealMethod());
      // Sets the expect request property if expect header is enabled.
      if (appendRequestParameters.isExpectHeaderEnabled()) {
        Mockito.doReturn(HUNDRED_CONTINUE).when(httpOperation)
            .getConnProperty(EXPECT);
      }
      Mockito.doNothing().when(httpOperation).setRequestProperty(Mockito
          .any(), Mockito.any());
      Mockito.doReturn(url).when(httpOperation).getConnUrl();

      // Give user error code 404 when processResponse is called.
      Mockito.doReturn(HTTP_METHOD_PUT).when(httpOperation).getMethod();
      Mockito.doReturn(HTTP_NOT_FOUND).when(httpOperation).getStatusCode();
      Mockito.doReturn(HTTP_NOT_FOUND).when(httpOperation).getConnResponseCode();
      Mockito.doReturn("Resource Not Found")
          .when(httpOperation)
          .getConnResponseMessage();

      if (httpOperation instanceof AbfsJdkHttpOperation) {
        // Make the getOutputStream throw IOException to see it returns from the sendRequest correctly.
        Mockito.doThrow(new ProtocolException(EXPECT_100_JDK_ERROR))
            .when((AbfsJdkHttpOperation) httpOperation)
            .getConnOutputStream();
      }

      if (httpOperation instanceof AbfsAHCHttpOperation) {
        Mockito.doNothing()
            .when((AbfsAHCHttpOperation) httpOperation)
            .parseResponseHeaderAndBody(Mockito.any(byte[].class),
                Mockito.anyInt(), Mockito.anyInt());
        Mockito.doReturn(HTTP_NOT_FOUND)
            .when((AbfsAHCHttpOperation) httpOperation)
            .parseStatusCode(Mockito.nullable(
                HttpResponse.class));
        Mockito.doThrow(
                new AbfsApacheHttpExpect100Exception(Mockito.mock(HttpResponse.class)))
            .when((AbfsAHCHttpOperation) httpOperation)
            .executeRequest();
      }
      return httpOperation;
    }).when(op).createHttpOperation();

    // Mock the restOperation for the client.
    Mockito.doReturn(op)
        .when(testClient)
        .getAbfsRestOperation(eq(AbfsRestOperationType.Append),
            Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(),
            Mockito.nullable(int.class), Mockito.nullable(int.class),
            Mockito.any());

    TracingContext tracingContext = Mockito.spy(new TracingContext("abcd",
        "abcde", FSOperationType.APPEND,
        TracingHeaderFormat.ALL_ID_FORMAT, null));

    // Check that expect header is enabled before the append call.
    Assertions.assertThat(appendRequestParameters.isExpectHeaderEnabled())
            .describedAs("The expect header is not true before the append call")
            .isTrue();

    intercept(AzureBlobFileSystemException.class,
        () -> testClient.append(finalTestPath, buffer, appendRequestParameters, null, null, tracingContext));

    // Verify that the request was not exponentially retried because of user error.
    Assertions.assertThat(tracingContext.getRetryCount())
        .describedAs("The retry count is incorrect")
        .isEqualTo(0);

    // Verify that the same request was retried with expect header disabled.
    Assertions.assertThat(appendRequestParameters.isExpectHeaderEnabled())
            .describedAs("The expect header is not false")
            .isFalse();
  }

  @Test
  public void testIsNonEmptyDirectory() throws IOException {
    testIsNonEmptyDirectoryInternal(EMPTY_STRING, true, EMPTY_STRING,
        true, 1, false);
    testIsNonEmptyDirectoryInternal(EMPTY_STRING, false, EMPTY_STRING,
        false, 1, true);

    testIsNonEmptyDirectoryInternal(TEST_CONTINUATION_TOKEN, true, EMPTY_STRING,
        true, 2, false);
    testIsNonEmptyDirectoryInternal(TEST_CONTINUATION_TOKEN, true, EMPTY_STRING,
        false, 2, true);
    testIsNonEmptyDirectoryInternal(TEST_CONTINUATION_TOKEN + 1, true, TEST_CONTINUATION_TOKEN + 2,
        true, 3, false);
    testIsNonEmptyDirectoryInternal(TEST_CONTINUATION_TOKEN + 1, true, TEST_CONTINUATION_TOKEN + 2,
        false, 2, true);

    testIsNonEmptyDirectoryInternal(TEST_CONTINUATION_TOKEN, false, EMPTY_STRING,
        true, 1, true);
    testIsNonEmptyDirectoryInternal(TEST_CONTINUATION_TOKEN, false, EMPTY_STRING,
        false, 1, true);
    testIsNonEmptyDirectoryInternal(TEST_CONTINUATION_TOKEN + 1, false, TEST_CONTINUATION_TOKEN + 2,
        true, 1, true);
    testIsNonEmptyDirectoryInternal(TEST_CONTINUATION_TOKEN + 1, false, TEST_CONTINUATION_TOKEN + 2,
        false, 1, true);
  }

  private void testIsNonEmptyDirectoryInternal(String firstCT,
      boolean isfirstEmpty, String secondCT, boolean isSecondEmpty,
      int expectedInvocations, boolean isNonEmpty) throws IOException {

    assumeBlobServiceType();
    AzureBlobFileSystem spiedFs = Mockito.spy(getFileSystem());
    AzureBlobFileSystemStore spiedStore = Mockito.spy(spiedFs.getAbfsStore());
    AbfsBlobClient spiedClient = Mockito.spy(spiedStore.getClientHandler().getBlobClient());
    Mockito.doReturn(spiedStore).when(spiedFs).getAbfsStore();
    Mockito.doReturn(spiedClient).when(spiedStore).getClient();
    VersionedFileStatus status1 = new VersionedFileStatus(
        "owner", "group", null, false, 0, false, 0, 0, 0,
        new Path("/testPath/file1"), "version", "encryptionContext");
    VersionedFileStatus status2 = new VersionedFileStatus(
        "owner", "group", null, false, 0, false, 0, 0, 0,
        new Path("/testPath/file2"), "version", "encryptionContext");

    List<VersionedFileStatus> mockedList1 = new ArrayList<>();
    mockedList1.add(status1);
    List<VersionedFileStatus> mockedList2 = new ArrayList<>();
    mockedList2.add(status2);

    ListResponseData listResponseData1 = new ListResponseData();
    listResponseData1.setContinuationToken(firstCT);
    listResponseData1.setFileStatusList(isfirstEmpty ? new ArrayList<>() : mockedList1);
    listResponseData1.setOp(Mockito.mock(AbfsRestOperation.class));

    ListResponseData listResponseData2 = new ListResponseData();
    listResponseData2.setContinuationToken(secondCT);
    listResponseData2.setFileStatusList(isSecondEmpty ? new ArrayList<>() : mockedList2);
    listResponseData2.setOp(Mockito.mock(AbfsRestOperation.class));

    ListResponseData listResponseData3 = new ListResponseData();
    listResponseData3.setContinuationToken(EMPTY_STRING);
    listResponseData3.setFileStatusList(new ArrayList<>());
    listResponseData3.setOp(Mockito.mock(AbfsRestOperation.class));

    Mockito.doReturn(listResponseData1).doReturn(listResponseData2).doReturn(listResponseData3)
        .when(spiedClient).listPath(eq("/testPath"), eq(false), eq(1),
            any(), any(), any());

    final int[] itr = new int[1];
    final String[] continuationTokenUsed = new String[3];

    Mockito.doAnswer(invocationOnMock -> {
      if (itr[0] == 0) {
        itr[0]++;
        continuationTokenUsed[0] = invocationOnMock.getArgument(3);
        return listResponseData1;
      } else if (itr[0] == 1) {
        itr[0]++;
        continuationTokenUsed[1] = invocationOnMock.getArgument(3);
        return listResponseData2;
      }
      continuationTokenUsed[2] = invocationOnMock.getArgument(3);
      return listResponseData3;
    }).when(spiedClient).listPath(eq("/testPath"), eq(false), eq(1),
        any(), any(TracingContext.class), any());

    Assertions.assertThat(spiedClient.isNonEmptyDirectory("/testPath",
        Mockito.mock(TracingContext.class)))
        .describedAs("isNonEmptyDirectory in client giving unexpected results")
        .isEqualTo(isNonEmpty);

    Assertions.assertThat(continuationTokenUsed[0])
        .describedAs("First continuation token used is not as expected")
        .isNull();

    if (expectedInvocations > 1) {
      Assertions.assertThat(continuationTokenUsed[1])
          .describedAs("Second continuation token used is not as expected")
          .isEqualTo(firstCT);
    }

    if (expectedInvocations > 2) {
      Assertions.assertThat(continuationTokenUsed[2])
          .describedAs("Third continuation token used is not as expected")
          .isEqualTo(secondCT);
    }

    Mockito.verify(spiedClient, times(expectedInvocations))
        .listPath(eq("/testPath"), eq(false), eq(1),
            any(), any(TracingContext.class), any());
  }
}