WriteThreadPoolSizeManager.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.fs.azurebfs;

import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.fs.azurebfs.services.AbfsCounters;
import org.apache.hadoop.fs.azurebfs.services.AbfsWriteResourceUtilizationMetrics;
import org.apache.hadoop.fs.azurebfs.services.ResourceUtilizationStats;
import org.apache.hadoop.fs.azurebfs.utils.ResourceUtilizationUtils;

import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.LOW_HEAP_SPACE_FACTOR;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.MEDIUM_HEAP_SPACE_FACTOR;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.HIGH_CPU_LOW_MEMORY_REDUCTION_FACTOR;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.HIGH_CPU_REDUCTION_FACTOR;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.LOW_CPU_HIGH_MEMORY_DECREASE_FACTOR;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.LOW_CPU_POOL_SIZE_INCREASE_FACTOR;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MEDIUM_CPU_LOW_MEMORY_REDUCTION_FACTOR;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MEDIUM_CPU_REDUCTION_FACTOR;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.SCALE_DIRECTION_DOWN;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.SCALE_DIRECTION_NO_DOWN_AT_MIN;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.SCALE_DIRECTION_NO_UP_AT_MAX;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.SCALE_DIRECTION_UP;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.THIRTY_SECONDS;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ZERO;

/**
 * Manages a thread pool for writing operations, adjusting the pool size based on CPU utilization.
 */
public final class WriteThreadPoolSizeManager implements Closeable {

  /* Maximum allowed size for the thread pool. */
  private final int maxThreadPoolSize;
  /* Executor for periodically monitoring CPU usage. */
  private final ScheduledExecutorService cpuMonitorExecutor;
  /* Thread pool whose size is dynamically managed. */
  private volatile ExecutorService boundedThreadPool;
  /* Lock to ensure thread-safe updates to the thread pool. */
  private final Lock lock = new ReentrantLock();
  /* New computed max size for the thread pool after adjustment. */
  private volatile int newMaxPoolSize;
  /* Logger instance for logging events from WriteThreadPoolSizeManager. */
  private static final Logger LOG = LoggerFactory.getLogger(
      WriteThreadPoolSizeManager.class);
  /* Map to maintain a WriteThreadPoolSizeManager instance per filesystem. */
  private static final ConcurrentHashMap<String, WriteThreadPoolSizeManager>
      POOL_SIZE_MANAGER_MAP = new ConcurrentHashMap<>();
  /* Name of the filesystem associated with this manager. */
  private final String filesystemName;
  /* Initial size for the thread pool when created. */
  private final int initialPoolSize;
  /* The configuration instance. */
  private final AbfsConfiguration abfsConfiguration;
  /* Metrics collector for monitoring the performance of the ABFS write thread pool.  */
  private final AbfsWriteResourceUtilizationMetrics writeThreadPoolMetrics;
  /* Flag indicating if CPU monitoring has started. */
  private volatile boolean isMonitoringStarted = false;
  /* Tracks the last scale direction applied, or empty if none. */
  private volatile String lastScaleDirection = EMPTY_STRING;
  /* Maximum CPU utilization observed during the monitoring interval. */
  private volatile long maxJvmCpuUtilization = 0L;
  /** High memory usage threshold used to trigger thread pool downscaling. */
  private final long highMemoryThreshold;
  /** Low memory usage threshold used to allow thread pool upscaling. */
  private final long lowMemoryThreshold;

  /**
   * Private constructor to initialize the write thread pool and CPU monitor executor
   * based on system resources and ABFS configuration.
   *
   * @param filesystemName       Name of the ABFS filesystem.
   * @param abfsConfiguration    Configuration containing pool size parameters.
   * @param abfsCounters                  ABFS counters instance used for metrics.
   */
  private WriteThreadPoolSizeManager(String filesystemName,
      AbfsConfiguration abfsConfiguration, AbfsCounters abfsCounters) {
    /* Retrieves and assigns the write thread pool metrics from the ABFS client counters. */
    this.writeThreadPoolMetrics = abfsCounters.getAbfsWriteResourceUtilizationMetrics();
    this.filesystemName = filesystemName;
    this.abfsConfiguration = abfsConfiguration;
    int availableProcessors = Runtime.getRuntime().availableProcessors();
    /* Compute the max pool size */
    int computedMaxPoolSize = getComputedMaxPoolSize(availableProcessors, ResourceUtilizationUtils.getAvailableMaxHeapMemory());

    /* Get the initial pool size from config, fallback to at least 1 */
    this.initialPoolSize = Math.max(1,
        abfsConfiguration.getWriteConcurrentRequestCount());

    /* Set the upper bound for the thread pool size */
    this.maxThreadPoolSize = Math.max(computedMaxPoolSize, initialPoolSize);
    AtomicInteger threadCount = new AtomicInteger(1);
    this.boundedThreadPool = Executors.newFixedThreadPool(
        initialPoolSize,
        r -> {
          Thread t = new Thread(r);
          t.setName("abfs-boundedwrite-" + threadCount.getAndIncrement());
          return t;
        }
    );
    ThreadPoolExecutor executor = (ThreadPoolExecutor) this.boundedThreadPool;
    int keepAlive = Math.max(1, abfsConfiguration.getWriteThreadPoolKeepAliveTime());
    executor.setKeepAliveTime(keepAlive, TimeUnit.SECONDS);
    executor.allowCoreThreadTimeOut(true);
    /* Create a scheduled executor for CPU monitoring and pool adjustment */
    this.cpuMonitorExecutor = Executors.newScheduledThreadPool(1);
    highMemoryThreshold = abfsConfiguration.getWriteHighMemoryUsageThresholdPercent();
    lowMemoryThreshold = abfsConfiguration.getWriteLowMemoryUsageThresholdPercent();
  }

  /** Returns the internal {@link AbfsConfiguration}. */
  private AbfsConfiguration getAbfsConfiguration() {
    return abfsConfiguration;
  }

  /**
   * Computes the maximum thread pool size based on the available processors
   * and the initial available heap memory. The calculation uses a tiered
   * multiplier derived from the memory-to-core ratio ��� systems with higher
   * memory per core allow for a larger thread pool.
   *
   * @param availableProcessors the number of available CPU cores.
   * @param initialAvailableHeapMemory the initial available heap memory, in bytes or GB (depending on implementation).
   * @return the computed maximum thread pool size.
   */
  private int getComputedMaxPoolSize(final int availableProcessors, long initialAvailableHeapMemory) {
    int maxpoolSize = getMemoryTierMaxThreads(initialAvailableHeapMemory, availableProcessors);
    LOG.debug("Computed max thread pool size: {} | Available processors: {} | Heap memory (GB): {}",
        maxpoolSize, availableProcessors, initialAvailableHeapMemory);
    return maxpoolSize;
  }

  /**
   * Determines the maximum thread count based on available heap memory and CPU cores.
   * Calculates the thread count as {@code availableProcessors �� multiplier}, where the
   * multiplier is selected according to the heap memory tier (low, medium, or high).
   *
   * @param availableHeapGB       the available heap memory in gigabytes.
   * @param availableProcessors   the number of available CPU cores.
   * @return the maximum thread count based on memory tier and processor count.
   */
  private int getMemoryTierMaxThreads(long availableHeapGB, int availableProcessors) {
    int multiplier;
    if (availableHeapGB <= LOW_HEAP_SPACE_FACTOR) {
      multiplier = abfsConfiguration.getLowTierMemoryMultiplier();
    } else if (availableHeapGB <= MEDIUM_HEAP_SPACE_FACTOR) {
      multiplier = abfsConfiguration.getMediumTierMemoryMultiplier();
    } else {
      multiplier = abfsConfiguration.getHighTierMemoryMultiplier();
    }
    return availableProcessors * multiplier;
  }

  /**
   * Returns the singleton {@link WriteThreadPoolSizeManager} instance for the specified filesystem.
   * If an active instance already exists in the manager map for the given filesystem, it is returned.
   * Otherwise, a new instance is created, registered in the map, and returned.
   *
   * @param filesystemName     the name of the filesystem.
   * @param abfsConfiguration  the {@link AbfsConfiguration} associated with the filesystem.
   * @param abfsCounters                the {@link AbfsCounters} used to initialize the manager.
   * @return  the singleton {@link WriteThreadPoolSizeManager} instance for the given filesystem.
   */
  public static synchronized WriteThreadPoolSizeManager getInstance(
      String filesystemName, AbfsConfiguration abfsConfiguration, AbfsCounters abfsCounters) {
    /* Check if an instance already exists in the map for the given filesystem */
    WriteThreadPoolSizeManager existingInstance = POOL_SIZE_MANAGER_MAP.get(
        filesystemName);

    /* If an existing instance is found, return it */
    if (existingInstance != null && existingInstance.boundedThreadPool != null
        && !existingInstance.boundedThreadPool.isShutdown()) {
      return existingInstance;
    }

    /* Otherwise, create a new instance, put it in the map, and return it */
    LOG.debug(
        "Creating new WriteThreadPoolSizeManager instance for filesystem: {}",
        filesystemName);
    WriteThreadPoolSizeManager newInstance = new WriteThreadPoolSizeManager(
        filesystemName, abfsConfiguration, abfsCounters);
    POOL_SIZE_MANAGER_MAP.put(filesystemName, newInstance);
    return newInstance;
  }

  /**
   * Adjusts the thread pool size to the specified maximum pool size.
   *
   * @param newMaxPoolSize the new maximum pool size.
   */
  private void adjustThreadPoolSize(int newMaxPoolSize) {
    synchronized (this) {
      ThreadPoolExecutor threadPoolExecutor
          = ((ThreadPoolExecutor) boundedThreadPool);
      int currentCorePoolSize = threadPoolExecutor.getCorePoolSize();

      if (newMaxPoolSize >= currentCorePoolSize) {
        threadPoolExecutor.setMaximumPoolSize(newMaxPoolSize);
        threadPoolExecutor.setCorePoolSize(newMaxPoolSize);
      } else {
        threadPoolExecutor.setCorePoolSize(newMaxPoolSize);
        threadPoolExecutor.setMaximumPoolSize(newMaxPoolSize);
      }
      LOG.debug("ThreadPool Info - New max pool size: {}, Current pool size: {}, Active threads: {}",
          newMaxPoolSize, threadPoolExecutor.getPoolSize(), threadPoolExecutor.getActiveCount());
    }
  }

  /**
   * Starts monitoring the CPU utilization and adjusts the thread pool size accordingly.
   */
  public synchronized void startCPUMonitoring() {
    if (!isMonitoringStarted()) {
      isMonitoringStarted = true;
      cpuMonitorExecutor.scheduleAtFixedRate(() -> {
            long cpuUtilization = ResourceUtilizationUtils.getJvmCpuLoad();
            LOG.debug("Current CPU Utilization is this: {}", cpuUtilization);
            try {
              adjustThreadPoolSizeBasedOnCPU(cpuUtilization);
            } catch (InterruptedException e) {
              throw new RuntimeException(String.format(
                  "Thread pool size adjustment interrupted for filesystem %s",
                  filesystemName), e);
            }
          }, 0, getAbfsConfiguration().getWriteCpuMonitoringInterval(),
          TimeUnit.MILLISECONDS);
    }
  }

  /**
   * Dynamically adjusts the thread pool size based on current CPU utilization
   * and available heap memory relative to the initially available heap.
   *
   * @param cpuUtilization Current system CPU utilization (0.0 to 1.0)
   *  @throws InterruptedException if the resizing operation is interrupted while acquiring the lock
   */
  public void adjustThreadPoolSizeBasedOnCPU(long cpuUtilization) throws InterruptedException {
    lock.lock();
    try {
      ThreadPoolExecutor executor = (ThreadPoolExecutor) this.boundedThreadPool;
      int currentPoolSize = executor.getMaximumPoolSize();
      long memoryLoad = ResourceUtilizationUtils.getMemoryLoad();
      long usedHeapMemory = ResourceUtilizationUtils.getUsedHeapMemory();
      long availableMemory = ResourceUtilizationUtils.getAvailableHeapMemory();
      long committedMemory = ResourceUtilizationUtils.getCommittedHeapMemory();
      LOG.debug("The memory load is {} and CPU utilization is {}", memoryLoad, cpuUtilization);
      if (cpuUtilization > (abfsConfiguration.getWriteHighCpuThreshold())) {
        newMaxPoolSize = calculateReducedPoolSizeHighCPU(currentPoolSize, memoryLoad);
        if (currentPoolSize == initialPoolSize && newMaxPoolSize == initialPoolSize) {
          lastScaleDirection = SCALE_DIRECTION_NO_DOWN_AT_MIN;
        }
      } else if (cpuUtilization > (abfsConfiguration.getWriteMediumCpuThreshold())) {
        newMaxPoolSize = calculateReducedPoolSizeMediumCPU(currentPoolSize, memoryLoad);
        if (currentPoolSize == initialPoolSize && newMaxPoolSize == initialPoolSize) {
          lastScaleDirection = SCALE_DIRECTION_NO_DOWN_AT_MIN;
        }
      } else if (cpuUtilization < (abfsConfiguration.getWriteLowCpuThreshold())) {
        newMaxPoolSize = calculateIncreasedPoolSizeLowCPU(currentPoolSize, memoryLoad);
        if (currentPoolSize == maxThreadPoolSize && newMaxPoolSize == maxThreadPoolSize) {
          lastScaleDirection = SCALE_DIRECTION_NO_UP_AT_MAX;
        }
      } else {
        newMaxPoolSize = currentPoolSize;
        LOG.debug("CPU load normal ({}). No change: current={}", cpuUtilization, currentPoolSize);
      }
      boolean willResize = newMaxPoolSize != currentPoolSize;
      if (!willResize && !lastScaleDirection.equals(EMPTY_STRING)) {
        WriteThreadPoolStats stats = getCurrentStats(cpuUtilization, memoryLoad,
            usedHeapMemory, availableMemory, committedMemory);
        // Update the write thread pool metrics with the latest statistics snapshot.
        writeThreadPoolMetrics.update(stats);
      }
      // Case 1: CPU increased ��� push metrics ONLY if not resizing
      if (cpuUtilization > maxJvmCpuUtilization) {
        maxJvmCpuUtilization = cpuUtilization;
        if (!willResize) {
          try {
            // Capture the latest thread pool statistics (pool size, CPU, memory, etc.).
            WriteThreadPoolStats stats = getCurrentStats(cpuUtilization, memoryLoad,
                usedHeapMemory, availableMemory, committedMemory);
            // Update the write thread pool metrics with the latest statistics snapshot.
            writeThreadPoolMetrics.update(stats);
          } catch (Exception e) {
            LOG.debug("Error updating write thread pool metrics", e);
          }
        }
      }
      // Case 2: Resize ��� always push metrics
      if (willResize) {
        LOG.debug("Resizing thread pool from {} to {}", currentPoolSize, newMaxPoolSize);
        // Record scale direction
        lastScaleDirection = (newMaxPoolSize > currentPoolSize) ? SCALE_DIRECTION_UP: SCALE_DIRECTION_DOWN;
        adjustThreadPoolSize(newMaxPoolSize);
        try {
          // Capture the latest thread pool statistics (pool size, CPU, memory, etc.).
          WriteThreadPoolStats stats = getCurrentStats(cpuUtilization, memoryLoad,
              usedHeapMemory, availableMemory, committedMemory);
          // Update the write thread pool metrics with the latest statistics snapshot.
          writeThreadPoolMetrics.update(stats);
        } catch (Exception e) {
          LOG.debug("Error updating write thread pool metrics after resizing.", e);
        }
      }
    } finally {
      lock.unlock();
    }
  }

  /**
   * Calculates a reduced thread pool size when high CPU utilization is detected.
   * The reduction strategy depends on available heap memory:
   * if heap usage is high (low free memory), the pool size is reduced aggressively;
   * otherwise, it is reduced moderately to prevent resource contention.
   *
   * @param currentPoolSize  the current size of the thread pool.
   *  @param memoryLoad      the current JVM heap load (0.0���1.0)
   * @return the adjusted (reduced) pool size based on CPU and memory conditions.
   */
  private int calculateReducedPoolSizeHighCPU(int currentPoolSize, double memoryLoad) {
    LOG.debug("The high cpu memory load is {}", memoryLoad);
    if (memoryLoad > highMemoryThreshold) {
      LOG.debug("High CPU & high memory load ({}). Aggressive reduction: current={}, new={}",
          memoryLoad, currentPoolSize, currentPoolSize / HIGH_CPU_LOW_MEMORY_REDUCTION_FACTOR);
      return Math.max(initialPoolSize, currentPoolSize / HIGH_CPU_LOW_MEMORY_REDUCTION_FACTOR);
    }
    int reduced = Math.max(initialPoolSize, currentPoolSize - currentPoolSize / HIGH_CPU_REDUCTION_FACTOR);
    LOG.debug("High CPU ({}). Reducing pool size moderately: current={}, new={}",
        abfsConfiguration.getWriteHighCpuThreshold(), currentPoolSize, reduced);
    return reduced;
  }

  /**
   * Calculates a reduced thread pool size when medium CPU utilization is detected.
   * The reduction is based on available heap memory: if memory is low, the pool size
   * is reduced more aggressively; otherwise, a moderate reduction is applied to
   * maintain balanced performance.
   *
   * @param currentPoolSize  the current size of the thread pool.
   * @param memoryLoad      the current JVM heap load (0.0���1.0)
   * @return the adjusted (reduced) pool size based on medium CPU and memory conditions.
   */
  private int calculateReducedPoolSizeMediumCPU(int currentPoolSize, double memoryLoad) {
    LOG.debug("The medium cpu memory load is {}", memoryLoad);
    if (memoryLoad > highMemoryThreshold) {
      int reduced = Math.max(initialPoolSize, currentPoolSize - currentPoolSize / MEDIUM_CPU_LOW_MEMORY_REDUCTION_FACTOR);
      LOG.debug("Medium CPU & high memory load ({}). Reducing: current={}, new={}",
          memoryLoad, currentPoolSize, reduced);
      return reduced;
    }
    int reduced = Math.max(initialPoolSize, currentPoolSize - currentPoolSize / MEDIUM_CPU_REDUCTION_FACTOR);
    LOG.debug("Medium CPU ({}). Moderate reduction: current={}, new={}",
        abfsConfiguration.getWriteMediumCpuThreshold(), currentPoolSize, reduced);
    return reduced;
  }

  /**
   * Calculates an adjusted thread pool size when low CPU utilization is detected.
   * If sufficient heap memory is available, the pool size is increased to improve throughput.
   * Otherwise, it is slightly decreased to conserve memory resources.
   *
   * @param currentPoolSize  the current size of the thread pool.
   * @param memoryLoad      the current JVM heap load (0.0���1.0)
   * @return the adjusted (increased or decreased) pool size based on CPU and memory conditions.
   */
  private int calculateIncreasedPoolSizeLowCPU(int currentPoolSize, double memoryLoad) {
    LOG.debug("The low cpu memory load is {}", memoryLoad);
    if (memoryLoad <= lowMemoryThreshold) {
      int increased = Math.min(maxThreadPoolSize, (int) (currentPoolSize * LOW_CPU_POOL_SIZE_INCREASE_FACTOR));
      LOG.debug("Low CPU & low memory load ({}). Increasing: current={}, new={}",
          memoryLoad, currentPoolSize, increased);
      return increased;
    } else {
      // Decrease by 10%
      int decreased = Math.max(1, (int) (currentPoolSize * LOW_CPU_HIGH_MEMORY_DECREASE_FACTOR));
      LOG.debug("Low CPU but insufficient heap. Decreasing: current={}, new={}", currentPoolSize, decreased);
      return decreased;
    }
  }

  /**
   * Returns the executor service for the thread pool.
   *
   * @return the executor service.
   */
  public ExecutorService getExecutorService() {
    return boundedThreadPool;
  }

  /**
   * Returns the scheduled executor responsible for CPU monitoring and dynamic pool adjustment.
   *
   * @return the {@link ScheduledExecutorService} used for CPU monitoring.
   */
  public ScheduledExecutorService getCpuMonitorExecutor() {
    return cpuMonitorExecutor;
  }

  /**
   * Checks if monitoring has started.
   *
   * @return true if monitoring has started, false otherwise.
   */
  public synchronized boolean isMonitoringStarted() {
    return isMonitoringStarted;
  }

  /**
   * Returns the maximum JVM CPU utilization observed during the current
   * monitoring interval or since the last reset.
   *
   * @return the highest JVM CPU utilization percentage recorded
   */
  @VisibleForTesting
  public long getMaxJvmCpuUtilization() {
    return maxJvmCpuUtilization;
  }

  /**
   * Closes this manager by shutting down executors and cleaning up resources.
   * Removes the instance from the active manager map.
   *
   * @throws IOException if an error occurs during shutdown.
   */
  @Override
  public void close() throws IOException {
    synchronized (this) {
      try {
        // Shutdown CPU monitor
        if (cpuMonitorExecutor != null && !cpuMonitorExecutor.isShutdown()) {
          cpuMonitorExecutor.shutdown();
        }
        // Gracefully shutdown the bounded thread pool
        if (boundedThreadPool != null && !boundedThreadPool.isShutdown()) {
          boundedThreadPool.shutdown();
          if (!boundedThreadPool.awaitTermination(THIRTY_SECONDS, TimeUnit.SECONDS)) {
            LOG.warn("Bounded thread pool did not terminate in time, forcing shutdownNow for filesystem: {}", filesystemName);
            boundedThreadPool.shutdownNow();
          }
          boundedThreadPool = null;
        }
        // Remove from the map
        POOL_SIZE_MANAGER_MAP.remove(filesystemName);
        LOG.debug("Closed and removed instance for filesystem: {}", filesystemName);
      } catch (Exception e) {
        LOG.warn("Failed to properly close instance for filesystem: {}", filesystemName, e);
      }
    }
  }

  /**
   * Represents current statistics of the write thread pool and system.
   */
  public static class WriteThreadPoolStats extends ResourceUtilizationStats {

    /**
     * Constructs a {@link WriteThreadPoolStats} instance containing thread pool
     * metrics and JVM/system resource utilization details.
     *
     * @param currentPoolSize the current number of threads in the pool
     * @param maxPoolSize the maximum number of threads permitted in the pool
     * @param activeThreads the number of threads actively executing tasks
     * @param idleThreads the number of idle threads in the pool
     * @param jvmCpuLoad the current JVM CPU load (0.0���1.0)
     * @param systemCpuUtilization the current system-wide CPU utilization (0.0���1.0)
     * @param availableHeapGB the available heap memory in gigabytes
     * @param committedHeapGB the committed heap memory in gigabytes
     * @param usedHeapGB the available heap memory in gigabytes
     * @param maxHeapGB the committed heap memory in gigabytes
     * @param memoryLoad the JVM memory load (used / max)
     * @param lastScaleDirection the last scaling action performed: "I" (increase),
     * "D" (decrease), or empty if no scaling occurred
     * @param maxCpuUtilization the peak JVM CPU utilization observed during this interval
     * @param jvmProcessId the process ID of the JVM
     */
    public WriteThreadPoolStats(int currentPoolSize,
        int maxPoolSize, int activeThreads, int idleThreads,
        long jvmCpuLoad, long systemCpuUtilization, long availableHeapGB,
        long committedHeapGB, long usedHeapGB, long maxHeapGB, long memoryLoad, String lastScaleDirection,
        long maxCpuUtilization, long jvmProcessId) {
      super(currentPoolSize, maxPoolSize, activeThreads, idleThreads,
          jvmCpuLoad, systemCpuUtilization, availableHeapGB,
          committedHeapGB, usedHeapGB, maxHeapGB, memoryLoad, lastScaleDirection,
          maxCpuUtilization, jvmProcessId);
    }
  }

  /**
   * Returns a snapshot of the current write thread pool and JVM/system resource
   * statistics.
   *
   * <p>The snapshot includes thread pool size and activity, JVM and system CPU
   * utilization, and JVM heap memory metrics. These values are used for monitoring
   * and for making dynamic scaling decisions for the write thread pool.</p>
   *
   * @param jvmCpuUtilization current JVM CPU utilization
   * @param memoryLoad current JVM memory load ratio (used / max)
   * @param usedMemory current used JVM heap memory
   * @param availableMemory current available JVM heap memory
   * @param committedMemory current committed JVM heap memory
   *
   * @return a {@link WriteThreadPoolStats} instance containing the current metrics
   */
  synchronized WriteThreadPoolStats getCurrentStats(long jvmCpuUtilization,
      long memoryLoad, long usedMemory, long availableMemory, long committedMemory) {

    if (boundedThreadPool == null) {
      return new WriteThreadPoolStats(
          ZERO, ZERO, ZERO, ZERO, ZERO, ZERO, ZERO, ZERO, ZERO,
          ZERO, ZERO, EMPTY_STRING, ZERO, ZERO);
    }

    ThreadPoolExecutor exec = (ThreadPoolExecutor) this.boundedThreadPool;

    String currentScaleDirection = lastScaleDirection;
    lastScaleDirection = EMPTY_STRING;

    int poolSize = exec.getPoolSize();
    int activeThreads = exec.getActiveCount();
    int idleThreads = poolSize - activeThreads;

    return new WriteThreadPoolStats(
        poolSize,                      // Current thread count
        exec.getMaximumPoolSize(),     // Max allowed threads
        activeThreads,                 // Busy threads
        idleThreads,                   // Idle threads
        jvmCpuUtilization,             // JVM CPU usage (ratio)
        ResourceUtilizationUtils.getSystemCpuLoad(), // System CPU usage (ratio)
        availableMemory, // Free heap (GB)
        committedMemory, // Committed heap (GB)
        usedMemory,   // Used heap (GB)
        ResourceUtilizationUtils.getMaxHeapMemory(),    // Max heap (GB)
        memoryLoad,                    // used/max
        currentScaleDirection,         // "I", "D", or ""
        getMaxJvmCpuUtilization(),              // Peak JVM CPU usage so far
        JvmUniqueIdProvider.getJvmId()            // JVM PID
    );
  }
}