AzureDfsToBlobIngressFallbackHandler.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs.services;
import java.io.IOException;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidIngressServiceException;
import org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters;
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
import org.apache.hadoop.fs.store.DataBlocks;
import org.apache.hadoop.io.IOUtils;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APPEND_ACTION;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.FALLBACK_APPEND;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.FALLBACK_FLUSH;
/**
* Handles the fallback mechanism for Azure Blob Ingress operations.
*/
public class AzureDfsToBlobIngressFallbackHandler extends AzureDFSIngressHandler {
private static final Logger LOG = LoggerFactory.getLogger(
AbfsOutputStream.class);
private final AzureBlobBlockManager blobBlockManager;
private final String eTag;
private final Lock lock = new ReentrantLock();
/**
* Constructs an AzureDfsToBlobIngressFallbackHandler.
*
* @param abfsOutputStream the AbfsOutputStream.
* @param blockFactory the block factory.
* @param bufferSize the buffer size.
* @param eTag the eTag.
* @param clientHandler the client handler.
* @throws AzureBlobFileSystemException if an error occurs.
*/
public AzureDfsToBlobIngressFallbackHandler(AbfsOutputStream abfsOutputStream,
DataBlocks.BlockFactory blockFactory,
int bufferSize, String eTag, AbfsClientHandler clientHandler) throws AzureBlobFileSystemException {
super(abfsOutputStream, clientHandler);
this.eTag = eTag;
this.blobBlockManager = new AzureBlobBlockManager(abfsOutputStream,
blockFactory, bufferSize);
LOG.trace(
"Created a new BlobFallbackIngress Handler for AbfsOutputStream instance {} for path {}",
abfsOutputStream.getStreamID(), abfsOutputStream.getPath());
}
/**
* Buffers data into the specified block.
*
* @param block the block to buffer data into.
* @param data the data to be buffered.
* @param off the start offset in the data.
* @param length the number of bytes to buffer.
* @return the number of bytes buffered.
* @throws IOException if an I/O error occurs.
*/
@Override
public int bufferData(AbfsBlock block,
final byte[] data,
final int off,
final int length) throws IOException {
LOG.trace("Buffering data of length {} to block at offset {}", length, off);
return super.bufferData(block, data, off, length);
}
/**
* Performs a remote write operation.
*
* @param blockToUpload the block to upload.
* @param uploadData the data to upload.
* @param reqParams the request parameters.
* @param tracingContext the tracing context.
* @return the resulting AbfsRestOperation.
* @throws IOException if an I/O error occurs.
*/
@Override
protected AbfsRestOperation remoteWrite(AbfsBlock blockToUpload,
DataBlocks.BlockUploadData uploadData,
AppendRequestParameters reqParams,
TracingContext tracingContext) throws IOException {
AbfsRestOperation op;
TracingContext tracingContextAppend = new TracingContext(tracingContext);
String threadIdStr = String.valueOf(Thread.currentThread().getId());
tracingContextAppend.setIngressHandler(FALLBACK_APPEND + " T " + threadIdStr);
tracingContextAppend.setPosition(String.valueOf(blockToUpload.getOffset()));
try {
op = super.remoteWrite(blockToUpload, uploadData, reqParams,
tracingContextAppend);
blobBlockManager.updateEntry(blockToUpload);
} catch (AbfsRestOperationException ex) {
if (shouldIngressHandlerBeSwitched(ex)) {
LOG.error("Error in remote write requiring handler switch for path {}", getAbfsOutputStream().getPath(), ex);
throw getIngressHandlerSwitchException(ex);
}
LOG.error("Error in remote write for path {} and offset {}", getAbfsOutputStream().getPath(),
blockToUpload.getOffset(), ex);
throw ex;
}
return op;
}
/**
* Flushes data to the remote store.
*
* @param offset the offset to flush.
* @param retainUncommitedData whether to retain uncommitted data.
* @param isClose whether this is a close operation.
* @param leaseId the lease ID.
* @param tracingContext the tracing context.
* @return the resulting AbfsRestOperation.
* @throws IOException if an I/O error occurs.
*/
@Override
protected synchronized AbfsRestOperation remoteFlush(final long offset,
final boolean retainUncommitedData,
final boolean isClose,
final String leaseId,
TracingContext tracingContext) throws IOException {
AbfsRestOperation op;
if (!blobBlockManager.hasBlocksToCommit()) {
return null;
}
try {
TracingContext tracingContextFlush = new TracingContext(tracingContext);
tracingContextFlush.setIngressHandler(FALLBACK_FLUSH);
tracingContextFlush.setPosition(String.valueOf(offset));
op = super.remoteFlush(offset, retainUncommitedData, isClose, leaseId,
tracingContextFlush);
} catch (AbfsRestOperationException ex) {
if (shouldIngressHandlerBeSwitched(ex)) {
LOG.error("Error in remote flush requiring handler switch for path {}", getAbfsOutputStream().getPath(), ex);
throw getIngressHandlerSwitchException(ex);
}
LOG.error("Error in remote flush for path {} and offset {}", getAbfsOutputStream().getPath(), offset, ex);
throw ex;
}
return op;
}
/**
* Gets the block manager.
*
* @return the block manager.
*/
@Override
public AzureBlockManager getBlockManager() {
return blobBlockManager;
}
/**
* Gets the eTag value of the blob.
*
* @return the eTag.
*/
@VisibleForTesting
public String getETag() {
lock.lock();
try {
return eTag;
} finally {
lock.unlock();
}
}
/**
* Appending the current active data block to the service. Clearing the active
* data block and releasing all buffered data.
*
* @throws IOException if there is any failure while starting an upload for
* the data block or while closing the BlockUploadData.
*/
@Override
protected void writeAppendBlobCurrentBufferToService() throws IOException {
AbfsBlock activeBlock = blobBlockManager.getActiveBlock();
// No data, return immediately.
if (!getAbfsOutputStream().hasActiveBlockDataToUpload()) {
return;
}
// Prepare data for upload.
final int bytesLength = activeBlock.dataSize();
DataBlocks.BlockUploadData uploadData = activeBlock.startUpload();
// Clear active block and update statistics.
if (blobBlockManager.hasActiveBlock()) {
blobBlockManager.clearActiveBlock();
}
getAbfsOutputStream().getOutputStreamStatistics().writeCurrentBuffer();
getAbfsOutputStream().getOutputStreamStatistics().bytesToUpload(bytesLength);
// Update the stream position.
final long offset = getAbfsOutputStream().getPosition();
getAbfsOutputStream().setPosition(offset + bytesLength);
// Perform the upload within a performance tracking context.
try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(
getClient().getAbfsPerfTracker(),
"writeCurrentBufferToService", APPEND_ACTION)) {
LOG.trace("Writing current buffer to service at offset {} and path {}", offset, getAbfsOutputStream().getPath());
AppendRequestParameters reqParams = new AppendRequestParameters(
offset, 0, bytesLength, AppendRequestParameters.Mode.APPEND_MODE,
true, getAbfsOutputStream().getLeaseId(),
getAbfsOutputStream().isExpectHeaderEnabled(),
getAbfsOutputStream().getMd5());
// Perform the remote write operation.
AbfsRestOperation op;
try {
op = remoteAppendBlobWrite(getAbfsOutputStream().getPath(), uploadData,
activeBlock, reqParams,
new TracingContext(getAbfsOutputStream().getTracingContext()));
} catch (InvalidIngressServiceException ex) {
LOG.debug("InvalidIngressServiceException caught for path: {}, switching handler and retrying remoteAppendBlobWrite.", getAbfsOutputStream().getPath());
getAbfsOutputStream().switchHandler();
op = getAbfsOutputStream().getIngressHandler()
.remoteAppendBlobWrite(getAbfsOutputStream().getPath(), uploadData,
activeBlock, reqParams,
new TracingContext(getAbfsOutputStream().getTracingContext()));
} finally {
// Ensure the upload data stream is closed.
IOUtils.closeStreams(uploadData, activeBlock);
}
if (op != null) {
// Update the SAS token and log the successful upload.
getAbfsOutputStream().getCachedSasToken().update(op.getSasToken());
getAbfsOutputStream().getOutputStreamStatistics()
.uploadSuccessful(bytesLength);
// Register performance information.
perfInfo.registerResult(op.getResult());
perfInfo.registerSuccess(true);
}
}
}
}