TestAbfsRestOperationMockFailures.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs.services;
import java.io.InterruptedIOException;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.net.UnknownHostException;
import java.util.ArrayList;
import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.stubbing.Stubber;
import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
import static java.net.HttpURLConnection.HTTP_BAD_REQUEST;
import static java.net.HttpURLConnection.HTTP_INTERNAL_ERROR;
import static java.net.HttpURLConnection.HTTP_OK;
import static java.net.HttpURLConnection.HTTP_UNAVAILABLE;
import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.EGRESS_OVER_ACCOUNT_LIMIT;
import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.INGRESS_OVER_ACCOUNT_LIMIT;
import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.OTHER_SERVER_THROTTLING;
import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.TPS_OVER_ACCOUNT_LIMIT;
import static org.apache.hadoop.fs.azurebfs.services.RetryPolicyConstants.EXPONENTIAL_RETRY_POLICY_ABBREVIATION;
import static org.apache.hadoop.fs.azurebfs.services.RetryPolicyConstants.STATIC_RETRY_POLICY_ABBREVIATION;
import static org.apache.hadoop.fs.azurebfs.services.AbfsClientTestUtil.addGeneralMockBehaviourToAbfsClient;
import static org.apache.hadoop.fs.azurebfs.services.AbfsClientTestUtil.addGeneralMockBehaviourToRestOpAndHttpOp;
import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.CONNECTION_RESET_ABBREVIATION;
import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.CONNECTION_RESET_MESSAGE;
import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.CONNECTION_TIMEOUT_ABBREVIATION;
import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.CONNECTION_TIMEOUT_JDK_MESSAGE;
import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.EGRESS_LIMIT_BREACH_ABBREVIATION;
import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.INGRESS_LIMIT_BREACH_ABBREVIATION;
import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.IO_EXCEPTION_ABBREVIATION;
import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.TPS_LIMIT_BREACH_ABBREVIATION;
import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.OTHER_SERVER_THROTTLING_ABBREVIATION;
import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.READ_TIMEOUT_ABBREVIATION;
import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.READ_TIMEOUT_JDK_MESSAGE;
import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.SOCKET_EXCEPTION_ABBREVIATION;
import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.UNKNOWN_HOST_EXCEPTION_ABBREVIATION;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.nullable;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.when;
public class TestAbfsRestOperationMockFailures {
// In these tests a request first fails with given exceptions and then succeed on retry.
// Client Side Throttling Metrics will be updated at least for retried request which succeeded.
// For original requests it will be updated only for EGR, IGR, OPR throttling.
@Test
public void testClientRequestIdForConnectTimeoutRetry() throws Exception {
Exception[] exceptions = new Exception[1];
String[] abbreviations = new String[1];
exceptions[0] = new SocketTimeoutException(CONNECTION_TIMEOUT_JDK_MESSAGE);
abbreviations[0] = CONNECTION_TIMEOUT_ABBREVIATION;
testClientRequestIdForTimeoutRetry(exceptions, abbreviations, 1, 1);
}
@Test
public void testClientRequestIdForConnectAndReadTimeoutRetry()
throws Exception {
Exception[] exceptions = new Exception[2];
String[] abbreviations = new String[2];
exceptions[0] = new SocketTimeoutException(CONNECTION_TIMEOUT_JDK_MESSAGE);
abbreviations[0] = CONNECTION_TIMEOUT_ABBREVIATION;
exceptions[1] = new SocketTimeoutException(READ_TIMEOUT_JDK_MESSAGE);
abbreviations[1] = READ_TIMEOUT_ABBREVIATION;
testClientRequestIdForTimeoutRetry(exceptions, abbreviations, 1, 1);
}
@Test
public void testClientRequestIdForReadTimeoutRetry() throws Exception {
Exception[] exceptions = new Exception[1];
String[] abbreviations = new String[1];
exceptions[0] = new SocketTimeoutException(READ_TIMEOUT_JDK_MESSAGE);
abbreviations[0] = READ_TIMEOUT_ABBREVIATION;
testClientRequestIdForTimeoutRetry(exceptions, abbreviations, 1, 0);
}
@Test
public void testClientRequestIdForUnknownHostRetry() throws Exception {
Exception[] exceptions = new Exception[1];
String[] abbreviations = new String[1];
exceptions[0] = new UnknownHostException();
abbreviations[0] = UNKNOWN_HOST_EXCEPTION_ABBREVIATION;
testClientRequestIdForTimeoutRetry(exceptions, abbreviations, 1, 0);
}
@Test
public void testClientRequestIdForConnectionResetRetry() throws Exception {
Exception[] exceptions = new Exception[1];
String[] abbreviations = new String[1];
exceptions[0] = new SocketTimeoutException(CONNECTION_RESET_MESSAGE + " by peer");
abbreviations[0] = CONNECTION_RESET_ABBREVIATION;
testClientRequestIdForTimeoutRetry(exceptions, abbreviations, 1, 0);
}
@Test
public void testClientRequestIdForUnknownSocketExRetry() throws Exception {
Exception[] exceptions = new Exception[1];
String[] abbreviations = new String[1];
exceptions[0] = new SocketException("unknown");
abbreviations[0] = SOCKET_EXCEPTION_ABBREVIATION;
testClientRequestIdForTimeoutRetry(exceptions, abbreviations, 1, 0);
}
@Test
public void testClientRequestIdForIOERetry() throws Exception {
Exception[] exceptions = new Exception[1];
String[] abbreviations = new String[1];
exceptions[0] = new InterruptedIOException();
abbreviations[0] = IO_EXCEPTION_ABBREVIATION;
testClientRequestIdForTimeoutRetry(exceptions, abbreviations, 1, 0);
}
@Test
public void testClientRequestIdFor400Retry() throws Exception {
testClientRequestIdForStatusRetry(HTTP_BAD_REQUEST, "", "400", 1);
}
@Test
public void testClientRequestIdFor500Retry() throws Exception {
testClientRequestIdForStatusRetry(HTTP_INTERNAL_ERROR, "", "500", 1);
}
@Test
public void testClientRequestIdFor503INGRetry() throws Exception {
testClientRequestIdForStatusRetry(
HTTP_UNAVAILABLE,
INGRESS_OVER_ACCOUNT_LIMIT.getErrorMessage(),
INGRESS_LIMIT_BREACH_ABBREVIATION,
2);
}
@Test
public void testClientRequestIdFor503EGRRetry() throws Exception {
testClientRequestIdForStatusRetry(
HTTP_UNAVAILABLE,
EGRESS_OVER_ACCOUNT_LIMIT.getErrorMessage(),
EGRESS_LIMIT_BREACH_ABBREVIATION,
2);
}
@Test
public void testClientRequestIdFor503OPRRetry() throws Exception {
testClientRequestIdForStatusRetry(
HTTP_UNAVAILABLE,
TPS_OVER_ACCOUNT_LIMIT.getErrorMessage(),
TPS_LIMIT_BREACH_ABBREVIATION,
2);
}
@Test
public void testClientRequestIdFor503OtherRetry() throws Exception {
testClientRequestIdForStatusRetry(
HTTP_UNAVAILABLE,
OTHER_SERVER_THROTTLING.getErrorMessage(),
OTHER_SERVER_THROTTLING_ABBREVIATION,
1);
}
/**
* Test for mocking the failure scenario with retry policy assertions.
* Here we will try to create a request with following life cycle:
* 1. Primary Request made fails with Connection Timeout and fall into retry loop
* 2. Retried request fails with 503 and again go for retry
* 3. Retried request fails with 503 and do not go for retry.
*
* We will try to assert that:
* 1. Correct retry policy is used to get the retry interval for each failed request
* 2. Tracing header construction takes place with proper arguments based on the failure reason and retry policy used
* @throws Exception
*/
@Test
public void testRetryPolicyWithDifferentFailureReasons() throws Exception {
AbfsClient abfsClient = Mockito.mock(AbfsClient.class);
ExponentialRetryPolicy exponentialRetryPolicy = Mockito.mock(
ExponentialRetryPolicy.class);
StaticRetryPolicy staticRetryPolicy = Mockito.mock(StaticRetryPolicy.class);
AbfsThrottlingIntercept intercept = Mockito.mock(
AbfsThrottlingIntercept.class);
addGeneralMockBehaviourToAbfsClient(abfsClient, exponentialRetryPolicy, staticRetryPolicy, intercept);
AbfsRestOperation abfsRestOperation = Mockito.spy(new AbfsRestOperation(
AbfsRestOperationType.ReadFile,
abfsClient,
"PUT",
null,
new ArrayList<>(),
Mockito.mock(AbfsConfiguration.class)
));
AbfsHttpOperation httpOperation = Mockito.mock(AbfsHttpOperation.class);
addGeneralMockBehaviourToRestOpAndHttpOp(abfsRestOperation, httpOperation);
Stubber stubber = Mockito.doThrow(new SocketTimeoutException(CONNECTION_TIMEOUT_JDK_MESSAGE));
stubber.doNothing().when(httpOperation).processResponse(
nullable(byte[].class), nullable(int.class), nullable(int.class));
when(httpOperation.getStatusCode()).thenReturn(-1).thenReturn(HTTP_UNAVAILABLE);
TracingContext tracingContext = Mockito.mock(TracingContext.class);
Mockito.doNothing().when(tracingContext).setRetryCount(nullable(int.class));
Mockito.doReturn("").when(httpOperation).getStorageErrorMessage();
Mockito.doReturn("").when(httpOperation).getStorageErrorCode();
Mockito.doReturn("HEAD").when(httpOperation).getMethod();
Mockito.doReturn("").when(httpOperation).getMaskedUrl();
Mockito.doReturn("").when(httpOperation).getRequestId();
Mockito.doReturn(EGRESS_OVER_ACCOUNT_LIMIT.getErrorMessage()).when(httpOperation).getStorageErrorMessage();
Mockito.doReturn(tracingContext).when(abfsRestOperation).createNewTracingContext(any());
try {
// Operation will fail with CT first and then 503 thereafter.
abfsRestOperation.execute(tracingContext);
} catch(AbfsRestOperationException ex) {
Assertions.assertThat(ex.getStatusCode())
.describedAs("Status Code must be HTTP_UNAVAILABLE(503)")
.isEqualTo(HTTP_UNAVAILABLE);
}
// Assert that httpOperation.processResponse was called 3 times.
// One for retry count 0
// One for retry count 1 after failing with CT
// One for retry count 2 after failing with 503
Mockito.verify(httpOperation, times(3)).processResponse(
nullable(byte[].class), nullable(int.class), nullable(int.class));
// Primary Request Failed with CT. Static Retry Policy should be used.
Mockito.verify(abfsClient, Mockito.times(1))
.getRetryPolicy(CONNECTION_TIMEOUT_ABBREVIATION);
Mockito.verify(staticRetryPolicy, Mockito.times(1))
.shouldRetry(0, -1);
Mockito.verify(staticRetryPolicy, Mockito.times(1))
.getRetryInterval(1);
Mockito.verify(tracingContext, Mockito.times(1))
.constructHeader(httpOperation, CONNECTION_TIMEOUT_ABBREVIATION, STATIC_RETRY_POLICY_ABBREVIATION);
// Assert that exponential Retry Policy was used during second and third Iteration.
// Iteration 2 and 3 failed with 503 and should retry was called with retry count 1 and 2
// Before iteration 3 sleep will be computed using exponential retry policy and retry count 2
// Should retry with retry count 2 will return false and no further requests will be made.
Mockito.verify(abfsClient, Mockito.times(2))
.getRetryPolicy(EGRESS_LIMIT_BREACH_ABBREVIATION);
Mockito.verify(exponentialRetryPolicy, Mockito.times(1))
.shouldRetry(1, HTTP_UNAVAILABLE);
Mockito.verify(exponentialRetryPolicy, Mockito.times(1))
.shouldRetry(2, HTTP_UNAVAILABLE);
Mockito.verify(exponentialRetryPolicy, Mockito.times(1))
.getRetryInterval(2);
Mockito.verify(tracingContext, Mockito.times(1))
.constructHeader(httpOperation, EGRESS_LIMIT_BREACH_ABBREVIATION, EXPONENTIAL_RETRY_POLICY_ABBREVIATION);
// Assert that intercept.updateMetrics was called 2 times. Both the retried request fails with EGR.
Mockito.verify(intercept, Mockito.times(2))
.updateMetrics(nullable(AbfsRestOperationType.class), nullable(
AbfsHttpOperation.class));
}
private void testClientRequestIdForStatusRetry(int status,
String serverErrorMessage,
String keyExpected,
int numOfTimesCSTMetricsUpdated) throws Exception {
AbfsClient abfsClient = Mockito.mock(AbfsClient.class);
ExponentialRetryPolicy exponentialRetryPolicy = Mockito.mock(
ExponentialRetryPolicy.class);
StaticRetryPolicy staticRetryPolicy = Mockito.mock(StaticRetryPolicy.class);
AbfsThrottlingIntercept intercept = Mockito.mock(
AbfsThrottlingIntercept.class);
addGeneralMockBehaviourToAbfsClient(abfsClient, exponentialRetryPolicy, staticRetryPolicy, intercept);
// Create a readfile operation that will fail
AbfsRestOperation abfsRestOperation = Mockito.spy(new AbfsRestOperation(
AbfsRestOperationType.ReadFile,
abfsClient,
"PUT",
null,
new ArrayList<>(),
Mockito.mock(AbfsConfiguration.class)
));
AbfsHttpOperation httpOperation = Mockito.mock(AbfsHttpOperation.class);
addGeneralMockBehaviourToRestOpAndHttpOp(abfsRestOperation, httpOperation);
Mockito.doNothing()
.doNothing()
.when(httpOperation)
.processResponse(nullable(byte[].class), nullable(int.class),
nullable(int.class));
int[] statusCount = new int[1];
statusCount[0] = 0;
Mockito.doAnswer(answer -> {
if (statusCount[0] <= 10) {
statusCount[0]++;
return status;
}
return HTTP_OK;
}).when(httpOperation).getStatusCode();
Mockito.doReturn(serverErrorMessage)
.when(httpOperation)
.getStorageErrorMessage();
TracingContext tracingContext = Mockito.mock(TracingContext.class);
Mockito.doNothing().when(tracingContext).setRetryCount(nullable(int.class));
Mockito.doReturn(tracingContext).when(abfsRestOperation).createNewTracingContext(any());
int[] count = new int[1];
count[0] = 0;
Mockito.doAnswer(invocationOnMock -> {
if (count[0] == 1) {
Assertions.assertThat((String) invocationOnMock.getArgument(1))
.isEqualTo(keyExpected);
}
count[0]++;
return null;
}).when(tracingContext).constructHeader(any(), any(), any());
abfsRestOperation.execute(tracingContext);
Assertions.assertThat(count[0]).isEqualTo(2);
Mockito.verify(intercept, Mockito.times(numOfTimesCSTMetricsUpdated)).updateMetrics(any(), any());
}
private void testClientRequestIdForTimeoutRetry(Exception[] exceptions,
String[] abbreviationsExpected,
int len,
int numOfCTExceptions) throws Exception {
AbfsClient abfsClient = Mockito.mock(AbfsClient.class);
ExponentialRetryPolicy exponentialRetryPolicy = Mockito.mock(
ExponentialRetryPolicy.class);
StaticRetryPolicy staticRetryPolicy = Mockito.mock(StaticRetryPolicy.class);
AbfsThrottlingIntercept intercept = Mockito.mock(
AbfsThrottlingIntercept.class);
addGeneralMockBehaviourToAbfsClient(abfsClient, exponentialRetryPolicy, staticRetryPolicy, intercept);
AbfsRestOperation abfsRestOperation = Mockito.spy(new AbfsRestOperation(
AbfsRestOperationType.ReadFile,
abfsClient,
"PUT",
null,
new ArrayList<>(),
Mockito.mock(AbfsConfiguration.class)
));
AbfsHttpOperation httpOperation = Mockito.mock(AbfsHttpOperation.class);
addGeneralMockBehaviourToRestOpAndHttpOp(abfsRestOperation, httpOperation);
Stubber stubber = Mockito.doThrow(exceptions[0]);
for (int iteration = 1; iteration < len; iteration++) {
stubber.doThrow(exceptions[iteration]);
}
stubber
.doNothing()
.when(httpOperation)
.processResponse(nullable(byte[].class), nullable(int.class),
nullable(int.class));
Mockito.doReturn(HTTP_OK).when(httpOperation).getStatusCode();
TracingContext tracingContext = Mockito.mock(TracingContext.class);
Mockito.doNothing().when(tracingContext).setRetryCount(nullable(int.class));
Mockito.doReturn(tracingContext).when(abfsRestOperation).createNewTracingContext(any());
int[] count = new int[1];
count[0] = 0;
Mockito.doAnswer(invocationOnMock -> {
if (count[0] > 0 && count[0] <= len) {
Assertions.assertThat((String) invocationOnMock.getArgument(1))
.isEqualTo(abbreviationsExpected[count[0] - 1]);
}
count[0]++;
return null;
}).when(tracingContext).constructHeader(any(), any(), any());
abfsRestOperation.execute(tracingContext);
Assertions.assertThat(count[0]).isEqualTo(len + 1);
/**
* Assert that getRetryPolicy was called with
* failureReason CT only for Connection Timeout Cases.
* For every failed request getRetryPolicy will be called three times
* It will be called with failureReason CT for every request failing with CT
*/
Mockito.verify(abfsClient, Mockito.times(
numOfCTExceptions))
.getRetryPolicy(CONNECTION_TIMEOUT_ABBREVIATION);
}
}