TestWeakReferencedElasticByteBufferPool.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.io;

import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
import java.util.Random;

import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import org.apache.hadoop.test.HadoopTestBase;

/**
 * Unit tests for {@code WeakReferencedElasticByteBufferPool}.
 */
@RunWith(Parameterized.class)
public class TestWeakReferencedElasticByteBufferPool
        extends HadoopTestBase {

  private final boolean isDirect;

  private final String type;

  @Parameterized.Parameters(name = "Buffer type : {0}")
  public static List<String> params() {
    return Arrays.asList("direct", "array");
  }

  public TestWeakReferencedElasticByteBufferPool(String type) {
    this.type = type;
    this.isDirect = !"array".equals(type);
  }

  @Test
  public void testGetAndPutBasic() {
    WeakReferencedElasticByteBufferPool pool = new WeakReferencedElasticByteBufferPool();
    int bufferSize = 5;
    ByteBuffer buffer = pool.getBuffer(isDirect, bufferSize);
    Assertions.assertThat(buffer.isDirect())
            .describedAs("Buffered returned should be of correct type {}", type)
            .isEqualTo(isDirect);
    Assertions.assertThat(buffer.capacity())
            .describedAs("Initial capacity of returned buffer from pool")
            .isEqualTo(bufferSize);
    Assertions.assertThat(buffer.position())
            .describedAs("Initial position of returned buffer from pool")
            .isEqualTo(0);

    byte[] arr = createByteArray(bufferSize);
    buffer.put(arr, 0, arr.length);
    buffer.flip();
    validateBufferContent(buffer, arr);
    Assertions.assertThat(buffer.position())
            .describedAs("Buffer's position after filling bytes in it")
            .isEqualTo(bufferSize);
    // releasing buffer to the pool.
    pool.putBuffer(buffer);
    Assertions.assertThat(buffer.position())
            .describedAs("Position should be reset to 0 after returning buffer to the pool")
            .isEqualTo(0);

  }

  @Test
  public void testPoolingWithDifferentSizes() {
    WeakReferencedElasticByteBufferPool pool = new WeakReferencedElasticByteBufferPool();
    ByteBuffer buffer = pool.getBuffer(isDirect, 5);
    ByteBuffer buffer1 = pool.getBuffer(isDirect, 10);
    ByteBuffer buffer2 = pool.getBuffer(isDirect, 15);

    Assertions.assertThat(pool.getCurrentBuffersCount(isDirect))
            .describedAs("Number of buffers in the pool")
            .isEqualTo(0);

    pool.putBuffer(buffer1);
    pool.putBuffer(buffer2);
    Assertions.assertThat(pool.getCurrentBuffersCount(isDirect))
            .describedAs("Number of buffers in the pool")
            .isEqualTo(2);
    ByteBuffer buffer3 = pool.getBuffer(isDirect, 12);
    Assertions.assertThat(buffer3.capacity())
            .describedAs("Pooled buffer should have older capacity")
            .isEqualTo(15);
    Assertions.assertThat(pool.getCurrentBuffersCount(isDirect))
            .describedAs("Number of buffers in the pool")
            .isEqualTo(1);
    pool.putBuffer(buffer);
    ByteBuffer buffer4 = pool.getBuffer(isDirect, 6);
    Assertions.assertThat(buffer4.capacity())
            .describedAs("Pooled buffer should have older capacity")
            .isEqualTo(10);
    Assertions.assertThat(pool.getCurrentBuffersCount(isDirect))
            .describedAs("Number of buffers in the pool")
            .isEqualTo(1);

    pool.release();
    Assertions.assertThat(pool.getCurrentBuffersCount(isDirect))
            .describedAs("Number of buffers in the pool post release")
            .isEqualTo(0);
  }

  @Test
  public void testPoolingWithDifferentInsertionTime() {
    WeakReferencedElasticByteBufferPool pool = new WeakReferencedElasticByteBufferPool();
    ByteBuffer buffer = pool.getBuffer(isDirect, 10);
    ByteBuffer buffer1 = pool.getBuffer(isDirect, 10);
    ByteBuffer buffer2 = pool.getBuffer(isDirect, 10);

    Assertions.assertThat(pool.getCurrentBuffersCount(isDirect))
            .describedAs("Number of buffers in the pool")
            .isEqualTo(0);

    pool.putBuffer(buffer1);
    pool.putBuffer(buffer2);
    Assertions.assertThat(pool.getCurrentBuffersCount(isDirect))
            .describedAs("Number of buffers in the pool")
            .isEqualTo(2);
    ByteBuffer buffer3 = pool.getBuffer(isDirect, 10);
    // As buffer1 is returned to the pool before buffer2, it should
    // be returned when buffer of same size is asked again from
    // the pool. Memory references must match not just content
    // that is why {@code Assertions.isSameAs} is used here rather
    // than usual {@code Assertions.isEqualTo}.
    Assertions.assertThat(buffer3)
            .describedAs("Buffers should be returned in order of their " +
                    "insertion time")
            .isSameAs(buffer1);
    pool.putBuffer(buffer);
    ByteBuffer buffer4 = pool.getBuffer(isDirect, 10);
    Assertions.assertThat(buffer4)
            .describedAs("Buffers should be returned in order of their " +
                    "insertion time")
            .isSameAs(buffer2);
  }

  @Test
  public void testGarbageCollection() {
    WeakReferencedElasticByteBufferPool pool = new WeakReferencedElasticByteBufferPool();
    ByteBuffer buffer = pool.getBuffer(isDirect, 5);
    ByteBuffer buffer1 = pool.getBuffer(isDirect, 10);
    ByteBuffer buffer2 = pool.getBuffer(isDirect, 15);
    Assertions.assertThat(pool.getCurrentBuffersCount(isDirect))
            .describedAs("Number of buffers in the pool")
            .isEqualTo(0);
    pool.putBuffer(buffer1);
    pool.putBuffer(buffer2);
    Assertions.assertThat(pool.getCurrentBuffersCount(isDirect))
            .describedAs("Number of buffers in the pool")
            .isEqualTo(2);
    // Before GC.
    ByteBuffer buffer4 = pool.getBuffer(isDirect, 12);
    Assertions.assertThat(buffer4.capacity())
            .describedAs("Pooled buffer should have older capacity")
            .isEqualTo(15);
    pool.putBuffer(buffer4);
    // Removing the references
    buffer1 = null;
    buffer2 = null;
    buffer4 = null;
    System.gc();
    ByteBuffer buffer3 = pool.getBuffer(isDirect, 12);
    Assertions.assertThat(buffer3.capacity())
            .describedAs("After garbage collection new buffer should be " +
                    "returned with fixed capacity")
            .isEqualTo(12);
  }

  @Test
  public void testWeakReferencesPruning() {
    WeakReferencedElasticByteBufferPool pool = new WeakReferencedElasticByteBufferPool();
    ByteBuffer buffer1 = pool.getBuffer(isDirect, 5);
    ByteBuffer buffer2 = pool.getBuffer(isDirect, 10);
    ByteBuffer buffer3 = pool.getBuffer(isDirect, 15);

    pool.putBuffer(buffer2);
    pool.putBuffer(buffer3);
    Assertions.assertThat(pool.getCurrentBuffersCount(isDirect))
            .describedAs("Number of buffers in the pool")
            .isEqualTo(2);

    // marking only buffer2 to be garbage collected.
    buffer2 = null;
    System.gc();
    ByteBuffer buffer4 = pool.getBuffer(isDirect, 10);
    // Number of buffers in the pool is 0 as one got garbage
    // collected and other got returned in above call.
    Assertions.assertThat(pool.getCurrentBuffersCount(isDirect))
            .describedAs("Number of buffers in the pool")
            .isEqualTo(0);
    Assertions.assertThat(buffer4.capacity())
            .describedAs("After gc, pool should return next greater than " +
                    "available buffer")
            .isEqualTo(15);

  }

  private void validateBufferContent(ByteBuffer buffer, byte[] arr) {
    for (int i=0; i<arr.length; i++) {
      Assertions.assertThat(buffer.get())
              .describedAs("Content of buffer at index {} should match " +
                      "with content of byte array", i)
              .isEqualTo(arr[i]);
    }
  }

  private byte[] createByteArray(int length) {
    byte[] arr = new byte[length];
    Random r = new Random();
    r.nextBytes(arr);
    return arr;
  }
}