ReadBufferManager.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 * <p>
 * http://www.apache.org/licenses/LICENSE-2.0
 * <p>
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.fs.azurebfs.services;

import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.Stack;
import java.util.concurrent.locks.ReentrantLock;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus;
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;

import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_READ_AHEAD_BLOCK_SIZE;

/**
 * Abstract class for managing read buffers for Azure Blob File System input streams.
 */
public abstract class ReadBufferManager {
  protected static final Logger LOGGER = LoggerFactory.getLogger(
      ReadBufferManager.class);
  protected static final ReentrantLock LOCK = new ReentrantLock();
  private static int thresholdAgeMilliseconds;
  private static int blockSize = DEFAULT_READ_AHEAD_BLOCK_SIZE; // default block size for read-ahead in bytes

  private Stack<Integer> freeList = new Stack<>();   // indices in buffers[] array that are available
  private Queue<ReadBuffer> readAheadQueue = new LinkedList<>(); // queue of requests that are not picked up by any worker thread yet
  private LinkedList<ReadBuffer> inProgressList = new LinkedList<>(); // requests being processed by worker threads
  private LinkedList<ReadBuffer> completedReadList = new LinkedList<>(); // buffers available for reading

  /**
   * Initializes the ReadBufferManager singleton instance. Creates the read buffers and threads.
   * This method should be called once to set up the read buffer manager.
   */
  abstract void init();

  /**
   * Queues a read-ahead request from {@link AbfsInputStream}
   * for a given offset in file and given length.
   * @param stream the input stream requesting the read-ahead
   * @param requestedOffset the offset in the remote file to start reading
   * @param requestedLength the number of bytes to read from file
   * @param tracingContext the tracing context for diagnostics
   */
  abstract void queueReadAhead(AbfsInputStream stream,
      long requestedOffset,
      int requestedLength,
      TracingContext tracingContext);

  /**
   * Gets a block of data from the prefetched data by ReadBufferManager.
   * {@link AbfsInputStream} calls this method read any bytes already available in a buffer (thereby saving a
   * remote read). This returns the bytes if the data already exists in buffer. If there is a buffer that is reading
   * the requested offset, then this method blocks until that read completes. If the data is queued in a read-ahead
   * but not picked up by a worker thread yet, then it cancels that read-ahead and reports cache miss. This is because
   * depending on worker thread availability, the read-ahead may take a while - the calling thread can do its own
   * read to get the data faster (compared to the read waiting in queue for an indeterminate amount of time).
   *
   * @param stream the input stream requesting the block
   * @param position the position in the file to read from
   * @param length the number of bytes to read
   * @param buffer the buffer to store the read data
   * @return the number of bytes actually read
   * @throws IOException if an I/O error occurs
   */
  abstract int getBlock(AbfsInputStream stream,
      long position,
      int length,
      byte[] buffer) throws IOException;

  /**
   * {@link ReadBufferWorker} calls this to get the next buffer to read from read-ahead queue.
   * Requested read will be performed by background thread.
   *
   * @return the next {@link ReadBuffer} to read
   * @throws InterruptedException if interrupted while waiting
   */
  abstract ReadBuffer getNextBlockToRead() throws InterruptedException;

  /**
   * Marks the specified buffer as done reading and updates its status.
   * Called by {@link ReadBufferWorker} after reading is complete.
   *
   * @param buffer the buffer that was read by worker thread
   * @param result the status of the read operation
   * @param bytesActuallyRead the number of bytes actually read by worker thread.
   */
  abstract void doneReading(ReadBuffer buffer,
      ReadBufferStatus result,
      int bytesActuallyRead);

  /**
   * Purging the buffers associated with an {@link AbfsInputStream}
   * from {@link ReadBufferManager} when stream is closed.
   *
   * @param stream the input stream whose buffers should be purged.
   */
  abstract void purgeBuffersForStream(AbfsInputStream stream);


  // Following Methods are for testing purposes only and should not be used in production code.

  /**
   * Gets the number of buffers currently managed by the read buffer manager.
   *
   * @return the number of buffers
   */
  @VisibleForTesting
  abstract int getNumBuffers();

  /**
   * Attempts to evict buffers based on the eviction policy.
   */
  @VisibleForTesting
  abstract void callTryEvict();

  /**
   * Resets the read buffer manager for testing purposes. Clean up the current
   * state of readAhead buffers and the lists. Will also trigger a fresh init.
   */
  @VisibleForTesting
  abstract void testResetReadBufferManager();

  /**
   * Resets the read buffer manager for testing with the specified block size and threshold age.
   *
   * @param readAheadBlockSize the block size for read-ahead
   * @param thresholdAgeMilliseconds the threshold age in milliseconds
   */
  @VisibleForTesting
  abstract void testResetReadBufferManager(int readAheadBlockSize, int thresholdAgeMilliseconds);

  /**
   * Resets the buffer manager instance to null for testing purposes.
   * This allows for reinitialization in tests.
   */
  abstract void resetBufferManager();

  /**
   * Gets the threshold age in milliseconds for buffer eviction.
   *
   * @return the threshold age in milliseconds
   */
  @VisibleForTesting
  protected static int getThresholdAgeMilliseconds() {
    return thresholdAgeMilliseconds;
  }

  /**
   * Sets the threshold age in milliseconds for buffer eviction.
   *
   * @param thresholdAgeMs the threshold age in milliseconds
   */
  @VisibleForTesting
  protected static void setThresholdAgeMilliseconds(int thresholdAgeMs) {
    thresholdAgeMilliseconds = thresholdAgeMs;
  }

  /**
   * Gets the block size used for read-ahead operations.
   *
   * @return the read-ahead block size in bytes
   */
  @VisibleForTesting
  protected static int getReadAheadBlockSize() {
    return blockSize;
  }

  /**
   * Sets the block size used for read-ahead operations.
   *
   * @param readAheadBlockSize the read-ahead block size in bytes
   */
  @VisibleForTesting
  protected static void setReadAheadBlockSize(int readAheadBlockSize) {
    if (readAheadBlockSize <= 0) {
      throw new IllegalArgumentException("Read-ahead block size must be positive");
    }
    blockSize = readAheadBlockSize;
  }

  /**
   * Gets the stack of free buffer indices.
   *
   * @return the stack of free buffer indices
   */
  public Stack<Integer> getFreeList() {
    return freeList;
  }

  /**
   * Gets the queue of read-ahead requests.
   *
   * @return the queue of {@link ReadBuffer} objects in the read-ahead queue
   */
  public Queue<ReadBuffer> getReadAheadQueue() {
    return readAheadQueue;
  }

  /**
   * Gets the list of in-progress read buffers.
   *
   * @return the list of {@link ReadBuffer} objects that are currently being processed
   */
  public LinkedList<ReadBuffer> getInProgressList() {
    return inProgressList;
  }

  /**
   * Gets the list of completed read buffers.
   *
   * @return the list of {@link ReadBuffer} objects that have been read and are available for use
   */
  public LinkedList<ReadBuffer> getCompletedReadList() {
    return completedReadList;
  }


  /**
   * Gets a copy of the list of free buffer indices.
   *
   * @return a list of free buffer indices
   */
  @VisibleForTesting
  protected synchronized List<Integer> getFreeListCopy() {
    return new ArrayList<>(freeList);
  }

  /**
   * Gets a copy of the read-ahead queue.
   *
   * @return a list of {@link ReadBuffer} objects in the read-ahead queue
   */
  @VisibleForTesting
  protected synchronized List<ReadBuffer> getReadAheadQueueCopy() {
    return new ArrayList<>(readAheadQueue);
  }

  /**
   * Gets a copy of the list of in-progress read buffers.
   *
   * @return a list of in-progress {@link ReadBuffer} objects
   */
  @VisibleForTesting
  protected synchronized List<ReadBuffer> getInProgressCopiedList() {
    return new ArrayList<>(inProgressList);
  }

  /**
   * Gets a copy of the list of completed read buffers.
   *
   * @return a list of completed {@link ReadBuffer} objects
   */
  @VisibleForTesting
  protected synchronized List<ReadBuffer> getCompletedReadListCopy() {
    return new ArrayList<>(completedReadList);
  }

  /**
   * Gets the size of the completed read list.
   *
   * @return the number of completed read buffers
   */
  @VisibleForTesting
  protected int getCompletedReadListSize() {
    return completedReadList.size();
  }

  /**
   * Simulates full buffer usage and adds a failed buffer for testing.
   *
   * @param buf the buffer to add as failed
   */
  @VisibleForTesting
  protected void testMimicFullUseAndAddFailedBuffer(ReadBuffer buf) {
    freeList.clear();
    completedReadList.add(buf);
  }
}