ListBlobQueue.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.fs.azurebfs.services;

import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;

import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidConfigurationValueException;
import org.apache.hadoop.fs.Path;

import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_CONSUMER_MAX_LAG;

/**
 * Data-structure to hold the list of paths to be processed. The paths are
 * enqueued by the producer and dequeued by the consumer. The producer can
 * enqueue the paths until the queue is full. The consumer can consume the paths
 * until the queue is empty. The producer can mark the queue as completed once
 * all the paths are enqueued and there is no more paths that can be returned from
 * server. The consumer can mark the queue as failed if it encounters any exception
 * while consuming the paths.
 */
class ListBlobQueue {

  private final Queue<Path> pathQueue = new ArrayDeque<>();

  private final int maxSize;

  private final int consumeSetSize;

  private volatile boolean isCompleted = false;

  private volatile boolean isConsumptionFailed = false;

  private volatile AzureBlobFileSystemException failureFromProducer;

  /**
   * Maximum number of entries in the queue allowed for letting the producer to
   * produce. If the current size of the queue is greater than or equal to
   * maxConsumptionLag, the producer will wait until the current size of the queue
   * becomes lesser than maxConsumptionLag. This parameter is used to control the
   * behavior of the producer-consumer pattern and preventing producer from
   * rapidly producing very small amount of items.
   * <p>
   * For example, let's say maxSize is 10000 and maxConsumptionLag is 5000.
   * The producer will stop producing when the current size of the queue is 5000
   * and will wait until the current size of the queue becomes lesser than 5000.
   * Once, the size becomes lesser than 5000, producer can produce (maxSize - currentSize)
   * of items, which would make the current size of the queue to be 10000. Then again
   * it will wait for 5000 items to be consumed before generating next 5000 items.
   * <p>
   * If this is not used, the producer will keep on producing items as soon as
   * the queue become available with small size. Let say, 5 items got consumed,
   * producer would make a server call for only 5 items and would populate the queue.
   * <p>
   * This mechanism would prevent producer making server calls for very small amount
   * of items.
   */
  private final int maxConsumptionLag;

  ListBlobQueue(int maxSize, int consumeSetSize, int maxConsumptionLag)
      throws InvalidConfigurationValueException {
    this.maxSize = maxSize;
    this.maxConsumptionLag = maxConsumptionLag;
    this.consumeSetSize = consumeSetSize;

    if (maxConsumptionLag >= maxSize) {
      throw new InvalidConfigurationValueException(FS_AZURE_CONSUMER_MAX_LAG,
          "maxConsumptionLag should be lesser than maxSize");
    }
  }

  /** Mark the queue as failed.*/
  void markProducerFailure(AzureBlobFileSystemException failure) {
    failureFromProducer = failure;
  }

  /** Mark the queue as completed.*/
  void complete() {
    isCompleted = true;
  }

  /** Mark the consumption as failed.*/
  synchronized void markConsumptionFailed() {
    isConsumptionFailed = true;
    notify();
  }

  /** Check if the consumption has failed.
   *
   * @return true if the consumption has failed
   */
  boolean getConsumptionFailed() {
    return isConsumptionFailed;
  }

  /** Check if the queue is completed.
   *
   * @return true if the queue is completed
   */
  boolean getIsCompleted() {
    return isCompleted && size() == 0;
  }

  /** Get the exception from producer.
   *
   * @return exception from producer
   */
  private AzureBlobFileSystemException getException() {
    return failureFromProducer;
  }

  /** Enqueue the paths.
   *
   * @param pathList list of paths to be enqueued
   */
  synchronized void enqueue(List<Path> pathList) {
    if (isCompleted) {
      throw new IllegalStateException(
          "Cannot enqueue paths as the queue is already marked as completed");
    }
    pathQueue.addAll(pathList);
  }

  /** Consume the paths.
   *
   * @return list of paths to be consumed
   * @throws AzureBlobFileSystemException if the consumption fails
   */
  synchronized List<Path> consume() throws AzureBlobFileSystemException {
    AzureBlobFileSystemException exception = getException();
    if (exception != null) {
      throw exception;
    }
    return dequeue();
  }

  /** Dequeue the paths.
   *
   * @return list of paths to be consumed
   */
  private List<Path> dequeue() {
    List<Path> pathListForConsumption = new ArrayList<>();
    int counter = 0;
    while (counter < consumeSetSize && !pathQueue.isEmpty()) {
      pathListForConsumption.add(pathQueue.poll());
      counter++;
    }
    if (counter > 0) {
      notify();
    }
    return pathListForConsumption;
  }

  synchronized int size() {
    return pathQueue.size();
  }

  /**
   * Returns the available size of the queue for production. This is calculated by subtracting
   * the current size of the queue from its maximum size. This method waits until
   * the current size of the queue becomes lesser than the maxConsumptionLag. This
   * method is synchronized to prevent concurrent modifications of the queue.
   *
   * @return the available size of the queue.
   */
  synchronized int availableSizeForProduction() {
    while (size() >= maxConsumptionLag) {
      if (isConsumptionFailed) {
        return 0;
      }
      try {
        wait();
      } catch (InterruptedException ignored) {
      }
    }
    return maxSize - size();
  }
}