AbfsClientThrottlingAnalyzer.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.fs.azurebfs.services;

import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
import org.apache.hadoop.util.Preconditions;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.hadoop.util.Time.now;

class AbfsClientThrottlingAnalyzer {
  private static final Logger LOG = LoggerFactory.getLogger(
      AbfsClientThrottlingAnalyzer.class);
  private static final int MIN_ANALYSIS_PERIOD_MS = 1000;
  private static final int MAX_ANALYSIS_PERIOD_MS = 30000;
  private static final double MIN_ACCEPTABLE_ERROR_PERCENTAGE = .1;
  private static final double MAX_EQUILIBRIUM_ERROR_PERCENTAGE = 1;
  private static final double RAPID_SLEEP_DECREASE_FACTOR = .75;
  private static final double RAPID_SLEEP_DECREASE_TRANSITION_PERIOD_MS = 150
      * 1000;
  private static final double SLEEP_DECREASE_FACTOR = .975;
  private static final double SLEEP_INCREASE_FACTOR = 1.05;
  private int analysisPeriodMs;

  private volatile int sleepDuration = 0;
  private long consecutiveNoErrorCount = 0;
  private String name = null;
  private Timer timer = null;
  private AtomicReference<AbfsOperationMetrics> blobMetrics = null;
  private AtomicLong lastExecutionTime = null;
  private final AtomicBoolean isOperationOnAccountIdle = new AtomicBoolean(false);
  private AbfsConfiguration abfsConfiguration = null;
  private boolean accountLevelThrottlingEnabled = true;

  private AbfsClientThrottlingAnalyzer() {
    // hide default constructor
  }

  /**
   * Creates an instance of the <code>AbfsClientThrottlingAnalyzer</code> class with
   * the specified name and period.
   *
   * @param name   A name used to identify this instance.
   * @param abfsConfiguration The configuration set.
   * @throws IllegalArgumentException If name is null or empty.
   *                                  If period is less than 1000 or greater than 30000 milliseconds.
   */
  AbfsClientThrottlingAnalyzer(String name, AbfsConfiguration abfsConfiguration)
      throws IllegalArgumentException {
    Preconditions.checkArgument(
        StringUtils.isNotEmpty(name),
        "The argument 'name' cannot be null or empty.");
    int period = abfsConfiguration.getAnalysisPeriod();
    Preconditions.checkArgument(
        period >= MIN_ANALYSIS_PERIOD_MS && period <= MAX_ANALYSIS_PERIOD_MS,
        "The argument 'period' must be between 1000 and 30000.");
    this.name = name;
    this.abfsConfiguration = abfsConfiguration;
    this.accountLevelThrottlingEnabled = abfsConfiguration.accountThrottlingEnabled();
    this.analysisPeriodMs = abfsConfiguration.getAnalysisPeriod();
    this.lastExecutionTime = new AtomicLong(now());
    this.blobMetrics = new AtomicReference<AbfsOperationMetrics>(
        new AbfsOperationMetrics(System.currentTimeMillis()));
    this.timer = new Timer(
        String.format("abfs-timer-client-throttling-analyzer-%s", name), true);
    this.timer.schedule(new TimerTaskImpl(),
        analysisPeriodMs,
        analysisPeriodMs);
  }

  /**
   * Resumes the timer if it was stopped.
   */
  private void resumeTimer() {
    blobMetrics = new AtomicReference<AbfsOperationMetrics>(
            new AbfsOperationMetrics(System.currentTimeMillis()));
    timer.schedule(new TimerTaskImpl(),
            analysisPeriodMs,
            analysisPeriodMs);
    isOperationOnAccountIdle.set(false);
  }

  /**
   * Synchronized method to suspend or resume timer.
   * @param timerFunctionality resume or suspend.
   * @param timerTask The timertask object.
   * @return true or false.
   */
  private synchronized boolean timerOrchestrator(TimerFunctionality timerFunctionality,
      TimerTask timerTask) {
    switch (timerFunctionality) {
    case RESUME:
      if (isOperationOnAccountIdle.get()) {
        resumeTimer();
      }
      break;
    case SUSPEND:
      if (accountLevelThrottlingEnabled && (System.currentTimeMillis()
          - lastExecutionTime.get() >= getOperationIdleTimeout())) {
        isOperationOnAccountIdle.set(true);
        timerTask.cancel();
        timer.purge();
        return true;
      }
      break;
    default:
      break;
    }
    return false;
  }

  /**
   * Updates metrics with results from the current storage operation.
   *
   * @param count             The count of bytes transferred.
   * @param isFailedOperation True if the operation failed; otherwise false.
   */
  public void addBytesTransferred(long count, boolean isFailedOperation) {
    AbfsOperationMetrics metrics = blobMetrics.get();
    if (isFailedOperation) {
      metrics.addBytesFailed(count);
      metrics.incrementOperationsFailed();
    } else {
      metrics.addBytesSuccessful(count);
      metrics.incrementOperationsSuccessful();
    }
    blobMetrics.set(metrics);
  }

  /**
   * Suspends the current storage operation, as necessary, to reduce throughput.
   * @return true if Thread sleeps(Throttling occurs) else false.
   */
  public boolean suspendIfNecessary() {
    lastExecutionTime.set(now());
    timerOrchestrator(TimerFunctionality.RESUME, null);
    int duration = sleepDuration;
    if (duration > 0) {
      try {
        Thread.sleep(duration);
        return true;
      } catch (InterruptedException ie) {
        Thread.currentThread().interrupt();
      }
    }
    return false;
  }

  @VisibleForTesting
  int getSleepDuration() {
    return sleepDuration;
  }

  int getOperationIdleTimeout() {
    return abfsConfiguration.getAccountOperationIdleTimeout();
  }

  AtomicBoolean getIsOperationOnAccountIdle() {
    return isOperationOnAccountIdle;
  }

  private int analyzeMetricsAndUpdateSleepDuration(AbfsOperationMetrics metrics,
                                                   int sleepDuration) {
    final double percentageConversionFactor = 100;
    double bytesFailed = metrics.getBytesFailed().get();
    double bytesSuccessful = metrics.getBytesSuccessful().get();
    double operationsFailed = metrics.getOperationsFailed().get();
    double operationsSuccessful = metrics.getOperationsSuccessful().get();
    double errorPercentage = (bytesFailed <= 0)
        ? 0
        : (percentageConversionFactor
        * bytesFailed
        / (bytesFailed + bytesSuccessful));
    long periodMs = metrics.getEndTime() - metrics.getStartTime();

    double newSleepDuration;

    if (errorPercentage < MIN_ACCEPTABLE_ERROR_PERCENTAGE) {
      ++consecutiveNoErrorCount;
      // Decrease sleepDuration in order to increase throughput.
      double reductionFactor =
          (consecutiveNoErrorCount * analysisPeriodMs
              >= RAPID_SLEEP_DECREASE_TRANSITION_PERIOD_MS)
              ? RAPID_SLEEP_DECREASE_FACTOR
              : SLEEP_DECREASE_FACTOR;

      newSleepDuration = sleepDuration * reductionFactor;
    } else if (errorPercentage < MAX_EQUILIBRIUM_ERROR_PERCENTAGE) {
      // Do not modify sleepDuration in order to stabilize throughput.
      newSleepDuration = sleepDuration;
    } else {
      // Increase sleepDuration in order to minimize error rate.
      consecutiveNoErrorCount = 0;

      // Increase sleep duration in order to reduce throughput and error rate.
      // First, calculate target throughput: bytesSuccessful / periodMs.
      // Next, calculate time required to send *all* data (assuming next period
      // is similar to previous) at the target throughput: (bytesSuccessful
      // + bytesFailed) * periodMs / bytesSuccessful. Next, subtract periodMs to
      // get the total additional delay needed.
      double additionalDelayNeeded = 5 * analysisPeriodMs;
      if (bytesSuccessful > 0) {
        additionalDelayNeeded = (bytesSuccessful + bytesFailed)
            * periodMs
            / bytesSuccessful
            - periodMs;
      }

      // amortize the additional delay needed across the estimated number of
      // requests during the next period
      newSleepDuration = additionalDelayNeeded
          / (operationsFailed + operationsSuccessful);

      final double maxSleepDuration = analysisPeriodMs;
      final double minSleepDuration = sleepDuration * SLEEP_INCREASE_FACTOR;

      // Add 1 ms to avoid rounding down and to decrease proximity to the server
      // side ingress/egress limit.  Ensure that the new sleep duration is
      // larger than the current one to more quickly reduce the number of
      // errors.  Don't allow the sleep duration to grow unbounded, after a
      // certain point throttling won't help, for example, if there are far too
      // many tasks/containers/nodes no amount of throttling will help.
      newSleepDuration = Math.max(newSleepDuration, minSleepDuration) + 1;
      newSleepDuration = Math.min(newSleepDuration, maxSleepDuration);
    }

    if (LOG.isDebugEnabled()) {
      LOG.debug(String.format(
          "%5.5s, %10d, %10d, %10d, %10d, %6.2f, %5d, %5d, %5d",
          name,
          (int) bytesFailed,
          (int) bytesSuccessful,
          (int) operationsFailed,
          (int) operationsSuccessful,
          errorPercentage,
          periodMs,
          (int) sleepDuration,
          (int) newSleepDuration));
    }

    return (int) newSleepDuration;
  }

  /**
   * Timer callback implementation for periodically analyzing metrics.
   */
  class TimerTaskImpl extends TimerTask {
    private AtomicInteger doingWork = new AtomicInteger(0);

    /**
     * Periodically analyzes a snapshot of the blob storage metrics and updates
     * the sleepDuration in order to appropriately throttle storage operations.
     */
    @Override
    public void run() {
      boolean doWork = false;
      try {
        doWork = doingWork.compareAndSet(0, 1);

        // prevent concurrent execution of this task
        if (!doWork) {
          return;
        }

        long now = System.currentTimeMillis();
        if (timerOrchestrator(TimerFunctionality.SUSPEND, this)) {
          return;
        }
        if (now - blobMetrics.get().getStartTime() >= analysisPeriodMs) {
          AbfsOperationMetrics oldMetrics = blobMetrics.getAndSet(
              new AbfsOperationMetrics(now));
          oldMetrics.setEndTime(now);
          sleepDuration = analyzeMetricsAndUpdateSleepDuration(oldMetrics,
              sleepDuration);
        }
      } finally {
        if (doWork) {
          doingWork.set(0);
        }
      }
    }
  }
}