ITestAbfsRestOperation.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs.services;
import java.io.IOException;
import java.io.OutputStream;
import java.net.ProtocolException;
import java.net.URL;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.mockito.Mockito;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
import org.apache.hadoop.fs.azurebfs.TestAbfsConfigurationFieldsValidation;
import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys;
import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
import org.apache.hadoop.fs.azurebfs.constants.HttpOperationType;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsApacheHttpExpect100Exception;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
import org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters;
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderFormat;
import org.apache.http.HttpResponse;
import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
import static java.net.HttpURLConnection.HTTP_OK;
import static java.net.HttpURLConnection.HTTP_UNAVAILABLE;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APPEND_ACTION;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EXPECT_100_JDK_ERROR;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PATCH;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PUT;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HUNDRED_CONTINUE;
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.EXPECT;
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_HTTP_METHOD_OVERRIDE;
import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_ACTION;
import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_POSITION;
import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_ABFS_ACCOUNT_NAME;
import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.TEST_CONFIGURATION_FILE_NAME;
import static org.apache.hadoop.fs.azurebfs.constants.HttpOperationType.APACHE_HTTP_CLIENT;
import static org.apache.hadoop.fs.azurebfs.constants.HttpOperationType.JDK_HTTP_URL_CONNECTION;
import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.EGRESS_OVER_ACCOUNT_LIMIT;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
@RunWith(Parameterized.class)
public class ITestAbfsRestOperation extends AbstractAbfsIntegrationTest {
// Specifies whether getOutputStream() or write() throws IOException.
public enum ErrorType {OUTPUTSTREAM, WRITE};
private static final int HTTP_EXPECTATION_FAILED = 417;
private static final int HTTP_ERROR = 0;
private static final int ZERO = 0;
private static final int REDUCED_RETRY_COUNT = 2;
private static final int REDUCED_BACKOFF_INTERVAL = 100;
private static final int BUFFER_LENGTH = 5;
private static final int BUFFER_OFFSET = 0;
private static final String TEST_PATH = "/testfile";
// Specifies whether the expect header is enabled or not.
@Parameterized.Parameter
public boolean expectHeaderEnabled;
// Gives the http response code.
@Parameterized.Parameter(1)
public int responseCode;
// Gives the http response message.
@Parameterized.Parameter(2)
public String responseMessage;
// Gives the errorType based on the enum.
@Parameterized.Parameter(3)
public ErrorType errorType;
@Parameterized.Parameter(4)
public HttpOperationType httpOperationType;
// The intercept.
private AbfsThrottlingIntercept intercept;
/*
HTTP_OK = 200,
HTTP_UNAVAILABLE = 503,
HTTP_NOT_FOUND = 404,
HTTP_EXPECTATION_FAILED = 417,
HTTP_ERROR = 0.
*/
@Parameterized.Parameters(name = "expect={0}-code={1}-ErrorType={3}=NetLib={4}")
public static Iterable<Object[]> params() {
return Arrays.asList(new Object[][]{
{true, HTTP_OK, "OK", ErrorType.WRITE, JDK_HTTP_URL_CONNECTION},
{true, HTTP_OK, "OK", ErrorType.WRITE, APACHE_HTTP_CLIENT},
{false, HTTP_OK, "OK", ErrorType.WRITE, JDK_HTTP_URL_CONNECTION},
{false, HTTP_OK, "OK", ErrorType.WRITE, APACHE_HTTP_CLIENT},
{true, HTTP_UNAVAILABLE, "ServerBusy", ErrorType.OUTPUTSTREAM, JDK_HTTP_URL_CONNECTION},
{true, HTTP_UNAVAILABLE, "ServerBusy", ErrorType.OUTPUTSTREAM, APACHE_HTTP_CLIENT},
{true, HTTP_NOT_FOUND, "Resource Not Found", ErrorType.OUTPUTSTREAM, JDK_HTTP_URL_CONNECTION},
{true, HTTP_NOT_FOUND, "Resource Not Found", ErrorType.OUTPUTSTREAM, APACHE_HTTP_CLIENT},
{true, HTTP_EXPECTATION_FAILED, "Expectation Failed", ErrorType.OUTPUTSTREAM, JDK_HTTP_URL_CONNECTION},
{true, HTTP_EXPECTATION_FAILED, "Expectation Failed", ErrorType.OUTPUTSTREAM, APACHE_HTTP_CLIENT},
{true, HTTP_ERROR, "Error", ErrorType.OUTPUTSTREAM, JDK_HTTP_URL_CONNECTION},
{true, HTTP_ERROR, "Error", ErrorType.OUTPUTSTREAM, APACHE_HTTP_CLIENT}
});
}
public ITestAbfsRestOperation() throws Exception {
super();
}
/**
* Test helper method to get random bytes array.
* @param length The length of byte buffer
* @return byte buffer
*/
private byte[] getRandomBytesArray(int length) {
final byte[] b = new byte[length];
new Random().nextBytes(b);
return b;
}
@Override
public AzureBlobFileSystem getFileSystem(final Configuration configuration)
throws Exception {
Configuration conf = new Configuration(configuration);
conf.set(ConfigurationKeys.FS_AZURE_NETWORKING_LIBRARY, httpOperationType.toString());
return (AzureBlobFileSystem) FileSystem.newInstance(conf);
}
/**
* Gives the AbfsRestOperation.
* @return abfsRestOperation.
*/
private AbfsRestOperation getRestOperation() throws Exception {
// Get the filesystem.
final AzureBlobFileSystem fs = getFileSystem(getRawConfiguration());
final Configuration configuration = fs.getConf();
configuration.addResource(TEST_CONFIGURATION_FILE_NAME);
AbfsClient abfsClient = fs.getAbfsStore().getClient();
AbfsConfiguration abfsConfiguration = new AbfsConfiguration(configuration,
configuration.get(FS_AZURE_ABFS_ACCOUNT_NAME));
// Update the configuration with reduced retry count and reduced backoff interval.
AbfsConfiguration abfsConfig
= TestAbfsConfigurationFieldsValidation.updateRetryConfigs(
abfsConfiguration,
REDUCED_RETRY_COUNT, REDUCED_BACKOFF_INTERVAL);
intercept = Mockito.mock(AbfsThrottlingIntercept.class);
Mockito.doNothing().when(intercept).updateMetrics(Mockito.any(), Mockito.any());
// Gets the client.
AbfsClient testClient = Mockito.spy(ITestAbfsClient.createTestClientFromCurrentContext(
abfsClient,
abfsConfig));
Mockito.doReturn(intercept).when(testClient).getIntercept();
// Expect header is enabled or not based on the parameter.
AppendRequestParameters appendRequestParameters
= new AppendRequestParameters(
BUFFER_OFFSET, BUFFER_OFFSET, BUFFER_LENGTH,
AppendRequestParameters.Mode.APPEND_MODE, false, null,
expectHeaderEnabled);
byte[] buffer = getRandomBytesArray(5);
// Create a test container to upload the data.
Path testPath = path(TEST_PATH);
fs.create(testPath);
String finalTestPath = testPath.toString().substring(testPath.toString().lastIndexOf("/"));
// Creates a list of request headers.
final List<AbfsHttpHeader> requestHeaders = ITestAbfsClient.getTestRequestHeaders(testClient);
requestHeaders.add(new AbfsHttpHeader(X_HTTP_METHOD_OVERRIDE, HTTP_METHOD_PATCH));
if (appendRequestParameters.isExpectHeaderEnabled()) {
requestHeaders.add(new AbfsHttpHeader(EXPECT, HUNDRED_CONTINUE));
}
// Updates the query parameters.
final AbfsUriQueryBuilder abfsUriQueryBuilder = testClient.createDefaultUriQueryBuilder();
abfsUriQueryBuilder.addQuery(QUERY_PARAM_ACTION, APPEND_ACTION);
abfsUriQueryBuilder.addQuery(QUERY_PARAM_POSITION, Long.toString(appendRequestParameters.getPosition()));
// Creates the url for the specified path.
URL url = testClient.createRequestUrl(finalTestPath, abfsUriQueryBuilder.toString());
// Create a mock of the AbfsRestOperation to set the urlConnection in the corresponding httpOperation.
final AbfsRestOperation op = Mockito.spy(new AbfsRestOperation(
AbfsRestOperationType.Append,
testClient,
HTTP_METHOD_PUT,
url,
requestHeaders, buffer,
appendRequestParameters.getoffset(),
appendRequestParameters.getLength(), null, abfsConfig));
Mockito.doAnswer(answer -> {
AbfsHttpOperation httpOperation = Mockito.spy(
(AbfsHttpOperation) answer.callRealMethod());
mockHttpOperation(appendRequestParameters, buffer, url, httpOperation);
Mockito.doReturn(httpOperation).when(op).getResult();
return httpOperation;
}).when(op).createHttpOperation();
return op;
}
private void mockHttpOperation(final AppendRequestParameters appendRequestParameters,
final byte[] buffer,
final URL url,
final AbfsHttpOperation httpOperation) throws IOException {
// Sets the expect request property if expect header is enabled.
if (expectHeaderEnabled) {
Mockito.doReturn(HUNDRED_CONTINUE)
.when(httpOperation)
.getConnProperty(EXPECT);
}
Mockito.doNothing().when(httpOperation).setRequestProperty(Mockito
.any(), Mockito.any());
switch (errorType) {
case OUTPUTSTREAM:
// If the getOutputStream() throws IOException and Expect Header is
// enabled, it returns back to processResponse and hence we have
// mocked the response code and the response message to check different
// behaviour based on response code.
Mockito.doReturn(responseCode).when(httpOperation).getStatusCode();
if (responseCode == HTTP_UNAVAILABLE) {
Mockito.doReturn(EGRESS_OVER_ACCOUNT_LIMIT.getErrorMessage())
.when(httpOperation)
.getStorageErrorMessage();
}
Mockito.doReturn(responseMessage)
.when(httpOperation)
.getConnResponseMessage();
if (httpOperation instanceof AbfsJdkHttpOperation) {
Mockito.doThrow(new ProtocolException(EXPECT_100_JDK_ERROR))
.when((AbfsJdkHttpOperation) httpOperation)
.getConnOutputStream();
}
if (httpOperation instanceof AbfsAHCHttpOperation) {
Mockito.doNothing()
.when((AbfsAHCHttpOperation) httpOperation)
.parseResponseHeaderAndBody(Mockito.any(byte[].class),
Mockito.anyInt(), Mockito.anyInt());
Mockito.doReturn(HTTP_NOT_FOUND)
.when((AbfsAHCHttpOperation) httpOperation)
.parseStatusCode(Mockito.nullable(
HttpResponse.class));
Mockito.doThrow(
new AbfsApacheHttpExpect100Exception(Mockito.mock(HttpResponse.class)))
.when((AbfsAHCHttpOperation) httpOperation).executeRequest();
}
break;
case WRITE:
// If write() throws IOException and Expect Header is
// enabled or not, it should throw back the exception.
if (httpOperation instanceof AbfsAHCHttpOperation) {
Mockito.doThrow(new IOException())
.when((AbfsAHCHttpOperation) httpOperation).executeRequest();
return;
}
OutputStream outputStream = Mockito.spy(new OutputStream() {
@Override
public void write(final int i) throws IOException {
}
});
Mockito.doReturn(outputStream)
.when((AbfsJdkHttpOperation) httpOperation)
.getConnOutputStream();
Mockito.doThrow(new IOException())
.when(outputStream)
.write(buffer, appendRequestParameters.getoffset(),
appendRequestParameters.getLength());
break;
default:
break;
}
}
void assertTraceContextState(int retryCount, int assertRetryCount, int bytesSent, int assertBytesSent,
int expectedBytesSent, int assertExpectedBytesSent) {
// Assert that the request is retried or not.
Assertions.assertThat(retryCount)
.describedAs("The retry count is incorrect")
.isEqualTo(assertRetryCount);
// Assert that metrics will be updated correctly.
Assertions.assertThat(bytesSent)
.describedAs("The bytes sent is incorrect")
.isEqualTo(assertBytesSent);
Assertions.assertThat(expectedBytesSent)
.describedAs("The expected bytes sent is incorrect")
.isEqualTo(assertExpectedBytesSent);
}
/**
* Test the functionalities based on whether getOutputStream() or write()
* throws exception and what is the corresponding response code.
*/
@Test
public void testExpectHundredContinue() throws Exception {
// Gets the AbfsRestOperation.
AbfsRestOperation op = getRestOperation();
TracingContext tracingContext = Mockito.spy(new TracingContext("abcd",
"abcde", FSOperationType.APPEND,
TracingHeaderFormat.ALL_ID_FORMAT, null));
Mockito.doReturn(tracingContext).when(op).createNewTracingContext(Mockito.any());
switch (errorType) {
case WRITE:
// If write() throws IOException and Expect Header is
// enabled or not, it should throw back the exception
// which is caught and exponential retry logic comes into place.
intercept(IOException.class,
() -> op.execute(tracingContext));
// Asserting update of metrics and retries.
assertTraceContextState(tracingContext.getRetryCount(), REDUCED_RETRY_COUNT, op.getResult().getBytesSent(), BUFFER_LENGTH,
0, 0);
break;
case OUTPUTSTREAM:
switch (responseCode) {
case HTTP_UNAVAILABLE:
// In the case of 503 i.e. throttled case, we should retry.
intercept(IOException.class,
() -> op.execute(tracingContext));
// Asserting update of metrics and retries.
assertTraceContextState(tracingContext.getRetryCount(), REDUCED_RETRY_COUNT, op.getResult().getBytesSent(), ZERO,
op.getResult().getExpectedBytesToBeSent(), BUFFER_LENGTH);
// Verifies that update Metrics call is made for throttle case and for the first without retry +
// for the retried cases as well.
Mockito.verify(intercept, times(REDUCED_RETRY_COUNT + 1))
.updateMetrics(Mockito.any(), Mockito.any());
break;
case HTTP_ERROR:
// In the case of http status code 0 i.e. ErrorType case, we should retry.
intercept(IOException.class,
() -> op.execute(tracingContext));
// Asserting update of metrics and retries.
assertTraceContextState(tracingContext.getRetryCount(), REDUCED_RETRY_COUNT, op.getResult().getBytesSent(),
ZERO, 0, 0);
// Verifies that update Metrics call is made for ErrorType case and for the first without retry +
// for the retried cases as well.
Mockito.verify(intercept, times(REDUCED_RETRY_COUNT + 1))
.updateMetrics(Mockito.any(), Mockito.any());
break;
case HTTP_NOT_FOUND:
case HTTP_EXPECTATION_FAILED:
// In the case of 4xx ErrorType. i.e. user ErrorType, retry should not happen.
intercept(AzureBlobFileSystemException.class,
() -> op.execute(tracingContext));
// Asserting update of metrics and retries.
assertTraceContextState(tracingContext.getRetryCount(), ZERO, 0,
0, 0, 0);
// Verifies that update Metrics call is not made for user ErrorType case.
Mockito.verify(intercept, never())
.updateMetrics(Mockito.any(), Mockito.any());
break;
default:
break;
}
break;
default:
break;
}
}
}