BoundedResourcePool.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements. See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership. The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License. You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied. See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

package org.apache.hadoop.fs.impl.prefetch;

import java.util.Collections;
import java.util.IdentityHashMap;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;

import static org.apache.hadoop.fs.impl.prefetch.Validate.checkNotNull;

/**
 * Manages a fixed pool of resources.
 *
 * Avoids creating a new resource if a previously created instance is already available.
 */
public abstract class BoundedResourcePool<T> extends ResourcePool<T> {
  /**
   * The size of this pool. Fixed at creation time.
   */
  private final int size;

  /**
   * Items currently available in the pool.
   */
  private ArrayBlockingQueue<T> items;

  /**
   * Items that have been created so far (regardless of whether they are currently available).
   */
  private Set<T> createdItems;

  /**
   * Constructs a resource pool of the given size.
   *
   * @param size the size of this pool. Cannot be changed post creation.
   *
   * @throws IllegalArgumentException if size is zero or negative.
   */
  public BoundedResourcePool(int size) {
    Validate.checkPositiveInteger(size, "size");

    this.size = size;
    this.items = new ArrayBlockingQueue<>(size);

    // The created items are identified based on their object reference.
    this.createdItems = Collections.newSetFromMap(new IdentityHashMap<T, Boolean>());
  }

  /**
   * Acquires a resource blocking if necessary until one becomes available.
   */
  @Override
  public T acquire() {
    return this.acquireHelper(true);
  }

  /**
   * Acquires a resource blocking if one is immediately available. Otherwise returns null.
   */
  @Override
  public T tryAcquire() {
    return this.acquireHelper(false);
  }

  /**
   * Releases a previously acquired resource.
   *
   * @throws IllegalArgumentException if item is null.
   */
  @Override
  public void release(T item) {
    checkNotNull(item, "item");

    synchronized (createdItems) {
      if (!createdItems.contains(item)) {
        throw new IllegalArgumentException("This item is not a part of this pool");
      }
    }

    // Return if this item was released earlier.
    // We cannot use items.contains() because that check is not based on reference equality.
    for (T entry : items) {
      if (entry == item) {
        return;
      }
    }

    try {
      items.put(item);
    } catch (InterruptedException e) {
      throw new IllegalStateException("release() should never block", e);
    }
  }

  @Override
  public synchronized void close() {
    for (T item : createdItems) {
      close(item);
    }

    items.clear();
    items = null;

    createdItems.clear();
    createdItems = null;
  }

  /**
   * Derived classes may implement a way to cleanup each item.
   */
  @Override
  protected synchronized void close(T item) {
    // Do nothing in this class. Allow overriding classes to take any cleanup action.
  }

  /**
   * Number of items created so far. Mostly for testing purposes.
   * @return the count.
   */
  public int numCreated() {
    synchronized (createdItems) {
      return createdItems.size();
    }
  }

  /**
   * Number of items available to be acquired. Mostly for testing purposes.
   * @return the number available.
   */
  public synchronized int numAvailable() {
    return (size - numCreated()) + items.size();
  }

  // For debugging purposes.
  @Override
  public synchronized String toString() {
    return String.format(
        "size = %d, #created = %d, #in-queue = %d, #available = %d",
        size, numCreated(), items.size(), numAvailable());
  }

  /**
   * Derived classes must implement a way to create an instance of a resource.
   */
  protected abstract T createNew();

  private T acquireHelper(boolean canBlock) {

    // Prefer reusing an item if one is available.
    // That avoids unnecessarily creating new instances.
    T result = items.poll();
    if (result != null) {
      return result;
    }

    synchronized (createdItems) {
      // Create a new instance if allowed by the capacity of this pool.
      if (createdItems.size() < size) {
        T item = createNew();
        createdItems.add(item);
        return item;
      }
    }

    if (canBlock) {
      try {
        // Block for an instance to be available.
        return items.take();
      } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
        return null;
      }
    } else {
      return null;
    }
  }
}