TrackingByteBufferPool.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

package org.apache.hadoop.fs.impl;

import java.nio.ByteBuffer;
import java.util.IdentityHashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.io.ByteBufferPool;

import static java.lang.System.identityHashCode;
import static java.util.Objects.requireNonNull;

/**
 * A wrapper {@link ByteBufferPool} implementation that tracks whether all allocated buffers
 * are released.
 * <p>
 * It throws the related exception at {@link #close()} if any buffer remains un-released.
 * It also clears the buffers at release so if they continued being used it'll generate errors.
 * <p>
 * To be used for testing..
 * <p>
 * The stacktraces of the allocation are not stored by default because
 * it can significantly decrease the unit test performance.
 * Configuring this class to log at DEBUG will trigger their collection.
 * @see ByteBufferAllocationStacktraceException
 * <p>
 * Adapted from Parquet class {@code org.apache.parquet.bytes.TrackingByteBufferAllocator}.
 */
@VisibleForTesting
public final class TrackingByteBufferPool implements ByteBufferPool, AutoCloseable {

  private static final Logger LOG = LoggerFactory.getLogger(TrackingByteBufferPool.class);

  /**
   * Wrap an existing allocator with this tracking allocator.
   * @param allocator allocator to wrap.
   * @return a new allocator.
   */
  public static TrackingByteBufferPool wrap(ByteBufferPool allocator) {
    return new TrackingByteBufferPool(allocator);
  }

  public static class LeakDetectorHeapByteBufferPoolException
      extends RuntimeException {

    private LeakDetectorHeapByteBufferPoolException(String msg) {
      super(msg);
    }

    private LeakDetectorHeapByteBufferPoolException(String msg, Throwable cause) {
      super(msg, cause);
    }

    private LeakDetectorHeapByteBufferPoolException(
        String message,
        Throwable cause,
        boolean enableSuppression,
        boolean writableStackTrace) {
      super(message, cause, enableSuppression, writableStackTrace);
    }
  }

  /**
   * Strack trace of allocation as saved in the tracking map.
   */
  public static final class ByteBufferAllocationStacktraceException
      extends LeakDetectorHeapByteBufferPoolException {

    /**
     * Single stack trace instance to use when DEBUG is not enabled.
     */
    private static final ByteBufferAllocationStacktraceException WITHOUT_STACKTRACE =
        new ByteBufferAllocationStacktraceException(false);

    /**
     * Create a stack trace for the map, either using the shared static one
     * or a dynamically created one.
     * @return a stack
     */
    private static ByteBufferAllocationStacktraceException create() {
      return LOG.isDebugEnabled()
          ? new ByteBufferAllocationStacktraceException()
          : WITHOUT_STACKTRACE;
    }

    private ByteBufferAllocationStacktraceException() {
      super("Allocation stacktrace of the first ByteBuffer:");
    }

    /**
     * Private constructor to for the singleton {@link #WITHOUT_STACKTRACE},
     * telling develoers how to see a trace per buffer.
     */
    private ByteBufferAllocationStacktraceException(boolean unused) {
      super("Log org.apache.hadoop.fs.impl.TrackingByteBufferPool at DEBUG for stack traces",
          null,
          false,
          false);
    }
  }

  /**
   * Exception raised in {@link TrackingByteBufferPool#putBuffer(ByteBuffer)} if the
   * buffer to release was not in the hash map.
   */
  public static final class ReleasingUnallocatedByteBufferException
      extends LeakDetectorHeapByteBufferPoolException {

    private ReleasingUnallocatedByteBufferException(final ByteBuffer b) {
      super(String.format("Releasing a ByteBuffer instance that is not allocated"
          + " by this buffer pool or already been released: %s size %d; hash code %s",
          b, b.capacity(), identityHashCode(b)));
    }
  }

  /**
   * Exception raised in {@link TrackingByteBufferPool#close()} if there
   * was an unreleased buffer.
   */
  public static final class LeakedByteBufferException
      extends LeakDetectorHeapByteBufferPoolException {

    private final int count;

    private LeakedByteBufferException(int count, ByteBufferAllocationStacktraceException e) {
      super(count + " ByteBuffer object(s) is/are remained unreleased"
          + " after closing this buffer pool.", e);
      this.count = count;
    }

    /**
     * Get the number of unreleased buffers.
     * @return number of unreleased buffers
     */
    public int getCount() {
      return count;
    }
  }

  /**
   * Tracker of allocations.
   * <p>
   * The key maps by the object id of the buffer, and refers to either a common stack trace
   * or one dynamically created for each allocation.
   */
  private final Map<ByteBuffer, ByteBufferAllocationStacktraceException> allocated =
      new IdentityHashMap<>();

  /**
   * Wrapped buffer pool.
   */
  private final ByteBufferPool allocator;

  /**
   * Number of buffer allocations.
   * <p>
   * This is incremented in {@link #getBuffer(boolean, int)}.
   */
  private final AtomicInteger bufferAllocations = new AtomicInteger();

  /**
   * Number of buffer releases.
   * <p>
   * This is incremented in {@link #putBuffer(ByteBuffer)}.
   */
  private final AtomicInteger bufferReleases = new AtomicInteger();

  /**
   * private constructor.
   * @param allocator pool allocator.
   */
  private TrackingByteBufferPool(ByteBufferPool allocator) {
    this.allocator = allocator;
  }

  public int getBufferAllocations() {
    return bufferAllocations.get();
  }

  public int getBufferReleases() {
    return bufferReleases.get();
  }

  /**
   * Get a buffer from the pool.
   * <p>
   * This increments the {@link #bufferAllocations} counter and stores the
   * singleron or local allocation stack trace in the {@link #allocated} map.
   * @param direct whether to allocate a direct buffer or not
   * @param size size of the buffer to allocate
   * @return a ByteBuffer instance
   */
  @Override
  public synchronized ByteBuffer getBuffer(final boolean direct, final int size) {
    bufferAllocations.incrementAndGet();
    ByteBuffer buffer = allocator.getBuffer(direct, size);
    final ByteBufferAllocationStacktraceException ex =
        ByteBufferAllocationStacktraceException.create();
    allocated.put(buffer, ex);
    LOG.debug("Creating ByteBuffer:{} size {} {}",
        identityHashCode(buffer), size, buffer, ex);
    return buffer;
  }

  /**
   * Release a buffer back to the pool.
   * <p>
   * This increments the {@link #bufferReleases} counter and removes the
   * buffer from the {@link #allocated} map.
   * <p>
   * If the buffer was not allocated by this pool, it throws
   * {@link ReleasingUnallocatedByteBufferException}.
   *
   * @param buffer buffer to release
   * @throws ReleasingUnallocatedByteBufferException if the buffer was not allocated by this pool
   */
  @Override
  public synchronized void putBuffer(ByteBuffer buffer)
      throws ReleasingUnallocatedByteBufferException {

    bufferReleases.incrementAndGet();
    requireNonNull(buffer);
    LOG.debug("Releasing ByteBuffer: {}: {}", identityHashCode(buffer), buffer);
    if (allocated.remove(buffer) == null) {
      throw new ReleasingUnallocatedByteBufferException(buffer);
    }
    allocator.putBuffer(buffer);
    // Clearing the buffer so subsequent access would probably generate errors
    buffer.clear();
  }

  /**
   * Check if the buffer is in the pool.
   * @param buffer buffer
   * @return true if the buffer is in the pool
   */
  public boolean containsBuffer(ByteBuffer buffer) {
    return allocated.containsKey(requireNonNull(buffer));
  }

  /**
   * Get the number of allocated buffers.
   * @return number of allocated buffers
   */
  public int size() {
    return allocated.size();
  }

  /**
   * Expect all buffers to be released -if not, log unreleased ones
   * and then raise an exception with the stack trace of the first
   * unreleased buffer.
   * @throws LeakedByteBufferException if at least one buffer was not released
   */
  @Override
  public void close() throws LeakedByteBufferException {
    if (!allocated.isEmpty()) {
      allocated.keySet().forEach(buffer ->
          LOG.warn("Unreleased ByteBuffer {}; {}", identityHashCode(buffer), buffer));
      LeakedByteBufferException ex = new LeakedByteBufferException(
          allocated.size(),
          allocated.values().iterator().next());
      allocated.clear(); // Drop the references to the ByteBuffers, so they can be gc'd
      throw ex;
    }
  }
}