ReadBufferManagerV2.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs.services;
import java.io.IOException;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus;
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
final class ReadBufferManagerV2 extends ReadBufferManager {
// Thread Pool Configurations
private static int minThreadPoolSize;
private static int maxThreadPoolSize;
private static int executorServiceKeepAliveTimeInMilliSec;
private ThreadPoolExecutor workerPool;
// Buffer Pool Configurations
private static int minBufferPoolSize;
private static int maxBufferPoolSize;
private int numberOfActiveBuffers = 0;
private byte[][] bufferPool;
private static ReadBufferManagerV2 bufferManager;
// hide instance constructor
private ReadBufferManagerV2() {
LOGGER.trace("Creating readbuffer manager with HADOOP-18546 patch");
}
/**
* Sets the read buffer manager configurations.
* @param readAheadBlockSize the size of the read-ahead block in bytes
* @param abfsConfiguration the AbfsConfiguration instance for other configurations
*/
static void setReadBufferManagerConfigs(int readAheadBlockSize, AbfsConfiguration abfsConfiguration) {
if (bufferManager == null) {
minThreadPoolSize = abfsConfiguration.getMinReadAheadV2ThreadPoolSize();
maxThreadPoolSize = abfsConfiguration.getMaxReadAheadV2ThreadPoolSize();
executorServiceKeepAliveTimeInMilliSec = abfsConfiguration.getReadAheadExecutorServiceTTLInMillis();
minBufferPoolSize = abfsConfiguration.getMinReadAheadV2BufferPoolSize();
maxBufferPoolSize = abfsConfiguration.getMaxReadAheadV2BufferPoolSize();
setThresholdAgeMilliseconds(abfsConfiguration.getReadAheadV2CachedBufferTTLMillis());
setReadAheadBlockSize(readAheadBlockSize);
}
}
/**
* Returns the singleton instance of ReadBufferManagerV2.
* @return the singleton instance of ReadBufferManagerV2
*/
static ReadBufferManagerV2 getBufferManager() {
if (bufferManager == null) {
LOCK.lock();
try {
if (bufferManager == null) {
bufferManager = new ReadBufferManagerV2();
bufferManager.init();
}
} finally {
LOCK.unlock();
}
}
return bufferManager;
}
/**
* {@inheritDoc}
*/
@Override
void init() {
// Initialize Buffer Pool
bufferPool = new byte[maxBufferPoolSize][];
for (int i = 0; i < minBufferPoolSize; i++) {
bufferPool[i] = new byte[getReadAheadBlockSize()]; // same buffers are reused. These byte arrays are never garbage collected
getFreeList().add(i);
numberOfActiveBuffers++;
}
// Initialize a Fixed Size Thread Pool with minThreadPoolSize threads
workerPool = new ThreadPoolExecutor(
minThreadPoolSize,
maxThreadPoolSize,
executorServiceKeepAliveTimeInMilliSec,
TimeUnit.MILLISECONDS,
new SynchronousQueue<>(),
namedThreadFactory);
workerPool.allowCoreThreadTimeOut(true);
for (int i = 0; i < minThreadPoolSize; i++) {
ReadBufferWorker worker = new ReadBufferWorker(i, this);
workerPool.submit(worker);
}
ReadBufferWorker.UNLEASH_WORKERS.countDown();
}
/**
* {@inheritDoc}
*/
@Override
public void queueReadAhead(final AbfsInputStream stream,
final long requestedOffset,
final int requestedLength,
final TracingContext tracingContext) {
// TODO: To be implemented
}
/**
* {@inheritDoc}
*/
@Override
public int getBlock(final AbfsInputStream stream,
final long position,
final int length,
final byte[] buffer) throws IOException {
// TODO: To be implemented
return 0;
}
/**
* {@inheritDoc}
*/
@Override
public ReadBuffer getNextBlockToRead() throws InterruptedException {
// TODO: To be implemented
return null;
}
/**
* {@inheritDoc}
*/
@Override
public void doneReading(final ReadBuffer buffer,
final ReadBufferStatus result,
final int bytesActuallyRead) {
// TODO: To be implemented
}
/**
* {@inheritDoc}
*/
@Override
public void purgeBuffersForStream(final AbfsInputStream stream) {
// TODO: To be implemented
}
/**
* {@inheritDoc}
*/
@VisibleForTesting
@Override
public int getNumBuffers() {
return numberOfActiveBuffers;
}
/**
* {@inheritDoc}
*/
@VisibleForTesting
@Override
public void callTryEvict() {
// TODO: To be implemented
}
/**
* {@inheritDoc}
*/
@VisibleForTesting
@Override
public void testResetReadBufferManager() {
// TODO: To be implemented
}
/**
* {@inheritDoc}
*/
@VisibleForTesting
@Override
public void testResetReadBufferManager(final int readAheadBlockSize,
final int thresholdAgeMilliseconds) {
// TODO: To be implemented
}
/**
* {@inheritDoc}
*/
@Override
public void testMimicFullUseAndAddFailedBuffer(final ReadBuffer buf) {
// TODO: To be implemented
}
private final ThreadFactory namedThreadFactory = new ThreadFactory() {
private int count = 0;
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "ReadAheadV2-Thread-" + count++);
}
};
@Override
void resetBufferManager() {
setBufferManager(null); // reset the singleton instance
}
private static void setBufferManager(ReadBufferManagerV2 manager) {
bufferManager = manager;
}
}