ITestAzureBlobFileSystemListStatus.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.fs.azurebfs;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.net.URL;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.stubbing.Stubber;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsDriverException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
import org.apache.hadoop.fs.azurebfs.services.AbfsBlobClient;
import org.apache.hadoop.fs.azurebfs.services.AbfsClientHandler;
import org.apache.hadoop.fs.azurebfs.services.AbfsHttpHeader;
import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation;
import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation;
import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperationType;
import org.apache.hadoop.fs.azurebfs.services.ListResponseData;
import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
import org.apache.hadoop.fs.azurebfs.services.AbfsClientTestUtil;
import org.apache.hadoop.fs.azurebfs.services.VersionedFileStatus;
import org.apache.hadoop.fs.azurebfs.utils.DirectoryStateHelper;
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderFormat;
import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderValidator;
import org.apache.hadoop.fs.contract.ContractTestUtils;

import static java.net.HttpURLConnection.HTTP_OK;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PUT;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ROOT_PATH;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.TRUE;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_LIST_MAX_RESULTS;
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_METADATA_PREFIX;
import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_BLOB_LIST_PARSING;
import static org.apache.hadoop.fs.azurebfs.services.RenameAtomicity.SUFFIX;
import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.CONNECTION_RESET_MESSAGE;
import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.CONNECTION_TIMEOUT_ABBREVIATION;
import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.CONNECTION_TIMEOUT_JDK_MESSAGE;
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertMkdirs;
import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertPathExists;
import static org.apache.hadoop.fs.contract.ContractTestUtils.rename;

import static org.apache.hadoop.test.LambdaTestUtils.intercept;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.nullable;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.when;

/**
 * Test listStatus operation.
 */
public class ITestAzureBlobFileSystemListStatus extends
    AbstractAbfsIntegrationTest {
  private static final int TEST_FILES_NUMBER = 6000;
  public static final String TEST_CONTINUATION_TOKEN = "continuation";
  private static final int TOTAL_NUMBER_OF_PATHS = 11;
  private static final int NUMBER_OF_UNIQUE_PATHS = 7;

  public ITestAzureBlobFileSystemListStatus() throws Exception {
    super();
  }

  @Test
  public void testListPath() throws Exception {
    Configuration config = new Configuration(this.getRawConfiguration());
    config.set(AZURE_LIST_MAX_RESULTS, "5000");
    final AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem
        .newInstance(getFileSystem().getUri(), config);
      final List<Future<Void>> tasks = new ArrayList<>();

      ExecutorService es = Executors.newFixedThreadPool(10);
      for (int i = 0; i < TEST_FILES_NUMBER; i++) {
        final Path fileName = new Path("/test" + i);
        Callable<Void> callable = new Callable<Void>() {
          @Override
          public Void call() throws Exception {
            touch(fileName);
            return null;
          }
        };

        tasks.add(es.submit(callable));
      }

      for (Future<Void> task : tasks) {
        task.get();
      }

      es.shutdownNow();
      fs.registerListener(
              new TracingHeaderValidator(getConfiguration().getClientCorrelationId(),
                      fs.getFileSystemId(), FSOperationType.LISTSTATUS, true, 0));
      FileStatus[] files = fs.listStatus(new Path("/"));
      assertEquals(TEST_FILES_NUMBER, files.length /* user directory */);
    fs.registerListener(
            new TracingHeaderValidator(getConfiguration().getClientCorrelationId(),
                    fs.getFileSystemId(), FSOperationType.GET_ATTR, true, 0));
    fs.close();
  }

  /**
   * Test to verify that each paginated call to ListBlobs uses a new tracing context.
   * Test also verifies that the retry policy is called when a SocketTimeoutException
   * Test also verifies that empty list with valid continuation token is handled.
   * @throws Exception if there is an error or test assertions fails.
   */
  @Test
  public void testListPathTracingContext() throws Exception {
    final AzureBlobFileSystem spiedFs = Mockito.spy(getFileSystem());
    final AzureBlobFileSystemStore spiedStore = Mockito.spy(spiedFs.getAbfsStore());
    final AbfsClient spiedClient = Mockito.spy(spiedFs.getAbfsClient());
    final TracingContext spiedTracingContext = Mockito.spy(
        new TracingContext(
            spiedFs.getClientCorrelationId(), spiedFs.getFileSystemId(),
            FSOperationType.LISTSTATUS, true, TracingHeaderFormat.ALL_ID_FORMAT, null));

    Mockito.doReturn(spiedStore).when(spiedFs).getAbfsStore();
    Mockito.doReturn(spiedClient).when(spiedStore).getClient();
    spiedFs.setWorkingDirectory(new Path("/"));

    AbfsClientTestUtil.setMockAbfsRestOperationForListOperation(spiedClient,
        (httpOperation) -> {
          Stubber stubber = Mockito.doThrow(
              new SocketTimeoutException(CONNECTION_TIMEOUT_JDK_MESSAGE));
          stubber.doNothing().when(httpOperation).processResponse(
              nullable(byte[].class), nullable(int.class), nullable(int.class));

          when(httpOperation.getStatusCode()).thenReturn(-1).thenReturn(HTTP_OK);
          return httpOperation;
        });

    List<FileStatus> fileStatuses = new ArrayList<>();
    spiedStore.listStatus(new Path("/"), "", fileStatuses, true, null, spiedTracingContext);

    // Assert that there were retries due to SocketTimeoutException
    Mockito.verify(spiedClient, Mockito.times(1))
        .getRetryPolicy(CONNECTION_TIMEOUT_ABBREVIATION);

    // Assert that there were 2 paginated ListPath calls were made 1 and 2.
    // 1. Without continuation token
    Mockito.verify(spiedClient, times(1)).listPath(
        "/", false,
        spiedFs.getAbfsStore().getAbfsConfiguration().getListMaxResults(),
        null, spiedTracingContext, spiedFs.getAbfsStore().getUri());
    // 2. With continuation token
    Mockito.verify(spiedClient, times(1)).listPath(
        "/", false,
        spiedFs.getAbfsStore().getAbfsConfiguration().getListMaxResults(),
        TEST_CONTINUATION_TOKEN, spiedTracingContext, spiedFs.getAbfsStore().getUri());

    // Assert that none of the API calls used the same tracing header.
    Mockito.verify(spiedTracingContext, times(0)).constructHeader(any(), any(), any());
  }

  @Test
  public void testListPathParsingFailure() throws Exception {
    assumeBlobServiceType();
    AzureBlobFileSystem spiedFs = Mockito.spy(getFileSystem());
    AzureBlobFileSystemStore spiedStore = Mockito.spy(spiedFs.getAbfsStore());
    AbfsBlobClient spiedClient = Mockito.spy(spiedStore.getClientHandler()
        .getBlobClient());
    Mockito.doReturn(spiedStore).when(spiedFs).getAbfsStore();
    Mockito.doReturn(spiedClient).when(spiedStore).getClient();

    Mockito.doThrow(new SocketException(CONNECTION_RESET_MESSAGE)).when(spiedClient).filterRenamePendingFiles(any(), any());
    List<FileStatus> fileStatuses = new ArrayList<>();
    AbfsDriverException ex = intercept(AbfsDriverException.class,
      () -> {
        spiedStore.listStatus(new Path("/"), "", fileStatuses,
            true, null, getTestTracingContext(spiedFs, true));
      });
    Assertions.assertThat(ex.getStatusCode())
        .describedAs("Expecting Network Error status code")
        .isEqualTo(-1);
    Assertions.assertThat(ex.getErrorMessage())
        .describedAs("Expecting COPY_ABORTED error code")
        .contains(ERR_BLOB_LIST_PARSING);
  }

  /**
   * Creates a file, verifies that listStatus returns it,
   * even while the file is still open for writing.
   */
  @Test
  public void testListFileVsListDir() throws Exception {
    final AzureBlobFileSystem fs = getFileSystem();
    Path path = path("/testFile");
    try(FSDataOutputStream ignored = fs.create(path)) {
      FileStatus[] testFiles = fs.listStatus(path);
      assertEquals("length of test files", 1, testFiles.length);
      FileStatus status = testFiles[0];
      assertIsFileReference(status);
    }
  }

  @Test
  public void testListFileVsListDir2() throws Exception {
    final AzureBlobFileSystem fs = getFileSystem();
    Path testFolder = path("/testFolder");
    fs.mkdirs(testFolder);
    fs.mkdirs(new Path(testFolder + "/testFolder2"));
    fs.mkdirs(new Path(testFolder + "/testFolder2/testFolder3"));
    Path testFile0Path = new Path(
        testFolder + "/testFolder2/testFolder3/testFile");
    ContractTestUtils.touch(fs, testFile0Path);

    FileStatus[] testFiles = fs.listStatus(testFile0Path);
    assertEquals("Wrong listing size of file " + testFile0Path,
        1, testFiles.length);
    FileStatus file0 = testFiles[0];
    assertEquals("Wrong path for " + file0, new Path(getTestUrl(),
        testFolder + "/testFolder2/testFolder3/testFile"), file0.getPath());
    assertIsFileReference(file0);
  }

  @Test(expected = FileNotFoundException.class)
  public void testListNonExistentDir() throws Exception {
    final AzureBlobFileSystem fs = getFileSystem();
    fs.listStatus(new Path("/testFile/"));
  }

  @Test
  public void testListFiles() throws Exception {
    final AzureBlobFileSystem fs = getFileSystem();
    Path testDir = path("/test");
    fs.mkdirs(testDir);

    FileStatus[] fileStatuses = fs.listStatus(new Path("/"));
    assertEquals(1, fileStatuses.length);

    fs.mkdirs(new Path(testDir + "/sub"));
    fileStatuses = fs.listStatus(testDir);
    assertEquals(1, fileStatuses.length);
    assertEquals("sub", fileStatuses[0].getPath().getName());
    assertIsDirectoryReference(fileStatuses[0]);
    Path childF = fs.makeQualified(new Path(testDir + "/f"));
    touch(childF);
    fileStatuses = fs.listStatus(testDir);
    assertEquals(2, fileStatuses.length);
    final FileStatus childStatus = fileStatuses[0];
    assertEquals(childF, childStatus.getPath());
    assertEquals("f", childStatus.getPath().getName());
    assertIsFileReference(childStatus);
    assertEquals(0, childStatus.getLen());
    final FileStatus status1 = fileStatuses[1];
    assertEquals("sub", status1.getPath().getName());
    assertIsDirectoryReference(status1);
    // look at the child through getFileStatus
    LocatedFileStatus locatedChildStatus = fs.listFiles(childF, false).next();
    assertIsFileReference(locatedChildStatus);

    fs.delete(testDir, true);
    intercept(FileNotFoundException.class,
        () -> fs.listFiles(childF, false).next());

    // do some final checks on the status (failing due to version checks)
    assertEquals("Path mismatch of " + locatedChildStatus,
        childF, locatedChildStatus.getPath());
    assertEquals("locatedstatus.equals(status)",
        locatedChildStatus, childStatus);
    assertEquals("status.equals(locatedstatus)",
        childStatus, locatedChildStatus);
  }

  private void assertIsDirectoryReference(FileStatus status) {
    assertTrue("Not a directory: " + status, status.isDirectory());
    assertFalse("Not a directory: " + status, status.isFile());
    assertEquals(0, status.getLen());
  }

  private void assertIsFileReference(FileStatus status) {
    assertFalse("Not a file: " + status, status.isDirectory());
    assertTrue("Not a file: " + status, status.isFile());
  }

  @Test
  public void testMkdirTrailingPeriodDirName() throws IOException {
    boolean exceptionThrown = false;
    final AzureBlobFileSystem fs = getFileSystem();

    Path nontrailingPeriodDir = path("testTrailingDir/dir");
    Path trailingPeriodDir = new Path("testMkdirTrailingDir/dir.");

    assertMkdirs(fs, nontrailingPeriodDir);

    try {
      fs.mkdirs(trailingPeriodDir);
    }
    catch(IllegalArgumentException e) {
      exceptionThrown = true;
    }
    assertTrue("Attempt to create file that ended with a dot should"
        + " throw IllegalArgumentException", exceptionThrown);
  }

  @Test
  public void testCreateTrailingPeriodFileName() throws IOException {
    boolean exceptionThrown = false;
    final AzureBlobFileSystem fs = getFileSystem();

    Path trailingPeriodFile = new Path("testTrailingDir/file.");
    Path nontrailingPeriodFile = path("testCreateTrailingDir/file");

    createFile(fs, nontrailingPeriodFile, false, new byte[0]);
    assertPathExists(fs, "Trailing period file does not exist",
        nontrailingPeriodFile);

    try {
      createFile(fs, trailingPeriodFile, false, new byte[0]);
    }
    catch(IllegalArgumentException e) {
      exceptionThrown = true;
    }
    assertTrue("Attempt to create file that ended with a dot should"
        + " throw IllegalArgumentException", exceptionThrown);
  }

  @Test
  public void testRenameTrailingPeriodFile() throws IOException {
    boolean exceptionThrown = false;
    final AzureBlobFileSystem fs = getFileSystem();

    Path nonTrailingPeriodFile = path("testTrailingDir/file");
    Path trailingPeriodFile = new Path("testRenameTrailingDir/file.");

    createFile(fs, nonTrailingPeriodFile, false, new byte[0]);
    try {
    rename(fs, nonTrailingPeriodFile, trailingPeriodFile);
    }
    catch(IllegalArgumentException e) {
      exceptionThrown = true;
    }
    assertTrue("Attempt to create file that ended with a dot should"
        + " throw IllegalArgumentException", exceptionThrown);
  }

  @Test
  public void testEmptyListingInSubsequentCall() throws IOException {
    testEmptyListingInSubsequentCallInternal(EMPTY_STRING, true, EMPTY_STRING,
        true, 1, 0);
    testEmptyListingInSubsequentCallInternal(EMPTY_STRING, true, EMPTY_STRING,
        false, 1, 0);
    testEmptyListingInSubsequentCallInternal(EMPTY_STRING, true, TEST_CONTINUATION_TOKEN,
        true, 1, 0);
    testEmptyListingInSubsequentCallInternal(EMPTY_STRING, true, TEST_CONTINUATION_TOKEN,
        false, 1, 0);

    testEmptyListingInSubsequentCallInternal(TEST_CONTINUATION_TOKEN, true, EMPTY_STRING,
        true, 2, 0);
    testEmptyListingInSubsequentCallInternal(TEST_CONTINUATION_TOKEN, true, EMPTY_STRING,
        false, 2, 1);
    testEmptyListingInSubsequentCallInternal(TEST_CONTINUATION_TOKEN + 1, true, TEST_CONTINUATION_TOKEN + 2,
        true, 3, 0);
    testEmptyListingInSubsequentCallInternal(TEST_CONTINUATION_TOKEN + 1, true, TEST_CONTINUATION_TOKEN + 2,
        false, 3, 1);

    testEmptyListingInSubsequentCallInternal(EMPTY_STRING, false, EMPTY_STRING,
        true, 1, 1);
    testEmptyListingInSubsequentCallInternal(EMPTY_STRING, false, EMPTY_STRING,
        false, 1, 1);
    testEmptyListingInSubsequentCallInternal(EMPTY_STRING, false, TEST_CONTINUATION_TOKEN,
        true, 1, 1);
    testEmptyListingInSubsequentCallInternal(EMPTY_STRING, false, TEST_CONTINUATION_TOKEN,
        false, 1, 1);

    testEmptyListingInSubsequentCallInternal(TEST_CONTINUATION_TOKEN, false, EMPTY_STRING,
        true, 2, 1);
    testEmptyListingInSubsequentCallInternal(TEST_CONTINUATION_TOKEN, false, EMPTY_STRING,
        false, 2, 2);
    testEmptyListingInSubsequentCallInternal(TEST_CONTINUATION_TOKEN + 1, false, TEST_CONTINUATION_TOKEN + 2,
        true, 3, 1);
    testEmptyListingInSubsequentCallInternal(TEST_CONTINUATION_TOKEN + 1, false, TEST_CONTINUATION_TOKEN + 2,
        false, 3, 2);
  }

  private void testEmptyListingInSubsequentCallInternal(String firstCT,
      boolean isfirstEmpty, String secondCT, boolean isSecondEmpty,
      int expectedInvocations, int expectedSize) throws IOException {
    assumeBlobServiceType();
    AzureBlobFileSystem spiedFs = Mockito.spy(getFileSystem());
    AzureBlobFileSystemStore spiedStore = Mockito.spy(spiedFs.getAbfsStore());
    spiedStore.getAbfsConfiguration().setListMaxResults(1);
    AbfsBlobClient spiedClient = Mockito.spy(spiedStore.getClientHandler().getBlobClient());
    Mockito.doReturn(spiedStore).when(spiedFs).getAbfsStore();
    Mockito.doReturn(spiedClient).when(spiedStore).getClient();
    spiedFs.mkdirs(new Path("/testPath"));
    VersionedFileStatus status1 = new VersionedFileStatus(
        "owner", "group", null, false, 0, false, 0, 0, 0,
        new Path("/testPath/file1"), "version", "encryptionContext");
    VersionedFileStatus status2 = new VersionedFileStatus(
        "owner", "group", null, false, 0, false, 0, 0, 0,
        new Path("/testPath/file2"), "version", "encryptionContext");

    List<VersionedFileStatus> mockedList1 = new ArrayList<>();
    mockedList1.add(status1);
    List<VersionedFileStatus> mockedList2 = new ArrayList<>();
    mockedList2.add(status2);

    ListResponseData listResponseData1 = new ListResponseData();
    listResponseData1.setContinuationToken(firstCT);
    listResponseData1.setFileStatusList(isfirstEmpty ? new ArrayList<>() : mockedList1);
    listResponseData1.setOp(Mockito.mock(AbfsRestOperation.class));

    ListResponseData listResponseData2 = new ListResponseData();
    listResponseData2.setContinuationToken(secondCT);
    listResponseData2.setFileStatusList(isSecondEmpty ? new ArrayList<>() : mockedList2);
    listResponseData2.setOp(Mockito.mock(AbfsRestOperation.class));

    ListResponseData listResponseData3 = new ListResponseData();
    listResponseData3.setContinuationToken(EMPTY_STRING);
    listResponseData3.setFileStatusList(new ArrayList<>());
    listResponseData3.setOp(Mockito.mock(AbfsRestOperation.class));

    final int[] itr = new int[1];
    final String[] continuationTokenUsed = new String[3];

    Mockito.doAnswer(invocationOnMock -> {
      if (itr[0] == 0) {
        itr[0]++;
        continuationTokenUsed[0] = invocationOnMock.getArgument(3);
        return listResponseData1;
      } else if (itr[0] == 1) {
        itr[0]++;
        continuationTokenUsed[1] = invocationOnMock.getArgument(3);
        return listResponseData2;
      }
      continuationTokenUsed[2] = invocationOnMock.getArgument(3);
      return listResponseData3;
    }).when(spiedClient).listPath(eq("/testPath"), eq(false), eq(1),
        any(), any(TracingContext.class), any());

    FileStatus[] list = spiedFs.listStatus(new Path("/testPath"));

    Mockito.verify(spiedClient, times(expectedInvocations))
        .listPath(eq("/testPath"), eq(false), eq(1),
        any(), any(TracingContext.class), any());
    Mockito.verify(spiedClient, times(1))
        .postListProcessing(eq("/testPath"), any(), any(), any());
    Assertions.assertThat(list).hasSize(expectedSize);

    if (expectedSize == 0) {
      Mockito.verify(spiedClient, times(1))
          .getPathStatus(eq("/testPath"), any(), eq(null), eq(false));
    } else {
      Mockito.verify(spiedClient, times(0))
          .getPathStatus(eq("/testPath"), any(), eq(null), eq(false));
    }

    Assertions.assertThat(continuationTokenUsed[0])
        .describedAs("First continuation token used is not as expected")
        .isNull();

    if (expectedInvocations > 1) {
      Assertions.assertThat(continuationTokenUsed[1])
          .describedAs("Second continuation token used is not as expected")
          .isEqualTo(firstCT);
    }

    if (expectedInvocations > 2) {
      Assertions.assertThat(continuationTokenUsed[2])
          .describedAs("Third continuation token used is not as expected")
          .isEqualTo(secondCT);
    }
  }

  /**
   * Test to verify that listStatus returns the correct file status all types
   * of paths viz. implicit, explicit, file.
   * @throws Exception if there is an error or test assertions fails.
   */
  @Test
  public void testListStatusWithImplicitExplicitChildren() throws Exception {
    final AzureBlobFileSystem fs = getFileSystem();
    fs.setWorkingDirectory(new Path(ROOT_PATH));
    Path root = new Path(ROOT_PATH);

    // Create an implicit directory under root
    Path dir = new Path("a");
    Path fileInsideDir = new Path("a/file");
    createAzCopyFolder(dir);

    // Assert that implicit directory is returned
    FileStatus[] fileStatuses = fs.listStatus(root);
    Assertions.assertThat(fileStatuses.length)
        .describedAs("List size is not expected").isEqualTo(1);
    assertImplicitDirectoryFileStatus(fileStatuses[0], fs.makeQualified(dir));

    // Create a marker blob for the directory.
    fs.create(fileInsideDir);

    // Assert that only one entry of explicit directory is returned
    fileStatuses = fs.listStatus(root);
    Assertions.assertThat(fileStatuses.length)
        .describedAs("List size is not expected").isEqualTo(1);
    assertExplicitDirectoryFileStatus(fileStatuses[0], fs.makeQualified(dir));

    // Create a file under root
    Path file1 = new Path("b");
    fs.create(file1);

    // Assert that two entries are returned in alphabetic order.
    fileStatuses = fs.listStatus(root);
    Assertions.assertThat(fileStatuses.length)
        .describedAs("List size is not expected").isEqualTo(2);
    assertExplicitDirectoryFileStatus(fileStatuses[0], fs.makeQualified(dir));
    assertFilePathFileStatus(fileStatuses[1], fs.makeQualified(file1));

    // Create another implicit directory under root.
    Path dir2 = new Path("c");
    createAzCopyFolder(dir2);

    // Assert that three entries are returned in alphabetic order.
    fileStatuses = fs.listStatus(root);
    Assertions.assertThat(fileStatuses.length)
        .describedAs("List size is not expected").isEqualTo(3);
    assertExplicitDirectoryFileStatus(fileStatuses[0], fs.makeQualified(dir));
    assertFilePathFileStatus(fileStatuses[1], fs.makeQualified(file1));
    assertImplicitDirectoryFileStatus(fileStatuses[2], fs.makeQualified(dir2));
  }

  /**
   * Test to verify that listStatus returns the correct file status when called on an implicit path
   * @throws Exception if there is an error or test assertions fails.
   */
  @Test
  public void testListStatusOnImplicitDirectoryPath() throws Exception {
    final AzureBlobFileSystem fs = getFileSystem();
    Path implicitPath = new Path("/implicitDir");
    createAzCopyFolder(implicitPath);

    FileStatus[] statuses = fs.listStatus(implicitPath);
    Assertions.assertThat(statuses.length)
        .describedAs("List size is not expected").isGreaterThanOrEqualTo(1);
    assertImplicitDirectoryFileStatus(statuses[0], fs.makeQualified(statuses[0].getPath()));

    FileStatus[] statuses1 = fs.listStatus(new Path(statuses[0].getPath().toString()));
    Assertions.assertThat(statuses1.length)
        .describedAs("List size is not expected").isGreaterThanOrEqualTo(1);
    assertFilePathFileStatus(statuses1[0], fs.makeQualified(statuses1[0].getPath()));
  }

  @Test
  public void testListStatusOnEmptyDirectory() throws Exception {
    final AzureBlobFileSystem fs = getFileSystem();
    Path emptyDir = new Path("/emptyDir");
    fs.mkdirs(emptyDir);

    FileStatus[] statuses = fs.listStatus(emptyDir);
    Assertions.assertThat(statuses.length)
        .describedAs("List size is not expected").isEqualTo(0);
  }

  @Test
  public void testListStatusOnRenamePendingJsonFile() throws Exception {
    final AzureBlobFileSystem fs = getFileSystem();
    Path renamePendingJsonPath = new Path("/hbase/A/A-" + SUFFIX);
    fs.create(renamePendingJsonPath);

    FileStatus[] statuses = fs.listStatus(renamePendingJsonPath);
    Assertions.assertThat(statuses.length)
        .describedAs("List size is not expected").isEqualTo(1);
    assertFilePathFileStatus(statuses[0], fs.makeQualified(statuses[0].getPath()));
  }

  @Test
  public void testContinuationTokenAcrossListStatus() throws Exception {
    final AzureBlobFileSystem fs = getFileSystem();
    Path path = new Path("/testContinuationToken");
    fs.mkdirs(path);
    fs.create(new Path(path + "/file1"));
    fs.create(new Path(path + "/file2"));

    fs.listStatus(path);

    ListResponseData listResponseData = fs.getAbfsStore().getClient().listPath(
        "/testContinuationToken", false, 1, null, getTestTracingContext(fs, true),
        fs.getAbfsStore().getUri());

    Assertions.assertThat(listResponseData.getContinuationToken())
        .describedAs("Continuation Token Should not be null").isNotNull();
    Assertions.assertThat(listResponseData.getFileStatusList())
        .describedAs("Listing Size Not as expected").hasSize(1);

    ListResponseData listResponseData1 =  fs.getAbfsStore().getClient().listPath(
        "/testContinuationToken", false, 1, listResponseData.getContinuationToken(), getTestTracingContext(fs, true),
        fs.getAbfsStore().getUri());

    Assertions.assertThat(listResponseData1.getContinuationToken())
        .describedAs("Continuation Token Should be null").isNull();
    Assertions.assertThat(listResponseData1.getFileStatusList())
        .describedAs("Listing Size Not as expected").hasSize(1);
  }

  @Test
  public void testInvalidContinuationToken() throws Exception {
    assumeHnsDisabled();
    final AzureBlobFileSystem fs = getFileSystem();
    Path path = new Path("/testInvalidContinuationToken");
    fs.mkdirs(path);
    fs.create(new Path(path + "/file1"));
    fs.create(new Path(path + "/file2"));

    intercept(AbfsRestOperationException.class,
        () -> fs.getAbfsStore().getClient().listPath(
            "/testInvalidContinuationToken", false, 1, "invalidToken",
            getTestTracingContext(fs, true), fs.getAbfsStore().getUri()));
  }

  @Test
  public void testEmptyContinuationToken() throws Exception {
    final AzureBlobFileSystem fs = getFileSystem();
    Path path = new Path("/testInvalidContinuationToken");
    fs.mkdirs(path);
    fs.create(new Path(path + "/file1"));
    fs.create(new Path(path + "/file2"));

    ListResponseData listResponseData = fs.getAbfsStore().getClient().listPath(
        "/testInvalidContinuationToken", false, 1, "",
        getTestTracingContext(fs, true), fs.getAbfsStore().getUri());

    Assertions.assertThat(listResponseData.getContinuationToken())
        .describedAs("Continuation Token Should Not be null").isNotNull();
    Assertions.assertThat(listResponseData.getFileStatusList())
        .describedAs("Listing Size Not as expected").hasSize(1);
  }

  /**
   * Test to verify that listStatus returns the correct file status
   * after removing duplicates across multiple iterations of list blobs.
   * Also verifies that in case of non-empty explicit dir,
   * entry corresponding to marker blob is returned.
   * @throws Exception if test fails.
   */
  @Test
  public void testDuplicateEntriesAcrossListBlobIterations() throws Exception {
    AzureBlobFileSystem fs = Mockito.spy(getFileSystem());
    AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore());
    store.getAbfsConfiguration().setListMaxResults(1);
    AbfsClient client = Mockito.spy(store.getClient());

    Mockito.doReturn(store).when(fs).getAbfsStore();
    Mockito.doReturn(client).when(store).getClient();

    /*
     * Following entries will be created inside the root path.
     * 0. /A - implicit directory without any marker blob
     * 1. /a - marker file for explicit directory
     * 2. /a/file1 - normal file inside explicit directory
     * 3. /b - normal file inside root
     * 4. /c - marker file for explicit directory
     * 5. /c.bak - marker file for explicit directory
     * 6. /c.bak/file2 - normal file inside explicit directory
     * 7. /c/file3 - normal file inside explicit directory
     * 8. /d - implicit directory
     * 9. /e - marker file for explicit directory
     * 10. /e/file4 - normal file inside explicit directory
     */
    // Create Path 0
    createAzCopyFolder(new Path("/A"));

    // Create Path 1 and 2.
    fs.create(new Path("/a/file1"));

    // Create Path 3
    fs.create(new Path("/b"));

    // Create Path 4 and 7
    fs.create(new Path("/c/file3"));

    // Create Path 5 and 6
    fs.create(new Path("/c.bak/file2"));

    // Create Path 8
    createAzCopyFolder(new Path("/d"));

    // Create Path 9 and 10
    fs.create(new Path("/e/file4"));

    FileStatus[] fileStatuses = fs.listStatus(new Path(ROOT_PATH));

    // Assert that client.listPath was called 11 times.
    // This will assert server returned 11 entries in total.
    Mockito.verify(client, Mockito.times(TOTAL_NUMBER_OF_PATHS))
        .listPath(eq(ROOT_PATH), eq(false), eq(1), any(), any(), any());

    // Assert that after duplicate removal, only 7 unique entries are returned.
    Assertions.assertThat(fileStatuses.length)
        .describedAs("List size is not expected").isEqualTo(NUMBER_OF_UNIQUE_PATHS);

    // Assert that for duplicates, entry corresponding to marker blob is returned.
    assertImplicitDirectoryFileStatus(fileStatuses[0], fs.makeQualified(new Path("/A")));
    assertExplicitDirectoryFileStatus(fileStatuses[1], fs.makeQualified(new Path("/a")));
    assertFilePathFileStatus(fileStatuses[2], fs.makeQualified(new Path("/b")));
    assertExplicitDirectoryFileStatus(fileStatuses[3], fs.makeQualified(new Path("/c")));
    assertExplicitDirectoryFileStatus(fileStatuses[4], fs.makeQualified(new Path("/c.bak")));
    assertImplicitDirectoryFileStatus(fileStatuses[5], fs.makeQualified(new Path("/d")));
    assertExplicitDirectoryFileStatus(fileStatuses[6], fs.makeQualified(new Path("/e")));

    // Assert that there are no duplicates in the returned file statuses.
    Set<Path> uniquePaths = new HashSet<>();
    for (FileStatus fileStatus : fileStatuses) {
      Assertions.assertThat(uniquePaths.add(fileStatus.getPath()))
          .describedAs("Duplicate Entries found")
          .isTrue();
    }
  }

  private void assertFilePathFileStatus(final FileStatus fileStatus,
      final Path qualifiedPath) {
    Assertions.assertThat(fileStatus.getPath())
        .describedAs("Path Not as expected").isEqualTo(qualifiedPath);
    Assertions.assertThat(fileStatus.isFile())
        .describedAs("Expecting a File Path").isEqualTo(true);
    Assertions.assertThat(fileStatus.isDirectory())
        .describedAs("Expecting a File Path").isEqualTo(false);
    Assertions.assertThat(fileStatus.getModificationTime()).isNotEqualTo(0);
  }

  private void assertImplicitDirectoryFileStatus(final FileStatus fileStatus,
      final Path qualifiedPath) throws Exception {
    assertDirectoryFileStatus(fileStatus, qualifiedPath);
    DirectoryStateHelper.isImplicitDirectory(qualifiedPath, getFileSystem(),
        getTestTracingContext(getFileSystem(), true));
    Assertions.assertThat(fileStatus.getModificationTime())
        .describedAs("Last Modified Time Not as Expected").isEqualTo(0);
  }

  private void assertExplicitDirectoryFileStatus(final FileStatus fileStatus,
      final Path qualifiedPath) throws Exception {
    assertDirectoryFileStatus(fileStatus, qualifiedPath);
    DirectoryStateHelper.isExplicitDirectory(qualifiedPath, getFileSystem(),
        getTestTracingContext(getFileSystem(), true));
    Assertions.assertThat(fileStatus.getModificationTime())
        .describedAs("Last Modified Time Not as Expected").isNotEqualTo(0);
  }

  private void assertDirectoryFileStatus(final FileStatus fileStatus,
      final Path qualifiedPath) {
    Assertions.assertThat(fileStatus.getPath())
        .describedAs("Path Not as Expected").isEqualTo(qualifiedPath);
    Assertions.assertThat(fileStatus.isDirectory())
        .describedAs("Expecting a Directory Path").isEqualTo(true);
    Assertions.assertThat(fileStatus.isFile())
        .describedAs("Expecting a Directory Path").isEqualTo(false);
    Assertions.assertThat(fileStatus.getLen())
        .describedAs("Content Length Not as Expected").isEqualTo(0);
  }

  /**
   * Helper method to mock the AbfsRestOperation and modify the request headers.
   *
   * @param abfsBlobClient the mocked AbfsBlobClient
   * @param newHeader the header to add in place of the old one
   */
  public static void mockAbfsRestOperation(AbfsBlobClient abfsBlobClient, String... newHeader) {
    Mockito.doAnswer(invocation -> {
      List<AbfsHttpHeader> requestHeaders = invocation.getArgument(3);

      // Remove the actual HDI config header and add the new one
      requestHeaders.removeIf(header ->
          HttpHeaderConfigurations.X_MS_META_HDI_ISFOLDER.equals(header.getName()));
      for (String header : newHeader) {
        requestHeaders.add(new AbfsHttpHeader(X_MS_METADATA_PREFIX + header, TRUE));
      }

      // Call the real method
      return invocation.callRealMethod();
    }).when(abfsBlobClient).getAbfsRestOperation(eq(AbfsRestOperationType.PutBlob),
        eq(HTTP_METHOD_PUT), any(URL.class), anyList());
  }

  /**
   * Helper method to mock the AbfsBlobClient and set up the client handler.
   *
   * @param fs the AzureBlobFileSystem instance
   * @return the mocked AbfsBlobClient
   */
  public static AbfsBlobClient mockIngressClientHandler(AzureBlobFileSystem fs) {
    AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore());
    AbfsClientHandler clientHandler = Mockito.spy(store.getClientHandler());
    AbfsBlobClient abfsBlobClient = (AbfsBlobClient) Mockito.spy(
        clientHandler.getClient());
    fs.getAbfsStore().setClient(abfsBlobClient);
    fs.getAbfsStore().setClientHandler(clientHandler);
    Mockito.doReturn(abfsBlobClient).when(clientHandler).getIngressClient();
    return abfsBlobClient;
  }

  /**
   * Test directory status with different HDI folder configuration,
   * verifying the correct header and directory state.
   */
  private void testIsDirectory(boolean expected, String... configName) throws Exception {
    try (AzureBlobFileSystem fs = Mockito.spy(getFileSystem())) {
      assumeBlobServiceType();
      AbfsBlobClient abfsBlobClient = mockIngressClientHandler(fs);
      // Mock the operation to modify the headers
      mockAbfsRestOperation(abfsBlobClient, configName);

      // Create the path and invoke mkdirs method
      Path path = new Path("/testPath");
      fs.mkdirs(path);

      // Assert that the response header has the updated value
      FileStatus[] fileStatus = fs.listStatus(path.getParent());

      AbfsHttpOperation op = abfsBlobClient.getPathStatus(
          path.toUri().getPath(),
          true, getTestTracingContext(fs, true),
          null).getResult();

      Assertions.assertThat(abfsBlobClient.checkIsDir(op))
          .describedAs("Directory should be marked as " + expected)
          .isEqualTo(expected);

      // Verify the header and directory state
      Assertions.assertThat(fileStatus.length)
          .describedAs("Expected directory state: " + expected)
          .isEqualTo(1);

      // Verify the header and directory state
      Assertions.assertThat(fileStatus[0].isDirectory())
          .describedAs("Expected directory state: " + expected)
          .isEqualTo(expected);

      fs.delete(path, true);
    }
  }

  /**
   * Test to verify the directory status with different HDI folder configurations.
   * Verifying the correct header and directory state.
   */
  @Test
  public void testIsDirectoryWithDifferentCases() throws Exception {
    testIsDirectory(true,  "HDI_ISFOLDER");

    testIsDirectory(true, "Hdi_ISFOLDER");

    testIsDirectory(true, "Hdi_isfolder");

    testIsDirectory(true, "hdi_isfolder");

    testIsDirectory(false, "Hdi_isfolder1");

    testIsDirectory(true, "HDI_ISFOLDER", "Hdi_ISFOLDER", "Hdi_isfolder");

    testIsDirectory(true, "HDI_ISFOLDER", "Hdi_ISFOLDER1", "Test");
  }
}