ITestFileSystemOperationsWithThreads.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.fs.azure;

import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import java.io.IOException;
import java.net.URI;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azure.NativeAzureFileSystem.FolderRenamePending;
import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Tests the Native Azure file system (WASB) using parallel threads for rename and delete operations.
 */
public class ITestFileSystemOperationsWithThreads extends AbstractWasbTestBase {

  private final int renameThreads = 10;
  private final int deleteThreads = 20;
  private int iterations = 1;
  private LogCapturer logs = null;

  @Rule
  public ExpectedException exception = ExpectedException.none();

  @Before
  public void setUp() throws Exception {
    super.setUp();
    Configuration conf = fs.getConf();

    // By default enable parallel threads for rename and delete operations.
    // Also enable flat listing of blobs for these operations.
    conf.setInt(NativeAzureFileSystem.AZURE_RENAME_THREADS, renameThreads);
    conf.setInt(NativeAzureFileSystem.AZURE_DELETE_THREADS, deleteThreads);
    conf.setBoolean(AzureNativeFileSystemStore.KEY_ENABLE_FLAT_LISTING, true);

    URI uri = fs.getUri();
    fs.initialize(uri, conf);

    // Capture logs
    logs = LogCapturer.captureLogs(LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME));
  }

  /*
   * Helper method to create sub directory and different types of files
   * for multiple iterations.
   */
  private void createFolder(FileSystem fs, String root) throws Exception {
    fs.mkdirs(new Path(root));
    for (int i = 0; i < this.iterations; i++) {
      fs.mkdirs(new Path(root + "/" + i));
      fs.createNewFile(new Path(root + "/" + i + "/fileToRename"));
      fs.createNewFile(new Path(root + "/" + i + "/file/to/rename"));
      fs.createNewFile(new Path(root + "/" + i + "/file+to%rename"));
      fs.createNewFile(new Path(root + "/fileToRename" + i));
    }
  }

  /*
   * Helper method to do rename operation and validate all files in source folder
   * doesn't exists and similar files exists in new folder.
   */
  private void validateRenameFolder(FileSystem fs, String source, String dest) throws Exception {
    // Create source folder with files.
    createFolder(fs, source);
    Path sourceFolder = new Path(source);
    Path destFolder = new Path(dest);

    // rename operation
    assertTrue(fs.rename(sourceFolder, destFolder));
    assertTrue(fs.exists(destFolder));

    for (int i = 0; i < this.iterations; i++) {
      // Check destination folder and files exists.
      assertTrue(fs.exists(new Path(dest + "/" + i)));
      assertTrue(fs.exists(new Path(dest + "/" + i + "/fileToRename")));
      assertTrue(fs.exists(new Path(dest + "/" + i + "/file/to/rename")));
      assertTrue(fs.exists(new Path(dest + "/" + i + "/file+to%rename")));
      assertTrue(fs.exists(new Path(dest + "/fileToRename" + i)));

      // Check source folder and files doesn't exists.
      assertFalse(fs.exists(new Path(source + "/" + i)));
      assertFalse(fs.exists(new Path(source + "/" + i + "/fileToRename")));
      assertFalse(fs.exists(new Path(source + "/" + i + "/file/to/rename")));
      assertFalse(fs.exists(new Path(source + "/" + i + "/file+to%rename")));
      assertFalse(fs.exists(new Path(source + "/fileToRename" + i)));
    }
  }

  /*
   * Test case for rename operation with multiple threads and flat listing enabled.
   */
  @Test
  public void testRenameSmallFolderWithThreads() throws Exception {

    validateRenameFolder(fs, "root", "rootnew");

    // With single iteration, we would have created 7 blobs.
    int expectedThreadsCreated = Math.min(7, renameThreads);

    // Validate from logs that threads are created.
    String content = logs.getOutput();
    assertInLog(content, "ms with threads: " + expectedThreadsCreated);

    // Validate thread executions
    for (int i = 0; i < expectedThreadsCreated; i++) {
      assertInLog(content,
          "AzureBlobRenameThread-" + Thread.currentThread().getName() + "-" + i);
    }

    // Also ensure that we haven't spawned extra threads.
    if (expectedThreadsCreated < renameThreads) {
      for (int i = expectedThreadsCreated; i < renameThreads; i++) {
        assertNotInLog(content,
            "AzureBlobRenameThread-" + Thread.currentThread().getName() + "-" + i);
      }
    }
  }

  /*
   * Test case for rename operation with multiple threads and flat listing enabled.
   */
  @Test
  public void testRenameLargeFolderWithThreads() throws Exception {

    // Populate source folder with large number of files and directories.
    this.iterations = 10;
    validateRenameFolder(fs, "root", "rootnew");

    // Validate from logs that threads are created.
    String content = logs.getOutput();
    assertInLog(content, "ms with threads: " + renameThreads);

    // Validate thread executions
    for (int i = 0; i < renameThreads; i++) {
      assertInLog(content,
          "AzureBlobRenameThread-" + Thread.currentThread().getName() + "-" + i);
    }
  }

  /*
   * Test case for rename operation with threads disabled and flat listing enabled.
   */
  @Test
  public void testRenameLargeFolderDisableThreads() throws Exception {
    Configuration conf = fs.getConf();

    // Number of threads set to 0 or 1 disables threads.
    conf.setInt(NativeAzureFileSystem.AZURE_RENAME_THREADS, 0);
    URI uri = fs.getUri();
    fs.initialize(uri, conf);

    // Populate source folder with large number of files and directories.
    this.iterations = 10;
    validateRenameFolder(fs, "root", "rootnew");

    // Validate from logs that threads are disabled.
    String content = logs.getOutput();
    assertInLog(content,
        "Disabling threads for Rename operation as thread count 0");

    // Validate no thread executions
    for (int i = 0; i < renameThreads; i++) {
      String term = "AzureBlobRenameThread-"
          + Thread.currentThread().getName()
          + "-" + i;
      assertNotInLog(content, term);
    }
  }

  /**
   * Assert that a log contains the given term.
   * @param content log output
   * @param term search term
   */
  protected void assertInLog(String content, String term) {
    assertTrue("Empty log", !content.isEmpty());
    if (!content.contains(term)) {
      String message = "No " + term + " found in logs";
      LOG.error(message);
      System.err.println(content);
      fail(message);
    }
  }

  /**
   * Assert that a log does not contain the given term.
   * @param content log output
   * @param term search term
   */
  protected void assertNotInLog(String content, String term) {
    assertTrue("Empty log", !content.isEmpty());
    if (content.contains(term)) {
      String message = term + " found in logs";
      LOG.error(message);
      System.err.println(content);
      fail(message);
    }
  }

  /*
   * Test case for rename operation with threads and flat listing disabled.
   */
  @Test
  public void testRenameSmallFolderDisableThreadsDisableFlatListing() throws Exception {
    Configuration conf = fs.getConf();
    conf = fs.getConf();

    // Number of threads set to 0 or 1 disables threads.
    conf.setInt(NativeAzureFileSystem.AZURE_RENAME_THREADS, 1);
    conf.setBoolean(AzureNativeFileSystemStore.KEY_ENABLE_FLAT_LISTING, false);
    URI uri = fs.getUri();
    fs.initialize(uri, conf);

    validateRenameFolder(fs, "root", "rootnew");

    // Validate from logs that threads are disabled.
    String content = logs.getOutput();
    assertInLog(content,
        "Disabling threads for Rename operation as thread count 1");

    // Validate no thread executions
    for (int i = 0; i < renameThreads; i++) {
      assertNotInLog(content,
          "AzureBlobRenameThread-" + Thread.currentThread().getName() + "-" + i);
    }
  }

  /*
   * Helper method to do delete operation and validate all files in source folder
   * doesn't exists after delete operation.
   */
  private void validateDeleteFolder(FileSystem fs, String source)  throws Exception {
    // Create folder with files.
    createFolder(fs, "root");
    Path sourceFolder = new Path(source);

    // Delete operation
    assertTrue(fs.delete(sourceFolder, true));
    assertFalse(fs.exists(sourceFolder));

    for (int i = 0; i < this.iterations; i++) {
      // check that source folder and files doesn't exists
      assertFalse(fs.exists(new Path(source + "/" + i)));
      assertFalse(fs.exists(new Path(source + "/" + i + "/fileToRename")));
      assertFalse(fs.exists(new Path(source + "/" + i + "/file/to/rename")));
      assertFalse(fs.exists(new Path(source + "/" + i + "/file+to%rename")));
      assertFalse(fs.exists(new Path(source + "/fileToRename" + i)));
    }
  }

  /*
   * Test case for delete operation with multiple threads and flat listing enabled.
   */
  @Test
  public void testDeleteSmallFolderWithThreads() throws Exception {

    validateDeleteFolder(fs, "root");

    // With single iteration, we would have created 7 blobs.
    int expectedThreadsCreated = Math.min(7, deleteThreads);

    // Validate from logs that threads are enabled.
    String content = logs.getOutput();
    assertInLog(content, "ms with threads: " + expectedThreadsCreated);

    // Validate thread executions
    for (int i = 0; i < expectedThreadsCreated; i++) {
      assertInLog(content,
          "AzureBlobDeleteThread-" + Thread.currentThread().getName() + "-" + i);
    }

    // Also ensure that we haven't spawned extra threads.
    if (expectedThreadsCreated < deleteThreads) {
      for (int i = expectedThreadsCreated; i < deleteThreads; i++) {
        assertNotInLog(content,
            "AzureBlobDeleteThread-" + Thread.currentThread().getName() + "-" + i);
      }
    }
  }

  /*
   * Test case for delete operation with multiple threads and flat listing enabled.
   */
  @Test
  public void testDeleteLargeFolderWithThreads() throws Exception {
    // Populate source folder with large number of files and directories.
    this.iterations = 10;
    validateDeleteFolder(fs, "root");

    // Validate from logs that threads are enabled.
    String content = logs.getOutput();
    assertInLog(content, "ms with threads: " + deleteThreads);

    // Validate thread executions
    for (int i = 0; i < deleteThreads; i++) {
      assertInLog(content,
          "AzureBlobDeleteThread-" + Thread.currentThread().getName() + "-" + i);
    }
  }

  /*
   * Test case for delete operation with threads disabled and flat listing enabled.
   */
  @Test
  public void testDeleteLargeFolderDisableThreads() throws Exception {
    Configuration conf = fs.getConf();
    conf.setInt(NativeAzureFileSystem.AZURE_DELETE_THREADS, 0);
    URI uri = fs.getUri();
    fs.initialize(uri, conf);

    // Populate source folder with large number of files and directories.
    this.iterations = 10;
    validateDeleteFolder(fs, "root");

    // Validate from logs that threads are disabled.
    String content = logs.getOutput();
    assertInLog(content,
        "Disabling threads for Delete operation as thread count 0");

    // Validate no thread executions
    for (int i = 0; i < deleteThreads; i++) {
      assertNotInLog(content,
          "AzureBlobDeleteThread-" + Thread.currentThread().getName() + "-" + i);
    }
  }

  /*
   * Test case for rename operation with threads and flat listing disabled.
   */
  @Test
  public void testDeleteSmallFolderDisableThreadsDisableFlatListing() throws Exception {
    Configuration conf = fs.getConf();

    // Number of threads set to 0 or 1 disables threads.
    conf.setInt(NativeAzureFileSystem.AZURE_DELETE_THREADS, 1);
    conf.setBoolean(AzureNativeFileSystemStore.KEY_ENABLE_FLAT_LISTING, false);
    URI uri = fs.getUri();
    fs.initialize(uri, conf);

    validateDeleteFolder(fs, "root");

    // Validate from logs that threads are disabled.
    String content = logs.getOutput();
    assertInLog(content,
        "Disabling threads for Delete operation as thread count 1");

    // Validate no thread executions
    for (int i = 0; i < deleteThreads; i++) {
      assertNotInLog(content,
          "AzureBlobDeleteThread-" + Thread.currentThread().getName() + "-" + i);
    }
  }

  /*
   * Test case for delete operation with multiple threads and flat listing enabled.
   */
  @Test
  public void testDeleteThreadPoolExceptionFailure() throws Exception {

    // Spy azure file system object and raise exception for new thread pool
    NativeAzureFileSystem mockFs = Mockito.spy((NativeAzureFileSystem) fs);

    String path = mockFs.pathToKey(mockFs.makeAbsolute(new Path("root")));

    AzureFileSystemThreadPoolExecutor mockThreadPoolExecutor = Mockito.spy(
        mockFs.getThreadPoolExecutor(deleteThreads, "AzureBlobDeleteThread", "Delete",
            path, NativeAzureFileSystem.AZURE_DELETE_THREADS));
    Mockito.when(mockThreadPoolExecutor.getThreadPool(7)).thenThrow(new Exception());

    // With single iteration, we would have created 7 blobs resulting 7 threads.
    Mockito.when(mockFs.getThreadPoolExecutor(deleteThreads, "AzureBlobDeleteThread", "Delete",
        path, NativeAzureFileSystem.AZURE_DELETE_THREADS)).thenReturn(mockThreadPoolExecutor);

    validateDeleteFolder(mockFs, "root");

    // Validate from logs that threads are disabled.
    String content = logs.getOutput();
    assertInLog(content, "Failed to create thread pool with threads");
    assertInLog(content, "Serializing the Delete operation");
  }

  /*
   * Test case for delete operation with multiple threads and flat listing enabled.
   */
  @Test
  public void testDeleteThreadPoolExecuteFailure() throws Exception {

    // Mock thread pool executor to throw exception for all requests.
    ThreadPoolExecutor mockThreadExecutor = Mockito.mock(ThreadPoolExecutor.class);
    Mockito.doThrow(new RejectedExecutionException()).when(mockThreadExecutor).execute(Mockito.any(Runnable.class));

    // Spy azure file system object and return mocked thread pool
    NativeAzureFileSystem mockFs = Mockito.spy((NativeAzureFileSystem) fs);

    String path = mockFs.pathToKey(mockFs.makeAbsolute(new Path("root")));

    AzureFileSystemThreadPoolExecutor mockThreadPoolExecutor = Mockito.spy(
        mockFs.getThreadPoolExecutor(deleteThreads, "AzureBlobDeleteThread", "Delete",
            path, NativeAzureFileSystem.AZURE_DELETE_THREADS));
    Mockito.when(mockThreadPoolExecutor.getThreadPool(7)).thenReturn(mockThreadExecutor);

    // With single iteration, we would have created 7 blobs resulting 7 threads.
    Mockito.when(mockFs.getThreadPoolExecutor(deleteThreads, "AzureBlobDeleteThread", "Delete",
        path, NativeAzureFileSystem.AZURE_DELETE_THREADS)).thenReturn(mockThreadPoolExecutor);

    validateDeleteFolder(mockFs, "root");

    // Validate from logs that threads are disabled.
    String content = logs.getOutput();
    assertInLog(content,
        "Rejected execution of thread for Delete operation on blob");
    assertInLog(content, "Serializing the Delete operation");
  }

  /*
   * Test case for delete operation with multiple threads and flat listing enabled.
   */
  @Test
  public void testDeleteThreadPoolExecuteSingleThreadFailure() throws Exception {

    // Spy azure file system object and return mocked thread pool
    NativeAzureFileSystem mockFs = Mockito.spy((NativeAzureFileSystem) fs);

    // Spy a thread pool executor and link it to azure file system object.
    String path = mockFs.pathToKey(mockFs.makeAbsolute(new Path("root")));
    AzureFileSystemThreadPoolExecutor mockThreadPoolExecutor = Mockito.spy(
        mockFs.getThreadPoolExecutor(deleteThreads, "AzureBlobDeleteThread", "Delete",
            path, NativeAzureFileSystem.AZURE_DELETE_THREADS));

    // With single iteration, we would have created 7 blobs resulting 7 threads.
    Mockito.when(mockFs.getThreadPoolExecutor(deleteThreads, "AzureBlobDeleteThread", "Delete",
        path, NativeAzureFileSystem.AZURE_DELETE_THREADS)).thenReturn(mockThreadPoolExecutor);

    // Create a thread executor and link it to mocked thread pool executor object.
    ThreadPoolExecutor mockThreadExecutor = Mockito.spy(mockThreadPoolExecutor.getThreadPool(7));
    Mockito.when(mockThreadPoolExecutor.getThreadPool(7)).thenReturn(mockThreadExecutor);

    // Mock thread executor to throw exception for all requests.
    Mockito.doCallRealMethod().doThrow(new RejectedExecutionException()).when(mockThreadExecutor).execute(Mockito.any(Runnable.class));

    validateDeleteFolder(mockFs, "root");

    // Validate from logs that threads are enabled and unused threads.
    String content = logs.getOutput();
    assertInLog(content,
        "Using thread pool for Delete operation with threads 7");
    assertInLog(content,
        "6 threads not used for Delete operation on blob");
  }

  /*
   * Test case for delete operation with multiple threads and flat listing enabled.
   */
  @Test
  public void testDeleteThreadPoolTerminationFailure() throws Exception {

    // Spy azure file system object and return mocked thread pool
    NativeAzureFileSystem mockFs = Mockito.spy((NativeAzureFileSystem) fs);

    // Spy a thread pool executor and link it to azure file system object.
    String path = mockFs.pathToKey(mockFs.makeAbsolute(new Path("root")));
    AzureFileSystemThreadPoolExecutor mockThreadPoolExecutor = Mockito.spy(
        ((NativeAzureFileSystem) fs).getThreadPoolExecutor(deleteThreads, "AzureBlobDeleteThread", "Delete",
            path, NativeAzureFileSystem.AZURE_DELETE_THREADS));

    // Create a thread executor and link it to mocked thread pool executor object.
    // Mock thread executor to throw exception for terminating threads.
    ThreadPoolExecutor mockThreadExecutor = Mockito.mock(ThreadPoolExecutor.class);
    Mockito.doNothing().when(mockThreadExecutor).execute(Mockito.any(Runnable.class));
    Mockito.when(mockThreadExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS)).thenThrow(new InterruptedException());

    Mockito.when(mockThreadPoolExecutor.getThreadPool(7)).thenReturn(mockThreadExecutor);

    // With single iteration, we would have created 7 blobs resulting 7 threads.
    Mockito.when(mockFs.getThreadPoolExecutor(deleteThreads, "AzureBlobDeleteThread", "Delete",
        path, NativeAzureFileSystem.AZURE_DELETE_THREADS)).thenReturn(mockThreadPoolExecutor);

    createFolder(mockFs, "root");
    Path sourceFolder = new Path("root");
    boolean exception = false;
    try {
      mockFs.delete(sourceFolder, true);
    } catch (IOException e){
      exception = true;
    }

    assertTrue(exception);
    assertTrue(mockFs.exists(sourceFolder));

    // Validate from logs that threads are enabled and delete operation is failed.
    String content = logs.getOutput();
    assertInLog(content,
        "Using thread pool for Delete operation with threads");
    assertInLog(content, "Threads got interrupted Delete blob operation");
    assertInLog(content,
        "Delete failed as operation on subfolders and files failed.");
  }

  /*
   * Validate that when a directory is deleted recursively, the operation succeeds
   * even if a child directory delete fails because the directory does not exist.
   * This can happen if a child directory is deleted by an external agent while
   * the parent is in progress of being deleted recursively.
   */
  @Test
  public void testRecursiveDirectoryDeleteWhenChildDirectoryDeleted()
      throws Exception {
    testRecusiveDirectoryDelete(true);
  }

  /*
   * Validate that when a directory is deleted recursively, the operation succeeds
   * even if a file delete fails because it does not exist.
   * This can happen if a file is deleted by an external agent while
   * the parent directory is in progress of being deleted.
   */
  @Test
  public void testRecursiveDirectoryDeleteWhenDeletingChildFileReturnsFalse()
      throws Exception {
    testRecusiveDirectoryDelete(false);
  }

  private void testRecusiveDirectoryDelete(boolean useDir) throws Exception {
    String childPathToBeDeletedByExternalAgent = (useDir)
        ? "root/0"
        : "root/0/fileToRename";
    // Spy azure file system object and return false for deleting one file
    NativeAzureFileSystem mockFs = Mockito.spy((NativeAzureFileSystem) fs);
    String path = mockFs.pathToKey(mockFs.makeAbsolute(new Path(
        childPathToBeDeletedByExternalAgent)));

    Answer<Boolean> answer = new Answer<Boolean>() {
      public Boolean answer(InvocationOnMock invocation) throws Throwable {
        String path = (String) invocation.getArguments()[0];
        boolean isDir = (boolean) invocation.getArguments()[1];
        boolean realResult = fs.deleteFile(path, isDir);
        assertTrue(realResult);
        boolean fakeResult = false;
        return fakeResult;
      }
    };

    Mockito.when(mockFs.deleteFile(path, useDir)).thenAnswer(answer);

    createFolder(mockFs, "root");
    Path sourceFolder = new Path("root");

    assertTrue(mockFs.delete(sourceFolder, true));
    assertFalse(mockFs.exists(sourceFolder));

    // Validate from logs that threads are enabled, that a child directory was
    // deleted by an external caller, and the parent delete operation still
    // succeeds.
    String content = logs.getOutput();
    assertInLog(content,
        "Using thread pool for Delete operation with threads");
    assertInLog(content, String.format("Attempt to delete non-existent %s %s",
        useDir ? "directory" : "file", path));
  }

  /*
   * Test case for delete operation with multiple threads and flat listing enabled.
   */
  @Test
  public void testDeleteSingleDeleteException() throws Exception {

    // Spy azure file system object and raise exception for deleting one file
    NativeAzureFileSystem mockFs = Mockito.spy((NativeAzureFileSystem) fs);
    String path = mockFs.pathToKey(mockFs.makeAbsolute(new Path("root/0")));
    Mockito.doThrow(new IOException()).when(mockFs).deleteFile(path, true);

    createFolder(mockFs, "root");
    Path sourceFolder = new Path("root");

    boolean exception = false;
    try {
      mockFs.delete(sourceFolder, true);
    } catch (IOException e){
      exception = true;
    }

    assertTrue(exception);
    assertTrue(mockFs.exists(sourceFolder));

    // Validate from logs that threads are enabled and delete operation failed.
    String content = logs.getOutput();
    assertInLog(content,
        "Using thread pool for Delete operation with threads");
    assertInLog(content,
        "Encountered Exception for Delete operation for file " + path);
    assertInLog(content,
        "Terminating execution of Delete operation now as some other thread already got exception or operation failed");
  }

  /*
   * Test case for rename operation with multiple threads and flat listing enabled.
   */
  @Test
  public void testRenameThreadPoolExceptionFailure() throws Exception {

    // Spy azure file system object and raise exception for new thread pool
    NativeAzureFileSystem mockFs = Mockito.spy((NativeAzureFileSystem) fs);

    String path = mockFs.pathToKey(mockFs.makeAbsolute(new Path("root")));
    AzureFileSystemThreadPoolExecutor mockThreadPoolExecutor = Mockito.spy(
        ((NativeAzureFileSystem) fs).getThreadPoolExecutor(renameThreads, "AzureBlobRenameThread", "Rename",
            path, NativeAzureFileSystem.AZURE_RENAME_THREADS));
    Mockito.when(mockThreadPoolExecutor.getThreadPool(7)).thenThrow(new Exception());

    // With single iteration, we would have created 7 blobs resulting 7 threads.
    Mockito.doReturn(mockThreadPoolExecutor).when(mockFs).getThreadPoolExecutor(renameThreads, "AzureBlobRenameThread", "Rename",
        path, NativeAzureFileSystem.AZURE_RENAME_THREADS);

    validateRenameFolder(mockFs, "root", "rootnew");

    // Validate from logs that threads are disabled.
    String content = logs.getOutput();
    assertInLog(content, "Failed to create thread pool with threads");
    assertInLog(content, "Serializing the Rename operation");
  }

  /*
   * Test case for rename operation with multiple threads and flat listing enabled.
   */
  @Test
  public void testRenameThreadPoolExecuteFailure() throws Exception {

    // Mock thread pool executor to throw exception for all requests.
    ThreadPoolExecutor mockThreadExecutor = Mockito.mock(ThreadPoolExecutor.class);
    Mockito.doThrow(new RejectedExecutionException()).when(mockThreadExecutor).execute(Mockito.any(Runnable.class));

    // Spy azure file system object and return mocked thread pool
    NativeAzureFileSystem mockFs = Mockito.spy((NativeAzureFileSystem) fs);

    String path = mockFs.pathToKey(mockFs.makeAbsolute(new Path("root")));
    AzureFileSystemThreadPoolExecutor mockThreadPoolExecutor = Mockito.spy(
        mockFs.getThreadPoolExecutor(renameThreads, "AzureBlobRenameThread", "Rename",
            path, NativeAzureFileSystem.AZURE_RENAME_THREADS));
    Mockito.when(mockThreadPoolExecutor.getThreadPool(7)).thenReturn(mockThreadExecutor);

    // With single iteration, we would have created 7 blobs resulting 7 threads.
    Mockito.when(mockFs.getThreadPoolExecutor(renameThreads, "AzureBlobRenameThread", "Rename",
        path, NativeAzureFileSystem.AZURE_RENAME_THREADS)).thenReturn(mockThreadPoolExecutor);

    validateRenameFolder(mockFs, "root", "rootnew");

    // Validate from logs that threads are disabled.
    String content = logs.getOutput();
    assertInLog(content,
        "Rejected execution of thread for Rename operation on blob");
    assertInLog(content, "Serializing the Rename operation");
  }

  /*
   * Test case for rename operation with multiple threads and flat listing enabled.
   */
  @Test
  public void testRenameThreadPoolExecuteSingleThreadFailure() throws Exception {

    // Spy azure file system object and return mocked thread pool
    NativeAzureFileSystem mockFs = Mockito.spy((NativeAzureFileSystem) fs);

    // Spy a thread pool executor and link it to azure file system object.
    String path = mockFs.pathToKey(mockFs.makeAbsolute(new Path("root")));
    AzureFileSystemThreadPoolExecutor mockThreadPoolExecutor = Mockito.spy(
        mockFs.getThreadPoolExecutor(renameThreads, "AzureBlobRenameThread", "Rename",
            path, NativeAzureFileSystem.AZURE_RENAME_THREADS));

    // With single iteration, we would have created 7 blobs resulting 7 threads.
    Mockito.when(mockFs.getThreadPoolExecutor(renameThreads, "AzureBlobRenameThread", "Rename",
        path, NativeAzureFileSystem.AZURE_RENAME_THREADS)).thenReturn(mockThreadPoolExecutor);

    // Create a thread executor and link it to mocked thread pool executor object.
    ThreadPoolExecutor mockThreadExecutor = Mockito.spy(mockThreadPoolExecutor.getThreadPool(7));
    Mockito.when(mockThreadPoolExecutor.getThreadPool(7)).thenReturn(mockThreadExecutor);

    // Mock thread executor to throw exception for all requests.
    Mockito.doCallRealMethod().doThrow(new RejectedExecutionException()).when(mockThreadExecutor).execute(Mockito.any(Runnable.class));

    validateRenameFolder(mockFs, "root", "rootnew");

    // Validate from logs that threads are enabled and unused threads exists.
    String content = logs.getOutput();
    assertInLog(content,
        "Using thread pool for Rename operation with threads 7");
    assertInLog(content,
        "6 threads not used for Rename operation on blob");
  }

  /*
   * Test case for rename operation with multiple threads and flat listing enabled.
   */
  @Test
  public void testRenameThreadPoolTerminationFailure() throws Exception {

    // Spy azure file system object and return mocked thread pool
    NativeAzureFileSystem mockFs = Mockito.spy((NativeAzureFileSystem) fs);

    // Spy a thread pool executor and link it to azure file system object.
    String path = mockFs.pathToKey(mockFs.makeAbsolute(new Path("root")));
    AzureFileSystemThreadPoolExecutor mockThreadPoolExecutor = Mockito.spy(
        mockFs.getThreadPoolExecutor(renameThreads, "AzureBlobRenameThread", "Rename",
            path, NativeAzureFileSystem.AZURE_RENAME_THREADS));

    // With single iteration, we would have created 7 blobs resulting 7 threads.
    Mockito.when(mockFs.getThreadPoolExecutor(renameThreads, "AzureBlobRenameThread", "Rename",
        path, NativeAzureFileSystem.AZURE_RENAME_THREADS)).thenReturn(mockThreadPoolExecutor);

    // Mock thread executor to throw exception for all requests.
    ThreadPoolExecutor mockThreadExecutor = Mockito.mock(ThreadPoolExecutor.class);
    Mockito.doNothing().when(mockThreadExecutor).execute(Mockito.any(Runnable.class));
    Mockito.when(mockThreadExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS)).thenThrow(new InterruptedException());
    Mockito.when(mockThreadPoolExecutor.getThreadPool(7)).thenReturn(mockThreadExecutor);


    createFolder(mockFs, "root");
    Path sourceFolder = new Path("root");
    Path destFolder = new Path("rootnew");
    boolean exception = false;
    try {
      mockFs.rename(sourceFolder, destFolder);
    } catch (IOException e){
      exception = true;
    }

    assertTrue(exception);
    assertTrue(mockFs.exists(sourceFolder));

    // Validate from logs that threads are enabled and rename operation is failed.
    String content = logs.getOutput();
    assertInLog(content,
        "Using thread pool for Rename operation with threads");
    assertInLog(content, "Threads got interrupted Rename blob operation");
    assertInLog(content,
        "Rename failed as operation on subfolders and files failed.");
  }

  /*
   * Test case for rename operation with multiple threads and flat listing enabled.
   */
  @Test
  public void testRenameSingleRenameException() throws Exception {

    // Spy azure file system object and raise exception for deleting one file
    Path sourceFolder = new Path("root");
    Path destFolder = new Path("rootnew");

    // Spy azure file system object and populate rename pending spy object.
    NativeAzureFileSystem mockFs = Mockito.spy((NativeAzureFileSystem) fs);

    // Populate data now only such that rename pending spy object would see this data.
    createFolder(mockFs, "root");

    String srcKey = mockFs.pathToKey(mockFs.makeAbsolute(sourceFolder));
    String dstKey = mockFs.pathToKey(mockFs.makeAbsolute(destFolder));

    FolderRenamePending mockRenameFs = Mockito.spy(mockFs.prepareAtomicFolderRename(srcKey, dstKey));
    Mockito.when(mockFs.prepareAtomicFolderRename(srcKey, dstKey)).thenReturn(mockRenameFs);
    String path = mockFs.pathToKey(mockFs.makeAbsolute(new Path("root/0")));
    Mockito.doThrow(new IOException()).when(mockRenameFs).renameFile(Mockito.any(FileMetadata.class));

    boolean exception = false;
    try {
      mockFs.rename(sourceFolder, destFolder);
    } catch (IOException e){
      exception = true;
    }

    assertTrue(exception);
    assertTrue(mockFs.exists(sourceFolder));

    // Validate from logs that threads are enabled and delete operation failed.
    String content = logs.getOutput();
    assertInLog(content,
        "Using thread pool for Rename operation with threads");
    assertInLog(content,
        "Encountered Exception for Rename operation for file " + path);
    assertInLog(content,
        "Terminating execution of Rename operation now as some other thread already got exception or operation failed");
  }

  @Override
  protected AzureBlobStorageTestAccount createTestAccount() throws Exception {
    return AzureBlobStorageTestAccount.create();
  }

}