ITestAzureBlobFileSystemRenameRecovery.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.fs.azurebfs;

import java.io.IOException;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicInteger;

import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.mockito.Mockito;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode;
import org.apache.hadoop.fs.azurebfs.services.AbfsBlobClient;
import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
import org.apache.hadoop.fs.azurebfs.services.RenameAtomicity;
import org.apache.hadoop.fs.azurebfs.services.VersionedFileStatus;
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderFormat;
import org.apache.hadoop.test.LambdaTestUtils;

import static java.net.HttpURLConnection.HTTP_OK;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ROOT_PATH;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_BLOB_DIR_RENAME_MAX_THREAD;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_CONSUMER_MAX_LAG;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_PRODUCER_QUEUE_MAX_SIZE;
import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.BLOB_ALREADY_EXISTS;
import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.BLOB_PATH_NOT_FOUND;
import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.PATH_NOT_FOUND;
import static org.apache.hadoop.fs.azurebfs.services.RenameAtomicity.SUFFIX;
import static org.apache.hadoop.fs.azurebfs.utils.AbfsTestUtils.createFiles;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;

/**
 * Test rename recovery operation.
 */
public class ITestAzureBlobFileSystemRenameRecovery extends
    AbstractAbfsIntegrationTest {

  private static final int FAILED_CALL = 15;

  private static final int TOTAL_FILES = 25;

  public ITestAzureBlobFileSystemRenameRecovery() throws Exception {
    super();
  }

  /**
   * Tests renaming a directory with a failure during the copy operation.
   * Simulates an error when copying on the 6th call.
   */
  @Test
  public void testRenameCopyFailureInBetween() throws Exception {
    try (AzureBlobFileSystem fs = Mockito.spy(this.getFileSystem(getConfig()))) {
      assumeBlobServiceType();
      AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs);
      fs.getAbfsStore().setClient(client);
      Path src = new Path("/hbase/A1/A2");
      Path dst = new Path("/hbase/A1/A3");

      // Create sample files in the source directory
      createFiles(fs, src, TOTAL_FILES);

      // Track the number of copy operations
      AtomicInteger copyCall = new AtomicInteger(0);
      renameCrashInBetween(fs, src, dst, client, copyCall);
    }
  }

  /**
   * Tests renaming a directory with a failure during the delete operation.
   * Simulates an error on the 6th delete operation and verifies the behavior.
   */
  @Test
  public void testRenameDeleteFailureInBetween() throws Exception {
    try (AzureBlobFileSystem fs = Mockito.spy(this.getFileSystem(getConfig()))) {
      assumeBlobServiceType();
      AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs);
      fs.getAbfsStore().setClient(client);
      Path src = new Path("/hbase/A1/A2");
      Path dst = new Path("/hbase/A1/A3");

      // Create sample files in the source directory
      createFiles(fs, src, TOTAL_FILES);

      // Track the number of delete operations
      AtomicInteger deleteCall = new AtomicInteger(0);
      Mockito.doAnswer(deleteRequest -> {
        if (deleteCall.get() == FAILED_CALL) {
          throw new AbfsRestOperationException(
              BLOB_PATH_NOT_FOUND.getStatusCode(),
              BLOB_PATH_NOT_FOUND.getErrorCode(),
              BLOB_PATH_NOT_FOUND.getErrorMessage(),
              new Exception());
        }
        deleteCall.incrementAndGet();
        return deleteRequest.callRealMethod();
      }).when(client).deleteBlobPath(Mockito.any(Path.class),
          Mockito.anyString(), Mockito.any(TracingContext.class));

      renameOperationWithRecovery(fs, src, dst, deleteCall);
    }
  }

  /**
   * Tests renaming a directory with a failure during the copy operation.
   * Since, destination path already exists, there will be adjustment in the
   * destination path. After crash recovery, recovery should succeed even in the
   * case when destination path already exists.
   * Simulates an error when copying on the 6th call.
   */
  @Test
  public void testRenameRecoveryWhenDestAlreadyExist() throws Exception {
    try (AzureBlobFileSystem fs = Mockito.spy(this.getFileSystem(getConfig()))) {
      assumeBlobServiceType();
      AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs);
      fs.getAbfsStore().setClient(client);
      Path src = new Path("/hbase/A1/A2");
      Path dst = new Path("/hbase/A1/A3");

      // Create sample files in the source directory
      createFiles(fs, src, TOTAL_FILES);
      // Create the destination directory
      fs.mkdirs(dst);

      // Track the number of copy operations
      AtomicInteger copyCall = new AtomicInteger(0);
      // Assertions to validate source and dest status before rename
      validateRename(fs, src, dst, true, true, false);
      renameCrashInBetween(fs, src, dst, client, copyCall);
    }
  }

  /**
   * Tests renaming a directory with a failure during the copy operation.
   * Since, destination path already exists, there will be adjustment in the
   * destination path. After crash recovery, recovery should succeed even in the
   * case when destination path already exists.
   * Simulates an error when copying on the 6th call.
   */
  @Test
  public void testRenameRecoveryWithMarkerPresentInDest() throws Exception {
    try (AzureBlobFileSystem fs = Mockito.spy(this.getFileSystem(getConfig()))) {
      assumeBlobServiceType();
      AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs);
      fs.getAbfsStore().setClient(client);
      Path src = new Path("/hbase/A1/A2");
      Path dst = new Path("/hbase/A1/A3");

      // Create sample files in the source directory
      createFiles(fs, src, TOTAL_FILES);

      // Track the number of copy operations
      AtomicInteger copyCall = new AtomicInteger(0);
      renameCrashInBetween(fs, src, dst, client, copyCall);
    }
  }

  /**
   * Test to check behaviour when rename is called on a atomic rename directory
   * for which rename pending json file is already present.
   * @throws Exception in case of failure
   */
  @Test
  public void testRenameWhenAlreadyRenamePendingJsonFilePresent() throws Exception {
    try (AzureBlobFileSystem fs = Mockito.spy(this.getFileSystem(getConfig()))) {
      assumeBlobServiceType();
      AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs);
      fs.getAbfsStore().setClient(client);
      Path src = new Path("/hbase/A1/A2");
      Path dst = new Path("/hbase/A1/A3");

      // Create sample files in the source directory
      createFiles(fs, src, TOTAL_FILES);

      // Track the number of copy operations
      AtomicInteger copyCall = new AtomicInteger(0);
      renameCrashInBetween(fs, src, dst, client, copyCall);
    }
  }

  /**
   * Test case to verify crash recovery with a single child folder.
   *
   * This test simulates a scenario where a pending rename JSON file exists for a single child folder
   * under the parent directory. It ensures that when listing the files in the parent directory,
   * only the child folder is returned, and no additional files are listed.
   *
   * @throws Exception if any error occurs during the test execution
   */
  @Test
  public void testListCrashRecoveryWithSingleChildFolder() throws Exception {
    Path path = new Path("/hbase/A1/A2");
    Path renameJson = new Path(path.getParent(), path.getName() + SUFFIX);
    AzureBlobFileSystem fs = createJsonFile(path, renameJson);

    FileStatus[] fileStatuses = fs.listStatus(new Path("/hbase/A1"));

    Assertions.assertThat(fileStatuses.length)
        .describedAs("List should return 0 file")
        .isEqualTo(0);
    assertPendingJsonFile(fs, renameJson, fileStatuses, path, false);
  }

  /**
   * Test case to verify crash recovery with multiple child folders.
   *
   * This test simulates a scenario where a pending rename JSON file exists, and multiple files are
   * created in the parent directory. It ensures that when listing the files in the parent directory,
   * the correct number of files is returned.
   *
   * @throws Exception if any error occurs during the test execution
   */
  @Test
  public void testListCrashRecoveryWithMultipleChildFolder() throws Exception {
    Path path = new Path("/hbase/A1/A2");
    Path renameJson = new Path(path.getParent(), path.getName() + SUFFIX);
    AzureBlobFileSystem fs = createJsonFile(path, renameJson);

    fs.create(new Path("/hbase/A1/file1.txt"));
    fs.create(new Path("/hbase/A1/file2.txt"));

    FileStatus[] fileStatuses = fs.listStatus(new Path("/hbase/A1"));

    Assertions.assertThat(fileStatuses.length)
        .describedAs("List should return 2 files")
        .isEqualTo(2);
    assertPendingJsonFile(fs, renameJson, fileStatuses, path, false);
  }

  /**
   * Test case to verify crash recovery with a pending rename JSON file.
   *
   * This test simulates a scenario where a pending rename JSON file exists in the parent directory,
   * and it ensures that after the deletion of the target directory and creation of new files,
   * the listing operation correctly returns the remaining files.
   *
   * @throws Exception if any error occurs during the test execution
   */
  @Test
  public void testListCrashRecoveryWithPendingJsonFile() throws Exception {
    Path path = new Path("/hbase/A1/A2");
    Path renameJson = new Path(path.getParent(), path.getName() + SUFFIX);
    AzureBlobFileSystem fs = createJsonFile(path, renameJson);

    fs.delete(path, true);
    fs.create(new Path("/hbase/A1/file1.txt"));
    fs.create(new Path("/hbase/A1/file2.txt"));

    FileStatus[] fileStatuses = fs.listStatus(path.getParent());

    Assertions.assertThat(fileStatuses.length)
        .describedAs("List should return 2 files")
        .isEqualTo(2);
    assertPendingJsonFile(fs, renameJson, fileStatuses, path, false);
  }

  /**
   * Test case to verify crash recovery when no pending rename JSON file exists.
   *
   * This test simulates a scenario where there is no pending rename JSON file in the directory.
   * It ensures that the listing operation correctly returns all files in the parent directory.
   *
   * @throws Exception if any error occurs during the test execution
   */
  @Test
  public void testListCrashRecoveryWithoutAnyPendingJsonFile() throws Exception {
    Path path = new Path("/hbase/A1/A2");
    Path renameJson = new Path(path.getParent(), path.getName() + SUFFIX);
    AzureBlobFileSystem fs = createJsonFile(path, renameJson);

    fs.delete(renameJson, true);
    fs.create(new Path("/hbase/A1/file1.txt"));
    fs.create(new Path("/hbase/A1/file2.txt"));

    FileStatus[] fileStatuses = fs.listStatus(path.getParent());

    Assertions.assertThat(fileStatuses.length)
        .describedAs("List should return 3 files")
        .isEqualTo(3);
    // Pending json file not present, no recovery take place, so source directory should exist.
    assertPendingJsonFile(fs, renameJson, fileStatuses, path, true);
  }

  /**
   * Test case to verify crash recovery when a pending rename JSON directory exists.
   *
   * This test simulates a scenario where a pending rename JSON directory exists, ensuring that the
   * listing operation correctly returns all files in the parent directory without triggering a redo
   * rename operation. It also checks that the directory with the suffix "-RenamePending.json" exists.
   *
   * @throws Exception if any error occurs during the test execution
   */
  @Test
  public void testListCrashRecoveryWithPendingJsonDir() throws Exception {
    try (AzureBlobFileSystem fs = Mockito.spy(this.getFileSystem())) {
      assumeBlobServiceType();
      AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs);

      Path path = new Path("/hbase/A1/A2");
      Path renameJson = new Path(path.getParent(), path.getName() + SUFFIX);
      fs.mkdirs(renameJson);

      fs.create(new Path(path.getParent(), "file1.txt"));
      fs.create(new Path(path, "file2.txt"));

      AtomicInteger redoRenameCall = new AtomicInteger(0);
      Mockito.doAnswer(answer -> {
        redoRenameCall.incrementAndGet();
        return answer.callRealMethod();
      }).when(client).getRedoRenameAtomicity(Mockito.any(Path.class),
          Mockito.anyInt(), Mockito.any(TracingContext.class));

      FileStatus[] fileStatuses = fs.listStatus(path.getParent());

      Assertions.assertThat(fileStatuses.length)
          .describedAs("List should return 3 files")
          .isEqualTo(3);

      Assertions.assertThat(redoRenameCall.get())
          .describedAs("No redo rename call should be made")
          .isEqualTo(0);

      Assertions.assertThat(
              Arrays.stream(fileStatuses)
                  .anyMatch(status -> renameJson.toUri().getPath().equals(status.getPath().toUri().getPath())))
          .describedAs("Directory with suffix -RenamePending.json should exist.")
          .isTrue();
    }
  }

  /**
   * Test case to verify crash recovery during listing with multiple pending rename JSON files.
   *
   * This test simulates a scenario where multiple pending rename JSON files exist, ensuring that
   * crash recovery properly handles the situation. It verifies that two redo rename calls are made
   * and that the list operation returns the correct number of paths.
   *
   * @throws Exception if any error occurs during the test execution
   */
  @Test
  public void testListCrashRecoveryWithMultipleJsonFile() throws Exception {
    Path path = new Path("/hbase/A1/A2");

    // 1st Json file
    Path renameJson = new Path(path.getParent(), path.getName() + SUFFIX);
    AzureBlobFileSystem fs = createJsonFile(path, renameJson);
    AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs);

    // 2nd Json file
    Path path2 = new Path("/hbase/A1/A3");
    fs.create(new Path(path2, "file3.txt"));

    Path renameJson2 = new Path(path2.getParent(), path2.getName() + SUFFIX);
    VersionedFileStatus fileStatus
        = (VersionedFileStatus) fs.getFileStatus(path2);

    new RenameAtomicity(path2, new Path("/hbase/test4"),
        renameJson2, getTestTracingContext(fs, true),
        fileStatus.getEtag(), client).preRename();

    fs.create(new Path(path, "file2.txt"));

    AtomicInteger redoRenameCall = new AtomicInteger(0);
    Mockito.doAnswer(answer -> {
      redoRenameCall.incrementAndGet();
      return answer.callRealMethod();
    }).when(client).getRedoRenameAtomicity(Mockito.any(Path.class),
        Mockito.anyInt(), Mockito.any(TracingContext.class));

    FileStatus[] fileStatuses = fs.listStatus(path.getParent());

    Assertions.assertThat(fileStatuses.length)
        .describedAs("List should return 0 paths")
        .isEqualTo(0);

    Assertions.assertThat(redoRenameCall.get())
        .describedAs("2 redo rename calls should be made")
        .isEqualTo(2);
    assertPathStatus(fs, path, false,
        "Source directory should not exist.");
    assertPathStatus(fs, new Path("/hbase/test4/file.txt"), true,
        "File in destination directory should exist.");
    assertPathStatus(fs, path2, false,
        "Source directory should not exist");
    assertPathStatus(fs, new Path("/hbase/test4/file2.txt"), true,
        "File in destination directory should exist.");
    assertPathStatus(fs, renameJson, false,
        "Rename Pending Json file should not exist.");
    assertPathStatus(fs, renameJson2, false,
        "Rename Pending Json file should not exist.");
  }

  /**
   * Test case to verify path status when a pending rename JSON file exists.
   *
   * This test simulates a scenario where a rename operation was pending, and ensures that
   * the path status retrieval triggers a redo rename operation. The test also checks that
   * the correct error code (`PATH_NOT_FOUND`) is returned.
   *
   * @throws Exception if any error occurs during the test execution
   */
  @Test
  public void testGetPathStatusWithPendingJsonFile() throws Exception {
    Path path = new Path("/hbase/A1/A2");
    Path renameJson = new Path(path.getParent(), path.getName() + SUFFIX);
    AzureBlobFileSystem fs = createJsonFile(path, renameJson);

    AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs);

    fs.create(new Path("/hbase/A1/file1.txt"));
    fs.create(new Path("/hbase/A1/file2.txt"));

    AbfsConfiguration conf = fs.getAbfsStore().getAbfsConfiguration();

    AtomicInteger redoRenameCall = new AtomicInteger(0);
    Mockito.doAnswer(answer -> {
      redoRenameCall.incrementAndGet();
      return answer.callRealMethod();
    }).when(client).getRedoRenameAtomicity(Mockito.any(Path.class),
        Mockito.anyInt(), Mockito.any(TracingContext.class));

    TracingContext tracingContext = new TracingContext(
        conf.getClientCorrelationId(), fs.getFileSystemId(),
        FSOperationType.GET_FILESTATUS, TracingHeaderFormat.ALL_ID_FORMAT, null);

    AzureServiceErrorCode azureServiceErrorCode = intercept(
        AbfsRestOperationException.class, () -> client.getPathStatus(
            path.toUri().getPath(), true,
            tracingContext, null)).getErrorCode();

    Assertions.assertThat(azureServiceErrorCode.getErrorCode())
        .describedAs("Path had to be recovered from atomic rename operation.")
        .isEqualTo(PATH_NOT_FOUND.getErrorCode());

    Assertions.assertThat(redoRenameCall.get())
        .describedAs("There should be one redo rename call")
        .isEqualTo(1);

    Assertions.assertThat(fs.exists(renameJson))
        .describedAs("Rename Pending Json file should not exist.")
        .isFalse();
  }

  /**
   * Test case to verify the behavior when the ETag of a file changes during a rename operation.
   *
   * This test simulates a scenario where the ETag of a file changes after the creation of a
   * rename pending JSON file. The steps include:
   * - Creating a rename pending JSON file with an old ETag.
   * - Deleting the original directory for an ETag change.
   * - Creating new files in the directory.
   * - Verifying that the copy blob call is not triggered.
   * - Verifying that the rename atomicity operation is called once.
   *
   * The test ensures that the system correctly handles the ETag change during the rename process.
   *
   * @throws Exception if any error occurs during the test execution
   */
  @Test
  public void testETagChangedDuringRename() throws Exception {
    assumeBlobServiceType();
    Path path = new Path("/hbase/A1/A2");
    Path renameJson = new Path(path.getParent(), path.getName() + SUFFIX);
    // Create rename pending json file with old etag
    AzureBlobFileSystem fs = createJsonFile(path, renameJson);
    AbfsBlobClient abfsBlobClient = (AbfsBlobClient) addSpyHooksOnClient(fs);
    fs.getAbfsStore().setClient(abfsBlobClient);

    // Delete the directory to change etag
    fs.delete(path, true);

    fs.create(new Path(path, "file1.txt"));
    fs.create(new Path(path, "file2.txt"));
    AtomicInteger numberOfCopyBlobCalls = new AtomicInteger(0);
    Mockito.doAnswer(copyBlob -> {
          numberOfCopyBlobCalls.incrementAndGet();
          return copyBlob.callRealMethod();
        })
        .when(abfsBlobClient)
        .copyBlob(Mockito.any(Path.class), Mockito.any(Path.class),
            Mockito.nullable(String.class),
            Mockito.any(TracingContext.class));

    AtomicInteger numberOfRedoRenameAtomicityCalls = new AtomicInteger(0);
    Mockito.doAnswer(redoRenameAtomicity -> {
          numberOfRedoRenameAtomicityCalls.incrementAndGet();
          return redoRenameAtomicity.callRealMethod();
        })
        .when(abfsBlobClient)
        .getRedoRenameAtomicity(Mockito.any(Path.class), Mockito.anyInt(),
            Mockito.any(TracingContext.class));
    // Call list status to trigger rename redo
    fs.listStatus(path.getParent());
    Assertions.assertThat(numberOfRedoRenameAtomicityCalls.get())
        .describedAs("There should be one call to getRedoRenameAtomicity")
        .isEqualTo(1);
    Assertions.assertThat(numberOfCopyBlobCalls.get())
        .describedAs("There should be no copy blob call")
        .isEqualTo(0);
    Assertions.assertThat(fs.exists(renameJson))
        .describedAs("Rename Pending Json file should not exist.")
        .isFalse();
  }

  /**
   * Triggers rename recovery by calling getPathStatus on the source path.
   * This simulates a scenario where the rename operation was interrupted,
   * and the system needs to recover the state of the source path.
   *
   * @param fs The AzureBlobFileSystem instance.
   * @param src The source path to trigger recovery on.
   * @throws Exception If an error occurs during the recovery process.
   */
  private void triggerRenameRecovery(AzureBlobFileSystem fs, Path src) throws Exception {
    // Trigger rename recovery
    TracingContext tracingContext = new TracingContext(
        getConfiguration().getClientCorrelationId(), fs.getFileSystemId(),
        FSOperationType.GET_FILESTATUS, TracingHeaderFormat.ALL_ID_FORMAT, null);
    AzureServiceErrorCode errorCode = LambdaTestUtils.intercept(
        AbfsRestOperationException.class, () -> {
          fs.getAbfsStore().getClient().getPathStatus(src.toUri().getPath(), true,
              tracingContext, null);
        }).getErrorCode();
    Assertions.assertThat(errorCode)
        .describedAs("Path had to be recovered from atomic rename operation.")
        .isEqualTo(PATH_NOT_FOUND);
  }

  /**
   * Simulates a failure during the rename operation by throwing an exception
   * when the copyBlob method is called. This is used to test the behavior of
   * the rename recovery operation when a blob already exists at the destination.
   *
   * @param fs The AzureBlobFileSystem instance.
   * @param src The source path to rename.
   * @param dst The destination path for the rename operation.
   * @param client The AbfsBlobClient instance.
   * @param copyCall The AtomicInteger to track the number of copy calls.
   * @throws AzureBlobFileSystemException If an error occurs during the operation.
   */
  private void renameCrashInBetween(AzureBlobFileSystem fs, Path src, Path dst,
      AbfsBlobClient client, AtomicInteger copyCall)
      throws Exception {
    Mockito.doAnswer(copyRequest -> {
      if (copyCall.get() == FAILED_CALL) {
        throw new AbfsRestOperationException(
            BLOB_ALREADY_EXISTS.getStatusCode(),
            BLOB_ALREADY_EXISTS.getErrorCode(),
            BLOB_ALREADY_EXISTS.getErrorMessage(),
            new Exception());
      }
      copyCall.incrementAndGet();
      return copyRequest.callRealMethod();
    }).when(client).copyBlob(Mockito.any(Path.class),
        Mockito.any(Path.class), Mockito.nullable(String.class),
        Mockito.any(TracingContext.class));
    renameOperationWithRecovery(fs, src, dst, copyCall);
  }

  /**
   * Helper method to create the configuration for the AzureBlobFileSystem.
   *
   * @return The configuration object.
   */
  private Configuration getConfig() {
    Configuration config = new Configuration(this.getRawConfiguration());
    config.set(FS_AZURE_PRODUCER_QUEUE_MAX_SIZE, "5");
    config.set(FS_AZURE_CONSUMER_MAX_LAG, "3");
    config.set(FS_AZURE_BLOB_DIR_RENAME_MAX_THREAD, "2");
    return config;
  }

  /**
   * Spies on the AzureBlobFileSystem's store and client to enable mocking and verification
   * of client interactions in tests. It replaces the actual store and client with mocked versions.
   *
   * @param fs the AzureBlobFileSystem instance
   * @return the spied AbfsClient for interaction verification
   */
  private AbfsClient addSpyHooksOnClient(final AzureBlobFileSystem fs) {
    AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore());
    Mockito.doReturn(store).when(fs).getAbfsStore();
    AbfsClient client = Mockito.spy(store.getClient());
    Mockito.doReturn(client).when(store).getClient();
    return client;
  }

  /**
   * Helper method to validate that the rename was successful and that the destination exists.
   *
   * @param fs The AzureBlobFileSystem instance to check the existence on.
   * @param dst The destination path.
   * @param src The source path.
   * @throws IOException If an I/O error occurs during the validation.
   */
  private void validateRename(AzureBlobFileSystem fs, Path src, Path dst,
      boolean isSrcExist, boolean isDstExist, boolean isJsonExist) throws Exception {
    // Validate pending JSON file status
    assertPathStatus(fs,
        new Path(src.getParent(), src.getName() + SUFFIX), isJsonExist,
        "Pending JSON file");

    // Validate source directory status
    assertPathStatus(fs, src, isSrcExist, "Source directory");

    // Validate destination directory status
    assertPathStatus(fs, dst, isDstExist, "Destination directory");
  }

  /**
   * Helper method to assert the status of a path in the AzureBlobFileSystem.
   *
   * @param fs The AzureBlobFileSystem instance to check the existence on.
   * @param path The path to check.
   * @param shouldExist Whether the path should exist or not.
   * @param description A description for the assertion.
   * @throws Exception If an error occurs during the assertion.
   */
  private void assertPathStatus(AzureBlobFileSystem fs, Path path,
      boolean shouldExist, String description) throws Exception{
    TracingContext tracingContext = getTestTracingContext(fs, true);
    AbfsBlobClient client = ((AbfsBlobClient) fs.getAbfsClient());
    if (shouldExist) {
      int actualStatus = client.getPathStatus(
              path.toUri().getPath(), tracingContext,
              null, true)
          .getResult().getStatusCode();
      Assertions.assertThat(actualStatus)
          .describedAs("%s should exists", description)
          .isEqualTo(HTTP_OK);
    } else {
      AzureServiceErrorCode errorCode = LambdaTestUtils.intercept(
          AbfsRestOperationException.class, () -> {
            client.getPathStatus(path.toUri().getPath(), true,
                tracingContext, null);
          }).getErrorCode();
      Assertions.assertThat(errorCode)
          .describedAs("%s should not exists", description)
          .isEqualTo(BLOB_PATH_NOT_FOUND);
    }
  }

  /**
   * Helper method to create a json file.
   * @param path parent path
   * @param renameJson rename json path
   * @return file system
   * @throws IOException in case of failure
   */
  private AzureBlobFileSystem createJsonFile(Path path, Path renameJson)
      throws IOException {
    final AzureBlobFileSystem fs = Mockito.spy(this.getFileSystem());
    assumeBlobServiceType();
    AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore());
    Mockito.doReturn(store).when(fs).getAbfsStore();
    AbfsClient client = Mockito.spy(store.getClient());
    Mockito.doReturn(client).when(store).getClient();

    fs.setWorkingDirectory(new Path(ROOT_PATH));
    fs.create(new Path(path, "file.txt"));

    VersionedFileStatus fileStatus
        = (VersionedFileStatus) fs.getFileStatus(path);

    new RenameAtomicity(path, new Path("/hbase/test4"),
        renameJson, getTestTracingContext(fs, true),
        fileStatus.getEtag(), client)
        .preRename();

    Assertions.assertThat(fs.exists(renameJson))
        .describedAs("Rename Pending Json file should exist.")
        .isTrue();

    return fs;
  }

  /**
   * Helper method to perform the rename operation and validate the results.
   *
   * @param fs The AzureBlobFileSystem instance to use for the rename operation.
   * @param src The source path (directory).
   * @param dst The destination path (directory).
   * @param countCall The AtomicInteger to track the number of operations.
   * @throws Exception If an error occurs during the rename operation.
   */
  private void renameOperationWithRecovery(AzureBlobFileSystem fs, Path src,
      Path dst, AtomicInteger countCall) throws Exception {
    Assertions.assertThat(fs.rename(src, dst))
        .describedAs("Rename should crash in between.")
        .isFalse();

    // Validate copy operation count
    Assertions.assertThat(countCall.get())
        .describedAs("Operation count should be less than 10.")
        .isLessThan(TOTAL_FILES);

    // Assertions to validate renamed destination and source
    validateRename(fs, src, dst, true, true, true);

    // Validate that rename redo operation was triggered
    countCall.set(0);
    triggerRenameRecovery(fs, src);

    Assertions.assertThat(countCall.get())
        .describedAs("Operation count should be greater than 0.")
        .isGreaterThan(0);

    // Validate final state of destination and source
    validateRename(fs, src, dst, false, true, false);
  }

  /**
   * Helper method to assert that the pending JSON file does not exist
   * and that the list of file statuses does not contain the rename pending JSON file.
   *
   * @param fs The AzureBlobFileSystem instance.
   * @param renameJson The path of the rename pending JSON file.
   * @param fileStatuses The array of FileStatus objects to check.
   * @param srcPath The source path to check.
   * @throws Exception If an error occurs during the assertion.
   */
  private void assertPendingJsonFile(AzureBlobFileSystem fs,
      Path renameJson, FileStatus[] fileStatuses,
      Path srcPath, boolean isSrcPathExist) throws Exception {
    Assertions.assertThat(fs.exists(renameJson))
        .describedAs("Rename Pending Json file should not exist.")
        .isFalse();

    Assertions.assertThat(
            Arrays.stream(fileStatuses)
                .anyMatch(status ->
                    renameJson.toUri().getPath()
                        .equals(status.getPath().toUri().getPath())))
        .describedAs(
            "List status should not contains any file with suffix -RenamePending.json.")
        .isFalse();

    Assertions.assertThat(
            Arrays.stream(fileStatuses)
                .anyMatch(status ->
                    srcPath.toUri().getPath()
                        .equals(status.getPath().toUri().getPath())))
        .describedAs(
            "List status should not contains source path.")
        .isEqualTo(isSrcPathExist);
  }
}