AzureBlobBlockManager.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs.services;
import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.commons.codec.binary.Base64;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
import org.apache.hadoop.fs.store.DataBlocks;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.COMMA;
/**
* Manages Azure Blob blocks for append operations.
*/
public class AzureBlobBlockManager extends AzureBlockManager {
private static final Logger LOG = LoggerFactory.getLogger(
AbfsOutputStream.class);
/** Cached list of committed block IDs */
private final StringBuilder committedBlockEntries = new StringBuilder();
/** The list to store blockId, position, and status. */
private final LinkedList<BlockEntry> blockEntryList = new LinkedList<>();
private int blockIdLength = 0;
/**
* Constructs an AzureBlobBlockManager.
*
* @param abfsOutputStream the output stream
* @param blockFactory the block factory
* @param bufferSize the buffer size
* @throws AzureBlobFileSystemException if an error occurs
*/
public AzureBlobBlockManager(AbfsOutputStream abfsOutputStream,
DataBlocks.BlockFactory blockFactory,
int bufferSize)
throws AzureBlobFileSystemException {
super(abfsOutputStream, blockFactory, bufferSize);
if (abfsOutputStream.getPosition() > 0 && !abfsOutputStream.isAppendBlob()) {
List<String> committedBlocks = getBlockList(abfsOutputStream.getTracingContext());
if (!committedBlocks.isEmpty()) {
committedBlockEntries.append(String.join(COMMA, committedBlocks));
}
}
LOG.debug("Created a new Blob Block Manager for AbfsOutputStream instance {} for path {}",
abfsOutputStream.getStreamID(), abfsOutputStream.getPath());
}
/**
* Retrieves the length of the block ID.
*
* @return the length of the block ID in bytes.
*/
public int getBlockIdLength() {
return blockIdLength;
}
/**
* Creates a new block.
*
* @param position the position
* @return the created block
* @throws IOException if an I/O error occurs
*/
@Override
protected synchronized AbfsBlock createBlockInternal(long position)
throws IOException {
if (getActiveBlock() == null) {
setBlockCount(getBlockCount() + 1);
AbfsBlock activeBlock = new AbfsBlobBlock(getAbfsOutputStream(), position, getBlockIdLength(), getBlockCount());
activeBlock.setBlockEntry(addNewEntry(activeBlock.getBlockId(), activeBlock.getOffset()));
getAbfsOutputStream().getMessageDigest().reset();
setActiveBlock(activeBlock);
}
return getActiveBlock();
}
/**
* Returns block id's which are committed for the blob.
*
* @param tracingContext Tracing context object.
* @return list of committed block id's.
* @throws AzureBlobFileSystemException if an error occurs
*/
private List<String> getBlockList(TracingContext tracingContext)
throws AzureBlobFileSystemException {
List<String> committedBlockIdList = new ArrayList<>();
AbfsBlobClient blobClient = getAbfsOutputStream().getClientHandler().getBlobClient();
final AbfsRestOperation op = blobClient
.getBlockList(getAbfsOutputStream().getPath(), tracingContext);
if (op != null && op.getResult() != null) {
committedBlockIdList = op.getResult().getBlockIdList();
if (!committedBlockIdList.isEmpty()) {
blockIdLength = Base64.decodeBase64(committedBlockIdList.get(0)).length;
}
}
return committedBlockIdList;
}
/**
* Adds a new block entry to the block entry list.
* The block entry is added only if the position of the new block
* is greater than the position of the last block in the list.
*
* @param blockId The ID of the new block to be added.
* @param position The position of the new block in the stream.
* @return The newly added {@link BlockEntry}.
* @throws IOException If the position of the new block is not greater than the last block in the list.
*/
private synchronized BlockEntry addNewEntry(String blockId, long position) throws IOException {
if (!blockEntryList.isEmpty()) {
BlockEntry lastEntry = blockEntryList.getLast();
if (position <= lastEntry.getPosition()) {
throw new IOException("New block position " + position + " must be greater than the last block position "
+ lastEntry.getPosition() + " for path " + getAbfsOutputStream().getPath());
}
}
BlockEntry blockEntry = new BlockEntry(blockId, position, AbfsBlockStatus.NEW);
blockEntryList.addLast(blockEntry);
LOG.debug("Added block {} at position {} with status NEW.", blockId, position);
return blockEntry;
}
/**
* Updates the status of an existing block entry to SUCCESS.
* This method is used to mark a block as successfully processed.
*
* @param block The {@link AbfsBlock} whose status needs to be updated to SUCCESS.
*/
protected synchronized void updateEntry(AbfsBlock block) {
BlockEntry blockEntry = block.getBlockEntry();
blockEntry.setStatus(AbfsBlockStatus.SUCCESS);
LOG.debug("Added block {} at position {} with status SUCCESS.", block.getBlockId(), blockEntry.getPosition());
}
/**
* Prepares the list of blocks to commit.
*
* @return whether we have some data to commit or not.
* @throws IOException if an I/O error occurs
*/
protected synchronized boolean hasBlocksToCommit() throws IOException {
// Adds all the committed blocks if available to the list of blocks to be added in putBlockList.
if (blockEntryList.isEmpty()) {
return false; // No entries to commit
}
while (!blockEntryList.isEmpty()) {
BlockEntry current = blockEntryList.poll();
if (current.getStatus() != AbfsBlockStatus.SUCCESS) {
LOG.debug(
"Block {} with position {} has status {}, flush cannot proceed.",
current.getBlockId(), current.getPosition(), current.getStatus());
throw new IOException("Flush failed. Block " + current.getBlockId()
+ " with position " + current.getPosition() + " has status "
+ current.getStatus() + "for path " + getAbfsOutputStream().getPath());
}
if (!blockEntryList.isEmpty()) {
BlockEntry next = blockEntryList.getFirst();
if (current.getPosition() >= next.getPosition()) {
String errorMessage =
"Position check failed. Current block position is greater than or equal to the next block's position.\n"
+ "Current Block Entry:\n"
+ "Block ID: " + current.getBlockId()
+ ", Position: " + current.getPosition()
+ ", Status: " + current.getStatus()
+ ", Path: " + getAbfsOutputStream().getPath()
+ ", StreamID: " + getAbfsOutputStream().getStreamID()
+ ", Next block position: " + next.getPosition()
+ "\n";
throw new IOException(errorMessage);
}
}
// Append the current block's ID to the committedBlockBuilder
if (committedBlockEntries.length() > 0) {
committedBlockEntries.append(COMMA);
}
committedBlockEntries.append(current.getBlockId());
LOG.debug("Block {} added to committed entries.", current.getBlockId());
}
return true;
}
/**
* Returns the block ID list.
*
* @return the block ID list
*/
protected String getBlockIdToCommit() {
return committedBlockEntries.toString();
}
@Override
public void close(){
super.close();
committedBlockEntries.setLength(0);
}
}