ITestExponentialRetryPolicy.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs.services;
import static java.net.HttpURLConnection.HTTP_INTERNAL_ERROR;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.DOT;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_BACKOFF_INTERVAL;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_MAX_BACKOFF_INTERVAL;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_MAX_IO_RETRIES;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_MIN_BACKOFF_INTERVAL;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ACCOUNT_LEVEL_THROTTLING_ENABLED;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENABLE_AUTOTHROTTLING;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MIN_BUFFER_SIZE;
import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_ABFS_ACCOUNT1_NAME;
import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_ACCOUNT_KEY;
import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_ACCOUNT_NAME;
import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.TEST_CONFIGURATION_FILE_NAME;
import static org.junit.Assume.assumeTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import org.apache.hadoop.fs.FSDataInputStream;
import org.assertj.core.api.Assertions;
import org.junit.Assume;
import org.mockito.Mockito;
import java.net.URI;
import java.util.Random;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
import org.junit.Test;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
/**
* Unit test ITestExponentialRetryPolicy.
*/
public class ITestExponentialRetryPolicy extends AbstractAbfsIntegrationTest {
private final int maxRetryCount = 30;
private final int noRetryCount = 0;
private final int retryCount = new Random().nextInt(maxRetryCount);
private final int retryCountBeyondMax = maxRetryCount + 1;
private static final String TEST_PATH = "/testfile";
private static final double MULTIPLYING_FACTOR = 1.5;
private static final int ANALYSIS_PERIOD = 10000;
private static final String DUMMY_ACCOUNT_NAME = "dummy.dfs.core.windows.net";
private static final String DUMMY_ACCOUNT_NAME_1 = "dummy1.dfs.core.windows.net";
private static final String DUMMY_ACCOUNT_KEY = "dummyKey";
public ITestExponentialRetryPolicy() throws Exception {
super();
}
@Test
public void testDifferentMaxIORetryCount() throws Exception {
AbfsConfiguration abfsConfig = getAbfsConfig();
abfsConfig.setMaxIoRetries(noRetryCount);
testMaxIOConfig(abfsConfig);
abfsConfig.setMaxIoRetries(retryCount);
testMaxIOConfig(abfsConfig);
abfsConfig.setMaxIoRetries(retryCountBeyondMax);
testMaxIOConfig(abfsConfig);
}
@Test
public void testDefaultMaxIORetryCount() throws Exception {
AbfsConfiguration abfsConfig = getAbfsConfig();
Assertions.assertThat(abfsConfig.getMaxIoRetries())
.describedAs("Max retry count should be %s", maxRetryCount)
.isEqualTo(maxRetryCount);
testMaxIOConfig(abfsConfig);
}
@Test
public void testClientSideThrottlingConfigs() throws Exception {
final Configuration configuration = new Configuration();
configuration.setBoolean(FS_AZURE_ENABLE_AUTOTHROTTLING, true);
AbfsConfiguration abfsConfiguration = new AbfsConfiguration(configuration,
DUMMY_ACCOUNT_NAME);
Assertions.assertThat(abfsConfiguration.isAutoThrottlingEnabled())
.describedAs("Client-side throttling enabled by configuration key")
.isTrue();
configuration.unset(FS_AZURE_ENABLE_AUTOTHROTTLING);
AbfsConfiguration abfsConfiguration2 = new AbfsConfiguration(configuration,
DUMMY_ACCOUNT_NAME);
Assertions.assertThat(abfsConfiguration2.isAutoThrottlingEnabled())
.describedAs("Client-side throttling should be disabled by default")
.isFalse();
}
@Test
public void testThrottlingIntercept() throws Exception {
AzureBlobFileSystem fs = getFileSystem();
final Configuration configuration = new Configuration();
configuration.addResource(TEST_CONFIGURATION_FILE_NAME);
configuration.setBoolean(FS_AZURE_ENABLE_AUTOTHROTTLING, false);
configuration.set(FS_AZURE_ACCOUNT_KEY + DOT + DUMMY_ACCOUNT_NAME,
DUMMY_ACCOUNT_KEY);
configuration.set(FS_AZURE_ACCOUNT_KEY + DOT + DUMMY_ACCOUNT_NAME_1,
DUMMY_ACCOUNT_KEY);
// On disabling throttling AbfsNoOpThrottlingIntercept object is returned
AbfsConfiguration abfsConfiguration = new AbfsConfiguration(configuration,
DUMMY_ACCOUNT_NAME);
AbfsThrottlingIntercept intercept;
AbfsClient abfsClient = ITestAbfsClient.createTestClientFromCurrentContext(fs.getAbfsStore().getClient(), abfsConfiguration);
intercept = abfsClient.getIntercept();
Assertions.assertThat(intercept)
.describedAs("AbfsNoOpThrottlingIntercept instance expected")
.isInstanceOf(AbfsNoOpThrottlingIntercept.class);
configuration.setBoolean(FS_AZURE_ENABLE_AUTOTHROTTLING, true);
configuration.setBoolean(FS_AZURE_ACCOUNT_LEVEL_THROTTLING_ENABLED, true);
// On enabling throttling AbfsClientThrottlingIntercept object is returned
AbfsConfiguration abfsConfiguration1 = new AbfsConfiguration(configuration,
DUMMY_ACCOUNT_NAME_1);
AbfsClient abfsClient1 = ITestAbfsClient.createTestClientFromCurrentContext(fs.getAbfsStore().getClient(), abfsConfiguration1);
intercept = abfsClient1.getIntercept();
Assertions.assertThat(intercept)
.describedAs("AbfsClientThrottlingIntercept instance expected")
.isInstanceOf(AbfsClientThrottlingIntercept.class);
}
@Test
public void testCreateMultipleAccountThrottling() throws Exception {
Configuration config = new Configuration(getRawConfiguration());
String accountName = config.get(FS_AZURE_ACCOUNT_NAME);
if (accountName == null) {
// check if accountName is set using different config key
accountName = config.get(FS_AZURE_ABFS_ACCOUNT1_NAME);
}
assumeTrue("Not set: " + FS_AZURE_ABFS_ACCOUNT1_NAME,
accountName != null && !accountName.isEmpty());
Configuration rawConfig1 = new Configuration();
rawConfig1.addResource(TEST_CONFIGURATION_FILE_NAME);
AbfsRestOperation successOp = mock(AbfsRestOperation.class);
AbfsHttpOperation http500Op = mock(AbfsHttpOperation.class);
when(http500Op.getStatusCode()).thenReturn(HTTP_INTERNAL_ERROR);
when(successOp.getResult()).thenReturn(http500Op);
AbfsConfiguration configuration = Mockito.mock(AbfsConfiguration.class);
when(configuration.getAnalysisPeriod()).thenReturn(ANALYSIS_PERIOD);
when(configuration.isAutoThrottlingEnabled()).thenReturn(true);
when(configuration.accountThrottlingEnabled()).thenReturn(false);
AbfsThrottlingIntercept instance1 = AbfsThrottlingInterceptFactory.getInstance(accountName, configuration);
String accountName1 = config.get(FS_AZURE_ABFS_ACCOUNT1_NAME);
assumeTrue("Not set: " + FS_AZURE_ABFS_ACCOUNT1_NAME,
accountName1 != null && !accountName1.isEmpty());
AbfsThrottlingIntercept instance2 = AbfsThrottlingInterceptFactory.getInstance(accountName1, configuration);
//if singleton is enabled, for different accounts both the instances should return same value
Assertions.assertThat(instance1)
.describedAs(
"if singleton is enabled, for different accounts both the instances should return same value")
.isEqualTo(instance2);
when(configuration.accountThrottlingEnabled()).thenReturn(true);
AbfsThrottlingIntercept instance3 = AbfsThrottlingInterceptFactory.getInstance(accountName, configuration);
AbfsThrottlingIntercept instance4 = AbfsThrottlingInterceptFactory.getInstance(accountName1, configuration);
AbfsThrottlingIntercept instance5 = AbfsThrottlingInterceptFactory.getInstance(accountName, configuration);
//if singleton is not enabled, for different accounts instances should return different value
Assertions.assertThat(instance3)
.describedAs(
"iff singleton is not enabled, for different accounts instances should return different value")
.isNotEqualTo(instance4);
//if singleton is not enabled, for same accounts instances should return same value
Assertions.assertThat(instance3)
.describedAs(
"if singleton is not enabled, for same accounts instances should return same value")
.isEqualTo(instance5);
}
@Test
public void testOperationOnAccountIdle() throws Exception {
//Get the filesystem.
AzureBlobFileSystem fs = getFileSystem();
AbfsClient client = fs.getAbfsStore().getClient();
AbfsConfiguration configuration1 = client.getAbfsConfiguration();
Assume.assumeTrue(configuration1.isAutoThrottlingEnabled());
Assume.assumeTrue(configuration1.accountThrottlingEnabled());
AbfsClientThrottlingIntercept accountIntercept
= (AbfsClientThrottlingIntercept) client.getIntercept();
final byte[] b = new byte[2 * MIN_BUFFER_SIZE];
new Random().nextBytes(b);
Path testPath = path(TEST_PATH);
//Do an operation on the filesystem.
try (FSDataOutputStream stream = fs.create(testPath)) {
stream.write(b);
}
//Don't perform any operation on the account.
int sleepTime = (int) ((getAbfsConfig().getAccountOperationIdleTimeout()) * MULTIPLYING_FACTOR);
Thread.sleep(sleepTime);
try (FSDataInputStream streamRead = fs.open(testPath)) {
streamRead.read(b);
}
//Perform operations on another account.
AzureBlobFileSystem fs1 = new AzureBlobFileSystem();
Configuration config = new Configuration(getRawConfiguration());
String accountName1 = config.get(FS_AZURE_ABFS_ACCOUNT1_NAME);
assumeTrue("Not set: " + FS_AZURE_ABFS_ACCOUNT1_NAME,
accountName1 != null && !accountName1.isEmpty());
final String abfsUrl1 = this.getFileSystemName() + "12" + "@" + accountName1;
URI defaultUri1 = null;
defaultUri1 = new URI("abfss", abfsUrl1, null, null, null);
fs1.initialize(defaultUri1, getRawConfiguration());
AbfsClient client1 = fs1.getAbfsStore().getClient();
AbfsClientThrottlingIntercept accountIntercept1
= (AbfsClientThrottlingIntercept) client1.getIntercept();
try (FSDataOutputStream stream1 = fs1.create(testPath)) {
stream1.write(b);
}
//Verify the write analyzer for first account is idle but the read analyzer is not idle.
Assertions.assertThat(accountIntercept.getWriteThrottler()
.getIsOperationOnAccountIdle()
.get())
.describedAs("Write analyzer for first account should be idle the first time")
.isTrue();
Assertions.assertThat(
accountIntercept.getReadThrottler()
.getIsOperationOnAccountIdle()
.get())
.describedAs("Read analyzer for first account should not be idle")
.isFalse();
//Verify the write analyzer for second account is not idle.
Assertions.assertThat(
accountIntercept1.getWriteThrottler()
.getIsOperationOnAccountIdle()
.get())
.describedAs("Write analyzer for second account should not be idle")
.isFalse();
//Again perform an operation on the first account.
try (FSDataOutputStream stream2 = fs.create(testPath)) {
stream2.write(b);
}
//Verify the write analyzer on first account is not idle.
Assertions.assertThat(
accountIntercept.getWriteThrottler()
.getIsOperationOnAccountIdle()
.get())
.describedAs(
"Write analyzer for first account should not be idle second time")
.isFalse();
}
@Test
public void testAbfsConfigConstructor() throws Exception {
// Ensure we choose expected values that are not defaults
ExponentialRetryPolicy template = new ExponentialRetryPolicy(
getAbfsConfig().getMaxIoRetries());
int testModifier = 1;
int expectedMaxRetries = template.getMaxRetryCount() + testModifier;
int expectedMinBackoff = template.getMinBackoff() + testModifier;
int expectedMaxBackoff = template.getMaxBackoff() + testModifier;
int expectedDeltaBackoff = template.getDeltaBackoff() + testModifier;
Configuration config = new Configuration(this.getRawConfiguration());
config.setInt(AZURE_MAX_IO_RETRIES, expectedMaxRetries);
config.setInt(AZURE_MIN_BACKOFF_INTERVAL, expectedMinBackoff);
config.setInt(AZURE_MAX_BACKOFF_INTERVAL, expectedMaxBackoff);
config.setInt(AZURE_BACKOFF_INTERVAL, expectedDeltaBackoff);
ExponentialRetryPolicy policy = new ExponentialRetryPolicy(
new AbfsConfiguration(config, "dummyAccountName"));
Assertions.assertThat(policy.getMaxRetryCount())
.describedAs("Max retry count was not set as expected.")
.isEqualTo(expectedMaxRetries);
Assertions.assertThat(policy.getMinBackoff())
.describedAs("Min backoff interval was not set as expected.")
.isEqualTo(expectedMinBackoff);
Assertions.assertThat(policy.getMaxBackoff())
.describedAs("Max backoff interval was not set as expected")
.isEqualTo(expectedMaxBackoff);
Assertions.assertThat(policy.getDeltaBackoff())
.describedAs("Delta backoff interval was not set as expected.")
.isEqualTo(expectedDeltaBackoff);
}
private AbfsConfiguration getAbfsConfig() throws Exception {
Configuration
config = new Configuration(this.getRawConfiguration());
return new AbfsConfiguration(config, "dummyAccountName");
}
private void testMaxIOConfig(AbfsConfiguration abfsConfig) {
ExponentialRetryPolicy retryPolicy = new ExponentialRetryPolicy(
abfsConfig.getMaxIoRetries());
int localRetryCount = 0;
while (localRetryCount < abfsConfig.getMaxIoRetries()) {
Assertions.assertThat(retryPolicy.shouldRetry(localRetryCount, -1))
.describedAs("Retry should be allowed when retryCount less than max count configured.")
.isTrue();
localRetryCount++;
}
Assertions.assertThat(localRetryCount)
.describedAs("When all retries are exhausted, the retryCount will be same as max configured.")
.isEqualTo(abfsConfig.getMaxIoRetries());
}
}