TestWriteThreadPoolSizeManager.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.fs.azurebfs;

import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerArray;

import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ZERO;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

class TestWriteThreadPoolSizeManager extends AbstractAbfsIntegrationTest {

  private AbfsConfiguration mockConfig;
  private static final double HIGH_CPU_UTILIZATION_THRESHOLD = 0.95;
  private static final double LOW_CPU_UTILIZATION_THRESHOLD = 0.05;
  private static final int THREAD_SLEEP_DURATION_MS = 200;
  private static final String TEST_FILE_PATH = "testFilePath";
  private static final String TEST_DIR_PATH = "testDirPath";
  private static final int TEST_FILE_LENGTH = 1024 * 1024 * 8;
  private static final int CONCURRENT_REQUEST_COUNT = 15;
  private static final int THREAD_POOL_KEEP_ALIVE_TIME = 10;
  private static final int LOW_TIER_MEMORY_MULTIPLIER = 4;
  private static final int MEDIUM_TIER_MEMORY_MULTIPLIER = 6;
  private static final int HIGH_TIER_MEMORY_MULTIPLIER = 8;
  private static final int HIGH_CPU_THRESHOLD = 15;
  private static final int MEDIUM_CPU_THRESHOLD = 10;
  private static final int LOW_CPU_THRESHOLD = 5;
  private static final int CPU_MONITORING_INTERVAL = 15;
  private static final int WAIT_DURATION_MS = 3000;
  private static final int LATCH_TIMEOUT_SECONDS = 60;
  private static final int RESIZE_WAIT_TIME_MS = 6_000;
  private static final double HIGH_CPU_USAGE_RATIO = 0.95;
  private static final double LOW_CPU_USAGE_RATIO = 0.05;
  private static final int SLEEP_DURATION_MS = 150;
  private static final int AWAIT_TIMEOUT_SECONDS = 45;
  private static final int RESIZER_JOIN_TIMEOUT_MS = 2_000;
  private static final int WAIT_TIMEOUT_MS = 5000;
  private static final int SLEEP_DURATION_30S_MS = 30000;
  private static final int SMALL_PAUSE_MS = 50;
  private static final int BURST_LOAD = 50;
  private static final long LOAD_SLEEP_DURATION_MS = 2000;

  TestWriteThreadPoolSizeManager() throws Exception {
    super.setup();
  }

  /**
   * Common setup to prepare a mock configuration for each test.
   */
  @BeforeEach
  public void setUp() {
    mockConfig = mock(AbfsConfiguration.class);
    when(mockConfig.getWriteConcurrentRequestCount()).thenReturn(CONCURRENT_REQUEST_COUNT);
    when(mockConfig.getWriteThreadPoolKeepAliveTime()).thenReturn(THREAD_POOL_KEEP_ALIVE_TIME);
    when(mockConfig.getLowTierMemoryMultiplier()).thenReturn(LOW_TIER_MEMORY_MULTIPLIER);
    when(mockConfig.getMediumTierMemoryMultiplier()).thenReturn(MEDIUM_TIER_MEMORY_MULTIPLIER);
    when(mockConfig.getHighTierMemoryMultiplier()).thenReturn(HIGH_TIER_MEMORY_MULTIPLIER);
    when(mockConfig.getWriteHighCpuThreshold()).thenReturn(HIGH_CPU_THRESHOLD);
    when(mockConfig.getWriteMediumCpuThreshold()).thenReturn(MEDIUM_CPU_THRESHOLD);
    when(mockConfig.getWriteLowCpuThreshold()).thenReturn(LOW_CPU_THRESHOLD);
    when(mockConfig.getWriteCpuMonitoringInterval()).thenReturn(CPU_MONITORING_INTERVAL);
  }

  /**
   * Ensures that {@link WriteThreadPoolSizeManager#getInstance(String, AbfsConfiguration)} returns a singleton per key.
   */
  @Test
  void testGetInstanceReturnsSingleton() {
    WriteThreadPoolSizeManager instance1
        = WriteThreadPoolSizeManager.getInstance("testfs", mockConfig);
    WriteThreadPoolSizeManager instance2
        = WriteThreadPoolSizeManager.getInstance("testfs", mockConfig);
    WriteThreadPoolSizeManager instance3 =
        WriteThreadPoolSizeManager.getInstance("newFs", mockConfig);
    Assertions.assertThat(instance1)
        .as("Expected the same singleton instance for the same key")
        .isSameAs(instance2);
    Assertions.assertThat(instance1)
        .as("Expected the same singleton instance for the same key")
        .isNotSameAs(instance3);
  }

  /**
   /**
   * Tests that high CPU usage results in thread pool downscaling.
   */
  @Test
  void testAdjustThreadPoolSizeBasedOnHighCPU() throws InterruptedException, IOException {
    // Get the executor service (ThreadPoolExecutor)
    WriteThreadPoolSizeManager instance
        = WriteThreadPoolSizeManager.getInstance("testfsHigh",
        getAbfsStore(getFileSystem()).getAbfsConfiguration());
    ExecutorService executor = instance.getExecutorService();
    ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executor;

    // Simulate high CPU usage (e.g., 95% CPU utilization)
    int initialMaxSize = threadPoolExecutor.getMaximumPoolSize();
    instance.adjustThreadPoolSizeBasedOnCPU(HIGH_CPU_UTILIZATION_THRESHOLD);  // High CPU

    // Get the new maximum pool size after adjustment
    int newMaxSize = threadPoolExecutor.getMaximumPoolSize();

    // Assert that the pool size has decreased or is equal to initial PoolSize based on high CPU usage
    Assertions.assertThat(newMaxSize)
        .as("Expected pool size to decrease under high CPU usage")
        .isLessThanOrEqualTo(initialMaxSize);
    instance.close();
  }

  /**
   * Tests that low CPU usage results in thread pool upscaling or remains the same.
   */
  @Test
  void testAdjustThreadPoolSizeBasedOnLowCPU()
      throws InterruptedException, IOException {
    WriteThreadPoolSizeManager instance
        = WriteThreadPoolSizeManager.getInstance("testfsLow",
        getAbfsStore(getFileSystem()).getAbfsConfiguration());
    ExecutorService executor = instance.getExecutorService();
    int initialSize = ((ThreadPoolExecutor) executor).getMaximumPoolSize();
    instance.adjustThreadPoolSizeBasedOnCPU(LOW_CPU_UTILIZATION_THRESHOLD); // Low CPU
    int newSize = ((ThreadPoolExecutor) executor).getMaximumPoolSize();
    Assertions.assertThat(newSize)
        .as("Expected pool size to increase or stay the same under low CPU usage")
        .isGreaterThanOrEqualTo(initialSize);
    instance.close();
  }


  /**
   * Confirms that the thread pool executor is initialized and not shut down.
   */
  @Test
  void testExecutorServiceIsNotNull() throws IOException {
    WriteThreadPoolSizeManager instance
        = WriteThreadPoolSizeManager.getInstance("testfsExec", mockConfig);
    ExecutorService executor = instance.getExecutorService();
    Assertions.assertThat(executor).as("Executor service should be initialized")
        .isNotNull();
    Assertions.assertThat(executor.isShutdown())
        .as("Executor service should not be shut down")
        .isFalse();
    instance.close();
  }


  /**
   * Ensures that calling {@link WriteThreadPoolSizeManager#close()} cleans up resources.
   */
  @Test
  void testCloseCleansUp() throws Exception {
    WriteThreadPoolSizeManager instance
        = WriteThreadPoolSizeManager.getInstance("testfsClose", mockConfig);
    ExecutorService executor = instance.getExecutorService();
    instance.close();
    Assertions.assertThat(executor.isShutdown() || executor.isTerminated())
        .as("Executor service should be shut down or terminated after close()")
        .isTrue();
  }

  /**
   * Test that the CPU monitoring task is scheduled properly when startCPUMonitoring() is called.
   * This test checks the following:
   * 1. That the CPU monitoring task gets scheduled by verifying that the CPU monitor executor is not null.
   * 2. Ensures that the thread pool executor has at least one thread running, confirming that the task is being executed.
   * @throws InterruptedException if the test is interrupted during the sleep time
   */
  @Test
  void testStartCPUMonitoringSchedulesTask()
      throws InterruptedException, IOException {
    // Create a new instance of WriteThreadPoolSizeManager using a mock configuration
    WriteThreadPoolSizeManager instance
        = WriteThreadPoolSizeManager.getInstance("testScheduler", mockConfig);

    // Call startCPUMonitoring to schedule the monitoring task
    instance.startCPUMonitoring();

    // Wait for a short period to allow the task to run and be scheduled
    Thread.sleep(THREAD_SLEEP_DURATION_MS);

    // Retrieve the CPU monitor executor (ScheduledThreadPoolExecutor) from the instance
    ScheduledThreadPoolExecutor monitor
        = (ScheduledThreadPoolExecutor) instance.getCpuMonitorExecutor();

    // Assert that the monitor executor is not null, ensuring that it was properly initialized
    Assertions.assertThat(monitor)
        .as("CPU Monitor Executor should not be null")
        .isNotNull();

    // Assert that the thread pool size is greater than 0, confirming that the task has been scheduled and threads are active
    Assertions.assertThat(monitor.getPoolSize())
        .as("Thread pool size should be greater than 0, indicating that the task is running")
        .isGreaterThan(ZERO);
    instance.close();
  }

  /**
   * Verifies that ABFS write tasks can complete successfully even when the system
   * is under artificial CPU stress. The test also ensures that the write thread
   * pool resizes dynamically under load without leading to starvation, overload,
   * or leftover work in the queue.
   */
  @Test
  void testABFSWritesUnderCPUStress() throws Exception {
    // Initialize the filesystem and thread pool manager
    AzureBlobFileSystem fs = getFileSystem();
    WriteThreadPoolSizeManager instance =
        WriteThreadPoolSizeManager.getInstance(getFileSystemName(), getConfiguration());
    ThreadPoolExecutor executor =
        (ThreadPoolExecutor) instance.getExecutorService();

    // Start CPU monitoring so pool size adjustments happen in response to load
    instance.startCPUMonitoring();

    // Launch a background thread that generates CPU stress for ~3 seconds.
    // This simulates contention on the system and should cause the pool to adjust.
    Thread stressThread = new Thread(() -> {
      long end = System.currentTimeMillis() + WAIT_DURATION_MS;
      while (System.currentTimeMillis() < end) {
        // Busy-work loop: repeatedly compute random powers to waste CPU cycles
        double waste = Math.pow(Math.random(), Math.random());
      }
    });
    stressThread.start();

    // Prepare the ABFS write workload with multiple concurrent tasks
    int taskCount = 10;
    CountDownLatch latch = new CountDownLatch(taskCount);
    Path testFile = new Path(TEST_FILE_PATH);
    final byte[] b = new byte[TEST_FILE_LENGTH];
    new Random().nextBytes(b);

    // Submit 10 tasks, each writing to its own file to simulate parallel load
    for (int i = 0; i < taskCount; i++) {
      int finalI = i;
      executor.submit(() -> {
        try (FSDataOutputStream out = fs.create(
            new Path(testFile + "_" + finalI), true)) {
          for (int j = 0; j < 5; j++) {
            out.write(b); // perform multiple writes to add sustained pressure
          }
          out.hflush();   // flush to force actual I/O
        } catch (IOException e) {
          // Any failure here indicates pool misbehavior or I/O issues
          Assertions.fail("Write task failed with exception", e);
        } finally {
          // Mark this task as complete
          latch.countDown();
        }
      });
    }

    // Wait for all tasks to finish (up to 60s timeout to guard against deadlock/starvation)
    boolean finished = latch.await(LATCH_TIMEOUT_SECONDS, TimeUnit.SECONDS);

    // Record the pool size after CPU stress to confirm resizing took place
    int resizedPoolSize = executor.getMaximumPoolSize();

    // 1. All tasks must finish within timeout ��� proves no starvation or deadlock
    Assertions.assertThat(finished)
        .as("All ABFS write tasks should complete without starvation")
        .isTrue();

    // 2. Pool size must fall within valid bounds ��� proves resizing occurred
    Assertions.assertThat(resizedPoolSize)
        .as("Thread pool size should dynamically adjust under CPU stress")
        .isBetween(1, getAbfsStore(fs).getAbfsConfiguration().getWriteConcurrentRequestCount());

    // 3. Task queue must be empty ��� proves no backlog remains after workload
    Assertions.assertThat(executor.getQueue().size())
        .as("No backlog should remain in task queue after completion")
        .isEqualTo(0);

    // Cleanup resources
    instance.close();
  }


  /**
   * Ensures that dynamic thread pool resizing during an active ABFS write workload
   * does not cause deadlocks, task loss, or task duplication. The test also verifies
   * that the pool resizes while work is in progress and that the executor queue
   * eventually drains cleanly.
   */
  @Test
  void testDynamicResizeNoDeadlocksNoTaskLoss() throws Exception {
    // Initialize filesystem and thread pool manager
    AzureBlobFileSystem fs = getFileSystem();
    WriteThreadPoolSizeManager mgr =
        WriteThreadPoolSizeManager.getInstance(getFileSystemName(), mockConfig);
    ThreadPoolExecutor executor = (ThreadPoolExecutor) mgr.getExecutorService();

    // Enable monitoring (may not be required if adjust() is triggered internally)
    mgr.startCPUMonitoring();

    // Test configuration: enough tasks and writes to stress the pool
    final int taskCount = 10;
    final int writesPerTask = 5;
    final byte[] b = new byte[TEST_FILE_LENGTH];
    new Random().nextBytes(b);
    final Path base = new Path(TEST_DIR_PATH);
    fs.mkdirs(base);

    // Barrier ensures all tasks start together, so resizing happens mid-flight
    final CyclicBarrier startBarrier = new CyclicBarrier(taskCount + 1);
    final CountDownLatch done = new CountDownLatch(taskCount);

    // Track execution results
    final AtomicIntegerArray completed = new AtomicIntegerArray(taskCount); // mark tasks once
    final AtomicInteger duplicates = new AtomicInteger(0);                  // guard against double-completion
    final AtomicInteger rejected = new AtomicInteger(0);                    // count unexpected rejections

    // Submit ABFS write tasks
    for (int i = 0; i < taskCount; i++) {
      final int id = i;
      try {
        executor.submit(() -> {
          try {
            // Hold until all tasks are enqueued, then start together
            startBarrier.await(10, TimeUnit.SECONDS);

            // Each task writes to its own file, flushing intermittently
            Path subPath = new Path(base, "part-" + id);
            try (FSDataOutputStream out = fs.create(subPath)) {
              for (int w = 0; w < writesPerTask; w++) {
                out.write(b);
                if ((w & 1) == 1) {
                  out.hflush(); // force some syncs to increase contention
                }
              }
              out.hflush();
            }

            // Mark task as completed once; duplicates flag if it happens again
            if (!completed.compareAndSet(id, 0, 1)) {
              duplicates.incrementAndGet();
            }
          } catch (Exception e) {
            Assertions.fail("ABFS write task " + id + " failed", e);
          } finally {
            done.countDown();
          }
        });
      } catch (RejectedExecutionException rex) {
        rejected.incrementAndGet();
      }
    }

    // Thread that simulates fluctuating CPU load while tasks are running
    final AtomicInteger observedMinMax = new AtomicInteger(executor.getMaximumPoolSize());
    final AtomicInteger observedMaxMax = new AtomicInteger(executor.getMaximumPoolSize());

    Thread resizer = new Thread(() -> {
      try {
        // Release worker tasks
        startBarrier.await(10, TimeUnit.SECONDS);

        long end = System.currentTimeMillis() + RESIZE_WAIT_TIME_MS; // keep resizing for ~6s
        boolean high = true;
        while (System.currentTimeMillis() < end) {
          // Alternate between high load (shrink) and low load (expand)
          if (high) {
            mgr.adjustThreadPoolSizeBasedOnCPU(HIGH_CPU_USAGE_RATIO);
          } else {
            mgr.adjustThreadPoolSizeBasedOnCPU(LOW_CPU_USAGE_RATIO);
          }
          high = !high;

          // Track observed pool size bounds to prove resizing occurred
          int cur = executor.getMaximumPoolSize();
          observedMinMax.updateAndGet(prev -> Math.min(prev, cur));
          observedMaxMax.updateAndGet(prev -> Math.max(prev, cur));

          Thread.sleep(SLEEP_DURATION_MS);
        }
      } catch (Exception ignore) {
        // No-op: this is best-effort simulation
      }
    }, "resizer-thread");

    resizer.start();

    // Wait for all tasks to finish (ensures no deadlock)
    boolean finished = done.await(AWAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS);

    // Join resizer thread
    resizer.join(RESIZER_JOIN_TIMEOUT_MS);

    // 1. All tasks must complete in time ��� proves there are no deadlocks
    Assertions.assertThat(finished)
        .as("All tasks must complete within timeout (no deadlock)")
        .isTrue();

    // 2. Every task should complete exactly once ��� proves no task loss
    int completedCount = 0;
    for (int i = 0; i < taskCount; i++) {
      completedCount += completed.get(i);
    }
    Assertions.assertThat(completedCount)
        .as("Every task should complete exactly once (no task loss)")
        .isEqualTo(taskCount);

    // 3. No task should mark itself as done more than once ��� proves no duplication
    Assertions.assertThat(duplicates.get())
        .as("No task should report completion more than once (no duplication)")
        .isZero();

    // 4. The executor should not reject tasks while resizing is happening
    Assertions.assertThat(rejected.get())
        .as("Tasks should not be rejected during active resizing")
        .isZero();

    // 5. Executor queue should eventually empty once all tasks finish
    Assertions.assertThat(executor.getQueue().size())
        .as("Executor queue should drain after workload")
        .isEqualTo(0);

    // 6. Executor should still be running after workload until explicitly closed
    Assertions.assertThat(executor.isShutdown())
        .as("Executor should remain running until manager.close()")
        .isFalse();

    // 7. Verify that resizing actually occurred (pool max both grew and shrank)
    int minObserved = observedMinMax.get();
    int maxObserved = observedMaxMax.get();

    Assertions.assertThat(maxObserved)
        .as("Pool maximum size should have increased or fluctuated above baseline")
        .isGreaterThan(0);

    Assertions.assertThat(minObserved)
        .as("Pool maximum size should have dropped during resizing")
        .isLessThanOrEqualTo(maxObserved);

    // Cleanup
    for (int i = 0; i < taskCount; i++) {
      Path p = new Path(base, "part-" + i);
      try {
        fs.delete(p, false);
      } catch (IOException ignore) {
        // Ignored: delete failures are non-fatal for test cleanup
      }
    }
    try {
      fs.delete(base, true);
    } catch (IOException ignore) {
      // Ignored: cleanup failures are non-fatal in tests
    }
    mgr.close();
  }



  /**
   * Verifies that when the system experiences high CPU usage,
   * the WriteThreadPoolSizeManager detects the load and reduces
   * the maximum thread pool size accordingly.
   */
  @Test
  void testThreadPoolScalesDownOnHighCpuLoad() throws Exception {
    // Initialize filesystem and thread pool manager
    try (FileSystem fileSystem = FileSystem.newInstance(getRawConfiguration())) {
      AzureBlobFileSystem abfs = (AzureBlobFileSystem) fileSystem;
      WriteThreadPoolSizeManager instance =
          WriteThreadPoolSizeManager.getInstance(abfs.getFileSystemId(), getConfiguration());
      ThreadPoolExecutor executor =
          (ThreadPoolExecutor) instance.getExecutorService();

      // Start monitoring CPU load
      instance.startCPUMonitoring();

      // Capture baseline pool size for comparison later
      int initialMax = executor.getMaximumPoolSize();

      // Define a CPU-bound task: tight loop of math ops for ~5s
      Runnable cpuBurn = () -> {
        long end = System.currentTimeMillis() + WAIT_TIMEOUT_MS;
        while (System.currentTimeMillis() < end) {
          double waste = Math.sin(Math.random()) * Math.cos(Math.random());
        }
      };

      // Launch two CPU hogs in parallel
      Thread cpuHog1 = new Thread(cpuBurn, "cpu-hog-thread-1");
      Thread cpuHog2 = new Thread(cpuBurn, "cpu-hog-thread-2");
      cpuHog1.start();
      cpuHog2.start();

      // Submit multiple write tasks while CPU is under stress
      int taskCount = 10;
      CountDownLatch latch = new CountDownLatch(taskCount);
      Path base = new Path(TEST_DIR_PATH);
      abfs.mkdirs(base);
      final byte[] buffer = new byte[TEST_FILE_LENGTH];
      new Random().nextBytes(buffer);

      for (int i = 0; i < taskCount; i++) {
        final Path part = new Path(base, "part-" + i);
        executor.submit(() -> {
          try (FSDataOutputStream out = abfs.create(part, true)) {
            for (int j = 0; j < 5; j++) {
              out.write(buffer);
              out.hflush();
            }
          } catch (IOException e) {
            Assertions.fail("Write task failed under CPU stress", e);
          } finally {
            latch.countDown();
          }
        });
      }

      // Ensure all tasks complete (avoid deadlock/starvation)
      boolean finished = latch.await(LATCH_TIMEOUT_SECONDS, TimeUnit.SECONDS);

      // Wait for CPU hogs to end and give monitor time to react
      cpuHog1.join();
      cpuHog2.join();
      Thread.sleep(SLEEP_DURATION_30S_MS);

      int resizedMax = executor.getMaximumPoolSize();

      // Verify outcomes:
      // 1. All write tasks succeeded despite CPU pressure
      Assertions.assertThat(finished)
          .as("All ABFS write tasks must complete despite CPU stress")
          .isTrue();

      // 2. Thread pool scaled down as expected
      Assertions.assertThat(resizedMax)
          .as("Thread pool should scale down under high CPU load")
          .isLessThanOrEqualTo(initialMax);

      // 3. No leftover tasks in the queue
      Assertions.assertThat(executor.getQueue().size())
          .as("No backlog should remain in the queue after workload")
          .isEqualTo(0);

      // Cleanup test data
      for (int i = 0; i < taskCount; i++) {
        try {
          abfs.delete(new Path(base, "part-" + i), false);
        } catch (IOException ignore) {
          // Ignored: cleanup failures are non-fatal in tests
        }
      }
      try {
        abfs.delete(base, true);
      } catch (IOException ignore) {
        // Ignored: cleanup failures are non-fatal in tests
      }
      instance.close();
    }
  }


  /**
   * Verifies that when two parallel high memory���consuming workloads run,
   * the WriteThreadPoolSizeManager detects the memory pressure and
   * scales down the maximum thread pool size.
   */
  @Test
  void testScalesDownOnParallelHighMemoryLoad() throws Exception {
    // Initialize filesystem and thread pool manager
    try (FileSystem fileSystem = FileSystem.newInstance(getRawConfiguration())) {
      AzureBlobFileSystem abfs = (AzureBlobFileSystem) fileSystem;
      WriteThreadPoolSizeManager instance =
          WriteThreadPoolSizeManager.getInstance(abfs.getFileSystemId(),
              getConfiguration());
      ThreadPoolExecutor executor =
          (ThreadPoolExecutor) instance.getExecutorService();

      // Begin monitoring resource usage (CPU + memory)
      instance.startCPUMonitoring();

      // Capture the initial thread pool size for later comparison
      int initialMax = executor.getMaximumPoolSize();

      // Define a workload that continuously allocates memory (~5 MB chunks)
      // for ~5 seconds to simulate memory pressure in the JVM.
      Runnable memoryBurn = () -> {
        List<byte[]> allocations = new ArrayList<>();
        long end = System.currentTimeMillis() + WAIT_TIMEOUT_MS;
        while (System.currentTimeMillis() < end) {
          try {
            allocations.add(new byte[5 * 1024 * 1024]); // allocate 5 MB
            Thread.sleep(SMALL_PAUSE_MS); // small pause to avoid instant OOM
          } catch (OutOfMemoryError oom) {
            // Clear allocations if JVM runs out of memory and continue
            allocations.clear();
          } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
          }
        }
      };

      // Start two threads running the memory hog workload in parallel
      Thread memHog1 = new Thread(memoryBurn, "mem-hog-thread-1");
      Thread memHog2 = new Thread(memoryBurn, "mem-hog-thread-2");
      memHog1.start();
      memHog2.start();

      // Submit several write tasks to ABFS while memory is under stress
      int taskCount = 10;
      CountDownLatch latch = new CountDownLatch(taskCount);
      Path base = new Path(TEST_DIR_PATH);
      abfs.mkdirs(base);
      final byte[] buffer = new byte[TEST_FILE_LENGTH];
      new Random().nextBytes(buffer);

      for (int i = 0; i < taskCount; i++) {
        final Path part = new Path(base, "part-" + i);
        executor.submit(() -> {
          try (FSDataOutputStream out = abfs.create(part, true)) {
            for (int j = 0; j < 5; j++) {
              out.write(buffer);
              out.hflush();
            }
          } catch (IOException e) {
            Assertions.fail("Write task failed under memory stress", e);
          } finally {
            latch.countDown();
          }
        });
      }

      // Ensure all tasks finish within a timeout
      boolean finished = latch.await(LATCH_TIMEOUT_SECONDS, TimeUnit.SECONDS);

      // Wait for memory hog threads to finish
      memHog1.join();
      memHog2.join();

      // Give monitoring thread time to detect memory pressure and react
      Thread.sleep(SLEEP_DURATION_30S_MS);

      int resizedMax = executor.getMaximumPoolSize();

      // Validate that:
      // 1. All ABFS writes succeeded despite memory stress
      Assertions.assertThat(finished)
          .as("All ABFS write tasks must complete despite parallel memory stress")
          .isTrue();

      // 2. The thread pool scaled down under memory pressure
      Assertions.assertThat(resizedMax)
          .as("Thread pool should scale down under parallel high memory load")
          .isLessThanOrEqualTo(initialMax);

      // 3. No tasks remain queued after workload completion
      Assertions.assertThat(executor.getQueue().size())
          .as("No backlog should remain in the queue after workload")
          .isEqualTo(0);

      // Clean up temporary test files
      for (int i = 0; i < taskCount; i++) {
        try {
          abfs.delete(new Path(base, "part-" + i), false);
        } catch (IOException ignore) {
          // Ignored: cleanup failures are non-fatal in tests
        }
      }
      try {
        abfs.delete(base, true);
      } catch (IOException ignore) {
        // Ignored: cleanup failures are non-fatal in tests
      }
      instance.close();
    }
  }

  /**
   * Test that after a long idle period, the thread pool
   * can quickly scale up in response to a sudden burst of load
   * without performance degradation.
   */
  @Test
  void testThreadPoolScalesUpAfterIdleBurstLoad() throws Exception {
    // Initialize filesystem and thread pool manager
    try (FileSystem fileSystem = FileSystem.newInstance(
        getRawConfiguration())) {
      AzureBlobFileSystem abfs = (AzureBlobFileSystem) fileSystem;
      WriteThreadPoolSizeManager instance = WriteThreadPoolSizeManager.getInstance(abfs.getFileSystemId(),
              abfs.getAbfsStore().getAbfsConfiguration());
      ThreadPoolExecutor executor =
          (ThreadPoolExecutor) instance.getExecutorService();

      // --- Step 1: Simulate idle period ---
      // Let the executor sit idle with no work for a few seconds
      Thread.sleep(WAIT_TIMEOUT_MS);
      int poolSizeAfterIdle = executor.getPoolSize();

      // Verify that after idling, the pool is at or close to its minimum size
      Assertions.assertThat(poolSizeAfterIdle)
          .as("Pool size should remain minimal after idle")
          .isLessThanOrEqualTo(executor.getCorePoolSize());

      // --- Step 2: Submit a sudden burst of tasks ---
      // Launch many short, CPU-heavy tasks at once to simulate burst load
      int burstLoad = BURST_LOAD;
      CountDownLatch latch = new CountDownLatch(burstLoad);
      for (int i = 0; i < burstLoad; i++) {
        executor.submit(() -> {
          // Busy loop for ~200ms to simulate CPU work
          long end = System.currentTimeMillis() + THREAD_SLEEP_DURATION_MS;
          while (System.currentTimeMillis() < end) {
            Math.sqrt(Math.random()); // burn CPU cycles
          }
          latch.countDown();
        });
      }

      // --- Step 3: Give pool time to react ---
      // Wait briefly so the pool���s scaling logic has a chance to expand
      Thread.sleep(LOAD_SLEEP_DURATION_MS);
      int poolSizeDuringBurst = executor.getPoolSize();

      // Verify that the pool scaled up compared to idle
      Assertions.assertThat(poolSizeDuringBurst)
          .as("Pool size should increase after burst load")
          .isGreaterThanOrEqualTo(poolSizeAfterIdle);

// --- Step 4: Verify completion ---
// Ensure all tasks complete successfully in a reasonable time,
// proving there was no degradation or deadlock under burst load
      Assertions.assertThat(
              latch.await(LATCH_TIMEOUT_SECONDS / 2, TimeUnit.SECONDS))
          .as("All burst tasks should finish in reasonable time")
          .isTrue();
      instance.close();
    }
  }
}