WriteThreadPoolSizeManager.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.fs.azurebfs;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Closeable;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.lang.management.MemoryMXBean;
import java.lang.management.MemoryUsage;

import com.sun.management.OperatingSystemMXBean;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.LOW_HEAP_SPACE_FACTOR;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.MEDIUM_HEAP_SPACE_FACTOR;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.BYTES_PER_GIGABYTE;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.HIGH_CPU_LOW_MEMORY_REDUCTION_FACTOR;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.HIGH_CPU_REDUCTION_FACTOR;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.HIGH_MEDIUM_HEAP_FACTOR;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.HUNDRED_D;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.LOW_CPU_HEAP_FACTOR;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.LOW_CPU_HIGH_MEMORY_DECREASE_FACTOR;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.LOW_CPU_POOL_SIZE_INCREASE_FACTOR;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MEDIUM_CPU_LOW_MEMORY_REDUCTION_FACTOR;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MEDIUM_CPU_REDUCTION_FACTOR;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.THIRTY_SECONDS;

/**
 * Manages a thread pool for writing operations, adjusting the pool size based on CPU utilization.
 */
public final class WriteThreadPoolSizeManager implements Closeable {

  /* Maximum allowed size for the thread pool. */
  private final int maxThreadPoolSize;
  /* Executor for periodically monitoring CPU usage. */
  private final ScheduledExecutorService cpuMonitorExecutor;
  /* Thread pool whose size is dynamically managed. */
  private volatile ExecutorService boundedThreadPool;
  /* Lock to ensure thread-safe updates to the thread pool. */
  private final Lock lock = new ReentrantLock();
  /* New computed max size for the thread pool after adjustment. */
  private volatile int newMaxPoolSize;
  /* Logger instance for logging events from WriteThreadPoolSizeManager. */
  private static final Logger LOG = LoggerFactory.getLogger(
      WriteThreadPoolSizeManager.class);
  /* Map to maintain a WriteThreadPoolSizeManager instance per filesystem. */
  private static final ConcurrentHashMap<String, WriteThreadPoolSizeManager>
      POOL_SIZE_MANAGER_MAP = new ConcurrentHashMap<>();
  /* Name of the filesystem associated with this manager. */
  private final String filesystemName;
  /* Initial size for the thread pool when created. */
  private final int initialPoolSize;
  /* Initially available heap memory. */
  private final long initialAvailableHeapMemory;
  /* The configuration instance. */
  private final AbfsConfiguration abfsConfiguration;

  /**
   * Private constructor to initialize the write thread pool and CPU monitor executor
   * based on system resources and ABFS configuration.
   *
   * @param filesystemName       Name of the ABFS filesystem.
   * @param abfsConfiguration    Configuration containing pool size parameters.
   */
  private WriteThreadPoolSizeManager(String filesystemName,
      AbfsConfiguration abfsConfiguration) {
    this.filesystemName = filesystemName;
    this.abfsConfiguration = abfsConfiguration;
    int availableProcessors = Runtime.getRuntime().availableProcessors();
    /* Get the heap space available when the instance is created */
    this.initialAvailableHeapMemory = getAvailableHeapMemory();
    /* Compute the max pool size */
    int computedMaxPoolSize = getComputedMaxPoolSize(availableProcessors, initialAvailableHeapMemory);

    /* Get the initial pool size from config, fallback to at least 1 */
    this.initialPoolSize = Math.max(1,
        abfsConfiguration.getWriteConcurrentRequestCount());

    /* Set the upper bound for the thread pool size */
    this.maxThreadPoolSize = Math.max(computedMaxPoolSize, initialPoolSize);
    AtomicInteger threadCount = new AtomicInteger(1);
    this.boundedThreadPool = Executors.newFixedThreadPool(
        initialPoolSize,
        r -> {
          Thread t = new Thread(r);
          t.setName("abfs-boundedwrite-" + threadCount.getAndIncrement());
          return t;
        }
    );
    ThreadPoolExecutor executor = (ThreadPoolExecutor) this.boundedThreadPool;
    executor.setKeepAliveTime(
        abfsConfiguration.getWriteThreadPoolKeepAliveTime(), TimeUnit.SECONDS);
    executor.allowCoreThreadTimeOut(true);
    /* Create a scheduled executor for CPU monitoring and pool adjustment */
    this.cpuMonitorExecutor = Executors.newScheduledThreadPool(1);
  }

  /** Returns the internal {@link AbfsConfiguration}. */
  private AbfsConfiguration getAbfsConfiguration() {
    return abfsConfiguration;
  }

  /**
   * Computes the maximum thread pool size based on the available processors
   * and the initial available heap memory. The calculation uses a tiered
   * multiplier derived from the memory-to-core ratio ��� systems with higher
   * memory per core allow for a larger thread pool.
   *
   * @param availableProcessors the number of available CPU cores.
   * @param initialAvailableHeapMemory the initial available heap memory, in bytes or GB (depending on implementation).
   * @return the computed maximum thread pool size.
   */
  private int getComputedMaxPoolSize(final int availableProcessors, long initialAvailableHeapMemory) {
    int maxpoolSize = getMemoryTierMaxThreads(initialAvailableHeapMemory, availableProcessors);
    LOG.debug("Computed max thread pool size: {} | Available processors: {} | Heap memory (GB): {}",
        maxpoolSize, availableProcessors, initialAvailableHeapMemory);
    return maxpoolSize;
  }

  /**
   * Calculates the available heap memory in gigabytes.
   * This method uses {@link Runtime#getRuntime()} to obtain the maximum heap memory
   * allowed for the JVM and subtracts the currently used memory (total - free)
   * to determine how much heap memory is still available.
   * The result is rounded up to the nearest gigabyte.
   *
   * @return the available heap memory in gigabytes
   */
  private long getAvailableHeapMemory() {
    MemoryMXBean osBean = ManagementFactory.getMemoryMXBean();
    MemoryUsage memoryUsage = osBean.getHeapMemoryUsage();
    long availableHeapBytes = memoryUsage.getMax() - memoryUsage.getUsed();
    return (availableHeapBytes + BYTES_PER_GIGABYTE - 1) / BYTES_PER_GIGABYTE;
  }

  /**
   * Returns aggressive thread count = CPU cores �� multiplier based on heap tier.
   */
  private int getMemoryTierMaxThreads(long availableHeapGB, int availableProcessors) {
    int multiplier;
    if (availableHeapGB <= LOW_HEAP_SPACE_FACTOR) {
      multiplier = abfsConfiguration.getLowTierMemoryMultiplier();
    } else if (availableHeapGB <= MEDIUM_HEAP_SPACE_FACTOR) {
      multiplier = abfsConfiguration.getMediumTierMemoryMultiplier();
    } else {
      multiplier = abfsConfiguration.getHighTierMemoryMultiplier();
    }
    return availableProcessors * multiplier;
  }

  /**
   * Returns the singleton instance of WriteThreadPoolSizeManager for the given filesystem.
   *
   * @param filesystemName the name of the filesystem.
   * @param abfsConfiguration the configuration for the ABFS.
   *
   * @return the singleton instance.
   */
  public static synchronized WriteThreadPoolSizeManager getInstance(
      String filesystemName, AbfsConfiguration abfsConfiguration) {
    /* Check if an instance already exists in the map for the given filesystem */
    WriteThreadPoolSizeManager existingInstance = POOL_SIZE_MANAGER_MAP.get(
        filesystemName);

    /* If an existing instance is found, return it */
    if (existingInstance != null && existingInstance.boundedThreadPool != null
        && !existingInstance.boundedThreadPool.isShutdown()) {
      return existingInstance;
    }

    /* Otherwise, create a new instance, put it in the map, and return it */
    LOG.debug(
        "Creating new WriteThreadPoolSizeManager instance for filesystem: {}",
        filesystemName);
    WriteThreadPoolSizeManager newInstance = new WriteThreadPoolSizeManager(
        filesystemName, abfsConfiguration);
    POOL_SIZE_MANAGER_MAP.put(filesystemName, newInstance);
    return newInstance;
  }

  /**
   * Adjusts the thread pool size to the specified maximum pool size.
   *
   * @param newMaxPoolSize the new maximum pool size.
   */
  private void adjustThreadPoolSize(int newMaxPoolSize) {
    synchronized (this) {
      ThreadPoolExecutor threadPoolExecutor
          = ((ThreadPoolExecutor) boundedThreadPool);
      int currentCorePoolSize = threadPoolExecutor.getCorePoolSize();

      if (newMaxPoolSize >= currentCorePoolSize) {
        threadPoolExecutor.setMaximumPoolSize(newMaxPoolSize);
        threadPoolExecutor.setCorePoolSize(newMaxPoolSize);
      } else {
        threadPoolExecutor.setCorePoolSize(newMaxPoolSize);
        threadPoolExecutor.setMaximumPoolSize(newMaxPoolSize);
      }
      LOG.debug("ThreadPool Info - New max pool size: {}, Current pool size: {}, Active threads: {}",
          newMaxPoolSize, threadPoolExecutor.getPoolSize(), threadPoolExecutor.getActiveCount());
    }
  }

  /**
   * Starts monitoring the CPU utilization and adjusts the thread pool size accordingly.
   */
  synchronized void startCPUMonitoring() {
    cpuMonitorExecutor.scheduleAtFixedRate(() -> {
      double cpuUtilization = getCpuUtilization();
      LOG.debug("Current CPU Utilization is this: {}", cpuUtilization);
      try {
        adjustThreadPoolSizeBasedOnCPU(cpuUtilization);
      } catch (InterruptedException e) {
        throw new RuntimeException(String.format(
            "Thread pool size adjustment interrupted for filesystem %s",
            filesystemName), e);
      }
    }, 0, getAbfsConfiguration().getWriteCpuMonitoringInterval(), TimeUnit.MILLISECONDS);
  }

  /**
   * Gets the current system CPU utilization.
   *
   * @return the CPU utilization as a fraction (0.0 to 1.0), or 0.0 if unavailable.
   */
  private double getCpuUtilization() {
    OperatingSystemMXBean osBean = ManagementFactory.getPlatformMXBean(
        OperatingSystemMXBean.class);
    double cpuLoad = osBean.getSystemCpuLoad();
    if (cpuLoad < 0) {
      LOG.warn("System CPU load value unavailable (returned -1.0). Defaulting to 0.0.");
      return 0.0;
    }
    return cpuLoad;
  }

  /**
   * Dynamically adjusts the thread pool size based on current CPU utilization
   * and available heap memory relative to the initially available heap.
   *
   * @param cpuUtilization Current system CPU utilization (0.0 to 1.0)
   * @throws InterruptedException if thread locking is interrupted
   */
  public void adjustThreadPoolSizeBasedOnCPU(double cpuUtilization) throws InterruptedException {
    lock.lock();
    try {
      ThreadPoolExecutor executor = (ThreadPoolExecutor) this.boundedThreadPool;
      int currentPoolSize = executor.getMaximumPoolSize();
      long currentHeap = getAvailableHeapMemory();
      long initialHeap = initialAvailableHeapMemory;
      LOG.debug("Available heap memory: {} GB, Initial heap memory: {} GB", currentHeap, initialHeap);
      LOG.debug("Current CPU Utilization: {}", cpuUtilization);

      if (cpuUtilization > (abfsConfiguration.getWriteHighCpuThreshold()/HUNDRED_D)) {
        newMaxPoolSize = calculateReducedPoolSizeHighCPU(currentPoolSize, currentHeap, initialHeap);
      } else if (cpuUtilization > (abfsConfiguration.getWriteMediumCpuThreshold()/HUNDRED_D)) {
        newMaxPoolSize = calculateReducedPoolSizeMediumCPU(currentPoolSize, currentHeap, initialHeap);
      } else if (cpuUtilization < (abfsConfiguration.getWriteLowCpuThreshold()/HUNDRED_D)) {
        newMaxPoolSize = calculateIncreasedPoolSizeLowCPU(currentPoolSize, currentHeap, initialHeap);
      } else {
        newMaxPoolSize = currentPoolSize;
        LOG.debug("CPU load normal ({}). No change: current={}", cpuUtilization, currentPoolSize);
      }
      if (newMaxPoolSize != currentPoolSize) {
        LOG.debug("Resizing thread pool from {} to {}", currentPoolSize, newMaxPoolSize);
        adjustThreadPoolSize(newMaxPoolSize);
      }
    } finally {
      lock.unlock();
    }
  }

  /**
   * Calculates reduced pool size under high CPU utilization.
   */
  private int calculateReducedPoolSizeHighCPU(int currentPoolSize, long currentHeap, long initialHeap) {
    if (currentHeap <= initialHeap / HIGH_MEDIUM_HEAP_FACTOR) {
      LOG.debug("High CPU & low heap. Aggressively reducing: current={}, new={}",
          currentPoolSize, currentPoolSize / HIGH_CPU_LOW_MEMORY_REDUCTION_FACTOR);
      return Math.max(initialPoolSize, currentPoolSize / HIGH_CPU_LOW_MEMORY_REDUCTION_FACTOR);
    }
    int reduced = Math.max(initialPoolSize, currentPoolSize - currentPoolSize / HIGH_CPU_REDUCTION_FACTOR);
    LOG.debug("High CPU ({}). Reducing pool size moderately: current={}, new={}",
        abfsConfiguration.getWriteHighCpuThreshold(), currentPoolSize, reduced);
    return reduced;
  }

  /**
   * Calculates reduced pool size under medium CPU utilization.
   */
  private int calculateReducedPoolSizeMediumCPU(int currentPoolSize, long currentHeap, long initialHeap) {
    if (currentHeap <= initialHeap / HIGH_MEDIUM_HEAP_FACTOR) {
      int reduced = Math.max(initialPoolSize, currentPoolSize - currentPoolSize / MEDIUM_CPU_LOW_MEMORY_REDUCTION_FACTOR);
      LOG.debug("Medium CPU & low heap. Reducing: current={}, new={}", currentPoolSize, reduced);
      return reduced;
    }
    int reduced = Math.max(initialPoolSize, currentPoolSize - currentPoolSize / MEDIUM_CPU_REDUCTION_FACTOR);
    LOG.debug("Medium CPU ({}). Moderate reduction: current={}, new={}",
        abfsConfiguration.getWriteMediumCpuThreshold(), currentPoolSize, reduced);
    return reduced;
  }

  /**
   * Calculates increased pool size under low CPU utilization.
   */
  private int calculateIncreasedPoolSizeLowCPU(int currentPoolSize, long currentHeap, long initialHeap) {
    if (currentHeap >= initialHeap * LOW_CPU_HEAP_FACTOR) {
      int increased = Math.min(maxThreadPoolSize, (int) (currentPoolSize * LOW_CPU_POOL_SIZE_INCREASE_FACTOR));
      LOG.debug("Low CPU & healthy heap. Increasing: current={}, new={}", currentPoolSize, increased);
      return increased;
    } else {
      // Decrease by 10%
      int decreased = Math.max(1, (int) (currentPoolSize * LOW_CPU_HIGH_MEMORY_DECREASE_FACTOR));
      LOG.debug("Low CPU but insufficient heap ({} GB). Decreasing: current={}, new={}", currentHeap, currentPoolSize, decreased);
      return decreased;
    }
  }


  /**
   * Returns the executor service for the thread pool.
   *
   * @return the executor service.
   */
  public ExecutorService getExecutorService() {
    return boundedThreadPool;
  }

  /**
   * Returns the scheduled executor responsible for CPU monitoring and dynamic pool adjustment.
   *
   * @return the {@link ScheduledExecutorService} used for CPU monitoring.
   */
  public ScheduledExecutorService getCpuMonitorExecutor() {
    return cpuMonitorExecutor;
  }

  /**
   * Closes this manager by shutting down executors and cleaning up resources.
   * Removes the instance from the active manager map.
   *
   * @throws IOException if an error occurs during shutdown.
   */
  @Override
  public void close() throws IOException {
    synchronized (this) {
      try {
        // Shutdown CPU monitor
        if (cpuMonitorExecutor != null && !cpuMonitorExecutor.isShutdown()) {
          cpuMonitorExecutor.shutdown();
        }
        // Gracefully shutdown the bounded thread pool
        if (boundedThreadPool != null && !boundedThreadPool.isShutdown()) {
          boundedThreadPool.shutdown();
          if (!boundedThreadPool.awaitTermination(THIRTY_SECONDS, TimeUnit.SECONDS)) {
            LOG.warn("Bounded thread pool did not terminate in time, forcing shutdownNow for filesystem: {}", filesystemName);
            boundedThreadPool.shutdownNow();
          }
          boundedThreadPool = null;
        }
        // Remove from the map
        POOL_SIZE_MANAGER_MAP.remove(filesystemName);
        LOG.debug("Closed and removed instance for filesystem: {}", filesystemName);
      } catch (Exception e) {
        LOG.warn("Failed to properly close instance for filesystem: {}", filesystemName, e);
      }
    }
  }
}