AzureDFSIngressHandler.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs.services;
import java.io.IOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
import org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters;
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
import org.apache.hadoop.fs.store.DataBlocks;
import org.apache.hadoop.io.IOUtils;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APPEND_ACTION;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.DFS_APPEND;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.DFS_FLUSH;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING;
/**
* The BlobFsOutputStream for Rest AbfsClient.
*/
public class AzureDFSIngressHandler extends AzureIngressHandler {
private static final Logger LOG = LoggerFactory.getLogger(
AbfsOutputStream.class);
private AzureDFSBlockManager dfsBlockManager;
private final AbfsDfsClient dfsClient;
private String eTag;
/**
* Constructs an AzureDFSIngressHandler.
*
* @param abfsOutputStream the AbfsOutputStream instance.
* @param clientHandler the AbfsClientHandler instance.
*/
public AzureDFSIngressHandler(AbfsOutputStream abfsOutputStream,
AbfsClientHandler clientHandler) {
super(abfsOutputStream);
this.dfsClient = clientHandler.getDfsClient();
}
/**
* Constructs an AzureDFSIngressHandler with specified parameters.
*
* @param abfsOutputStream the AbfsOutputStream.
* @param blockFactory the block factory.
* @param bufferSize the buffer size.
* @param eTag the eTag.
* @param clientHandler the client handler.
*/
public AzureDFSIngressHandler(AbfsOutputStream abfsOutputStream,
DataBlocks.BlockFactory blockFactory,
int bufferSize, String eTag, AbfsClientHandler clientHandler) {
this(abfsOutputStream, clientHandler);
this.eTag = eTag;
this.dfsBlockManager = new AzureDFSBlockManager(abfsOutputStream,
blockFactory, bufferSize);
LOG.trace(
"Created a new DFSIngress Handler for AbfsOutputStream instance {} for path {}",
abfsOutputStream.getStreamID(), abfsOutputStream.getPath());
}
/**
* Buffers data into the specified block.
*
* @param block the block to buffer data into.
* @param data the data to be buffered.
* @param off the start offset in the data.
* @param length the number of bytes to buffer.
* @return the number of bytes buffered.
* @throws IOException if an I/O error occurs.
*/
@Override
public int bufferData(AbfsBlock block,
final byte[] data,
final int off,
final int length)
throws IOException {
LOG.trace("Buffering data of length {} to block at offset {}", length, off);
return block.write(data, off, length);
}
/**
* Performs a remote write operation.
*
* @param blockToUpload the block to upload.
* @param uploadData the data to upload.
* @param reqParams the request parameters.
* @param tracingContext the tracing context.
* @return the resulting AbfsRestOperation.
* @throws IOException if an I/O error occurs.
*/
@Override
protected AbfsRestOperation remoteWrite(AbfsBlock blockToUpload,
DataBlocks.BlockUploadData uploadData,
AppendRequestParameters reqParams,
TracingContext tracingContext) throws IOException {
TracingContext tracingContextAppend = new TracingContext(tracingContext);
String threadIdStr = String.valueOf(Thread.currentThread().getId());
if (tracingContextAppend.getIngressHandler().equals(EMPTY_STRING)) {
tracingContextAppend.setIngressHandler(DFS_APPEND + " T " + threadIdStr);
tracingContextAppend.setPosition(
String.valueOf(blockToUpload.getOffset()));
}
LOG.trace("Starting remote write for block with offset {} and path {}",
blockToUpload.getOffset(),
getAbfsOutputStream().getPath());
return getClient().append(getAbfsOutputStream().getPath(),
uploadData.toByteArray(), reqParams,
getAbfsOutputStream().getCachedSasTokenString(),
getAbfsOutputStream().getContextEncryptionAdapter(),
tracingContextAppend);
}
/**
* Method to perform a remote write operation for appending data to an append blob in Azure Blob Storage.
*
* <p>This method is intended to be implemented by subclasses to handle the specific
* case of appending data to an append blob. It takes in the path of the append blob,
* the data to be uploaded, the block of data, and additional parameters required for
* the append operation.</p>
*
* @param path The path of the append blob to which data is to be appended.
* @param uploadData The data to be uploaded as part of the append operation.
* @param block The block of data to append.
* @param reqParams The additional parameters required for the append operation.
* @param tracingContext The tracing context for the operation.
* @return An {@link AbfsRestOperation} object representing the remote write operation.
* @throws IOException If an I/O error occurs during the append operation.
*/
@Override
protected AbfsRestOperation remoteAppendBlobWrite(String path, DataBlocks.BlockUploadData uploadData,
AbfsBlock block, AppendRequestParameters reqParams,
TracingContext tracingContext) throws IOException {
return remoteWrite(block, uploadData, reqParams, tracingContext);
}
/**
* Flushes data to the remote store.
*
* @param offset the offset to flush.
* @param retainUncommitedData whether to retain uncommitted data.
* @param isClose whether this is a close operation.
* @param leaseId the lease ID.
* @param tracingContext the tracing context.
* @return the resulting AbfsRestOperation.
* @throws IOException if an I/O error occurs.
*/
@Override
protected synchronized AbfsRestOperation remoteFlush(final long offset,
final boolean retainUncommitedData,
final boolean isClose,
final String leaseId,
TracingContext tracingContext)
throws IOException {
TracingContext tracingContextFlush = new TracingContext(tracingContext);
if (tracingContextFlush.getIngressHandler().equals(EMPTY_STRING)) {
tracingContextFlush.setIngressHandler(DFS_FLUSH);
tracingContextFlush.setPosition(String.valueOf(offset));
}
String fullBlobMd5 = computeFullBlobMd5();
LOG.trace("Flushing data at offset {} and path {}", offset, getAbfsOutputStream().getPath());
AbfsRestOperation op;
try {
op = getClient()
.flush(getAbfsOutputStream().getPath(), offset, retainUncommitedData,
isClose,
getAbfsOutputStream().getCachedSasTokenString(), leaseId,
getAbfsOutputStream().getContextEncryptionAdapter(),
tracingContextFlush, fullBlobMd5);
} catch (AbfsRestOperationException ex) {
LOG.error("Error in remote flush for path {} and offset {}",
getAbfsOutputStream().getPath(), offset, ex);
throw ex;
} finally {
getAbfsOutputStream().getFullBlobContentMd5().reset();
}
return op;
}
/**
* Appending the current active data block to the service. Clearing the active
* data block and releasing all buffered data.
*
* @throws IOException if there is any failure while starting an upload for
* the data block or while closing the BlockUploadData.
*/
@Override
protected void writeAppendBlobCurrentBufferToService() throws IOException {
AbfsBlock activeBlock = dfsBlockManager.getActiveBlock();
// No data, return immediately.
if (!getAbfsOutputStream().hasActiveBlockDataToUpload()) {
return;
}
// Prepare data for upload.
final int bytesLength = activeBlock.dataSize();
DataBlocks.BlockUploadData uploadData = activeBlock.startUpload();
// Clear active block and update statistics.
if (dfsBlockManager.hasActiveBlock()) {
dfsBlockManager.clearActiveBlock();
}
getAbfsOutputStream().getOutputStreamStatistics().writeCurrentBuffer();
getAbfsOutputStream().getOutputStreamStatistics().bytesToUpload(bytesLength);
// Update the stream position.
final long offset = getAbfsOutputStream().getPosition();
getAbfsOutputStream().setPosition(offset + bytesLength);
// Perform the upload within a performance tracking context.
try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(
dfsClient.getAbfsPerfTracker(),
"writeCurrentBufferToService", APPEND_ACTION)) {
LOG.trace("Writing current buffer to service at offset {} and path {}", offset, getAbfsOutputStream().getPath());
AppendRequestParameters reqParams = new AppendRequestParameters(
offset, 0, bytesLength, AppendRequestParameters.Mode.APPEND_MODE,
true, getAbfsOutputStream().getLeaseId(),
getAbfsOutputStream().isExpectHeaderEnabled(),
getAbfsOutputStream().getMd5());
// Perform the remote write operation.
AbfsRestOperation op = remoteWrite(activeBlock, uploadData, reqParams,
new TracingContext(getAbfsOutputStream().getTracingContext()));
// Update the SAS token and log the successful upload.
getAbfsOutputStream().getCachedSasToken().update(op.getSasToken());
getAbfsOutputStream().getOutputStreamStatistics().uploadSuccessful(bytesLength);
// Register performance information.
perfInfo.registerResult(op.getResult());
perfInfo.registerSuccess(true);
} catch (Exception ex) {
LOG.error("Failed to upload current buffer of length {} and path {}", bytesLength, getAbfsOutputStream().getPath(), ex);
getAbfsOutputStream().getOutputStreamStatistics().uploadFailed(bytesLength);
getAbfsOutputStream().failureWhileSubmit(ex);
} finally {
// Ensure the upload data stream is closed.
IOUtils.closeStreams(uploadData, activeBlock);
}
}
/**
* Gets the block manager.
*
* @return the block manager.
*/
@Override
public AzureBlockManager getBlockManager() {
return dfsBlockManager;
}
/**
* Gets the dfs client.
*
* @return the dfs client.
*/
@Override
public AbfsDfsClient getClient() {
return dfsClient;
}
/**
* Gets the eTag value of the blob.
*
* @return the eTag.
*/
@Override
public String getETag() {
return eTag;
}
}