AzureBlockManager.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.fs.azurebfs.services;

import java.io.IOException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.fs.store.DataBlocks;

/**
 * Abstract base class for managing Azure Data Lake Storage (ADLS) blocks.
 */
public abstract class AzureBlockManager {

  private static final Logger LOG = LoggerFactory.getLogger(
      AbfsOutputStream.class);

  /** Factory for blocks. */
  private final DataBlocks.BlockFactory blockFactory;

  /** Current data block. Null means none currently active. */
  private AbfsBlock activeBlock;

  /** Count of blocks uploaded. */
  private long blockCount = 0;

  /** The size of a single block. */
  private final int blockSize;

  private AbfsOutputStream abfsOutputStream;

  /**
   * Constructs an AzureBlockManager.
   *
   * @param abfsOutputStream the output stream associated with this block manager
   * @param blockFactory the factory to create blocks
   * @param blockSize the size of each block
   */
  protected AzureBlockManager(AbfsOutputStream abfsOutputStream,
      DataBlocks.BlockFactory blockFactory,
      final int blockSize) {
    this.abfsOutputStream = abfsOutputStream;
    this.blockFactory = blockFactory;
    this.blockSize = blockSize;
  }

  /**
   * Creates a new block at the given position.
   *
   * @param position the position in the output stream where the block should be created
   * @return the created block
   * @throws IOException if an I/O error occurs
   */
  protected final synchronized AbfsBlock createBlock(final long position)
      throws IOException {
    return createBlockInternal(position);
  }

  /**
   * Internal method to create a new block at the given position.
   *
   * @param position the position in the output stream where the block should be created.
   * @return the created block.
   * @throws IOException if an I/O error occurs.
   */
  protected abstract AbfsBlock createBlockInternal(long position)
      throws IOException;

  /**
   * Gets the active block.
   *
   * @return the active block
   */
  public synchronized AbfsBlock getActiveBlock() {
    return activeBlock;
  }

  /**
   * Sets the active block.
   *
   * @param activeBlock the block to set as active
   */
  public synchronized void setActiveBlock(final AbfsBlock activeBlock) {
    this.activeBlock = activeBlock;
  }

  /**
   * Checks if there is an active block.
   *
   * @return true if there is an active block, false otherwise
   */
  protected synchronized boolean hasActiveBlock() {
    return activeBlock != null;
  }

  /**
   * Gets the block factory.
   *
   * @return the block factory
   */
  protected DataBlocks.BlockFactory getBlockFactory() {
    return blockFactory;
  }

  /**
   * Gets the count of blocks uploaded.
   *
   * @return the block count
   */
  public long getBlockCount() {
    return blockCount;
  }

  /**
   * Sets the count of blocks uploaded.
   *
   * @param blockCount the count of blocks to set
   */
  protected void setBlockCount(final long blockCount) {
    this.blockCount = blockCount;
  }

  /**
   * Gets the block size.
   *
   * @return the block size
   */
  protected int getBlockSize() {
    return blockSize;
  }

  /**
   * Gets the AbfsOutputStream associated with this block manager.
   *
   * @return the AbfsOutputStream
   */
  protected AbfsOutputStream getAbfsOutputStream() {
    return abfsOutputStream;
  }

  /**
   * Clears the active block.
   */
  void clearActiveBlock() {
    synchronized (this) {
      if (activeBlock != null) {
        LOG.debug("Clearing active block");
      }
      activeBlock = null;
    }
  }

  // Used to clear any resources used by the block manager.
  void close() {
    if (hasActiveBlock()) {
      clearActiveBlock();
    }
    LOG.debug("AzureBlockManager closed.");
  }
}