TestReadBufferManagerV2.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs.services;
import java.util.ArrayList;
import java.util.List;
import org.junit.jupiter.api.Test;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENABLE_READAHEAD_V2;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENABLE_READAHEAD_V2_DYNAMIC_SCALING;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_READAHEAD_V2_CACHED_BUFFER_TTL_MILLIS;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_READAHEAD_V2_CPU_MONITORING_INTERVAL_MILLIS;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_READAHEAD_V2_CPU_USAGE_THRESHOLD_PERCENT;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_READAHEAD_V2_MAX_THREAD_POOL_SIZE;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_READAHEAD_V2_MEMORY_MONITORING_INTERVAL_MILLIS;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_READAHEAD_V2_MEMORY_USAGE_THRESHOLD_PERCENT;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_READAHEAD_V2_MIN_THREAD_POOL_SIZE;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.HUNDRED;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_KB;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
import static org.assertj.core.api.Assertions.assertThat;
/**
* Unit Tests around different components of Read Buffer Manager V2
*/
public class TestReadBufferManagerV2 extends AbstractAbfsIntegrationTest {
private volatile boolean running = true;
private final List<byte[]> allocations = new ArrayList<>();
private static final double HIGH_MEMORY_USAGE_THRESHOLD_PERCENT = 0.8;
public TestReadBufferManagerV2() throws Exception {
super();
}
/**
* Test to verify init of ReadBufferManagerV2
* @throws Exception if test fails
*/
@Test
public void testReadBufferManagerV2Init() throws Exception {
ReadBufferManagerV2.setReadBufferManagerConfigs(getConfiguration().getReadAheadBlockSize(), getConfiguration());
ReadBufferManagerV2.getBufferManager().testResetReadBufferManager();
assertThat(ReadBufferManagerV2.getInstance())
.as("ReadBufferManager should be uninitialized").isNull();
intercept(IllegalStateException.class, "ReadBufferManagerV2 is not configured.", () -> {
ReadBufferManagerV2.getBufferManager();
});
// verify that multiple invocations of getBufferManager returns same instance.
ReadBufferManagerV2.setReadBufferManagerConfigs(getConfiguration().getReadAheadBlockSize(), getConfiguration());
ReadBufferManagerV2 bufferManager = ReadBufferManagerV2.getBufferManager();
ReadBufferManagerV2 bufferManager2 = ReadBufferManagerV2.getBufferManager();
ReadBufferManagerV2 bufferManager3 = ReadBufferManagerV2.getInstance();
assertThat(bufferManager).isNotNull();
assertThat(bufferManager2).isNotNull();
assertThat(bufferManager).isSameAs(bufferManager2);
assertThat(bufferManager3).isNotNull();
assertThat(bufferManager3).isSameAs(bufferManager);
// Verify default values are not invalid.
assertThat(bufferManager.getMinBufferPoolSize()).isGreaterThan(0);
assertThat(bufferManager.getMaxBufferPoolSize()).isGreaterThan(0);
}
/**
* Test to verify that cpu monitor thread is not active if disabled.
* @throws Exception if test fails
*/
@Test
public void testDynamicScalingSwitchingOnAndOff() throws Exception {
Configuration conf = new Configuration(getRawConfiguration());
conf.setBoolean(FS_AZURE_ENABLE_READAHEAD_V2, true);
conf.setBoolean(FS_AZURE_ENABLE_READAHEAD_V2_DYNAMIC_SCALING, true);
try(AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.newInstance(getFileSystem().getUri(), conf)) {
AbfsConfiguration abfsConfiguration = fs.getAbfsStore().getAbfsConfiguration();
ReadBufferManagerV2.setReadBufferManagerConfigs(abfsConfiguration.getReadAheadBlockSize(), abfsConfiguration);
ReadBufferManagerV2.getBufferManager().testResetReadBufferManager();
ReadBufferManagerV2.setReadBufferManagerConfigs(abfsConfiguration.getReadAheadBlockSize(), abfsConfiguration);
ReadBufferManagerV2 bufferManagerV2 = ReadBufferManagerV2.getBufferManager();
assertThat(bufferManagerV2.getCpuMonitoringThread())
.as("CPU Monitor thread should be initialized").isNotNull();
bufferManagerV2.resetBufferManager();
}
conf.setBoolean(FS_AZURE_ENABLE_READAHEAD_V2_DYNAMIC_SCALING, false);
try(AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.newInstance(getFileSystem().getUri(), conf)) {
AbfsConfiguration abfsConfiguration = fs.getAbfsStore().getAbfsConfiguration();
ReadBufferManagerV2.setReadBufferManagerConfigs(abfsConfiguration.getReadAheadBlockSize(), abfsConfiguration);
ReadBufferManagerV2.getBufferManager().testResetReadBufferManager();
ReadBufferManagerV2.setReadBufferManagerConfigs(abfsConfiguration.getReadAheadBlockSize(), abfsConfiguration);
ReadBufferManagerV2 bufferManagerV2 = ReadBufferManagerV2.getBufferManager();
assertThat(bufferManagerV2.getCpuMonitoringThread())
.as("CPU Monitor thread should not be initialized").isNull();
bufferManagerV2.resetBufferManager();
}
}
@Test
public void testThreadPoolDynamicScaling() throws Exception {
running = true;
TestAbfsInputStream testAbfsInputStream = new TestAbfsInputStream();
AbfsClient client = testAbfsInputStream.getMockAbfsClient();
AbfsInputStream inputStream = testAbfsInputStream.getAbfsInputStream(client, "testFailedReadAhead.txt");
Configuration configuration = getReadAheadV2Configuration();
AbfsConfiguration abfsConfig = new AbfsConfiguration(configuration,
getAccountName());
ReadBufferManagerV2.setReadBufferManagerConfigs(abfsConfig.getReadAheadBlockSize(), abfsConfig);
ReadBufferManagerV2.getBufferManager().testResetReadBufferManager();
ReadBufferManagerV2.setReadBufferManagerConfigs(abfsConfig.getReadAheadBlockSize(), abfsConfig);
ReadBufferManagerV2 bufferManagerV2 = ReadBufferManagerV2.getBufferManager();
assertThat(bufferManagerV2.getCurrentThreadPoolSize()).isEqualTo(2);
int[] reqOffset = {0};
int reqLength = 1;
Thread t = new Thread(() -> {
while (running) {
bufferManagerV2.queueReadAhead(inputStream, reqOffset[0], reqLength,
inputStream.getTracingContext());
reqOffset[0] += reqLength;
}
});
t.start();
Thread.sleep(2L * bufferManagerV2.getCpuMonitoringIntervalInMilliSec());
assertThat(bufferManagerV2.getCurrentThreadPoolSize()).isGreaterThan(2);
running = false;
t.join();
Thread.sleep(4L * bufferManagerV2.getCpuMonitoringIntervalInMilliSec());
assertThat(bufferManagerV2.getCurrentThreadPoolSize()).isLessThan(4);
}
@Test
public void testCpuUpscaleNotAllowedIfCpuAboveThreshold() throws Exception {
TestAbfsInputStream testAbfsInputStream = new TestAbfsInputStream();
AbfsClient client = testAbfsInputStream.getMockAbfsClient();
AbfsInputStream inputStream = testAbfsInputStream.getAbfsInputStream(client, "testFailedReadAhead.txt");
Configuration configuration = getReadAheadV2Configuration();
configuration.set(FS_AZURE_READAHEAD_V2_CPU_USAGE_THRESHOLD_PERCENT, "0"); // set low threshold
AbfsConfiguration abfsConfig = new AbfsConfiguration(configuration,
getAccountName());
ReadBufferManagerV2.setReadBufferManagerConfigs(abfsConfig.getReadAheadBlockSize(), abfsConfig);
ReadBufferManagerV2.getBufferManager().testResetReadBufferManager();
ReadBufferManagerV2.setReadBufferManagerConfigs(abfsConfig.getReadAheadBlockSize(), abfsConfig);
ReadBufferManagerV2 bufferManagerV2 = ReadBufferManagerV2.getBufferManager();
assertThat(bufferManagerV2.getCurrentThreadPoolSize()).isEqualTo(2);
int[] reqOffset = {0};
int reqLength = 1;
running = true;
Thread t = new Thread(() -> {
while (running) {
bufferManagerV2.queueReadAhead(inputStream, reqOffset[0], reqLength,
inputStream.getTracingContext());
reqOffset[0] += reqLength;
}
});
t.start();
Thread.sleep(2L * bufferManagerV2.getCpuMonitoringIntervalInMilliSec());
assertThat(bufferManagerV2.getCurrentThreadPoolSize()).isEqualTo(2);
running = false;
t.join();
}
@Test
public void testScheduledEviction() throws Exception {
TestAbfsInputStream testAbfsInputStream = new TestAbfsInputStream();
AbfsClient client = testAbfsInputStream.getMockAbfsClient();
AbfsInputStream inputStream = testAbfsInputStream.getAbfsInputStream(client, "testFailedReadAhead.txt");
Configuration configuration = getReadAheadV2Configuration();
AbfsConfiguration abfsConfig = new AbfsConfiguration(configuration,
getAccountName());
ReadBufferManagerV2.getBufferManager().testResetReadBufferManager();
ReadBufferManagerV2.setReadBufferManagerConfigs(abfsConfig.getReadAheadBlockSize(), abfsConfig);
ReadBufferManagerV2 bufferManagerV2 = ReadBufferManagerV2.getBufferManager();
// Add a failed buffer to completed queue and set to no free buffers to read ahead.
ReadBuffer buff = new ReadBuffer();
buff.setStatus(ReadBufferStatus.READ_FAILED);
buff.setStream(inputStream);
bufferManagerV2.testMimicFullUseAndAddFailedBuffer(buff);
bufferManagerV2.testMimicFullUseAndAddFailedBuffer(buff);
assertThat(bufferManagerV2.getCompletedReadListSize()).isEqualTo(2);
Thread.sleep(2L * bufferManagerV2.getMemoryMonitoringIntervalInMilliSec());
assertThat(bufferManagerV2.getCompletedReadListSize()).isEqualTo(0);
}
@Test
public void testMemoryUpscaleNotAllowedIfMemoryAboveThreshold() throws Exception {
TestAbfsInputStream testAbfsInputStream = new TestAbfsInputStream();
AbfsClient client = testAbfsInputStream.getMockAbfsClient();
AbfsInputStream inputStream = testAbfsInputStream.getAbfsInputStream(client, "testFailedReadAhead.txt");
Configuration configuration = getReadAheadV2Configuration();
configuration.set(FS_AZURE_READAHEAD_V2_MEMORY_USAGE_THRESHOLD_PERCENT, "0"); // set low threshold
AbfsConfiguration abfsConfig = new AbfsConfiguration(configuration,
getAccountName());
ReadBufferManagerV2.setReadBufferManagerConfigs(abfsConfig.getReadAheadBlockSize(), abfsConfig);
ReadBufferManagerV2.getBufferManager().testResetReadBufferManager();
ReadBufferManagerV2.setReadBufferManagerConfigs(abfsConfig.getReadAheadBlockSize(), abfsConfig);
ReadBufferManagerV2 bufferManagerV2 = ReadBufferManagerV2.getBufferManager();
// Add a failed buffer to completed queue and set to no free buffers to read ahead.
ReadBuffer buff = new ReadBuffer();
buff.setStatus(ReadBufferStatus.READ_FAILED);
buff.setStream(inputStream);
bufferManagerV2.testMimicFullUseAndAddFailedBuffer(buff);
assertThat(bufferManagerV2.getNumBuffers()).isEqualTo(bufferManagerV2.getMinBufferPoolSize());
bufferManagerV2.queueReadAhead(inputStream, 0, ONE_KB,
inputStream.getTracingContext());
assertThat(bufferManagerV2.getNumBuffers()).isEqualTo(bufferManagerV2.getMinBufferPoolSize());
}
@Test
public void testMemoryUpscaleIfMemoryBelowThreshold() throws Exception {
TestAbfsInputStream testAbfsInputStream = new TestAbfsInputStream();
AbfsClient client = testAbfsInputStream.getMockAbfsClient();
AbfsInputStream inputStream = testAbfsInputStream.getAbfsInputStream(client, "testFailedReadAhead.txt");
Configuration configuration = getReadAheadV2Configuration();
configuration.set(FS_AZURE_READAHEAD_V2_MEMORY_USAGE_THRESHOLD_PERCENT, "100");
AbfsConfiguration abfsConfig = new AbfsConfiguration(configuration,
getAccountName());
ReadBufferManagerV2.setReadBufferManagerConfigs(abfsConfig.getReadAheadBlockSize(), abfsConfig);
ReadBufferManagerV2.getBufferManager().testResetReadBufferManager();
ReadBufferManagerV2.setReadBufferManagerConfigs(abfsConfig.getReadAheadBlockSize(), abfsConfig);
ReadBufferManagerV2 bufferManagerV2 = ReadBufferManagerV2.getBufferManager();
// Add a failed buffer to completed queue and set to no free buffers to read ahead.
ReadBuffer buff = new ReadBuffer();
buff.setStatus(ReadBufferStatus.READ_FAILED);
buff.setStream(inputStream);
bufferManagerV2.testMimicFullUseAndAddFailedBuffer(buff);
assertThat(bufferManagerV2.getNumBuffers()).isEqualTo(bufferManagerV2.getMinBufferPoolSize());
bufferManagerV2.queueReadAhead(inputStream, 0, ONE_KB,
inputStream.getTracingContext());
assertThat(bufferManagerV2.getNumBuffers()).isGreaterThan(bufferManagerV2.getMinBufferPoolSize());
}
@Test
public void testMemoryDownscaleIfMemoryAboveThreshold() throws Exception {
Configuration configuration = getReadAheadV2Configuration();
configuration.set(FS_AZURE_READAHEAD_V2_MEMORY_USAGE_THRESHOLD_PERCENT, "2");
AbfsConfiguration abfsConfig = new AbfsConfiguration(configuration,
getAccountName());
ReadBufferManagerV2.setReadBufferManagerConfigs(abfsConfig.getReadAheadBlockSize(), abfsConfig);
ReadBufferManagerV2.getBufferManager().testResetReadBufferManager();
ReadBufferManagerV2.setReadBufferManagerConfigs(abfsConfig.getReadAheadBlockSize(), abfsConfig);
ReadBufferManagerV2 bufferManagerV2 = ReadBufferManagerV2.getBufferManager();
int initialBuffers = bufferManagerV2.getMinBufferPoolSize();
assertThat(bufferManagerV2.getNumBuffers()).isEqualTo(initialBuffers);
running = true;
Thread t = new Thread(() -> {
while (running) {
long maxMemory = Runtime.getRuntime().maxMemory();
long usedMemory = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory();
double usage = (double) usedMemory / maxMemory;
if (usage < HIGH_MEMORY_USAGE_THRESHOLD_PERCENT) {
// Allocate more memory
allocations.add(new byte[10 * 1024 * 1024]); // 10MB
}
}
}, "MemoryLoadThread");
t.setDaemon(true);
t.start();
Thread.sleep(2L * bufferManagerV2.getMemoryMonitoringIntervalInMilliSec());
assertThat(bufferManagerV2.getNumBuffers()).isLessThan(initialBuffers);
running = false;
t.join();
}
private Configuration getReadAheadV2Configuration() {
Configuration conf = new Configuration(getRawConfiguration());
conf.setBoolean(FS_AZURE_ENABLE_READAHEAD_V2, true);
conf.setBoolean(FS_AZURE_ENABLE_READAHEAD_V2_DYNAMIC_SCALING, true);
conf.setInt(FS_AZURE_READAHEAD_V2_MIN_THREAD_POOL_SIZE, 2);
conf.setInt(FS_AZURE_READAHEAD_V2_MAX_THREAD_POOL_SIZE, 4);
conf.setInt(FS_AZURE_READAHEAD_V2_CPU_USAGE_THRESHOLD_PERCENT, HUNDRED);
conf.setInt(FS_AZURE_READAHEAD_V2_CPU_MONITORING_INTERVAL_MILLIS, 1_000);
conf.setInt(FS_AZURE_READAHEAD_V2_MEMORY_MONITORING_INTERVAL_MILLIS, 1_000);
conf.setInt(FS_AZURE_READAHEAD_V2_CACHED_BUFFER_TTL_MILLIS, 1_000);
return conf;
}
}