ReadBufferManagerV2.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs.services;
import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus;
import com.sun.management.OperatingSystemMXBean;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.lang.management.MemoryMXBean;
import java.lang.management.MemoryUsage;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Stack;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
import org.apache.hadoop.classification.VisibleForTesting;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.HUNDRED_D;
/**
* The Improved Read Buffer Manager for Rest AbfsClient.
*/
public final class ReadBufferManagerV2 extends ReadBufferManager {
// Internal constants
private static final ReentrantLock LOCK = new ReentrantLock();
// Thread Pool Configurations
private static int minThreadPoolSize;
private static int maxThreadPoolSize;
private static int cpuMonitoringIntervalInMilliSec;
private static double cpuThreshold;
private static int threadPoolUpscalePercentage;
private static int threadPoolDownscalePercentage;
private static int executorServiceKeepAliveTimeInMilliSec;
private static final double THREAD_POOL_REQUIREMENT_BUFFER = 1.2;
// 20% more threads than the queue size
private static boolean isDynamicScalingEnabled;
private ScheduledExecutorService cpuMonitorThread;
private ThreadPoolExecutor workerPool;
private final List<ReadBufferWorker> workerRefs = new ArrayList<>();
// Buffer Pool Configurations
private static int minBufferPoolSize;
private static int maxBufferPoolSize;
private static int memoryMonitoringIntervalInMilliSec;
private static double memoryThreshold;
private final AtomicInteger numberOfActiveBuffers = new AtomicInteger(0);
private byte[][] bufferPool;
private final Stack<Integer> removedBufferList = new Stack<>();
private ScheduledExecutorService memoryMonitorThread;
// Buffer Manager Structures
private static ReadBufferManagerV2 bufferManager;
private static AtomicBoolean isConfigured = new AtomicBoolean(false);
/**
* Private constructor to prevent instantiation as this needs to be singleton.
*/
private ReadBufferManagerV2() {
printTraceLog("Creating Read Buffer Manager V2 with HADOOP-18546 patch");
}
static ReadBufferManagerV2 getBufferManager() {
if (!isConfigured.get()) {
throw new IllegalStateException("ReadBufferManagerV2 is not configured. "
+ "Please call setReadBufferManagerConfigs() before calling getBufferManager().");
}
if (bufferManager == null) {
LOCK.lock();
try {
if (bufferManager == null) {
bufferManager = new ReadBufferManagerV2();
bufferManager.init();
LOGGER.trace("ReadBufferManagerV2 singleton initialized");
}
} finally {
LOCK.unlock();
}
}
return bufferManager;
}
/**
* Set the ReadBufferManagerV2 configurations based on the provided before singleton initialization.
* @param readAheadBlockSize the read-ahead block size to set for the ReadBufferManagerV2.
* @param abfsConfiguration the configuration to set for the ReadBufferManagerV2.
*/
public static void setReadBufferManagerConfigs(final int readAheadBlockSize,
final AbfsConfiguration abfsConfiguration) {
// Set Configs only before initializations.
if (bufferManager == null && !isConfigured.get()) {
LOCK.lock();
try {
if (bufferManager == null && !isConfigured.get()) {
minThreadPoolSize = abfsConfiguration.getMinReadAheadV2ThreadPoolSize();
maxThreadPoolSize = abfsConfiguration.getMaxReadAheadV2ThreadPoolSize();
cpuMonitoringIntervalInMilliSec
= abfsConfiguration.getReadAheadV2CpuMonitoringIntervalMillis();
cpuThreshold = abfsConfiguration.getReadAheadV2CpuUsageThresholdPercent()
/ HUNDRED_D;
threadPoolUpscalePercentage
= abfsConfiguration.getReadAheadV2ThreadPoolUpscalePercentage();
threadPoolDownscalePercentage
= abfsConfiguration.getReadAheadV2ThreadPoolDownscalePercentage();
executorServiceKeepAliveTimeInMilliSec
= abfsConfiguration.getReadAheadExecutorServiceTTLInMillis();
minBufferPoolSize = abfsConfiguration.getMinReadAheadV2BufferPoolSize();
maxBufferPoolSize = abfsConfiguration.getMaxReadAheadV2BufferPoolSize();
memoryMonitoringIntervalInMilliSec
= abfsConfiguration.getReadAheadV2MemoryMonitoringIntervalMillis();
memoryThreshold =
abfsConfiguration.getReadAheadV2MemoryUsageThresholdPercent()
/ HUNDRED_D;
setThresholdAgeMilliseconds(
abfsConfiguration.getReadAheadV2CachedBufferTTLMillis());
isDynamicScalingEnabled
= abfsConfiguration.isReadAheadV2DynamicScalingEnabled();
setReadAheadBlockSize(readAheadBlockSize);
setIsConfigured(true);
}
} finally {
LOCK.unlock();
}
}
}
/**
* Initialize the singleton ReadBufferManagerV2.
*/
@Override
void init() {
// Initialize Buffer Pool. Size can never be more than max pool size
bufferPool = new byte[maxBufferPoolSize][];
for (int i = 0; i < minBufferPoolSize; i++) {
// Start with just minimum number of buffers.
bufferPool[i]
= new byte[getReadAheadBlockSize()]; // same buffers are reused. The byte array never goes back to GC
getFreeList().add(i);
numberOfActiveBuffers.getAndIncrement();
}
memoryMonitorThread = Executors.newSingleThreadScheduledExecutor(
runnable -> {
Thread t = new Thread(runnable, "ReadAheadV2-Memory-Monitor");
t.setDaemon(true);
return t;
});
memoryMonitorThread.scheduleAtFixedRate(this::scheduledEviction,
getMemoryMonitoringIntervalInMilliSec(),
getMemoryMonitoringIntervalInMilliSec(), TimeUnit.MILLISECONDS);
// Initialize a Fixed Size Thread Pool with minThreadPoolSize threads
workerPool = new ThreadPoolExecutor(
minThreadPoolSize,
maxThreadPoolSize,
executorServiceKeepAliveTimeInMilliSec,
TimeUnit.MILLISECONDS,
new SynchronousQueue<>(),
workerThreadFactory);
workerPool.allowCoreThreadTimeOut(true);
for (int i = 0; i < minThreadPoolSize; i++) {
ReadBufferWorker worker = new ReadBufferWorker(i, getBufferManager());
workerRefs.add(worker);
workerPool.submit(worker);
}
ReadBufferWorker.UNLEASH_WORKERS.countDown();
if (isDynamicScalingEnabled) {
cpuMonitorThread = Executors.newSingleThreadScheduledExecutor(
runnable -> {
Thread t = new Thread(runnable, "ReadAheadV2-CPU-Monitor");
t.setDaemon(true);
return t;
});
cpuMonitorThread.scheduleAtFixedRate(this::adjustThreadPool,
getCpuMonitoringIntervalInMilliSec(),
getCpuMonitoringIntervalInMilliSec(),
TimeUnit.MILLISECONDS);
}
printTraceLog(
"ReadBufferManagerV2 initialized with {} buffers and {} worker threads",
numberOfActiveBuffers.get(), workerRefs.size());
}
/**
* {@link AbfsInputStream} calls this method to queueing read-ahead.
* @param stream which read-ahead is requested from.
* @param requestedOffset The offset in the file which should be read.
* @param requestedLength The length to read.
*/
@Override
public void queueReadAhead(final AbfsInputStream stream,
final long requestedOffset,
final int requestedLength,
TracingContext tracingContext) {
printTraceLog(
"Start Queueing readAhead for file: {}, with eTag: {}, "
+ "offset: {}, length: {}, triggered by stream: {}",
stream.getPath(), stream.getETag(), requestedOffset, requestedLength,
stream.hashCode());
ReadBuffer buffer;
synchronized (this) {
if (isAlreadyQueued(stream.getETag(), requestedOffset)) {
// Already queued for this offset, so skip queuing.
printTraceLog(
"Skipping queuing readAhead for file: {}, with eTag: {}, "
+ "offset: {}, triggered by stream: {} as it is already queued",
stream.getPath(), stream.getETag(), requestedOffset,
stream.hashCode());
return;
}
if (isFreeListEmpty() && !tryMemoryUpscale() && !tryEvict()) {
// No buffers are available and more buffers cannot be created. Skip queuing.
printTraceLog(
"Skipping queuing readAhead for file: {}, with eTag: {}, offset: {}, triggered by stream: {} as no buffers are available",
stream.getPath(), stream.getETag(), requestedOffset,
stream.hashCode());
return;
}
// Create a new ReadBuffer to keep the prefetched data and queue.
buffer = new ReadBuffer();
buffer.setStream(stream); // To map buffer with stream that requested it
buffer.setETag(stream.getETag()); // To map buffer with file it belongs to
buffer.setPath(stream.getPath());
buffer.setOffset(requestedOffset);
buffer.setLength(0);
buffer.setRequestedLength(requestedLength);
buffer.setStatus(ReadBufferStatus.NOT_AVAILABLE);
buffer.setLatch(new CountDownLatch(1));
buffer.setTracingContext(tracingContext);
if (isFreeListEmpty()) {
/*
* By now there should be at least one buffer available.
* This is to double sure that after upscaling or eviction,
* we still have free buffer available. If not, we skip queueing.
*/
return;
}
Integer bufferIndex = popFromFreeList();
if (bufferIndex > bufferPool.length) {
// This should never happen.
printTraceLog(
"Skipping queuing readAhead for file: {}, with eTag: {}, offset: {}, triggered by stream: {} as invalid buffer index popped from free list",
stream.getPath(), stream.getETag(), requestedOffset,
stream.hashCode());
return;
}
buffer.setBuffer(bufferPool[bufferIndex]);
buffer.setBufferindex(bufferIndex);
getReadAheadQueue().add(buffer);
notifyAll();
printTraceLog(
"Done q-ing readAhead for file: {}, with eTag:{}, offset: {}, "
+ "buffer idx: {}, triggered by stream: {}",
stream.getPath(), stream.getETag(), requestedOffset,
buffer.getBufferindex(), stream.hashCode());
}
}
/**
* {@link AbfsInputStream} calls this method read any bytes already available in a buffer (thereby saving a
* remote read). This returns the bytes if the data already exists in buffer. If there is a buffer that is reading
* the requested offset, then this method blocks until that read completes. If the data is queued in a read-ahead
* but not picked up by a worker thread yet, then it cancels that read-ahead and reports cache miss. This is because
* depending on worker thread availability, the read-ahead may take a while - the calling thread can do its own
* read to get the data faster (compared to the read waiting in queue for an indeterminate amount of time).
*
* @param stream of the file to read bytes for
* @param position the offset in the file to do a read for
* @param length the length to read
* @param buffer the buffer to read data into. Note that the buffer will be written into from offset 0.
* @return the number of bytes read
*/
@Override
public int getBlock(final AbfsInputStream stream,
final long position,
final int length,
final byte[] buffer)
throws IOException {
// not synchronized, so have to be careful with locking
printTraceLog(
"getBlock request for file: {}, with eTag: {}, for position: {} "
+ "for length: {} received from stream: {}",
stream.getPath(), stream.getETag(), position, length,
stream.hashCode());
String requestedETag = stream.getETag();
boolean isFirstRead = stream.isFirstRead();
// Wait for any in-progress read to complete.
waitForProcess(requestedETag, position, isFirstRead);
int bytesRead = 0;
synchronized (this) {
bytesRead = getBlockFromCompletedQueue(requestedETag, position, length,
buffer);
}
if (bytesRead > 0) {
printTraceLog(
"Done read from Cache for the file with eTag: {}, position: {}, length: {}, requested by stream: {}",
requestedETag, position, bytesRead, stream.hashCode());
return bytesRead;
}
// otherwise, just say we got nothing - calling thread can do its own read
return 0;
}
/**
* {@link ReadBufferWorker} thread calls this to get the next buffer that it should work on.
* @return {@link ReadBuffer}
* @throws InterruptedException if thread is interrupted
*/
@Override
public ReadBuffer getNextBlockToRead() throws InterruptedException {
ReadBuffer buffer = null;
synchronized (this) {
// Blocking Call to wait for prefetch to be queued.
while (getReadAheadQueue().size() == 0) {
wait();
}
buffer = getReadAheadQueue().remove();
notifyAll();
if (buffer == null) {
return null;
}
buffer.setStatus(ReadBufferStatus.READING_IN_PROGRESS);
getInProgressList().add(buffer);
}
printTraceLog(
"ReadBufferWorker picked file: {}, with eTag: {}, for offset: {}, "
+ "queued by stream: {}",
buffer.getPath(), buffer.getETag(), buffer.getOffset(),
buffer.getStream().hashCode());
return buffer;
}
/**
* {@link ReadBufferWorker} thread calls this method to post completion. *
* @param buffer the buffer whose read was completed
* @param result the {@link ReadBufferStatus} after the read operation in the worker thread
* @param bytesActuallyRead the number of bytes that the worker thread was actually able to read
*/
@Override
public void doneReading(final ReadBuffer buffer,
final ReadBufferStatus result,
final int bytesActuallyRead) {
printTraceLog(
"ReadBufferWorker completed prefetch for file: {} with eTag: {}, for offset: {}, queued by stream: {}, with status: {} and bytes read: {}",
buffer.getPath(), buffer.getETag(), buffer.getOffset(),
buffer.getStream().hashCode(), result, bytesActuallyRead);
synchronized (this) {
// If this buffer has already been purged during
// close of InputStream then we don't update the lists.
if (getInProgressList().contains(buffer)) {
getInProgressList().remove(buffer);
if (result == ReadBufferStatus.AVAILABLE && bytesActuallyRead > 0) {
// Successful read, so update the buffer status and length
buffer.setStatus(ReadBufferStatus.AVAILABLE);
buffer.setLength(bytesActuallyRead);
} else {
// Failed read, reuse buffer for next read, this buffer will be
// evicted later based on eviction policy.
pushToFreeList(buffer.getBufferindex());
}
// completed list also contains FAILED read buffers
// for sending exception message to clients.
buffer.setStatus(result);
buffer.setTimeStamp(currentTimeMillis());
getCompletedReadList().add(buffer);
}
}
//outside the synchronized, since anyone receiving a wake-up from the latch must see safe-published results
buffer.getLatch().countDown(); // wake up waiting threads (if any)
}
/**
* Purging the buffers associated with an {@link AbfsInputStream}
* from {@link ReadBufferManagerV2} when stream is closed.
* @param stream input stream.
*/
public synchronized void purgeBuffersForStream(AbfsInputStream stream) {
printDebugLog("Purging stale buffers for AbfsInputStream {} ", stream);
getReadAheadQueue().removeIf(
readBuffer -> readBuffer.getStream() == stream);
purgeList(stream, getCompletedReadList());
}
/**
* Check if any buffer is already queued for the requested offset.
* @param eTag the eTag of the file
* @param requestedOffset the requested offset
* @return whether any buffer is already queued
*/
private boolean isAlreadyQueued(final String eTag,
final long requestedOffset) {
// returns true if any part of the buffer is already queued
return (isInList(getReadAheadQueue(), eTag, requestedOffset)
|| isInList(getInProgressList(), eTag, requestedOffset)
|| isInList(getCompletedReadList(), eTag, requestedOffset));
}
/**
* Check if any buffer in the list contains the requested offset.
* @param list the list to check
* @param eTag the eTag of the file
* @param requestedOffset the requested offset
* @return whether any buffer in the list contains the requested offset
*/
private boolean isInList(final Collection<ReadBuffer> list, final String eTag,
final long requestedOffset) {
return (getFromList(list, eTag, requestedOffset) != null);
}
/**
* Get the buffer from the list that contains the requested offset.
* @param list the list to check
* @param eTag the eTag of the file
* @param requestedOffset the requested offset
* @return the buffer if found, null otherwise
*/
private ReadBuffer getFromList(final Collection<ReadBuffer> list,
final String eTag,
final long requestedOffset) {
for (ReadBuffer buffer : list) {
if (eTag.equals(buffer.getETag())) {
if (buffer.getStatus() == ReadBufferStatus.AVAILABLE
&& requestedOffset >= buffer.getOffset()
&& requestedOffset < buffer.getOffset() + buffer.getLength()) {
return buffer;
} else if (requestedOffset >= buffer.getOffset()
&& requestedOffset
< buffer.getOffset() + buffer.getRequestedLength()) {
return buffer;
}
}
}
return null;
}
/**
* If any buffer in the completed list can be reclaimed then reclaim it and return the buffer to free list.
* The objective is to find just one buffer - there is no advantage to evicting more than one.
* @return whether the eviction succeeded - i.e., were we able to free up one buffer
*/
private synchronized boolean tryEvict() {
ReadBuffer nodeToEvict = null;
if (getCompletedReadList().size() <= 0) {
return false; // there are no evict-able buffers
}
long currentTimeInMs = currentTimeMillis();
// first, try buffers where all bytes have been consumed (approximated as first and last bytes consumed)
for (ReadBuffer buf : getCompletedReadList()) {
if (buf.isFullyConsumed()) {
nodeToEvict = buf;
break;
}
}
if (nodeToEvict != null) {
return manualEviction(nodeToEvict);
}
// next, try buffers where any bytes have been consumed (maybe a bad idea? have to experiment and see)
for (ReadBuffer buf : getCompletedReadList()) {
if (buf.isAnyByteConsumed()) {
nodeToEvict = buf;
break;
}
}
if (nodeToEvict != null) {
return manualEviction(nodeToEvict);
}
// next, try any old nodes that have not been consumed
// Failed read buffers (with buffer index=-1) that are older than
// thresholdAge should be cleaned up, but at the same time should not
// report successful eviction.
// Queue logic expects that a buffer is freed up for read ahead when
// eviction is successful, whereas a failed ReadBuffer would have released
// its buffer when its status was set to READ_FAILED.
long earliestBirthday = Long.MAX_VALUE;
ArrayList<ReadBuffer> oldFailedBuffers = new ArrayList<>();
for (ReadBuffer buf : getCompletedReadList()) {
if ((buf.getBufferindex() != -1)
&& (buf.getTimeStamp() < earliestBirthday)) {
nodeToEvict = buf;
earliestBirthday = buf.getTimeStamp();
} else if ((buf.getBufferindex() == -1)
&& (currentTimeInMs - buf.getTimeStamp())
> getThresholdAgeMilliseconds()) {
oldFailedBuffers.add(buf);
}
}
for (ReadBuffer buf : oldFailedBuffers) {
manualEviction(buf);
}
if ((currentTimeInMs - earliestBirthday > getThresholdAgeMilliseconds())
&& (nodeToEvict != null)) {
return manualEviction(nodeToEvict);
}
printTraceLog("No buffer eligible for eviction");
// nothing can be evicted
return false;
}
/**
* Evict the given buffer.
* @param buf the buffer to evict
* @return whether the eviction succeeded
*/
private boolean evict(final ReadBuffer buf) {
if (buf.getRefCount() > 0) {
// If the buffer is still being read, then we cannot evict it.
printTraceLog(
"Cannot evict buffer with index: {}, file: {}, with eTag: {}, offset: {} as it is still being read by some input stream",
buf.getBufferindex(), buf.getPath(), buf.getETag(), buf.getOffset());
return false;
}
// As failed ReadBuffers (bufferIndx = -1) are saved in getCompletedReadList(),
// avoid adding it to availableBufferList.
if (buf.getBufferindex() != -1) {
pushToFreeList(buf.getBufferindex());
}
getCompletedReadList().remove(buf);
buf.setTracingContext(null);
printTraceLog(
"Eviction of Buffer Completed for BufferIndex: {}, file: {}, with eTag: {}, offset: {}, is fully consumed: {}, is partially consumed: {}",
buf.getBufferindex(), buf.getPath(), buf.getETag(), buf.getOffset(),
buf.isFullyConsumed(), buf.isAnyByteConsumed());
return true;
}
/**
* Wait for any in-progress read for the requested offset to complete.
* @param eTag the eTag of the file
* @param position the requested offset
* @param isFirstRead whether this is the first read of the stream
*/
private void waitForProcess(final String eTag,
final long position,
boolean isFirstRead) {
ReadBuffer readBuf;
synchronized (this) {
readBuf = clearFromReadAheadQueue(eTag, position, isFirstRead);
if (readBuf == null) {
readBuf = getFromList(getInProgressList(), eTag, position);
}
}
if (readBuf != null) { // if in in-progress queue, then block for it
try {
printTraceLog(
"A relevant read buffer for file: {}, with eTag: {}, offset: {}, "
+ "queued by stream: {}, having buffer idx: {} is being prefetched, waiting for latch",
readBuf.getPath(), readBuf.getETag(), readBuf.getOffset(),
readBuf.getStream().hashCode(), readBuf.getBufferindex());
readBuf.getLatch()
.await(); // blocking wait on the caller stream's thread
// Note on correctness: readBuf gets out of getInProgressList() only in 1 place: after worker thread
// is done processing it (in doneReading). There, the latch is set after removing the buffer from
// getInProgressList(). So this latch is safe to be outside the synchronized block.
// Putting it in synchronized would result in a deadlock, since this thread would be holding the lock
// while waiting, so no one will be able to change any state. If this becomes more complex in the future,
// then the latch can be removed and replaced with wait/notify whenever getInProgressList() is touched.
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
printTraceLog("Latch done for file: {}, with eTag: {}, for offset: {}, "
+ "buffer index: {} queued by stream: {}", readBuf.getPath(),
readBuf.getETag(),
readBuf.getOffset(), readBuf.getBufferindex(),
readBuf.getStream().hashCode());
}
}
/**
* Clear the buffer from read-ahead queue if it exists.
* @param eTag the eTag of the file
* @param requestedOffset the requested offset
* @param isFirstRead whether this is the first read of the stream
* @return the buffer if found, null otherwise
*/
private ReadBuffer clearFromReadAheadQueue(final String eTag,
final long requestedOffset,
boolean isFirstRead) {
ReadBuffer buffer = getFromList(getReadAheadQueue(), eTag, requestedOffset);
/*
* If this prefetch was triggered by first read of this input stream,
* we should not remove it from queue and let it complete by backend threads.
*/
if (buffer != null && isFirstRead) {
return buffer;
}
if (buffer != null) {
getReadAheadQueue().remove(buffer);
notifyAll(); // lock is held in calling method
pushToFreeList(buffer.getBufferindex());
}
return null;
}
/**
* Get the block from completed queue if it exists.
* @param eTag the eTag of the file
* @param position the requested offset
* @param length the length to read
* @param buffer the buffer to read data into
* @return the number of bytes read
* @throws IOException if an I/O error occurs
*/
private int getBlockFromCompletedQueue(final String eTag, final long position,
final int length, final byte[] buffer) throws IOException {
ReadBuffer buf = getBufferFromCompletedQueue(eTag, position);
if (buf == null) {
return 0;
}
buf.startReading(); // atomic increment of refCount.
if (buf.getStatus() == ReadBufferStatus.READ_FAILED) {
// To prevent new read requests to fail due to old read-ahead attempts,
// return exception only from buffers that failed within last getThresholdAgeMilliseconds()
if ((currentTimeMillis() - (buf.getTimeStamp())
< getThresholdAgeMilliseconds())) {
throw buf.getErrException();
} else {
return 0;
}
}
if ((buf.getStatus() != ReadBufferStatus.AVAILABLE)
|| (position >= buf.getOffset() + buf.getLength())) {
return 0;
}
int cursor = (int) (position - buf.getOffset());
int availableLengthInBuffer = buf.getLength() - cursor;
int lengthToCopy = Math.min(length, availableLengthInBuffer);
System.arraycopy(buf.getBuffer(), cursor, buffer, 0, lengthToCopy);
if (cursor == 0) {
buf.setFirstByteConsumed(true);
}
if (cursor + lengthToCopy == buf.getLength()) {
buf.setLastByteConsumed(true);
}
buf.setAnyByteConsumed(true);
buf.endReading(); // atomic decrement of refCount
return lengthToCopy;
}
/**
* Get the buffer from completed queue that contains the requested offset.
* @param eTag the eTag of the file
* @param requestedOffset the requested offset
* @return the buffer if found, null otherwise
*/
private ReadBuffer getBufferFromCompletedQueue(final String eTag,
final long requestedOffset) {
for (ReadBuffer buffer : getCompletedReadList()) {
// Buffer is returned if the requestedOffset is at or above buffer's
// offset but less than buffer's length or the actual requestedLength
if (eTag.equals(buffer.getETag())
&& (requestedOffset >= buffer.getOffset())
&& ((requestedOffset < buffer.getOffset() + buffer.getLength())
|| (requestedOffset
< buffer.getOffset() + buffer.getRequestedLength()))) {
return buffer;
}
}
return null;
}
/**
* Try to upscale memory by adding more buffers to the pool if memory usage is below threshold.
* @return whether the upscale succeeded
*/
private synchronized boolean tryMemoryUpscale() {
if (!isDynamicScalingEnabled) {
printTraceLog("Dynamic scaling is disabled, skipping memory upscale");
return false; // Dynamic scaling is disabled, so no upscaling.
}
double memoryLoad = getMemoryLoad();
if (memoryLoad < memoryThreshold && getNumBuffers() < maxBufferPoolSize) {
// Create and Add more buffers in getFreeList().
int nextIndx = getNumBuffers();
if (removedBufferList.isEmpty() && nextIndx < bufferPool.length) {
bufferPool[nextIndx] = new byte[getReadAheadBlockSize()];
pushToFreeList(nextIndx);
} else {
// Reuse a removed buffer index.
int freeIndex = removedBufferList.pop();
if (freeIndex >= bufferPool.length || bufferPool[freeIndex] != null) {
printTraceLog("Invalid free index: {}. Current buffer pool size: {}",
freeIndex, bufferPool.length);
return false;
}
bufferPool[freeIndex] = new byte[getReadAheadBlockSize()];
pushToFreeList(freeIndex);
}
incrementActiveBufferCount();
printTraceLog(
"Current Memory Load: {}. Incrementing buffer pool size to {}",
memoryLoad, getNumBuffers());
return true;
}
printTraceLog("Could not Upscale memory. Total buffers: {} Memory Load: {}",
getNumBuffers(), memoryLoad);
return false;
}
/**
* Scheduled Eviction task that runs periodically to evict old buffers.
*/
private void scheduledEviction() {
for (ReadBuffer buf : getCompletedReadList()) {
if (currentTimeMillis() - buf.getTimeStamp()
> getThresholdAgeMilliseconds()) {
// If the buffer is older than thresholdAge, evict it.
printTraceLog(
"Scheduled Eviction of Buffer Triggered for BufferIndex: {}, "
+ "file: {}, with eTag: {}, offset: {}, length: {}, queued by stream: {}",
buf.getBufferindex(), buf.getPath(), buf.getETag(), buf.getOffset(),
buf.getLength(), buf.getStream().hashCode());
evict(buf);
}
}
double memoryLoad = getMemoryLoad();
if (isDynamicScalingEnabled && memoryLoad > memoryThreshold) {
synchronized (this) {
if (isFreeListEmpty()) {
printTraceLog(
"No free buffers available. Skipping downscale of buffer pool");
return; // No free buffers available, so cannot downscale.
}
int freeIndex = popFromFreeList();
if (freeIndex > bufferPool.length || bufferPool[freeIndex] == null) {
printTraceLog("Invalid free index: {}. Current buffer pool size: {}",
freeIndex, bufferPool.length);
return;
}
bufferPool[freeIndex] = null;
removedBufferList.add(freeIndex);
decrementActiveBufferCount();
printTraceLog(
"Current Memory Load: {}. Decrementing buffer pool size to {}",
memoryLoad, getNumBuffers());
}
}
}
/**
* Manual Eviction of a buffer.
* @param buf the buffer to evict
* @return whether the eviction succeeded
*/
private boolean manualEviction(final ReadBuffer buf) {
printTraceLog(
"Manual Eviction of Buffer Triggered for BufferIndex: {}, file: {}, with eTag: {}, offset: {}, queued by stream: {}",
buf.getBufferindex(), buf.getPath(), buf.getETag(), buf.getOffset(),
buf.getStream().hashCode());
return evict(buf);
}
/**
* Adjust the thread pool size based on CPU load and queue size.
*/
private void adjustThreadPool() {
int currentPoolSize = workerRefs.size();
double cpuLoad = getCpuLoad();
int requiredPoolSize = getRequiredThreadPoolSize();
int newThreadPoolSize;
printTraceLog(
"Current CPU load: {}, Current worker pool size: {}, Current queue size: {}",
cpuLoad, currentPoolSize, requiredPoolSize);
if (currentPoolSize < requiredPoolSize && cpuLoad < cpuThreshold) {
// Submit more background tasks.
newThreadPoolSize = Math.min(maxThreadPoolSize,
(int) Math.ceil(
(currentPoolSize * (HUNDRED_D + threadPoolUpscalePercentage))
/ HUNDRED_D));
// Create new Worker Threads
for (int i = currentPoolSize; i < newThreadPoolSize; i++) {
ReadBufferWorker worker = new ReadBufferWorker(i, getBufferManager());
workerRefs.add(worker);
workerPool.submit(worker);
}
printTraceLog("Increased worker pool size from {} to {}", currentPoolSize,
newThreadPoolSize);
} else if (cpuLoad > cpuThreshold || currentPoolSize > requiredPoolSize) {
newThreadPoolSize = Math.max(minThreadPoolSize,
(int) Math.ceil(
(currentPoolSize * (HUNDRED_D - threadPoolDownscalePercentage))
/ HUNDRED_D));
// Signal the extra workers to stop
while (workerRefs.size() > newThreadPoolSize) {
ReadBufferWorker worker = workerRefs.remove(workerRefs.size() - 1);
worker.stop();
}
printTraceLog("Decreased worker pool size from {} to {}", currentPoolSize,
newThreadPoolSize);
} else {
printTraceLog("No change in worker pool size. CPU load: {} Pool size: {}",
cpuLoad, currentPoolSize);
}
}
/**
* Similar to System.currentTimeMillis, except implemented with System.nanoTime().
* System.currentTimeMillis can go backwards when system clock is changed (e.g., with NTP time synchronization),
* making it unsuitable for measuring time intervals. nanotime is strictly monotonically increasing per CPU core.
* Note: it is not monotonic across Sockets, and even within a CPU, its only the
* more recent parts which share a clock across all cores.
*
* @return current time in milliseconds
*/
private long currentTimeMillis() {
return TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
}
/**
* Purge all buffers associated with the given stream from the given list.
* @param stream the stream whose buffers are to be purged
* @param list the list to purge from
*/
private void purgeList(AbfsInputStream stream, LinkedList<ReadBuffer> list) {
for (Iterator<ReadBuffer> it = list.iterator(); it.hasNext();) {
ReadBuffer readBuffer = it.next();
if (readBuffer.getStream() == stream) {
it.remove();
// As failed ReadBuffers (bufferIndex = -1) are already pushed to free
// list in doneReading method, we will skip adding those here again.
if (readBuffer.getBufferindex() != -1) {
pushToFreeList(readBuffer.getBufferindex());
}
}
}
}
/**
* Test method that can clean up the current state of readAhead buffers and
* the lists. Will also trigger a fresh init.
*/
@VisibleForTesting
@Override
public void testResetReadBufferManager() {
synchronized (this) {
ArrayList<ReadBuffer> completedBuffers = new ArrayList<>();
for (ReadBuffer buf : getCompletedReadList()) {
if (buf != null) {
completedBuffers.add(buf);
}
}
for (ReadBuffer buf : completedBuffers) {
manualEviction(buf);
}
getReadAheadQueue().clear();
getInProgressList().clear();
getCompletedReadList().clear();
getFreeList().clear();
for (int i = 0; i < maxBufferPoolSize; i++) {
bufferPool[i] = null;
}
bufferPool = null;
if (cpuMonitorThread != null) {
cpuMonitorThread.shutdownNow();
}
if (memoryMonitorThread != null) {
memoryMonitorThread.shutdownNow();
}
if (workerPool != null) {
workerPool.shutdownNow();
}
resetBufferManager();
}
}
@VisibleForTesting
@Override
public void testResetReadBufferManager(int readAheadBlockSize,
int thresholdAgeMilliseconds) {
setReadAheadBlockSize(readAheadBlockSize);
setThresholdAgeMilliseconds(thresholdAgeMilliseconds);
testResetReadBufferManager();
}
@VisibleForTesting
public void callTryEvict() {
tryEvict();
}
@VisibleForTesting
public int getNumBuffers() {
return numberOfActiveBuffers.get();
}
@Override
void resetBufferManager() {
setBufferManager(null); // reset the singleton instance
setIsConfigured(false);
}
private static void setBufferManager(ReadBufferManagerV2 manager) {
bufferManager = manager;
}
private static void setIsConfigured(boolean configured) {
isConfigured.set(configured);
}
private final ThreadFactory workerThreadFactory = new ThreadFactory() {
private int count = 0;
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, "ReadAheadV2-WorkerThread-" + count++);
t.setDaemon(true);
return t;
}
};
private void printTraceLog(String message, Object... args) {
if (LOGGER.isTraceEnabled()) {
LOGGER.trace(message, args);
}
}
private void printDebugLog(String message, Object... args) {
LOGGER.debug(message, args);
}
/**
* Get the current memory load of the JVM.
* @return the memory load as a double value between 0.0 and 1.0
*/
@VisibleForTesting
double getMemoryLoad() {
MemoryMXBean osBean = ManagementFactory.getMemoryMXBean();
MemoryUsage memoryUsage = osBean.getHeapMemoryUsage();
return (double) memoryUsage.getUsed() / memoryUsage.getMax();
}
/**
* Get the current CPU load of the system.
* @return the CPU load as a double value between 0.0 and 1.0
*/
@VisibleForTesting
public double getCpuLoad() {
OperatingSystemMXBean osBean = ManagementFactory.getPlatformMXBean(
OperatingSystemMXBean.class);
double cpuLoad = osBean.getSystemCpuLoad();
if (cpuLoad < 0) {
// If the CPU load is not available, return 0.0
return 0.0;
}
return cpuLoad;
}
@VisibleForTesting
synchronized static ReadBufferManagerV2 getInstance() {
return bufferManager;
}
@VisibleForTesting
public int getMinBufferPoolSize() {
return minBufferPoolSize;
}
@VisibleForTesting
public int getMaxBufferPoolSize() {
return maxBufferPoolSize;
}
@VisibleForTesting
public int getCurrentThreadPoolSize() {
return workerRefs.size();
}
@VisibleForTesting
public int getCpuMonitoringIntervalInMilliSec() {
return cpuMonitoringIntervalInMilliSec;
}
@VisibleForTesting
public int getMemoryMonitoringIntervalInMilliSec() {
return memoryMonitoringIntervalInMilliSec;
}
@VisibleForTesting
public ScheduledExecutorService getCpuMonitoringThread() {
return cpuMonitorThread;
}
public int getRequiredThreadPoolSize() {
return (int) Math.ceil(THREAD_POOL_REQUIREMENT_BUFFER
* (getReadAheadQueue().size()
+ getInProgressList().size())); // 20% more for buffer
}
private boolean isFreeListEmpty() {
LOCK.lock();
try {
return getFreeList().isEmpty();
} finally {
LOCK.unlock();
}
}
private Integer popFromFreeList() {
LOCK.lock();
try {
return getFreeList().pop();
} finally {
LOCK.unlock();
}
}
private void pushToFreeList(int idx) {
LOCK.lock();
try {
getFreeList().push(idx);
} finally {
LOCK.unlock();
}
}
private void incrementActiveBufferCount() {
numberOfActiveBuffers.getAndIncrement();
}
private void decrementActiveBufferCount() {
numberOfActiveBuffers.getAndDecrement();
}
}