AzureBlobIngressHandler.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.fs.azurebfs.services;

import java.io.IOException;
import java.nio.charset.StandardCharsets;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidIngressServiceException;
import org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters;
import org.apache.hadoop.fs.azurebfs.contracts.services.BlobAppendRequestParameters;
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
import org.apache.hadoop.fs.store.DataBlocks;
import org.apache.hadoop.io.IOUtils;

import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APPEND_ACTION;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.BLOB_APPEND;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.BLOB_FLUSH;
import static org.apache.hadoop.fs.azurebfs.services.AbfsBlobClient.generateBlockListXml;

public class AzureBlobIngressHandler extends AzureIngressHandler {

  private static final Logger LOG = LoggerFactory.getLogger(
      AbfsOutputStream.class);

  private volatile String eTag;

  private final AzureBlobBlockManager blobBlockManager;

  private final AbfsBlobClient blobClient;

  private final AbfsClientHandler clientHandler;

  /**
   * Constructs an AzureBlobIngressHandler.
   *
   * @param abfsOutputStream the AbfsOutputStream.
   * @param blockFactory the block factory.
   * @param bufferSize the buffer size.
   * @param eTag the eTag.
   * @param clientHandler the client handler.
   * @param blockManager the block manager.
   * @throws AzureBlobFileSystemException if an error occurs.
   */
  public AzureBlobIngressHandler(AbfsOutputStream abfsOutputStream,
      DataBlocks.BlockFactory blockFactory,
      int bufferSize, String eTag, AbfsClientHandler clientHandler, AzureBlockManager blockManager)
      throws AzureBlobFileSystemException {
    super(abfsOutputStream);
    this.eTag = eTag;
    if (blockManager instanceof AzureBlobBlockManager) {
      this.blobBlockManager = (AzureBlobBlockManager) blockManager;
    } else {
      this.blobBlockManager = new AzureBlobBlockManager(abfsOutputStream,
          blockFactory, bufferSize);
    }
    this.clientHandler = clientHandler;
    this.blobClient = clientHandler.getBlobClient();
    LOG.trace("Created a new BlobIngress Handler for AbfsOutputStream instance {} for path {}",
        abfsOutputStream.getStreamID(), abfsOutputStream.getPath());
  }

  /**
   * Buffers data into the specified block.
   *
   * @param block the block to buffer data into.
   * @param data  the data to be buffered.
   * @param off   the start offset in the data.
   * @param length the number of bytes to buffer.
   * @return the number of bytes buffered.
   * @throws IOException if an I/O error occurs.
   */
  @Override
  protected int bufferData(AbfsBlock block,
      final byte[] data,
      final int off,
      final int length)
      throws IOException {
    LOG.trace("Buffering data of length {} to block at offset {}", length, off);
    return block.write(data, off, length);
  }

  /**
   * Performs a remote write operation.
   *
   * @param blockToUpload the block to upload.
   * @param uploadData    the data to upload.
   * @param reqParams     the request parameters.
   * @param tracingContext the tracing context.
   * @return the resulting AbfsRestOperation.
   * @throws IOException if an I/O error occurs.
   */
  @Override
  protected AbfsRestOperation remoteWrite(AbfsBlock blockToUpload,
      DataBlocks.BlockUploadData uploadData,
      AppendRequestParameters reqParams,
      TracingContext tracingContext)
      throws IOException {
    BlobAppendRequestParameters blobParams = new BlobAppendRequestParameters(blockToUpload.getBlockId(), getETag());
    reqParams.setBlobParams(blobParams);
    AbfsRestOperation op;
    String threadIdStr = String.valueOf(Thread.currentThread().getId());
    TracingContext tracingContextAppend = new TracingContext(tracingContext);
    tracingContextAppend.setIngressHandler(BLOB_APPEND + " T " + threadIdStr);
    tracingContextAppend.setPosition(String.valueOf(blockToUpload.getOffset()));
    try {
      LOG.trace("Starting remote write for block with ID {} and offset {}",
          blockToUpload.getBlockId(), blockToUpload.getOffset());
      op = getClient().append(getAbfsOutputStream().getPath(), uploadData.toByteArray(),
          reqParams,
          getAbfsOutputStream().getCachedSasTokenString(),
          getAbfsOutputStream().getContextEncryptionAdapter(),
          tracingContextAppend);
      blobBlockManager.updateEntry(blockToUpload);
    } catch (AbfsRestOperationException ex) {
      LOG.error("Error in remote write requiring handler switch for path {}", getAbfsOutputStream().getPath(), ex);
      if (shouldIngressHandlerBeSwitched(ex)) {
        throw getIngressHandlerSwitchException(ex);
      }
      LOG.error("Error in remote write for path {} and offset {}", getAbfsOutputStream().getPath(),
          blockToUpload.getOffset(), ex);
      throw ex;
    }
    return op;
  }

  /**
   * Flushes data to the remote store.
   *
   * @param offset               the offset to flush.
   * @param retainUncommitedData whether to retain uncommitted data.
   * @param isClose              whether this is a close operation.
   * @param leaseId              the lease ID.
   * @param tracingContext       the tracing context.
   * @return the resulting AbfsRestOperation.
   * @throws IOException if an I/O error occurs.
   */
  @Override
  protected synchronized AbfsRestOperation remoteFlush(final long offset,
      final boolean retainUncommitedData,
      final boolean isClose,
      final String leaseId,
      TracingContext tracingContext)
      throws IOException {
    AbfsRestOperation op;
    AzureBlobBlockManager blobBlockManager = (AzureBlobBlockManager) getBlockManager();
    if (getAbfsOutputStream().isAppendBlob()) {
      return null;
    }
    if (!blobBlockManager.hasBlocksToCommit()) {
      return null;
    }
    try {
      // Generate the xml with the list of blockId's to generate putBlockList call.
      String blockListXml = generateBlockListXml(
          blobBlockManager.getBlockIdToCommit());
      TracingContext tracingContextFlush = new TracingContext(tracingContext);
      tracingContextFlush.setIngressHandler(BLOB_FLUSH);
      tracingContextFlush.setPosition(String.valueOf(offset));
      LOG.trace("Flushing data at offset {} for path {}", offset, getAbfsOutputStream().getPath());
      String fullBlobMd5 = computeFullBlobMd5();
      op = getClient().flush(blockListXml.getBytes(StandardCharsets.UTF_8),
          getAbfsOutputStream().getPath(),
          isClose, getAbfsOutputStream().getCachedSasTokenString(), leaseId,
          getETag(), getAbfsOutputStream().getContextEncryptionAdapter(), tracingContextFlush, fullBlobMd5);
      setETag(op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG));
    } catch (AbfsRestOperationException ex) {
      LOG.error("Error in remote flush requiring handler switch for path {}", getAbfsOutputStream().getPath(), ex);
      if (shouldIngressHandlerBeSwitched(ex)) {
        throw getIngressHandlerSwitchException(ex);
      }
      LOG.error("Error in remote flush for path {} and offset {}", getAbfsOutputStream().getPath(), offset, ex);
      throw ex;
    } finally {
      getAbfsOutputStream().getFullBlobContentMd5().reset();
    }
    return op;
  }

  /**
   * Method to perform a remote write operation for appending data to an append blob in Azure Blob Storage.
   *
   * <p>This method is intended to be implemented by subclasses to handle the specific
   * case of appending data to an append blob. It takes in the path of the append blob,
   * the data to be uploaded, the block of data, and additional parameters required for
   * the append operation.</p>
   *
   * @param path           The path of the append blob to which data is to be appended.
   * @param uploadData     The data to be uploaded as part of the append operation.
   * @param block          The block of data to append.
   * @param reqParams      The additional parameters required for the append operation.
   * @param tracingContext The tracing context for the operation.
   * @return An {@link AbfsRestOperation} object representing the remote write operation.
   * @throws IOException If an I/O error occurs during the append operation.
   */
  protected AbfsRestOperation remoteAppendBlobWrite(String path,
      DataBlocks.BlockUploadData uploadData,
      AbfsBlock block,
      AppendRequestParameters reqParams,
      TracingContext tracingContext) throws IOException {
    // Perform the remote append operation using the blob client.
    AbfsRestOperation op = null;
    try {
      op = blobClient.appendBlock(path, reqParams, uploadData.toByteArray(), tracingContext);
    } catch (AbfsRestOperationException ex) {
      LOG.error("Error in remote write requiring handler switch for path {}",
          getAbfsOutputStream().getPath(), ex);
      if (shouldIngressHandlerBeSwitched(ex)) {
        throw getIngressHandlerSwitchException(ex);
      }
      LOG.error("Error in remote write for path {} and offset {}",
          getAbfsOutputStream().getPath(),
          block.getOffset(), ex);
      throw ex;
    }
    return op;
  }

  /**
   * Sets the eTag of the blob.
   *
   * @param eTag the eTag to set.
   */
  void setETag(String eTag) {
    this.eTag = eTag;
  }

  /**
   * Gets the eTag value of the blob.
   *
   * @return the eTag.
   */
  @VisibleForTesting
  @Override
  public String getETag() {
    return eTag;
  }

  /**
   * Writes the current buffer to the service. .
   *
   */
  @Override
  protected void writeAppendBlobCurrentBufferToService() throws IOException {
    AbfsBlock activeBlock = blobBlockManager.getActiveBlock();

    // No data, return immediately.
    if (!getAbfsOutputStream().hasActiveBlockDataToUpload()) {
      return;
    }

    // Prepare data for upload.
    final int bytesLength = activeBlock.dataSize();
    DataBlocks.BlockUploadData uploadData = activeBlock.startUpload();

    // Clear active block and update statistics.
    if (blobBlockManager.hasActiveBlock()) {
      blobBlockManager.clearActiveBlock();
    }
    getAbfsOutputStream().getOutputStreamStatistics().writeCurrentBuffer();
    getAbfsOutputStream().getOutputStreamStatistics().bytesToUpload(bytesLength);

    // Update the stream position.
    final long offset = getAbfsOutputStream().getPosition();
    getAbfsOutputStream().setPosition(offset + bytesLength);

    // Perform the upload within a performance tracking context.
    try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(
        blobClient.getAbfsPerfTracker(),
        "writeCurrentBufferToService", APPEND_ACTION)) {
      LOG.trace("Writing current buffer to service at offset {} and path {}", offset, getAbfsOutputStream().getPath());
      AppendRequestParameters reqParams = new AppendRequestParameters(
          offset, 0, bytesLength, AppendRequestParameters.Mode.APPEND_MODE,
          true, getAbfsOutputStream().getLeaseId(),
          getAbfsOutputStream().isExpectHeaderEnabled(),
          getAbfsOutputStream().getMd5());

      AbfsRestOperation op;
      try {
        op = remoteAppendBlobWrite(getAbfsOutputStream().getPath(), uploadData,
            activeBlock, reqParams,
            new TracingContext(getAbfsOutputStream().getTracingContext()));
      } catch (InvalidIngressServiceException ex) {
        LOG.debug("InvalidIngressServiceException caught for path: {}, switching handler and retrying remoteAppendBlobWrite.",
            getAbfsOutputStream().getPath());
        getAbfsOutputStream().switchHandler();
        op = getAbfsOutputStream().getIngressHandler()
            .remoteAppendBlobWrite(getAbfsOutputStream().getPath(), uploadData,
                activeBlock, reqParams,
                new TracingContext(getAbfsOutputStream().getTracingContext()));
      } finally {
        // Ensure the upload data stream is closed.
        IOUtils.closeStreams(uploadData, activeBlock);
      }

      if (op != null) {
        // Update the SAS token and log the successful upload.
        getAbfsOutputStream().getCachedSasToken().update(op.getSasToken());
        getAbfsOutputStream().getOutputStreamStatistics()
            .uploadSuccessful(bytesLength);

        // Register performance information.
        perfInfo.registerResult(op.getResult());
        perfInfo.registerSuccess(true);
      }
    }
  }

  /**
   * Gets the block manager.
   *
   * @return the block manager.
   */
  @Override
  public AzureBlockManager getBlockManager() {
    return blobBlockManager;
  }

  /**
   * Gets the blob client.
   *
   * @return the blob client.
   */
  @Override
  public AbfsBlobClient getClient() {
    return blobClient;
  }

  @VisibleForTesting
  public AbfsClientHandler getClientHandler() {
    return clientHandler;
  }
}