TestBaseAllocator.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.arrow.memory;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import org.apache.arrow.memory.AllocationOutcomeDetails.Entry;
import org.apache.arrow.memory.rounding.RoundingPolicy;
import org.apache.arrow.memory.rounding.SegmentRoundingPolicy;
import org.apache.arrow.memory.util.AssertionUtil;
import org.apache.arrow.memory.util.MemoryUtil;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;

public class TestBaseAllocator {

  private static final int MAX_ALLOCATION = 8 * 1024;

  /*
  // ---------------------------------------- DEBUG -----------------------------------

  @After
  public void checkBuffers() {
    final int bufferCount = UnsafeDirectLittleEndian.getBufferCount();
    if (bufferCount != 0) {
      UnsafeDirectLittleEndian.logBuffers(logger);
      UnsafeDirectLittleEndian.releaseBuffers();
    }

    assertEquals(0, bufferCount);
  }

  //  @AfterClass
  //  public static void dumpBuffers() {
  //    UnsafeDirectLittleEndian.logBuffers(logger);
  //  }

  // ---------------------------------------- DEBUG ------------------------------------
  */

  @Test
  public void test_privateMax() throws Exception {
    try (final RootAllocator rootAllocator = new RootAllocator(MAX_ALLOCATION)) {
      final ArrowBuf arrowBuf1 = rootAllocator.buffer(MAX_ALLOCATION / 2);
      assertNotNull(arrowBuf1, "allocation failed");

      try (final BufferAllocator childAllocator =
          rootAllocator.newChildAllocator("noLimits", 0, MAX_ALLOCATION)) {
        final ArrowBuf arrowBuf2 = childAllocator.buffer(MAX_ALLOCATION / 2);
        assertNotNull(arrowBuf2, "allocation failed");
        arrowBuf2.getReferenceManager().release();
      }

      arrowBuf1.getReferenceManager().release();
    }
  }

  @Test
  public void testRootAllocator_closeWithOutstanding() throws Exception {
    assertThrows(
        IllegalStateException.class,
        () -> {
          try {
            try (final RootAllocator rootAllocator = new RootAllocator(MAX_ALLOCATION)) {
              final ArrowBuf arrowBuf = rootAllocator.buffer(512);
              assertNotNull(arrowBuf, "allocation failed");
            }
          } finally {
            /*
             * We expect there to be one unreleased underlying buffer because we're closing
             * without releasing it.
             */
            /*
            // ------------------------------- DEBUG ---------------------------------
            final int bufferCount = UnsafeDirectLittleEndian.getBufferCount();
            UnsafeDirectLittleEndian.releaseBuffers();
            assertEquals(1, bufferCount);
            // ------------------------------- DEBUG ---------------------------------
            */
          }
        });
  }

  @Test
  @Disabled
  public void testRootAllocator_getEmpty() throws Exception {
    try (final RootAllocator rootAllocator = new RootAllocator(MAX_ALLOCATION)) {
      final ArrowBuf arrowBuf = rootAllocator.buffer(0);
      assertNotNull(arrowBuf, "allocation failed");
      assertEquals(0, arrowBuf.capacity(), "capacity was non-zero");
      assertTrue(arrowBuf.memoryAddress() != 0, "address should be valid");
      arrowBuf.getReferenceManager().release();
    }
  }

  @Disabled // TODO(DRILL-2740)
  @Test
  public void testAllocator_unreleasedEmpty() throws Exception {
    assertThrows(
        IllegalStateException.class,
        () -> {
          try (final RootAllocator rootAllocator = new RootAllocator(MAX_ALLOCATION)) {
            @SuppressWarnings("unused")
            final ArrowBuf arrowBuf = rootAllocator.buffer(0);
          }
        });
  }

  @Test
  public void testAllocator_transferOwnership() throws Exception {
    try (final RootAllocator rootAllocator = new RootAllocator(MAX_ALLOCATION)) {
      final BufferAllocator childAllocator1 =
          rootAllocator.newChildAllocator("changeOwnership1", 0, MAX_ALLOCATION);
      final BufferAllocator childAllocator2 =
          rootAllocator.newChildAllocator("changeOwnership2", 0, MAX_ALLOCATION);

      final ArrowBuf arrowBuf1 = childAllocator1.buffer(MAX_ALLOCATION / 4);
      rootAllocator.verify();
      final ReferenceManager referenceManager = arrowBuf1.getReferenceManager();
      OwnershipTransferResult transferOwnership =
          referenceManager.transferOwnership(arrowBuf1, childAllocator2);
      assertEquiv(arrowBuf1, transferOwnership.getTransferredBuffer());
      final boolean allocationFit = transferOwnership.getAllocationFit();
      rootAllocator.verify();
      assertTrue(allocationFit);

      arrowBuf1.getReferenceManager().release();
      childAllocator1.close();
      rootAllocator.verify();

      transferOwnership.getTransferredBuffer().getReferenceManager().release();
      childAllocator2.close();
    }
  }

  static <T> boolean equalsIgnoreOrder(Collection<T> c1, Collection<T> c2) {
    return (c1.size() == c2.size() && c1.containsAll(c2));
  }

  @Test
  public void testAllocator_getParentAndChild() throws Exception {
    try (final RootAllocator rootAllocator = new RootAllocator(MAX_ALLOCATION)) {
      assertNull(rootAllocator.getParentAllocator());

      try (final BufferAllocator childAllocator1 =
          rootAllocator.newChildAllocator("child1", 0, MAX_ALLOCATION)) {
        assertEquals(childAllocator1.getParentAllocator(), rootAllocator);
        assertTrue(
            equalsIgnoreOrder(Arrays.asList(childAllocator1), rootAllocator.getChildAllocators()));

        try (final BufferAllocator childAllocator2 =
            rootAllocator.newChildAllocator("child2", 0, MAX_ALLOCATION)) {
          assertEquals(childAllocator2.getParentAllocator(), rootAllocator);
          assertTrue(
              equalsIgnoreOrder(
                  Arrays.asList(childAllocator1, childAllocator2),
                  rootAllocator.getChildAllocators()));

          try (final BufferAllocator grandChildAllocator =
              childAllocator1.newChildAllocator("grand-child", 0, MAX_ALLOCATION)) {
            assertEquals(grandChildAllocator.getParentAllocator(), childAllocator1);
            assertTrue(
                equalsIgnoreOrder(
                    Arrays.asList(grandChildAllocator), childAllocator1.getChildAllocators()));
          }
        }
      }
    }
  }

  @Test
  public void testAllocator_childRemovedOnClose() throws Exception {
    try (final RootAllocator rootAllocator = new RootAllocator(MAX_ALLOCATION)) {
      try (final BufferAllocator childAllocator1 =
          rootAllocator.newChildAllocator("child1", 0, MAX_ALLOCATION)) {
        try (final BufferAllocator childAllocator2 =
            rootAllocator.newChildAllocator("child2", 0, MAX_ALLOCATION)) {

          // root has two child allocators
          assertTrue(
              equalsIgnoreOrder(
                  Arrays.asList(childAllocator1, childAllocator2),
                  rootAllocator.getChildAllocators()));

          try (final BufferAllocator grandChildAllocator =
              childAllocator1.newChildAllocator("grand-child", 0, MAX_ALLOCATION)) {

            // child1 has one allocator i.e grand-child
            assertTrue(
                equalsIgnoreOrder(
                    Arrays.asList(grandChildAllocator), childAllocator1.getChildAllocators()));
          }

          // grand-child closed
          assertTrue(
              equalsIgnoreOrder(Collections.EMPTY_SET, childAllocator1.getChildAllocators()));
        }
        // root has only one child left
        assertTrue(
            equalsIgnoreOrder(Arrays.asList(childAllocator1), rootAllocator.getChildAllocators()));
      }
      // all child allocators closed.
      assertTrue(equalsIgnoreOrder(Collections.EMPTY_SET, rootAllocator.getChildAllocators()));
    }
  }

  @Test
  public void testAllocator_shareOwnership() throws Exception {
    try (final RootAllocator rootAllocator = new RootAllocator(MAX_ALLOCATION)) {
      final BufferAllocator childAllocator1 =
          rootAllocator.newChildAllocator("shareOwnership1", 0, MAX_ALLOCATION);
      final BufferAllocator childAllocator2 =
          rootAllocator.newChildAllocator("shareOwnership2", 0, MAX_ALLOCATION);
      final ArrowBuf arrowBuf1 = childAllocator1.buffer(MAX_ALLOCATION / 4);
      rootAllocator.verify();

      // share ownership of buffer.
      final ArrowBuf arrowBuf2 = arrowBuf1.getReferenceManager().retain(arrowBuf1, childAllocator2);
      rootAllocator.verify();
      assertNotNull(arrowBuf2);
      assertNotEquals(arrowBuf2, arrowBuf1);
      assertEquiv(arrowBuf1, arrowBuf2);

      // release original buffer (thus transferring ownership to allocator 2. (should leave
      // allocator 1 in empty state)
      arrowBuf1.getReferenceManager().release();
      rootAllocator.verify();
      childAllocator1.close();
      rootAllocator.verify();

      final BufferAllocator childAllocator3 =
          rootAllocator.newChildAllocator("shareOwnership3", 0, MAX_ALLOCATION);
      final ArrowBuf arrowBuf3 = arrowBuf1.getReferenceManager().retain(arrowBuf1, childAllocator3);
      assertNotNull(arrowBuf3);
      assertNotEquals(arrowBuf3, arrowBuf1);
      assertNotEquals(arrowBuf3, arrowBuf2);
      assertEquiv(arrowBuf1, arrowBuf3);
      rootAllocator.verify();

      arrowBuf2.getReferenceManager().release();
      rootAllocator.verify();
      childAllocator2.close();
      rootAllocator.verify();

      arrowBuf3.getReferenceManager().release();
      rootAllocator.verify();
      childAllocator3.close();
    }
  }

  @Test
  public void testRootAllocator_createChildAndUse() throws Exception {
    try (final RootAllocator rootAllocator = new RootAllocator(MAX_ALLOCATION)) {
      try (final BufferAllocator childAllocator =
          rootAllocator.newChildAllocator("createChildAndUse", 0, MAX_ALLOCATION)) {
        final ArrowBuf arrowBuf = childAllocator.buffer(512);
        assertNotNull(arrowBuf, "allocation failed");
        arrowBuf.getReferenceManager().release();
      }
    }
  }

  @Test
  public void testRootAllocator_createChildDontClose() throws Exception {
    assertThrows(
        IllegalStateException.class,
        () -> {
          try {
            try (final RootAllocator rootAllocator = new RootAllocator(MAX_ALLOCATION)) {
              final BufferAllocator childAllocator =
                  rootAllocator.newChildAllocator("createChildDontClose", 0, MAX_ALLOCATION);
              final ArrowBuf arrowBuf = childAllocator.buffer(512);
              assertNotNull(arrowBuf, "allocation failed");
            }
          } finally {
            /*
             * We expect one underlying buffer because we closed a child allocator without
             * releasing the buffer allocated from it.
             */
            /*
            // ------------------------------- DEBUG ---------------------------------
            final int bufferCount = UnsafeDirectLittleEndian.getBufferCount();
            UnsafeDirectLittleEndian.releaseBuffers();
            assertEquals(1, bufferCount);
            // ------------------------------- DEBUG ---------------------------------
            */
          }
        });
  }

  @Test
  public void testSegmentAllocator() {
    RoundingPolicy policy = new SegmentRoundingPolicy(1024L);
    try (RootAllocator allocator =
        new RootAllocator(AllocationListener.NOOP, 1024 * 1024, policy)) {
      ArrowBuf buf = allocator.buffer(798);
      assertEquals(1024, buf.capacity());
      buf.setInt(333, 959);
      assertEquals(959, buf.getInt(333));
      buf.close();

      buf = allocator.buffer(1025);
      assertEquals(2048, buf.capacity());
      buf.setInt(193, 939);
      assertEquals(939, buf.getInt(193));
      buf.close();
    }
  }

  @Test
  public void testSegmentAllocator_childAllocator() {
    RoundingPolicy policy = new SegmentRoundingPolicy(1024L);
    try (RootAllocator allocator = new RootAllocator(AllocationListener.NOOP, 1024 * 1024, policy);
        BufferAllocator childAllocator = allocator.newChildAllocator("child", 0, 512 * 1024)) {

      assertEquals("child", childAllocator.getName());

      ArrowBuf buf = childAllocator.buffer(798);
      assertEquals(1024, buf.capacity());
      buf.setInt(333, 959);
      assertEquals(959, buf.getInt(333));
      buf.close();

      buf = childAllocator.buffer(1025);
      assertEquals(2048, buf.capacity());
      buf.setInt(193, 939);
      assertEquals(939, buf.getInt(193));
      buf.close();
    }
  }

  @Test
  public void testSegmentAllocator_smallSegment() {
    IllegalArgumentException e =
        assertThrows(IllegalArgumentException.class, () -> new SegmentRoundingPolicy(128L));
    assertEquals("The segment size cannot be smaller than 1024", e.getMessage());
  }

  @Test
  public void testSegmentAllocator_segmentSizeNotPowerOf2() {
    IllegalArgumentException e =
        assertThrows(IllegalArgumentException.class, () -> new SegmentRoundingPolicy(4097L));
    assertEquals("The segment size must be a power of 2", e.getMessage());
  }

  @Test
  public void testCustomizedAllocationManager() {
    try (BaseAllocator allocator = createAllocatorWithCustomizedAllocationManager()) {
      final ArrowBuf arrowBuf1 = allocator.buffer(MAX_ALLOCATION);
      assertNotNull(arrowBuf1, "allocation failed");

      arrowBuf1.setInt(0, 1);
      assertEquals(1, arrowBuf1.getInt(0));

      try {
        allocator.buffer(1);
        fail("allocated memory beyond max allowed");
      } catch (OutOfMemoryException e) {
        // expected
      }
      arrowBuf1.getReferenceManager().release();

      try {
        arrowBuf1.getInt(0);
        fail("data read from released buffer");
      } catch (RuntimeException e) {
        // expected
      }
    }
  }

  private BaseAllocator createAllocatorWithCustomizedAllocationManager() {
    return new RootAllocator(
        BaseAllocator.configBuilder()
            .maxAllocation(MAX_ALLOCATION)
            .allocationManagerFactory(
                new AllocationManager.Factory() {
                  @Override
                  public AllocationManager create(
                      BufferAllocator accountingAllocator, long requestedSize) {
                    return new AllocationManager(accountingAllocator) {
                      private final long address = MemoryUtil.allocateMemory(requestedSize);

                      @Override
                      protected long memoryAddress() {
                        return address;
                      }

                      @Override
                      protected void release0() {
                        MemoryUtil.setMemory(address, requestedSize, (byte) 0);
                        MemoryUtil.freeMemory(address);
                      }

                      @Override
                      public long getSize() {
                        return requestedSize;
                      }
                    };
                  }

                  @Override
                  public ArrowBuf empty() {
                    return null;
                  }
                })
            .build());
  }

  @Test
  public void testRootAllocator_listeners() throws Exception {
    CountingAllocationListener listener1 = new CountingAllocationListener();
    assertEquals(0, listener1.getNumPreCalls());
    assertEquals(0, listener1.getNumCalls());
    assertEquals(0, listener1.getNumReleaseCalls());
    assertEquals(0, listener1.getNumChildren());
    assertEquals(0, listener1.getTotalMem());
    CountingAllocationListener listener2 = new CountingAllocationListener();
    assertEquals(0, listener2.getNumPreCalls());
    assertEquals(0, listener2.getNumCalls());
    assertEquals(0, listener2.getNumReleaseCalls());
    assertEquals(0, listener2.getNumChildren());
    assertEquals(0, listener2.getTotalMem());
    // root and first-level child share the first listener
    // second-level and third-level child share the second listener
    try (final RootAllocator rootAllocator = new RootAllocator(listener1, MAX_ALLOCATION)) {
      try (final BufferAllocator c1 = rootAllocator.newChildAllocator("c1", 0, MAX_ALLOCATION)) {
        assertEquals(1, listener1.getNumChildren());
        final ArrowBuf buf1 = c1.buffer(16);
        assertNotNull(buf1, "allocation failed");
        assertEquals(1, listener1.getNumPreCalls());
        assertEquals(1, listener1.getNumCalls());
        assertEquals(0, listener1.getNumReleaseCalls());
        assertEquals(16, listener1.getTotalMem());
        buf1.getReferenceManager().release();
        try (final BufferAllocator c2 = c1.newChildAllocator("c2", listener2, 0, MAX_ALLOCATION)) {
          assertEquals(
              2, listener1.getNumChildren()); // c1 got a new child, so listener1 is notified.
          assertEquals(0, listener2.getNumChildren());
          final ArrowBuf buf2 = c2.buffer(32);
          assertNotNull(buf2, "allocation failed");
          assertEquals(1, listener1.getNumCalls());
          assertEquals(16, listener1.getTotalMem());
          assertEquals(1, listener2.getNumPreCalls());
          assertEquals(1, listener2.getNumCalls());
          assertEquals(0, listener2.getNumReleaseCalls());
          assertEquals(32, listener2.getTotalMem());
          buf2.getReferenceManager().release();
          try (final BufferAllocator c3 = c2.newChildAllocator("c3", 0, MAX_ALLOCATION)) {
            assertEquals(2, listener1.getNumChildren());
            assertEquals(1, listener2.getNumChildren());
            final ArrowBuf buf3 = c3.buffer(64);
            assertNotNull(buf3, "allocation failed");
            assertEquals(1, listener1.getNumPreCalls());
            assertEquals(1, listener1.getNumCalls());
            assertEquals(1, listener1.getNumReleaseCalls());
            assertEquals(16, listener1.getTotalMem());
            assertEquals(2, listener2.getNumPreCalls());
            assertEquals(2, listener2.getNumCalls());
            assertEquals(1, listener2.getNumReleaseCalls());
            assertEquals(32 + 64, listener2.getTotalMem());
            buf3.getReferenceManager().release();
          }
          assertEquals(2, listener1.getNumChildren());
          assertEquals(0, listener2.getNumChildren()); // third-level child removed
        }
        assertEquals(1, listener1.getNumChildren()); // second-level child removed
        assertEquals(0, listener2.getNumChildren());
      }
      assertEquals(0, listener1.getNumChildren()); // first-level child removed

      assertEquals(2, listener2.getNumReleaseCalls());
    }
  }

  @Test
  public void testRootAllocator_listenerAllocationFail() throws Exception {
    CountingAllocationListener listener1 = new CountingAllocationListener();
    assertEquals(0, listener1.getNumCalls());
    assertEquals(0, listener1.getTotalMem());
    // Test attempts to allocate too much from a child whose limit is set to half of the max
    // allocation. The listener's callback triggers, expanding the child allocator's limit, so then
    // the allocation succeeds.
    try (final RootAllocator rootAllocator = new RootAllocator(MAX_ALLOCATION)) {
      try (final BufferAllocator c1 =
          rootAllocator.newChildAllocator("c1", listener1, 0, MAX_ALLOCATION / 2)) {
        try {
          c1.buffer(MAX_ALLOCATION);
          fail("allocated memory beyond max allowed");
        } catch (OutOfMemoryException e) {
          // expected
        }
        assertEquals(0, listener1.getNumCalls());
        assertEquals(0, listener1.getTotalMem());

        listener1.setExpandOnFail(c1, MAX_ALLOCATION);
        ArrowBuf arrowBuf = c1.buffer(MAX_ALLOCATION);
        assertNotNull(arrowBuf, "allocation failed");
        assertEquals(1, listener1.getNumCalls());
        assertEquals(MAX_ALLOCATION, listener1.getTotalMem());
        arrowBuf.getReferenceManager().release();
      }
    }
  }

  private static void allocateAndFree(final BufferAllocator allocator) {
    final ArrowBuf arrowBuf = allocator.buffer(512);
    assertNotNull(arrowBuf, "allocation failed");
    arrowBuf.getReferenceManager().release();

    final ArrowBuf arrowBuf2 = allocator.buffer(MAX_ALLOCATION);
    assertNotNull(arrowBuf2, "allocation failed");
    arrowBuf2.getReferenceManager().release();

    final int nBufs = 8;
    final ArrowBuf[] arrowBufs = new ArrowBuf[nBufs];
    for (int i = 0; i < arrowBufs.length; ++i) {
      ArrowBuf arrowBufi = allocator.buffer(MAX_ALLOCATION / nBufs);
      assertNotNull(arrowBufi, "allocation failed");
      arrowBufs[i] = arrowBufi;
    }
    for (ArrowBuf arrowBufi : arrowBufs) {
      arrowBufi.getReferenceManager().release();
    }
  }

  @Test
  public void testAllocator_manyAllocations() throws Exception {
    try (final RootAllocator rootAllocator = new RootAllocator(MAX_ALLOCATION)) {
      try (final BufferAllocator childAllocator =
          rootAllocator.newChildAllocator("manyAllocations", 0, MAX_ALLOCATION)) {
        allocateAndFree(childAllocator);
      }
    }
  }

  @Test
  public void testAllocator_overAllocate() throws Exception {
    try (final RootAllocator rootAllocator = new RootAllocator(MAX_ALLOCATION)) {
      try (final BufferAllocator childAllocator =
          rootAllocator.newChildAllocator("overAllocate", 0, MAX_ALLOCATION)) {
        allocateAndFree(childAllocator);

        try {
          childAllocator.buffer(MAX_ALLOCATION + 1);
          fail("allocated memory beyond max allowed");
        } catch (OutOfMemoryException e) {
          // expected
        }
      }
    }
  }

  @Test
  public void testAllocator_overAllocateParent() throws Exception {
    try (final RootAllocator rootAllocator = new RootAllocator(MAX_ALLOCATION)) {
      try (final BufferAllocator childAllocator =
          rootAllocator.newChildAllocator("overAllocateParent", 0, MAX_ALLOCATION)) {
        final ArrowBuf arrowBuf1 = rootAllocator.buffer(MAX_ALLOCATION / 2);
        assertNotNull(arrowBuf1, "allocation failed");
        final ArrowBuf arrowBuf2 = childAllocator.buffer(MAX_ALLOCATION / 2);
        assertNotNull(arrowBuf2, "allocation failed");

        try {
          childAllocator.buffer(MAX_ALLOCATION / 4);
          fail("allocated memory beyond max allowed");
        } catch (OutOfMemoryException e) {
          // expected
        }

        arrowBuf1.getReferenceManager().release();
        arrowBuf2.getReferenceManager().release();
      }
    }
  }

  @Test
  public void testAllocator_failureAtParentLimitOutcomeDetails() throws Exception {
    try (final RootAllocator rootAllocator = new RootAllocator(MAX_ALLOCATION)) {
      try (final BufferAllocator childAllocator =
          rootAllocator.newChildAllocator("child", 0, MAX_ALLOCATION / 2)) {
        try (final BufferAllocator grandChildAllocator =
            childAllocator.newChildAllocator("grandchild", MAX_ALLOCATION / 4, MAX_ALLOCATION)) {
          OutOfMemoryException e =
              assertThrows(
                  OutOfMemoryException.class, () -> grandChildAllocator.buffer(MAX_ALLOCATION));
          // expected
          assertTrue(e.getMessage().contains("Unable to allocate buffer"));

          assertTrue(e.getOutcomeDetails().isPresent(), "missing outcome details");
          AllocationOutcomeDetails outcomeDetails = e.getOutcomeDetails().get();

          assertEquals(outcomeDetails.getFailedAllocator(), childAllocator);

          // The order of allocators should be child to root (request propagates to parent if
          // child cannot satisfy the request).
          Iterator<Entry> iterator = outcomeDetails.allocEntries.iterator();
          AllocationOutcomeDetails.Entry first = iterator.next();
          assertEquals(MAX_ALLOCATION / 4, first.getAllocatedSize());
          assertEquals(MAX_ALLOCATION, first.getRequestedSize());
          assertFalse(first.isAllocationFailed());

          AllocationOutcomeDetails.Entry second = iterator.next();
          assertEquals(MAX_ALLOCATION - MAX_ALLOCATION / 4, second.getRequestedSize());
          assertEquals(0, second.getAllocatedSize());
          assertTrue(second.isAllocationFailed());

          assertFalse(iterator.hasNext());
        }
      }
    }
  }

  @Test
  public void testAllocator_failureAtRootLimitOutcomeDetails() throws Exception {
    try (final RootAllocator rootAllocator = new RootAllocator(MAX_ALLOCATION)) {
      try (final BufferAllocator childAllocator =
          rootAllocator.newChildAllocator("child", MAX_ALLOCATION / 2, Long.MAX_VALUE)) {
        try (final BufferAllocator grandChildAllocator =
            childAllocator.newChildAllocator("grandchild", MAX_ALLOCATION / 4, Long.MAX_VALUE)) {
          OutOfMemoryException e =
              assertThrows(
                  OutOfMemoryException.class, () -> grandChildAllocator.buffer(MAX_ALLOCATION * 2));

          assertTrue(e.getMessage().contains("Unable to allocate buffer"));
          assertTrue(e.getOutcomeDetails().isPresent(), "missing outcome details");
          AllocationOutcomeDetails outcomeDetails = e.getOutcomeDetails().get();

          assertEquals(outcomeDetails.getFailedAllocator(), rootAllocator);

          // The order of allocators should be child to root (request propagates to parent if
          // child cannot satisfy the request).
          Iterator<Entry> iterator = outcomeDetails.allocEntries.iterator();
          AllocationOutcomeDetails.Entry first = iterator.next();
          assertEquals(MAX_ALLOCATION / 4, first.getAllocatedSize());
          assertEquals(2 * MAX_ALLOCATION, first.getRequestedSize());
          assertFalse(first.isAllocationFailed());

          AllocationOutcomeDetails.Entry second = iterator.next();
          assertEquals(MAX_ALLOCATION / 4, second.getAllocatedSize());
          assertEquals(2 * MAX_ALLOCATION - MAX_ALLOCATION / 4, second.getRequestedSize());
          assertFalse(second.isAllocationFailed());

          AllocationOutcomeDetails.Entry third = iterator.next();
          assertEquals(0, third.getAllocatedSize());
          assertTrue(third.isAllocationFailed());

          assertFalse(iterator.hasNext());
        }
      }
    }
  }

  private static void testAllocator_sliceUpBufferAndRelease(
      final RootAllocator rootAllocator, final BufferAllocator bufferAllocator) {
    final ArrowBuf arrowBuf1 = bufferAllocator.buffer(MAX_ALLOCATION / 2);
    rootAllocator.verify();

    final ArrowBuf arrowBuf2 = arrowBuf1.slice(16, arrowBuf1.capacity() - 32);
    rootAllocator.verify();
    final ArrowBuf arrowBuf3 = arrowBuf2.slice(16, arrowBuf2.capacity() - 32);
    rootAllocator.verify();
    @SuppressWarnings("unused")
    final ArrowBuf arrowBuf4 = arrowBuf3.slice(16, arrowBuf3.capacity() - 32);
    rootAllocator.verify();

    arrowBuf3
        .getReferenceManager()
        .release(); // since they share refcounts, one is enough to release them all
    rootAllocator.verify();
  }

  @Test
  public void testAllocator_createSlices() throws Exception {
    try (final RootAllocator rootAllocator = new RootAllocator(MAX_ALLOCATION)) {
      testAllocator_sliceUpBufferAndRelease(rootAllocator, rootAllocator);

      try (final BufferAllocator childAllocator =
          rootAllocator.newChildAllocator("createSlices", 0, MAX_ALLOCATION)) {
        testAllocator_sliceUpBufferAndRelease(rootAllocator, childAllocator);
      }
      rootAllocator.verify();

      testAllocator_sliceUpBufferAndRelease(rootAllocator, rootAllocator);

      try (final BufferAllocator childAllocator =
          rootAllocator.newChildAllocator("createSlices", 0, MAX_ALLOCATION)) {
        try (final BufferAllocator childAllocator2 =
            childAllocator.newChildAllocator("createSlices", 0, MAX_ALLOCATION)) {
          final ArrowBuf arrowBuf1 = childAllocator2.buffer(MAX_ALLOCATION / 8);
          @SuppressWarnings("unused")
          final ArrowBuf arrowBuf2 = arrowBuf1.slice(MAX_ALLOCATION / 16, MAX_ALLOCATION / 16);
          testAllocator_sliceUpBufferAndRelease(rootAllocator, childAllocator);
          arrowBuf1.getReferenceManager().release();
          rootAllocator.verify();
        }
        rootAllocator.verify();

        testAllocator_sliceUpBufferAndRelease(rootAllocator, childAllocator);
      }
      rootAllocator.verify();
    }
  }

  @Test
  public void testAllocator_sliceRanges() throws Exception {
    // final AllocatorOwner allocatorOwner = new NamedOwner("sliceRanges");
    try (final RootAllocator rootAllocator = new RootAllocator(MAX_ALLOCATION)) {
      // Populate a buffer with byte values corresponding to their indices.
      final ArrowBuf arrowBuf = rootAllocator.buffer(256);
      assertEquals(256, arrowBuf.capacity());
      assertEquals(0, arrowBuf.readerIndex());
      assertEquals(0, arrowBuf.readableBytes());
      assertEquals(0, arrowBuf.writerIndex());
      assertEquals(256, arrowBuf.writableBytes());

      final ArrowBuf slice3 = arrowBuf.slice();
      assertEquals(0, slice3.readerIndex());
      assertEquals(0, slice3.readableBytes());
      assertEquals(0, slice3.writerIndex());
      // assertEquals(256, slice3.capacity());
      // assertEquals(256, slice3.writableBytes());

      for (int i = 0; i < 256; ++i) {
        arrowBuf.writeByte(i);
      }
      assertEquals(0, arrowBuf.readerIndex());
      assertEquals(256, arrowBuf.readableBytes());
      assertEquals(256, arrowBuf.writerIndex());
      assertEquals(0, arrowBuf.writableBytes());

      final ArrowBuf slice1 = arrowBuf.slice();
      assertEquals(0, slice1.readerIndex());
      assertEquals(256, slice1.readableBytes());
      for (int i = 0; i < 10; ++i) {
        assertEquals(i, slice1.readByte());
      }
      assertEquals(256 - 10, slice1.readableBytes());
      for (int i = 0; i < 256; ++i) {
        assertEquals((byte) i, slice1.getByte(i));
      }

      final ArrowBuf slice2 = arrowBuf.slice(25, 25);
      assertEquals(0, slice2.readerIndex());
      assertEquals(25, slice2.readableBytes());
      for (int i = 25; i < 50; ++i) {
        assertEquals(i, slice2.readByte());
      }

      /*
      for(int i = 256; i > 0; --i) {
        slice3.writeByte(i - 1);
      }
      for(int i = 0; i < 256; ++i) {
        assertEquals(255 - i, slice1.getByte(i));
      }
      */

      arrowBuf.getReferenceManager().release(); // all the derived buffers share this fate
    }
  }

  @Test
  public void testAllocator_slicesOfSlices() throws Exception {
    // final AllocatorOwner allocatorOwner = new NamedOwner("slicesOfSlices");
    try (final RootAllocator rootAllocator = new RootAllocator(MAX_ALLOCATION)) {
      // Populate a buffer with byte values corresponding to their indices.
      final ArrowBuf arrowBuf = rootAllocator.buffer(256);
      for (int i = 0; i < 256; ++i) {
        arrowBuf.writeByte(i);
      }

      // Slice it up.
      final ArrowBuf slice0 = arrowBuf.slice(0, arrowBuf.capacity());
      for (int i = 0; i < 256; ++i) {
        assertEquals((byte) i, arrowBuf.getByte(i));
      }

      final ArrowBuf slice10 = slice0.slice(10, arrowBuf.capacity() - 10);
      for (int i = 10; i < 256; ++i) {
        assertEquals((byte) i, slice10.getByte(i - 10));
      }

      final ArrowBuf slice20 = slice10.slice(10, arrowBuf.capacity() - 20);
      for (int i = 20; i < 256; ++i) {
        assertEquals((byte) i, slice20.getByte(i - 20));
      }

      final ArrowBuf slice30 = slice20.slice(10, arrowBuf.capacity() - 30);
      for (int i = 30; i < 256; ++i) {
        assertEquals((byte) i, slice30.getByte(i - 30));
      }

      arrowBuf.getReferenceManager().release();
    }
  }

  @Test
  public void testAllocator_transferSliced() throws Exception {
    try (final RootAllocator rootAllocator = new RootAllocator(MAX_ALLOCATION)) {
      final BufferAllocator childAllocator1 =
          rootAllocator.newChildAllocator("transferSliced1", 0, MAX_ALLOCATION);
      final BufferAllocator childAllocator2 =
          rootAllocator.newChildAllocator("transferSliced2", 0, MAX_ALLOCATION);

      final ArrowBuf arrowBuf1 = childAllocator1.buffer(MAX_ALLOCATION / 8);
      final ArrowBuf arrowBuf2 = childAllocator2.buffer(MAX_ALLOCATION / 8);

      final ArrowBuf arrowBuf1s = arrowBuf1.slice(0, arrowBuf1.capacity() / 2);
      final ArrowBuf arrowBuf2s = arrowBuf2.slice(0, arrowBuf2.capacity() / 2);

      rootAllocator.verify();

      OwnershipTransferResult result1 =
          arrowBuf2s.getReferenceManager().transferOwnership(arrowBuf2s, childAllocator1);
      assertEquiv(arrowBuf2s, result1.getTransferredBuffer());
      rootAllocator.verify();
      OwnershipTransferResult result2 =
          arrowBuf1s.getReferenceManager().transferOwnership(arrowBuf1s, childAllocator2);
      assertEquiv(arrowBuf1s, result2.getTransferredBuffer());
      rootAllocator.verify();

      result1.getTransferredBuffer().getReferenceManager().release();
      result2.getTransferredBuffer().getReferenceManager().release();

      arrowBuf1s.getReferenceManager().release(); // releases arrowBuf1
      arrowBuf2s.getReferenceManager().release(); // releases arrowBuf2

      childAllocator1.close();
      childAllocator2.close();
    }
  }

  @Test
  public void testAllocator_shareSliced() throws Exception {
    try (final RootAllocator rootAllocator = new RootAllocator(MAX_ALLOCATION)) {
      final BufferAllocator childAllocator1 =
          rootAllocator.newChildAllocator("transferSliced", 0, MAX_ALLOCATION);
      final BufferAllocator childAllocator2 =
          rootAllocator.newChildAllocator("transferSliced", 0, MAX_ALLOCATION);

      final ArrowBuf arrowBuf1 = childAllocator1.buffer(MAX_ALLOCATION / 8);
      final ArrowBuf arrowBuf2 = childAllocator2.buffer(MAX_ALLOCATION / 8);

      final ArrowBuf arrowBuf1s = arrowBuf1.slice(0, arrowBuf1.capacity() / 2);
      final ArrowBuf arrowBuf2s = arrowBuf2.slice(0, arrowBuf2.capacity() / 2);

      rootAllocator.verify();

      final ArrowBuf arrowBuf2s1 =
          arrowBuf2s.getReferenceManager().retain(arrowBuf2s, childAllocator1);
      assertEquiv(arrowBuf2s, arrowBuf2s1);
      final ArrowBuf arrowBuf1s2 =
          arrowBuf1s.getReferenceManager().retain(arrowBuf1s, childAllocator2);
      assertEquiv(arrowBuf1s, arrowBuf1s2);
      rootAllocator.verify();

      arrowBuf1s.getReferenceManager().release(); // releases arrowBuf1
      arrowBuf2s.getReferenceManager().release(); // releases arrowBuf2
      rootAllocator.verify();

      arrowBuf2s1.getReferenceManager().release(); // releases the shared arrowBuf2 slice
      arrowBuf1s2.getReferenceManager().release(); // releases the shared arrowBuf1 slice

      childAllocator1.close();
      childAllocator2.close();
    }
  }

  @Test
  public void testAllocator_transferShared() throws Exception {
    try (final RootAllocator rootAllocator = new RootAllocator(MAX_ALLOCATION)) {
      final BufferAllocator childAllocator1 =
          rootAllocator.newChildAllocator("transferShared1", 0, MAX_ALLOCATION);
      final BufferAllocator childAllocator2 =
          rootAllocator.newChildAllocator("transferShared2", 0, MAX_ALLOCATION);
      final BufferAllocator childAllocator3 =
          rootAllocator.newChildAllocator("transferShared3", 0, MAX_ALLOCATION);

      final ArrowBuf arrowBuf1 = childAllocator1.buffer(MAX_ALLOCATION / 8);

      boolean allocationFit;

      ArrowBuf arrowBuf2 = arrowBuf1.getReferenceManager().retain(arrowBuf1, childAllocator2);
      rootAllocator.verify();
      assertNotNull(arrowBuf2);
      assertNotEquals(arrowBuf2, arrowBuf1);
      assertEquiv(arrowBuf1, arrowBuf2);

      final ReferenceManager refManager1 = arrowBuf1.getReferenceManager();
      final OwnershipTransferResult result1 =
          refManager1.transferOwnership(arrowBuf1, childAllocator3);
      allocationFit = result1.getAllocationFit();
      final ArrowBuf arrowBuf3 = result1.getTransferredBuffer();
      assertTrue(allocationFit);
      assertEquiv(arrowBuf1, arrowBuf3);
      rootAllocator.verify();

      // Since childAllocator3 now has childAllocator1's buffer, 1, can close
      arrowBuf1.getReferenceManager().release();
      childAllocator1.close();
      rootAllocator.verify();

      arrowBuf2.getReferenceManager().release();
      childAllocator2.close();
      rootAllocator.verify();

      final BufferAllocator childAllocator4 =
          rootAllocator.newChildAllocator("transferShared4", 0, MAX_ALLOCATION);
      final ReferenceManager refManager3 = arrowBuf3.getReferenceManager();
      final OwnershipTransferResult result3 =
          refManager3.transferOwnership(arrowBuf3, childAllocator4);
      allocationFit = result3.getAllocationFit();
      final ArrowBuf arrowBuf4 = result3.getTransferredBuffer();
      assertTrue(allocationFit);
      assertEquiv(arrowBuf3, arrowBuf4);
      rootAllocator.verify();

      arrowBuf3.getReferenceManager().release();
      childAllocator3.close();
      rootAllocator.verify();

      arrowBuf4.getReferenceManager().release();
      childAllocator4.close();
      rootAllocator.verify();
    }
  }

  @Test
  public void testAllocator_unclaimedReservation() throws Exception {
    try (final RootAllocator rootAllocator = new RootAllocator(MAX_ALLOCATION)) {
      try (final BufferAllocator childAllocator1 =
          rootAllocator.newChildAllocator("unclaimedReservation", 0, MAX_ALLOCATION)) {
        try (final AllocationReservation reservation = childAllocator1.newReservation()) {
          assertTrue(reservation.add(64L));
        }
        rootAllocator.verify();
      }
    }
  }

  @Test
  public void testAllocator_claimedReservation() throws Exception {
    try (final RootAllocator rootAllocator = new RootAllocator(MAX_ALLOCATION)) {

      try (final BufferAllocator childAllocator1 =
          rootAllocator.newChildAllocator("claimedReservation", 0, MAX_ALLOCATION)) {

        try (final AllocationReservation reservation = childAllocator1.newReservation()) {
          assertTrue(reservation.add(32L));
          assertTrue(reservation.add(32L));

          final ArrowBuf arrowBuf = reservation.allocateBuffer();
          assertEquals(64, arrowBuf.capacity());
          rootAllocator.verify();

          arrowBuf.getReferenceManager().release();
          rootAllocator.verify();
        }
        rootAllocator.verify();
      }
    }
  }

  @Test
  public void testInitReservationAndLimit() throws Exception {
    try (final RootAllocator rootAllocator = new RootAllocator(MAX_ALLOCATION)) {
      try (final BufferAllocator childAllocator =
          rootAllocator.newChildAllocator("child", 2048, 4096)) {
        assertEquals(2048, childAllocator.getInitReservation());
        assertEquals(4096, childAllocator.getLimit());
      }
    }
  }

  @Test
  public void multiple() throws Exception {
    final String owner = "test";
    try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE)) {

      final int op = 100000;

      BufferAllocator frag1 = allocator.newChildAllocator(owner, 1500000, Long.MAX_VALUE);
      BufferAllocator frag2 = allocator.newChildAllocator(owner, 500000, Long.MAX_VALUE);

      allocator.verify();

      BufferAllocator allocator11 = frag1.newChildAllocator(owner, op, Long.MAX_VALUE);
      ArrowBuf b11 = allocator11.buffer(1000000);

      allocator.verify();

      BufferAllocator allocator12 = frag1.newChildAllocator(owner, op, Long.MAX_VALUE);
      ArrowBuf b12 = allocator12.buffer(500000);

      allocator.verify();

      BufferAllocator allocator21 = frag1.newChildAllocator(owner, op, Long.MAX_VALUE);

      allocator.verify();

      BufferAllocator allocator22 = frag2.newChildAllocator(owner, op, Long.MAX_VALUE);
      ArrowBuf b22 = allocator22.buffer(2000000);

      allocator.verify();

      BufferAllocator frag3 = allocator.newChildAllocator(owner, 1000000, Long.MAX_VALUE);

      allocator.verify();

      BufferAllocator allocator31 = frag3.newChildAllocator(owner, op, Long.MAX_VALUE);
      ArrowBuf b31a = allocator31.buffer(200000);

      allocator.verify();

      // Previously running operator completes
      b22.getReferenceManager().release();

      allocator.verify();

      allocator22.close();

      b31a.getReferenceManager().release();
      allocator31.close();

      b12.getReferenceManager().release();
      allocator12.close();

      allocator21.close();

      b11.getReferenceManager().release();
      allocator11.close();

      frag1.close();
      frag2.close();
      frag3.close();
    }
  }

  // This test needs to run in non-debug mode. So disabling the assertion status through class
  // loader for this.
  // The test passes if run individually with -Dtest=TestBaseAllocator#testMemoryLeakWithReservation
  // but fails generally since the assertion status cannot be changed once the class is initialized.
  // So setting the test to @ignore
  @Test
  @Disabled
  public void testMemoryLeakWithReservation() throws Exception {
    // disabling assertion status
    AssertionUtil.class
        .getClassLoader()
        .setClassAssertionStatus(AssertionUtil.class.getName(), false);

    try (RootAllocator rootAllocator = new RootAllocator(MAX_ALLOCATION)) {
      ChildAllocator childAllocator1 =
          (ChildAllocator) rootAllocator.newChildAllocator("child1", 1024, MAX_ALLOCATION);
      rootAllocator.verify();

      ChildAllocator childAllocator2 =
          (ChildAllocator) childAllocator1.newChildAllocator("child2", 1024, MAX_ALLOCATION);
      rootAllocator.verify();

      childAllocator2.buffer(256);

      Exception exception = assertThrows(IllegalStateException.class, childAllocator2::close);
      String exMessage = exception.getMessage();
      assertTrue(exMessage.contains("Memory leaked: (256)"));

      exception = assertThrows(IllegalStateException.class, childAllocator1::close);
      exMessage = exception.getMessage();
      assertTrue(exMessage.contains("Memory leaked: (256)"));
    }
  }

  @Test
  public void testOverlimit() {
    try (BufferAllocator allocator = new RootAllocator(1024)) {
      try (BufferAllocator child1 = allocator.newChildAllocator("ChildA", 0, 1024);
          BufferAllocator child2 = allocator.newChildAllocator("ChildB", 1024, 1024)) {
        assertThrows(
            OutOfMemoryException.class,
            () -> {
              ArrowBuf buf1 = child1.buffer(8);
              buf1.close();
            });
        assertEquals(0, child1.getAllocatedMemory());
        assertEquals(0, child2.getAllocatedMemory());
        assertEquals(1024, allocator.getAllocatedMemory());
      }
    }
  }

  @Test
  public void testOverlimitOverflow() {
    // Regression test for https://github.com/apache/arrow/issues/35960
    try (BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE)) {
      try (BufferAllocator child1 = allocator.newChildAllocator("ChildA", 0, Long.MAX_VALUE);
          BufferAllocator child2 =
              allocator.newChildAllocator("ChildB", Long.MAX_VALUE, Long.MAX_VALUE)) {
        assertThrows(
            OutOfMemoryException.class,
            () -> {
              ArrowBuf buf1 = child1.buffer(1024);
              buf1.close();
            });
        assertEquals(0, child1.getAllocatedMemory());
        assertEquals(0, child2.getAllocatedMemory());
        assertEquals(Long.MAX_VALUE, allocator.getAllocatedMemory());
      }
    }
  }

  public void assertEquiv(ArrowBuf origBuf, ArrowBuf newBuf) {
    assertEquals(origBuf.readerIndex(), newBuf.readerIndex());
    assertEquals(origBuf.writerIndex(), newBuf.writerIndex());
  }
}