AzureIngressHandler.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs.services;
import java.io.IOException;
import java.security.MessageDigest;
import java.util.Base64;
import java.util.Objects;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidIngressServiceException;
import org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters;
import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode;
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
import org.apache.hadoop.fs.store.DataBlocks;
import static java.net.HttpURLConnection.HTTP_CONFLICT;
import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.BLOB_OPERATION_NOT_SUPPORTED;
import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.INVALID_APPEND_OPERATION;
/**
* Abstract base class for handling ingress operations for Azure Data Lake Storage (ADLS).
*/
public abstract class AzureIngressHandler {
private static final Logger LOG = LoggerFactory.getLogger(
AbfsOutputStream.class);
/** The output stream associated with this handler */
private AbfsOutputStream abfsOutputStream;
/**
* Constructs an AzureIngressHandler.
*
* @param abfsOutputStream the output stream associated with this handler
*/
protected AzureIngressHandler(AbfsOutputStream abfsOutputStream) {
this.abfsOutputStream = abfsOutputStream;
}
/**
* Gets the AbfsOutputStream associated with this handler.
*
* @return the AbfsOutputStream
*/
public AbfsOutputStream getAbfsOutputStream() {
return abfsOutputStream;
}
/**
* Sets the AbfsOutputStream associated with this handler.
*
* @param abfsOutputStream the AbfsOutputStream to set
*/
public void setAbfsOutputStream(final AbfsOutputStream abfsOutputStream) {
this.abfsOutputStream = abfsOutputStream;
}
/**
* Gets the eTag value of the blob.
*
* @return the eTag.
*/
public abstract String getETag();
/**
* Buffers data into the specified block.
*
* @param block the block to buffer data into
* @param data the data to buffer
* @param off the start offset in the data
* @param length the number of bytes to buffer
* @return the number of bytes buffered
* @throws IOException if an I/O error occurs
*/
protected abstract int bufferData(AbfsBlock block,
byte[] data, int off, int length) throws IOException;
/**
* Performs a remote write operation to upload a block.
*
* @param blockToUpload the block to upload
* @param uploadData the data to upload
* @param reqParams the request parameters for the append operation
* @param tracingContext the tracing context
* @return the result of the REST operation
* @throws IOException if an I/O error occurs
*/
protected abstract AbfsRestOperation remoteWrite(AbfsBlock blockToUpload,
DataBlocks.BlockUploadData uploadData,
AppendRequestParameters reqParams,
TracingContext tracingContext) throws IOException;
/**
* Performs a remote flush operation.
*
* @param offset the offset to flush to
* @param retainUncommittedData whether to retain uncommitted data
* @param isClose whether this is a close operation
* @param leaseId the lease ID
* @param tracingContext the tracing context
* @return the result of the REST operation
* @throws IOException if an I/O error occurs
*/
protected abstract AbfsRestOperation remoteFlush(long offset,
boolean retainUncommittedData,
boolean isClose,
String leaseId,
TracingContext tracingContext) throws IOException;
/**
* Writes the current buffer to the service for an append blob.
*
* @throws IOException if an I/O error occurs
*/
protected abstract void writeAppendBlobCurrentBufferToService()
throws IOException;
/**
* Abstract method to perform a remote write operation for appending data to an append blob in Azure Blob Storage.
*
* <p>This method is intended to be implemented by subclasses to handle the specific
* case of appending data to an append blob. It takes in the path of the append blob,
* the data to be uploaded, the block of data, and additional parameters required for
* the append operation.</p>
*
* @param path The path of the append blob to which data is to be appended.
* @param uploadData The data to be uploaded as part of the append operation.
* @param block The block of data to append.
* @param reqParams The additional parameters required for the append operation.
* @param tracingContext The tracing context for the operation.
* @return An {@link AbfsRestOperation} object representing the remote write operation.
* @throws IOException If an I/O error occurs during the append operation.
*/
protected abstract AbfsRestOperation remoteAppendBlobWrite(String path,
DataBlocks.BlockUploadData uploadData,
AbfsBlock block,
AppendRequestParameters reqParams,
TracingContext tracingContext) throws IOException;
/**
* Determines if the ingress handler should be switched based on the given exception.
*
* @param ex the exception that occurred
* @return true if the ingress handler should be switched, false otherwise
*/
protected boolean shouldIngressHandlerBeSwitched(AbfsRestOperationException ex) {
if (ex == null || ex.getErrorCode() == null) {
return false;
}
String errorCode = ex.getErrorCode().getErrorCode();
if (errorCode != null) {
return ex.getStatusCode() == HTTP_CONFLICT
&& (Objects.equals(errorCode, AzureServiceErrorCode.BLOB_OPERATION_NOT_SUPPORTED.getErrorCode())
|| Objects.equals(errorCode, AzureServiceErrorCode.INVALID_APPEND_OPERATION.getErrorCode()));
}
return false;
}
/**
* Constructs an InvalidIngressServiceException that includes the current handler class name in the exception message.
*
* @param e the original AbfsRestOperationException that triggered this exception.
* @return an InvalidIngressServiceException with the status code, error code, original message, and handler class name.
*/
protected InvalidIngressServiceException getIngressHandlerSwitchException(
AbfsRestOperationException e) {
if (e.getMessage().contains(BLOB_OPERATION_NOT_SUPPORTED)) {
return new InvalidIngressServiceException(e.getStatusCode(),
AzureServiceErrorCode.BLOB_OPERATION_NOT_SUPPORTED.getErrorCode(),
BLOB_OPERATION_NOT_SUPPORTED + " " + getClass().getName(), e);
} else {
return new InvalidIngressServiceException(e.getStatusCode(),
AzureServiceErrorCode.INVALID_APPEND_OPERATION.getErrorCode(),
INVALID_APPEND_OPERATION + " " + getClass().getName(), e);
}
}
/**
* Gets the block manager associated with this handler.
*
* @return the block manager
*/
protected abstract AzureBlockManager getBlockManager();
/**
* Gets the client associated with this handler.
*
* @return the block manager
*/
public abstract AbfsClient getClient();
/**
* Computes the Base64-encoded MD5 hash of the full blob content.
*
* <p>This method clones the current state of the {@link MessageDigest} instance
* associated with the blob content to avoid resetting its original state. It then
* calculates the MD5 digest and encodes it into a Base64 string.</p>
*
* @return A Base64-encoded string representing the MD5 hash of the full blob content,
* or {@code null} if the digest could not be computed.
*/
protected String computeFullBlobMd5() {
byte[] digest = null;
String fullBlobMd5 = null;
try {
// Clone the MessageDigest to avoid resetting the original state
MessageDigest clonedMd5
= (MessageDigest) getAbfsOutputStream().getFullBlobContentMd5()
.clone();
digest = clonedMd5.digest();
} catch (CloneNotSupportedException e) {
LOG.warn("Failed to clone MessageDigest instance", e);
}
if (digest != null && digest.length != 0) {
fullBlobMd5 = Base64.getEncoder().encodeToString(digest);
}
return fullBlobMd5;
}
}