AbfsOutputStream.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs.services;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.Base64;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.Future;
import java.util.UUID;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.hadoop.fs.azurebfs.constants.AbfsServiceType;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidConfigurationValueException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidIngressServiceException;
import org.apache.hadoop.fs.azurebfs.security.ContextEncryptionAdapter;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.fs.impl.BackReference;
import org.apache.hadoop.util.Preconditions;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListeningExecutorService;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.MoreExecutors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.PathIOException;
import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
import org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters;
import org.apache.hadoop.fs.azurebfs.utils.CachedSASToken;
import org.apache.hadoop.fs.azurebfs.utils.Listener;
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
import org.apache.hadoop.fs.store.DataBlocks;
import org.apache.hadoop.fs.FileSystem.Statistics;
import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.fs.Syncable;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APPEND_ACTION;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.MD5;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.STREAM_ID_LEN;
import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_WRITE_WITHOUT_LEASE;
import static org.apache.hadoop.fs.impl.StoreImplementationUtils.isProbeForSyncable;
import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
import static org.apache.hadoop.io.IOUtils.wrapException;
import static org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters.Mode.APPEND_MODE;
import static org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters.Mode.FLUSH_CLOSE_MODE;
import static org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters.Mode.FLUSH_MODE;
import static org.apache.hadoop.util.Preconditions.checkState;
/**
* The BlobFsOutputStream for Rest AbfsClient.
*/
public class AbfsOutputStream extends OutputStream implements Syncable,
StreamCapabilities, IOStatisticsSource {
private volatile AbfsClient client;
private final String path;
/** The position in the file being uploaded, where the next block would be
* uploaded.
* This is used in constructing the AbfsClient requests to ensure that,
* even if blocks are uploaded out of order, they are reassembled in
* correct order.
* */
private long position;
private boolean closed;
private boolean supportFlush;
private boolean disableOutputStreamFlush;
private boolean enableSmallWriteOptimization;
private boolean isAppendBlob;
private boolean isExpectHeaderEnabled;
private volatile IOException lastError;
private long lastFlushOffset;
private long lastTotalAppendOffset = 0;
private final int bufferSize;
private byte[] buffer;
private int bufferIndex;
private int numOfAppendsToServerSinceLastFlush;
private final int maxConcurrentRequestCount;
private final int maxRequestsThatCanBeQueued;
private ConcurrentLinkedDeque<WriteOperation> writeOperations;
private final ContextEncryptionAdapter contextEncryptionAdapter;
// SAS tokens can be re-used until they expire
private CachedSASToken cachedSasToken;
private final String outputStreamId;
private final TracingContext tracingContext;
private Listener listener;
private AbfsLease lease;
private String leaseId;
private final Statistics statistics;
private final AbfsOutputStreamStatistics outputStreamStatistics;
private IOStatistics ioStatistics;
private static final Logger LOG =
LoggerFactory.getLogger(AbfsOutputStream.class);
/** Factory for blocks. */
private final DataBlocks.BlockFactory blockFactory;
/** Count of blocks uploaded. */
private long blockCount = 0;
/** Executor service to carry out the parallel upload requests. */
private final ListeningExecutorService executorService;
/** The etag of the blob. */
private final String eTag;
/** ABFS instance to be held by the output stream to avoid GC close. */
private final BackReference fsBackRef;
/** The service type at initialization. */
private final AbfsServiceType serviceTypeAtInit;
/** Indicates whether DFS to Blob fallback is enabled. */
private final boolean isDFSToBlobFallbackEnabled;
/** The current executing service type. */
private AbfsServiceType currentExecutingServiceType;
/** The handler for managing Azure ingress, marked as volatile to ensure visibility across threads. */
private volatile AzureIngressHandler ingressHandler;
/** The handler for managing Abfs client operations. */
private final AbfsClientHandler clientHandler;
/**
* The `MessageDigest` instance used for computing the incremental MD5 hash
* of the data written so far. This is updated as data is written to the stream.
*/
private MessageDigest md5 = null;
/**
* The `MessageDigest` instance used for computing the MD5 hash
* of the full blob content. This is updated with all data written to the stream
* and represents the complete MD5 checksum of the blob.
*/
private MessageDigest fullBlobContentMd5 = null;
public AbfsOutputStream(AbfsOutputStreamContext abfsOutputStreamContext)
throws IOException {
this.statistics = abfsOutputStreamContext.getStatistics();
this.path = abfsOutputStreamContext.getPath();
this.position = abfsOutputStreamContext.getPosition();
this.closed = false;
this.supportFlush = abfsOutputStreamContext.isEnableFlush();
this.isExpectHeaderEnabled = abfsOutputStreamContext.isExpectHeaderEnabled();
this.disableOutputStreamFlush = abfsOutputStreamContext
.isDisableOutputStreamFlush();
this.enableSmallWriteOptimization
= abfsOutputStreamContext.isSmallWriteSupported();
this.isAppendBlob = abfsOutputStreamContext.isAppendBlob();
this.lastError = null;
this.lastFlushOffset = 0;
this.bufferSize = abfsOutputStreamContext.getWriteBufferSize();
this.bufferIndex = 0;
this.numOfAppendsToServerSinceLastFlush = 0;
this.writeOperations = new ConcurrentLinkedDeque<>();
this.outputStreamStatistics = abfsOutputStreamContext.getStreamStatistics();
this.fsBackRef = abfsOutputStreamContext.getFsBackRef();
this.contextEncryptionAdapter = abfsOutputStreamContext.getEncryptionAdapter();
this.eTag = abfsOutputStreamContext.getETag();
if (this.isAppendBlob) {
this.maxConcurrentRequestCount = 1;
} else {
this.maxConcurrentRequestCount = abfsOutputStreamContext
.getWriteMaxConcurrentRequestCount();
}
this.maxRequestsThatCanBeQueued = abfsOutputStreamContext
.getMaxWriteRequestsToQueue();
this.lease = abfsOutputStreamContext.getLease();
this.leaseId = abfsOutputStreamContext.getLeaseId();
this.executorService =
MoreExecutors.listeningDecorator(abfsOutputStreamContext.getExecutorService());
this.cachedSasToken = new CachedSASToken(
abfsOutputStreamContext.getSasTokenRenewPeriodForStreamsInSeconds());
this.outputStreamId = createOutputStreamId();
this.tracingContext = new TracingContext(abfsOutputStreamContext.getTracingContext());
this.tracingContext.setStreamID(outputStreamId);
this.tracingContext.setOperation(FSOperationType.WRITE);
this.ioStatistics = outputStreamStatistics.getIOStatistics();
this.blockFactory = abfsOutputStreamContext.getBlockFactory();
this.isDFSToBlobFallbackEnabled
= abfsOutputStreamContext.isDFSToBlobFallbackEnabled();
this.serviceTypeAtInit = abfsOutputStreamContext.getIngressServiceType();
this.currentExecutingServiceType = abfsOutputStreamContext.getIngressServiceType();
this.clientHandler = abfsOutputStreamContext.getClientHandler();
createIngressHandler(serviceTypeAtInit,
abfsOutputStreamContext.getBlockFactory(), bufferSize, false, null);
try {
md5 = MessageDigest.getInstance(MD5);
fullBlobContentMd5 = MessageDigest.getInstance(MD5);
} catch (NoSuchAlgorithmException e) {
if (client.isChecksumValidationEnabled()) {
throw new IOException("MD5 algorithm not available", e);
}
}
}
/**
* Retrieves the current ingress handler.
*
* @return the current {@link AzureIngressHandler}.
*/
public AzureIngressHandler getIngressHandler() {
return ingressHandler;
}
private final Lock lock = new ReentrantLock();
private volatile boolean switchCompleted = false;
/**
* Creates or retrieves an existing Azure ingress handler based on the service type and provided parameters.
* <p>
* If the `ingressHandler` is already initialized and the switch operation is complete, the existing
* handler is returned without acquiring a lock to minimize performance overhead.
* If the `ingressHandler` is initialized but the switch is incomplete, a lock is acquired to safely modify
* or create a new handler. Double-checked locking is used to ensure thread safety while minimizing the
* time spent in the critical section.
* If the `ingressHandler` is `null`, the handler is safely initialized outside of the lock as no other
* thread would be modifying it.
* </p>
*
* @param serviceType The type of Azure service to handle (e.g., ABFS, Blob, etc.).
* @param blockFactory The factory to create data blocks used in the handler.
* @param bufferSize The buffer size used by the handler for data processing.
* @param isSwitch A flag indicating whether a switch operation is in progress.
* @param blockManager The manager responsible for handling blocks of data during processing.
*
* @return The initialized or existing Azure ingress handler.
* @throws IOException If an I/O error occurs during handler creation or data processing.
*/
private AzureIngressHandler createIngressHandler(AbfsServiceType serviceType,
DataBlocks.BlockFactory blockFactory,
int bufferSize, boolean isSwitch, AzureBlockManager blockManager) throws IOException {
if (ingressHandler != null) {
if (switchCompleted) {
return ingressHandler; // Return the handler if it's already initialized and the switch is completed
}
// If the switch is incomplete, lock to safely modify
lock.lock();
try {
// Double-check the condition after acquiring the lock
if (switchCompleted) {
return ingressHandler; // Return the handler if it's now completed
}
// If the switch is still incomplete, create a new handler
return createNewHandler(serviceType, blockFactory, bufferSize, isSwitch, blockManager);
} finally {
lock.unlock();
}
}
// If ingressHandler is null, no lock is needed; safely initialize it outside the lock
return createNewHandler(serviceType, blockFactory, bufferSize, isSwitch, blockManager);
}
// Helper method to create a new handler, used in both scenarios (locked and unlocked)
private AzureIngressHandler createNewHandler(AbfsServiceType serviceType,
DataBlocks.BlockFactory blockFactory,
int bufferSize,
boolean isSwitch,
AzureBlockManager blockManager) throws IOException {
this.client = clientHandler.getClient(serviceType);
if (isDFSToBlobFallbackEnabled && serviceTypeAtInit != AbfsServiceType.DFS) {
throw new InvalidConfigurationValueException(
"The ingress service type must be configured as DFS");
}
if (isDFSToBlobFallbackEnabled && !isSwitch) {
ingressHandler = new AzureDfsToBlobIngressFallbackHandler(this,
blockFactory, bufferSize, eTag, clientHandler);
} else if (serviceType == AbfsServiceType.BLOB) {
ingressHandler = new AzureBlobIngressHandler(this, blockFactory,
bufferSize, eTag, clientHandler, blockManager);
} else {
ingressHandler = new AzureDFSIngressHandler(this, blockFactory,
bufferSize, eTag, clientHandler);
}
if (isSwitch) {
switchCompleted = true;
}
return ingressHandler;
}
/**
* Switches the current ingress handler and service type if necessary.
*
* @throws IOException if there is an error creating the new ingress handler.
*/
protected void switchHandler() throws IOException {
if (serviceTypeAtInit != currentExecutingServiceType) {
LOG.debug("Handler switch not required as serviceTypeAtInit {} is different from currentExecutingServiceType {}. "
+ "This check prevents the handler from being switched more than once.",
serviceTypeAtInit, currentExecutingServiceType);
return;
}
if (serviceTypeAtInit == AbfsServiceType.BLOB) {
currentExecutingServiceType = AbfsServiceType.DFS;
} else {
currentExecutingServiceType = AbfsServiceType.BLOB;
}
LOG.info("Switching ingress handler to different service type: {}", currentExecutingServiceType);
ingressHandler = createIngressHandler(currentExecutingServiceType,
blockFactory, bufferSize, true, getBlockManager());
}
/**
* Buffers data in the given block.
*
* @param block the block to buffer data into.
* @param data the data to buffer.
* @param off the offset in the data array.
* @param length the length of data to buffer.
* @return the number of bytes buffered.
* @throws IOException if there is an error buffering the data.
*/
private int bufferData(AbfsBlock block,
final byte[] data,
final int off,
final int length)
throws IOException {
return getIngressHandler().bufferData(block, data, off, length);
}
/**
* Performs a remote write operation.
*
* @param blockToUpload the block to upload.
* @param uploadData the data to upload.
* @param reqParams the parameters for the append request.
* @param tracingContext the tracing context for the operation.
* @return the result of the remote write operation.
* @throws IOException if there is an error during the remote write.
*/
private AbfsRestOperation remoteWrite(AbfsBlock blockToUpload,
DataBlocks.BlockUploadData uploadData,
AppendRequestParameters reqParams,
TracingContext tracingContext)
throws IOException {
return getIngressHandler().remoteWrite(blockToUpload, uploadData, reqParams,
tracingContext);
}
/**
* Flushes data remotely.
*
* @param offset the offset to flush.
* @param retainUncommitedData whether to retain uncommitted data.
* @param isClose whether this is a close operation.
* @param leaseId the lease ID for the operation.
* @param tracingContext the tracing context for the operation.
* @return the result of the remote flush operation.
* @throws IOException if there is an error during the remote flush.
*/
private AbfsRestOperation remoteFlush(final long offset,
final boolean retainUncommitedData,
final boolean isClose,
final String leaseId,
TracingContext tracingContext)
throws IOException {
return getIngressHandler().remoteFlush(offset, retainUncommitedData,
isClose, leaseId, tracingContext);
}
/**
* Creates a new output stream ID.
*
* @return the newly created output stream ID.
*/
private String createOutputStreamId() {
return StringUtils.right(UUID.randomUUID().toString(), STREAM_ID_LEN);
}
/**
* Query the stream for a specific capability.
*
* @param capability string to query the stream support for.
* @return true for hsync and hflush.
*/
@Override
public boolean hasCapability(String capability) {
return supportFlush && isProbeForSyncable(capability);
}
/**
* Writes the specified byte to this output stream. The general contract for
* write is that one byte is written to the output stream. The byte to be
* written is the eight low-order bits of the argument b. The 24 high-order
* bits of b are ignored.
*
* @param byteVal the byteValue to write.
* @throws IOException if an I/O error occurs. In particular, an IOException may be
* thrown if the output stream has been closed.
*/
@Override
public void write(final int byteVal) throws IOException {
write(new byte[]{(byte) (byteVal & 0xFF)});
}
/**
* Writes length bytes from the specified byte array starting at off to
* this output stream.
*
* @param data the byte array to write.
* @param off the start off in the data.
* @param length the number of bytes to write.
* @throws IOException if an I/O error occurs. In particular, an IOException may be
* thrown if the output stream has been closed.
*/
@Override
public synchronized void write(final byte[] data, final int off, final int length)
throws IOException {
if (closed) {
throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
}
// validate if data is not null and index out of bounds.
DataBlocks.validateWriteArgs(data, off, length);
maybeThrowLastError();
if (off < 0 || length < 0 || length > data.length - off) {
throw new IndexOutOfBoundsException();
}
if (hasLease() && isLeaseFreed()) {
throw new PathIOException(path, ERR_WRITE_WITHOUT_LEASE);
}
if (length == 0) {
LOG.debug("No data to write, length is 0 for path: {}", path);
return;
}
AbfsBlock block = createBlockIfNeeded(position);
int written = bufferData(block, data, off, length);
// Update the incremental MD5 hash with the written data.
getMessageDigest().update(data, off, written);
// Update the full blob MD5 hash with the written data.
getFullBlobContentMd5().update(data, off, written);
int remainingCapacity = block.remainingCapacity();
if (written < length) {
// Number of bytes to write is more than the data block capacity,
// trigger an upload and then write on the next block.
LOG.debug("writing more data than block capacity -triggering upload");
uploadCurrentBlock();
// tail recursion is mildly expensive, but given buffer sizes must be MB.
// it's unlikely to recurse very deeply.
this.write(data, off + written, length - written);
} else {
if (remainingCapacity == 0) {
// the whole buffer is done, trigger an upload
uploadCurrentBlock();
}
}
incrementWriteOps();
}
/**
* Demand create a destination block.
*
* @return the active block; null if there isn't one.
* @throws IOException on any failure to create
*/
private synchronized AbfsBlock createBlockIfNeeded(long position)
throws IOException {
return getBlockManager().createBlock(position);
}
/**
* Start an asynchronous upload of the current block.
*
* @throws IOException Problems opening the destination for upload,
* initializing the upload, or if a previous operation has failed.
*/
private synchronized void uploadCurrentBlock() throws IOException {
checkState(getBlockManager().hasActiveBlock(),
"No active block");
LOG.debug("Writing block # {}", getBlockManager().getBlockCount());
try {
uploadBlockAsync(getBlockManager().getActiveBlock(),
false, false);
} finally {
if (getBlockManager().hasActiveBlock()) {
// set the block to null, so the next write will create a new block.
getBlockManager().clearActiveBlock();
}
}
}
/**
* Upload a block of data.
* This will take the block.
*
* @param blockToUpload block to upload.
* @throws IOException upload failure
*/
private void uploadBlockAsync(AbfsBlock blockToUpload,
boolean isFlush, boolean isClose)
throws IOException {
if (this.isAppendBlob) {
getIngressHandler().writeAppendBlobCurrentBufferToService();
return;
}
if (!blockToUpload.hasData()) {
return;
}
numOfAppendsToServerSinceLastFlush++;
final int bytesLength = blockToUpload.dataSize();
final long offset = position;
position += bytesLength;
outputStreamStatistics.bytesToUpload(bytesLength);
outputStreamStatistics.writeCurrentBuffer();
DataBlocks.BlockUploadData blockUploadData = blockToUpload.startUpload();
String md5Hash = getMd5();
final Future<Void> job =
executorService.submit(() -> {
AbfsPerfTracker tracker =
getClient().getAbfsPerfTracker();
try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker,
"writeCurrentBufferToService", APPEND_ACTION)) {
AppendRequestParameters.Mode
mode = APPEND_MODE;
if (isFlush & isClose) {
mode = FLUSH_CLOSE_MODE;
} else if (isFlush) {
mode = FLUSH_MODE;
}
/*
* Parameters Required for an APPEND call.
* offset(here) - refers to the position in the file.
* bytesLength - Data to be uploaded from the block.
* mode - If it's append, flush or flush_close.
* leaseId - The AbfsLeaseId for this request.
*/
AppendRequestParameters reqParams = new AppendRequestParameters(
offset, 0, bytesLength, mode, false, leaseId, isExpectHeaderEnabled, md5Hash);
AbfsRestOperation op;
try {
op = remoteWrite(blockToUpload, blockUploadData, reqParams, tracingContext);
} catch (InvalidIngressServiceException ex) {
LOG.debug("InvalidIngressServiceException caught for path: {}, switching handler and retrying remoteWrite.", getPath());
switchHandler();
// retry the operation with switched handler.
op = remoteWrite(blockToUpload, blockUploadData, reqParams, tracingContext);
}
cachedSasToken.update(op.getSasToken());
perfInfo.registerResult(op.getResult());
perfInfo.registerSuccess(true);
outputStreamStatistics.uploadSuccessful(bytesLength);
return null;
} finally {
cleanupWithLogger(LOG, blockUploadData, blockToUpload);
}
});
writeOperations.add(new WriteOperation(job, offset, bytesLength));
// Try to shrink the queue
shrinkWriteOperationQueue();
}
/**
* A method to set the lastError if an exception is caught.
* @param ex Exception caught.
* @throws IOException Throws the lastError.
*/
void failureWhileSubmit(Exception ex) throws IOException {
if (ex instanceof AbfsRestOperationException) {
if (((AbfsRestOperationException) ex).getStatusCode()
== HttpURLConnection.HTTP_NOT_FOUND) {
throw new FileNotFoundException(ex.getMessage());
}
}
if (ex instanceof IOException) {
lastError = (IOException) ex;
} else {
lastError = new IOException(ex);
}
throw lastError;
}
/**
* Is there an active block and is there any data in it to upload?
*
* @return true if there is some data to upload in an active block else false.
*/
boolean hasActiveBlockDataToUpload() {
AzureBlockManager blockManager = getBlockManager();
AbfsBlock activeBlock = blockManager.getActiveBlock();
return blockManager.hasActiveBlock() && activeBlock.hasData();
}
/**
* Increment Write Operations.
*/
private void incrementWriteOps() {
if (statistics != null) {
statistics.incrementWriteOps(1);
}
}
/**
* Throw the last error recorded if not null.
* After the stream is closed, this is always set to
* an exception, so acts as a guard against method invocation once
* closed.
* @throws IOException if lastError is set
*/
private void maybeThrowLastError() throws IOException {
if (lastError != null) {
throw lastError;
}
}
/**
* Flushes this output stream and forces any buffered output bytes to be
* written out. If any data remains in the payload it is committed to the
* service. Data is queued for writing and forced out to the service
* before the call returns.
*/
@Override
public void flush() throws IOException {
if (!disableOutputStreamFlush) {
flushInternalAsync();
}
}
/** Similar to posix fsync, flush out the data in client's user buffer
* all the way to the disk device (but the disk may have it in its cache).
* @throws IOException if error occurs
*/
@Override
public void hsync() throws IOException {
if (supportFlush) {
flushInternal(false);
}
}
/** Flush out the data in client's user buffer. After the return of
* this call, new readers will see the data.
* @throws IOException if any error occurs
*/
@Override
public void hflush() throws IOException {
if (supportFlush) {
flushInternal(false);
}
}
/**
* Retrieves the stream ID associated with this output stream.
*
* @return the stream ID of this output stream.
*/
public String getStreamID() {
return outputStreamId;
}
/**
* Registers a listener for this output stream.
*
* @param listener1 the listener to register.
*/
public void registerListener(Listener listener1) {
listener = listener1;
tracingContext.setListener(listener);
}
/**
* Force all data in the output stream to be written to Azure storage.
* Wait to return until this is complete. Close the access to the stream and
* shutdown the upload thread pool.
* If the blob was created, its lease will be released.
* Any error encountered caught in threads and stored will be rethrown here
* after cleanup.
*/
@Override
public synchronized void close() throws IOException {
if (closed) {
return;
}
try {
// Check if Executor Service got shutdown before the writes could be
// completed.
if (hasActiveBlockDataToUpload() && executorService.isShutdown()) {
throw new PathIOException(path, "Executor Service closed before "
+ "writes could be completed.");
}
flushInternal(true);
} catch (IOException e) {
// Problems surface in try-with-resources clauses if
// the exception thrown in a close == the one already thrown
// -so we wrap any exception with a new one.
// See HADOOP-16785
throw wrapException(path, e.getMessage(), e);
} finally {
if (contextEncryptionAdapter != null) {
contextEncryptionAdapter.destroy();
}
if (hasLease()) {
lease.free();
lease = null;
}
lastError = new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
buffer = null;
bufferIndex = 0;
closed = true;
writeOperations.clear();
getBlockManager().close();
}
LOG.debug("Closing AbfsOutputStream : {}", this);
}
/**
* Flushes the buffered data to the Azure Blob Storage service.
* This method checks if a small write optimization can be applied, and if so, delegates
* the flush operation to {@link #smallWriteOptimizedflushInternal(boolean)}.
* Otherwise, it uploads the active block synchronously, flushes the written bytes to
* the service, and resets the number of appends to the server since the last flush.
*
* @param isClose indicates whether this flush operation is part of a close operation.
* @throws IOException if an I/O error occurs during the flush operation.
*/
private synchronized void flushInternal(boolean isClose) throws IOException {
maybeThrowLastError();
// if its a flush post write < buffersize, send flush parameter in append
if (!isAppendBlob
&& enableSmallWriteOptimization
&& (numOfAppendsToServerSinceLastFlush == 0) // there are no ongoing store writes
&& (writeOperations.size() == 0) // double checking no appends in progress
&& hasActiveBlockDataToUpload()) { // there is
// some data that is pending to be written
smallWriteOptimizedflushInternal(isClose);
return;
}
if (hasActiveBlockDataToUpload()) {
uploadCurrentBlock();
}
flushWrittenBytesToService(isClose);
numOfAppendsToServerSinceLastFlush = 0;
}
/**
* Flushes the buffered data to the Azure Blob Storage service with small write optimization.
* This method uploads the active block asynchronously, waits for appends to complete, shrinks
* the write operation queue, checks for any previous errors, and resets the number of appends
* to the server since the last flush.
*
* @param isClose indicates whether this flush operation is part of a close operation.
* @throws IOException if an I/O error occurs during the flush operation.
*/
private synchronized void smallWriteOptimizedflushInternal(boolean isClose) throws IOException {
// writeCurrentBufferToService will increment numOfAppendsToServerSinceLastFlush
uploadBlockAsync(getBlockManager().getActiveBlock(),
true, isClose);
waitForAppendsToComplete();
shrinkWriteOperationQueue();
maybeThrowLastError();
numOfAppendsToServerSinceLastFlush = 0;
}
/**
* Asynchronously flushes the buffered data to the Azure Blob Storage service.
* This method checks for any previous errors, uploads the current block if needed,
* waits for appends to complete, and then performs an async flush operation.
*
* @throws IOException if an I/O error occurs during the flush operation.
*/
private synchronized void flushInternalAsync() throws IOException {
maybeThrowLastError();
// Upload the current block if there is active block data.
if (hasActiveBlockDataToUpload()) {
uploadCurrentBlock();
}
waitForAppendsToComplete();
flushWrittenBytesToServiceAsync();
}
/**
* Waits for all write operations (appends) to complete.
* This method iterates through the list of write operations and waits for their tasks
* to finish. If an error occurs during the operation, it is handled appropriately.
*
* @throws IOException if an I/O error occurs while waiting for appends to complete.
*/
private synchronized void waitForAppendsToComplete() throws IOException {
for (WriteOperation writeOperation : writeOperations) {
try {
// Wait for the write operation task to complete.
writeOperation.task.get();
} catch (Exception ex) {
outputStreamStatistics.uploadFailed(writeOperation.length);
if (ex.getCause() instanceof AbfsRestOperationException) {
if (((AbfsRestOperationException) ex.getCause()).getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) {
throw new FileNotFoundException(ex.getMessage());
}
}
if (ex.getCause() instanceof AzureBlobFileSystemException) {
ex = (AzureBlobFileSystemException) ex.getCause();
}
lastError = new IOException(ex);
throw lastError;
}
}
}
/**
* Flushes the written bytes to the Azure Blob Storage service, ensuring all
* appends are completed. This method is typically called during a close operation.
*
* @param isClose indicates whether this flush is happening as part of a close operation.
* @throws IOException if an I/O error occurs during the flush operation.
*/
private synchronized void flushWrittenBytesToService(boolean isClose) throws IOException {
// Ensure all appends are completed before flushing.
waitForAppendsToComplete();
// Flush the written bytes to the service.
flushWrittenBytesToServiceInternal(position, false, isClose);
}
/**
* Asynchronously flushes the written bytes to the Azure Blob Storage service.
* This method ensures that the write operation queue is managed and only flushes
* if there are uncommitted data beyond the last flush offset.
*
* @throws IOException if an I/O error occurs during the flush operation.
*/
private synchronized void flushWrittenBytesToServiceAsync() throws IOException {
// Manage the write operation queue to ensure efficient writes
shrinkWriteOperationQueue();
// Only flush if there are uncommitted data beyond the last flush offset
if (this.lastTotalAppendOffset > this.lastFlushOffset) {
this.flushWrittenBytesToServiceInternal(this.lastTotalAppendOffset, true,
false/*Async flush on close not permitted*/);
}
}
/**
* Flushes the written bytes to the Azure Blob Storage service.
*
* @param offset the offset up to which data needs to be flushed.
* @param retainUncommitedData whether to retain uncommitted data after flush.
* @param isClose whether this flush is happening as part of a close operation.
* @throws IOException if an I/O error occurs.
*/
private synchronized void flushWrittenBytesToServiceInternal(final long offset,
final boolean retainUncommitedData, final boolean isClose) throws IOException {
// flush is called for appendblob only on close
if (this.isAppendBlob && !isClose) {
return;
}
// Tracker to monitor performance metrics
AbfsPerfTracker tracker = client.getAbfsPerfTracker();
try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker,
"flushWrittenBytesToServiceInternal", "flush")) {
AbfsRestOperation op;
try {
// Attempt to flush data to the remote service.
op = remoteFlush(offset, retainUncommitedData, isClose, leaseId,
tracingContext);
} catch (InvalidIngressServiceException ex) {
LOG.debug("InvalidIngressServiceException caught for path: {}, switching handler and retrying remoteFlush.", getPath());
// If an invalid ingress service is encountered, switch handler and retry.
switchHandler();
op = remoteFlush(offset, retainUncommitedData, isClose, leaseId,
tracingContext);
} catch (AzureBlobFileSystemException ex) {
// Handle specific Azure Blob FileSystem exceptions
if (ex instanceof AbfsRestOperationException
&& ((AbfsRestOperationException) ex).getStatusCode()
== HttpURLConnection.HTTP_NOT_FOUND) {
throw new FileNotFoundException(ex.getMessage());
}
// Store the last error and rethrow it
lastError = new IOException(ex);
throw lastError;
}
if (op != null) {
// Update the cached SAS token if the operation was successful
cachedSasToken.update(op.getSasToken());
// Register the result and mark the operation as successful
perfInfo.registerResult(op.getResult()).registerSuccess(true);
}
// Update the last flush offset
this.lastFlushOffset = offset;
}
}
/**
* Try to remove the completed write operations from the beginning of write
* operation FIFO queue.
*/
private synchronized void shrinkWriteOperationQueue() throws IOException {
try {
WriteOperation peek = writeOperations.peek();
while (peek != null && peek.task.isDone()) {
peek.task.get();
lastTotalAppendOffset += peek.length;
writeOperations.remove();
peek = writeOperations.peek();
// Incrementing statistics to indicate queue has been shrunk.
outputStreamStatistics.queueShrunk();
}
} catch (Exception e) {
if (e.getCause() instanceof AzureBlobFileSystemException) {
lastError = (AzureBlobFileSystemException) e.getCause();
} else {
lastError = new IOException(e);
}
throw lastError;
}
}
private static class WriteOperation {
private final Future<Void> task;
private final long startOffset;
private final long length;
WriteOperation(final Future<Void> task, final long startOffset, final long length) {
Preconditions.checkNotNull(task, "task");
Preconditions.checkArgument(startOffset >= 0, "startOffset");
Preconditions.checkArgument(length >= 0, "length");
this.task = task;
this.startOffset = startOffset;
this.length = length;
}
}
@VisibleForTesting
public synchronized void waitForPendingUploads() throws IOException {
waitForAppendsToComplete();
}
/**
* Getter method for AbfsOutputStream statistics.
*
* @return statistics for AbfsOutputStream.
*/
@VisibleForTesting
public AbfsOutputStreamStatistics getOutputStreamStatistics() {
return outputStreamStatistics;
}
/**
* Getter to get the size of the task queue.
*
* @return the number of writeOperations in AbfsOutputStream.
*/
@VisibleForTesting
public int getWriteOperationsSize() {
return writeOperations.size();
}
@VisibleForTesting
int getMaxConcurrentRequestCount() {
return this.maxConcurrentRequestCount;
}
@VisibleForTesting
int getMaxRequestsThatCanBeQueued() {
return maxRequestsThatCanBeQueued;
}
@VisibleForTesting
Boolean isAppendBlobStream() {
return isAppendBlob;
}
@Override
public IOStatistics getIOStatistics() {
return ioStatistics;
}
@VisibleForTesting
public boolean isLeaseFreed() {
if (lease == null) {
return true;
}
return lease.isFreed();
}
@VisibleForTesting
public boolean hasLease() {
return lease != null;
}
/**
* Appending AbfsOutputStream statistics to base toString().
*
* @return String with AbfsOutputStream statistics.
*/
@Override
public String toString() {
final StringBuilder sb = new StringBuilder(super.toString());
sb.append("AbfsOutputStream@").append(this.hashCode());
sb.append("){");
sb.append(outputStreamStatistics.toString());
sb.append("}");
return sb.toString();
}
/**
* Gets the reference to the file system back.
*
* @return The back reference to the file system.
*/
@VisibleForTesting
BackReference getFsBackRef() {
return fsBackRef;
}
/**
* Gets the executor service used for asynchronous operations.
*
* @return The executor service.
*/
@VisibleForTesting
ListeningExecutorService getExecutorService() {
return executorService;
}
/**
* Gets the Azure Blob Storage client.
*
* @return The Azure Blob Storage client.
*/
@VisibleForTesting
AbfsClient getClient() {
return client;
}
/**
* Gets the Azure Blob Storage clientHandler.
*
* @return The Azure Blob Storage clientHandler.
*/
public AbfsClientHandler getClientHandler() {
return clientHandler;
}
/**
* Gets the path associated with this stream.
*
* @return The path of the stream.
*/
public String getPath() {
return this.path;
}
/**
* Gets the current position in the stream.
*
* @return The current position in the stream.
*/
public synchronized long getPosition() {
return position;
}
/**
* Sets the position in the stream.
*
* @param position The position to set.
*/
public synchronized void setPosition(final long position) {
this.position = position;
}
/**
* Gets the cached SAS token string for authentication.
*
* @return The cached SAS token string.
*/
public String getCachedSasTokenString() {
return cachedSasToken.get();
}
/**
* Gets the context encryption adapter.
*
* @return The context encryption adapter.
*/
public ContextEncryptionAdapter getContextEncryptionAdapter() {
return contextEncryptionAdapter;
}
/**
* Gets the Azure Block Manager associated with this stream.
*
* @return The Azure Block Manager.
*/
public AzureBlockManager getBlockManager() {
return getIngressHandler().getBlockManager();
}
/**
* Gets the tracing context for operations.
*
* @return The tracing context.
*/
public TracingContext getTracingContext() {
return tracingContext;
}
/**
* Checks if the DFS to blob fallback mechanism is enabled.
*
* @return True if the DFS to blob fallback is enabled, otherwise false.
*/
public boolean isDFSToBlobFallbackEnabled() {
return isDFSToBlobFallbackEnabled;
}
/**
* Checks if the 'Expect' header is enabled for HTTP requests.
*
* @return True if the 'Expect' header is enabled, otherwise false.
*/
public boolean isExpectHeaderEnabled() {
return isExpectHeaderEnabled;
}
/**
* Gets the lease ID associated with the stream.
*
* @return The lease ID.
*/
public String getLeaseId() {
return leaseId;
}
/**
* Gets the cached SAS token object.
*
* @return The cached SAS token object.
*/
public CachedSASToken getCachedSasToken() {
return cachedSasToken;
}
/**
* Checks if the stream is associated with an append blob.
*
* @return True if the stream is for an append blob, otherwise false.
*/
public boolean isAppendBlob() {
return isAppendBlob;
}
/**
* Checks if all write operation tasks are done.
*
* @return True if all write operation tasks are done, false otherwise.
*/
@VisibleForTesting
public Boolean areWriteOperationsTasksDone() {
for (WriteOperation writeOperation : writeOperations) {
if (!writeOperation.task.isDone()) {
return false;
}
}
return true;
}
/**
* Returns the MessageDigest used for computing the incremental MD5 hash
* of the data written so far.
*
* @return the MessageDigest used for partial MD5 calculation.
*/
public MessageDigest getMessageDigest() {
return md5;
}
/**
* Returns the MessageDigest used for computing the MD5 hash
* of the full blob content.
*
* @return the MessageDigest used for full blob MD5 calculation.
*/
public MessageDigest getFullBlobContentMd5() {
return fullBlobContentMd5;
}
/**
* Returns the Base64-encoded MD5 checksum based on the current digest state.
* This finalizes the digest calculation. Returns null if the digest is empty.
*
* @return the Base64-encoded MD5 string, or null if no digest is available.
*/
public String getMd5() {
byte[] digest = getMessageDigest().digest();
String md5 = null;
if (digest.length != 0) {
md5 = Base64.getEncoder().encodeToString(digest);
}
return md5;
}
}