AbfsAHCHttpOperation.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs.services;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.fs.PathIOException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsApacheHttpExpect100Exception;
import org.apache.http.Header;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpDelete;
import org.apache.http.client.methods.HttpEntityEnclosingRequestBase;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpHead;
import org.apache.http.client.methods.HttpPatch;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.client.methods.HttpRequestBase;
import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.util.EntityUtils;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APACHE_IMPL;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_DELETE;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_GET;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_HEAD;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PATCH;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_POST;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PUT;
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.CONTENT_LENGTH;
import static org.apache.http.entity.ContentType.TEXT_PLAIN;
/**
* Implementation of {@link AbfsHttpOperation} for orchestrating server calls using
* Apache Http Client.
*/
public class AbfsAHCHttpOperation extends AbfsHttpOperation {
private static final Logger LOG = LoggerFactory.getLogger(
AbfsAHCHttpOperation.class);
/**
* Request object for network call over ApacheHttpClient.
*/
private final HttpRequestBase httpRequestBase;
/**
* Response object received from a server call over ApacheHttpClient.
*/
private HttpResponse httpResponse;
/**
* Flag to indicate if the request is a payload request. HTTP methods PUT, POST,
* PATCH qualify for payload requests.
*/
private final boolean isPayloadRequest;
/**
* ApacheHttpClient to make network calls.
*/
private final AbfsApacheHttpClient abfsApacheHttpClient;
public AbfsAHCHttpOperation(final URL url,
final String method,
final List<AbfsHttpHeader> requestHeaders,
final Duration connectionTimeout,
final Duration readTimeout,
final AbfsApacheHttpClient abfsApacheHttpClient,
final AbfsClient abfsClient) throws IOException {
super(LOG, url, method, requestHeaders, connectionTimeout, readTimeout, abfsClient);
this.isPayloadRequest = HTTP_METHOD_PUT.equals(method)
|| HTTP_METHOD_PATCH.equals(method)
|| HTTP_METHOD_POST.equals(method);
this.abfsApacheHttpClient = abfsApacheHttpClient;
LOG.debug("Creating AbfsAHCHttpOperation for URL: {}, method: {}",
url, method);
final URI requestUri;
try {
requestUri = url.toURI();
} catch (URISyntaxException e) {
throw new IOException(e);
}
switch (getMethod()) {
case HTTP_METHOD_PUT:
httpRequestBase = new HttpPut(requestUri);
break;
case HTTP_METHOD_PATCH:
httpRequestBase = new HttpPatch(requestUri);
break;
case HTTP_METHOD_POST:
httpRequestBase = new HttpPost(requestUri);
break;
case HTTP_METHOD_GET:
httpRequestBase = new HttpGet(requestUri);
break;
case HTTP_METHOD_DELETE:
httpRequestBase = new HttpDelete(requestUri);
break;
case HTTP_METHOD_HEAD:
httpRequestBase = new HttpHead(requestUri);
break;
default:
/*
* This would not happen as the AbfsClient would always be sending valid
* method.
*/
throw new PathIOException(getUrl().toString(),
"Unsupported HTTP method: " + getMethod());
}
}
/**
* @return AbfsManagedHttpClientContext instance that captures latencies at
* different phases of network call.
*/
@VisibleForTesting
AbfsManagedHttpClientContext getHttpClientContext() {
return new AbfsManagedHttpClientContext();
}
/**{@inheritDoc}*/
@Override
protected InputStream getErrorStream() throws IOException {
HttpEntity entity = httpResponse.getEntity();
if (entity == null) {
return null;
}
return entity.getContent();
}
/**{@inheritDoc}*/
@Override
String getConnProperty(final String key) {
for (AbfsHttpHeader header : getRequestHeaders()) {
if (header.getName().equals(key)) {
return header.getValue();
}
}
return null;
}
/**{@inheritDoc}*/
@Override
URL getConnUrl() {
return getUrl();
}
/**{@inheritDoc}*/
@Override
Integer getConnResponseCode() throws IOException {
return getStatusCode();
}
/**{@inheritDoc}*/
@Override
String getConnResponseMessage() throws IOException {
return getStatusDescription();
}
/**{@inheritDoc}*/
@Override
public void processResponse(final byte[] buffer,
final int offset,
final int length) throws IOException {
try {
if (!isPayloadRequest) {
prepareRequest();
LOG.debug("Sending request: {}", httpRequestBase);
httpResponse = executeRequest();
LOG.debug("Request sent: {}; response {}", httpRequestBase,
httpResponse);
}
parseResponseHeaderAndBody(buffer, offset, length);
} finally {
if (httpResponse != null) {
try {
EntityUtils.consume(httpResponse.getEntity());
} finally {
if (httpResponse instanceof CloseableHttpResponse) {
((CloseableHttpResponse) httpResponse).close();
}
}
}
}
}
/**
* Parse response stream for headers and body.
*
* @param buffer byte array to store response body.
* @param offset offset in the buffer to start storing the response body.
* @param length length of the response body.
*
* @throws IOException network error while read response stream
*/
@VisibleForTesting
void parseResponseHeaderAndBody(final byte[] buffer,
final int offset,
final int length) throws IOException {
setStatusCode(parseStatusCode(httpResponse));
setStatusDescription(httpResponse.getStatusLine().getReasonPhrase());
setRequestId();
// dump the headers
if (LOG.isDebugEnabled()) {
AbfsIoUtils.dumpHeadersToDebugLog("Request Headers",
getRequestProperties());
}
parseResponse(buffer, offset, length);
}
/**
* Parse status code from response
*
* @param httpResponse response object
* @return status code
*/
@VisibleForTesting
int parseStatusCode(HttpResponse httpResponse) {
return httpResponse.getStatusLine().getStatusCode();
}
/**
* Execute network call for the request
*
* @return response object
* @throws IOException network error while executing the request
*/
@VisibleForTesting
HttpResponse executeRequest() throws IOException {
AbfsManagedHttpClientContext abfsHttpClientContext
= getHttpClientContext();
try {
LOG.debug("Executing request: {}", httpRequestBase);
HttpResponse response = abfsApacheHttpClient.execute(httpRequestBase,
abfsHttpClientContext, getConnectionTimeout(), getReadTimeout());
setConnectionTimeMs(abfsHttpClientContext.getConnectTime());
setSendRequestTimeMs(abfsHttpClientContext.getSendTime());
setRecvResponseTimeMs(abfsHttpClientContext.getReadTime());
return response;
} catch (IOException e) {
LOG.debug("Failed to execute request: {}", httpRequestBase, e);
throw e;
}
}
/**{@inheritDoc}*/
@Override
public void setRequestProperty(final String key, final String value) {
List<AbfsHttpHeader> headers = getRequestHeaders();
if (headers != null) {
headers.add(new AbfsHttpHeader(key, value));
}
}
/**{@inheritDoc}*/
@Override
Map<String, List<String>> getRequestProperties() {
Map<String, List<String>> map = new HashMap<>();
for (AbfsHttpHeader header : getRequestHeaders()) {
map.put(header.getName(),
new ArrayList<String>() {{
add(header.getValue());
}});
}
return map;
}
/**{@inheritDoc}*/
@Override
public String getResponseHeader(final String headerName) {
if (httpResponse == null) {
return null;
}
Header header = httpResponse.getFirstHeader(headerName);
if (header != null) {
return header.getValue();
}
return null;
}
/**{@inheritDoc}*/
@Override
public Map<String, List<String>> getResponseHeaders() {
Map<String, List<String>> headers = new HashMap<>();
if (httpResponse == null) {
return headers;
}
for (Header header : httpResponse.getAllHeaders()) {
headers.computeIfAbsent(header.getName(), k -> new ArrayList<>())
.add(header.getValue());
}
return headers;
}
/**{@inheritDoc}*/
@Override
public String getResponseHeaderIgnoreCase(final String headerName) {
Map<String, List<String>> responseHeaders = getResponseHeaders();
if (responseHeaders == null || responseHeaders.isEmpty()) {
return null;
}
// Search for the header value case-insensitively
return responseHeaders.entrySet().stream()
.filter(entry -> entry.getKey() != null
&& entry.getKey().equalsIgnoreCase(headerName))
.flatMap(entry -> entry.getValue().stream())
.findFirst()
.orElse(null); // Return null if no match is found
}
/**{@inheritDoc}*/
@Override
protected InputStream getContentInputStream()
throws IOException {
if (httpResponse == null || httpResponse.getEntity() == null) {
return null;
}
return httpResponse.getEntity().getContent();
}
/**{@inheritDoc}*/
@Override
public void sendPayload(final byte[] buffer,
final int offset,
final int length)
throws IOException {
if (!isPayloadRequest) {
return;
}
setExpectedBytesToBeSent(length);
if (buffer != null) {
HttpEntity httpEntity = new ByteArrayEntity(buffer, offset, length,
TEXT_PLAIN);
((HttpEntityEnclosingRequestBase) httpRequestBase).setEntity(
httpEntity);
}
prepareRequest();
try {
LOG.debug("Sending request: {}", httpRequestBase);
httpResponse = executeRequest();
} catch (AbfsApacheHttpExpect100Exception ex) {
LOG.debug(
"Getting output stream failed with expect header enabled, returning back."
+ "Expect 100 assertion failed for uri {} with status code: {}",
getMaskedUrl(), parseStatusCode(ex.getHttpResponse()),
ex);
setConnectionDisconnectedOnError();
httpResponse = ex.getHttpResponse();
} catch (IOException ex) {
LOG.debug("Getting output stream failed for uri {}, exception: {}",
getMaskedUrl(), ex);
throw ex;
} finally {
if (httpResponse != null) {
LOG.debug("Request sent: {}; response {}", httpRequestBase,
httpResponse);
}
if (!isConnectionDisconnectedOnError()
&& httpRequestBase instanceof HttpEntityEnclosingRequestBase) {
setBytesSent(length);
}
}
}
/**
* Sets the header on the request.
*/
private void prepareRequest() {
final boolean isEntityBasedRequest
= httpRequestBase instanceof HttpEntityEnclosingRequestBase;
for (AbfsHttpHeader header : getRequestHeaders()) {
if (CONTENT_LENGTH.equals(header.getName()) && isEntityBasedRequest) {
continue;
}
httpRequestBase.setHeader(header.getName(), header.getValue());
}
}
/**{@inheritDoc}*/
@Override
public String getRequestProperty(String name) {
for (AbfsHttpHeader header : getRequestHeaders()) {
if (header.getName().equals(name)) {
String val = header.getValue();
val = val == null ? EMPTY_STRING : val;
if (EMPTY_STRING.equals(val)) {
continue;
}
return val;
}
}
return EMPTY_STRING;
}
/**{@inheritDoc}*/
@Override
public String getTracingContextSuffix() {
return APACHE_IMPL;
}
}