TestNettyAllocator.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.arrow.memory.netty;

import static org.junit.jupiter.api.Assertions.assertTrue;

import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.Logger;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.read.ListAppender;
import io.netty.buffer.PooledByteBufAllocatorL;
import java.util.Collections;
import java.util.stream.Collectors;
import org.apache.arrow.memory.ArrowBuf;
import org.apache.arrow.memory.ReferenceManager;
import org.junit.jupiter.api.Test;
import org.slf4j.LoggerFactory;

/** Test netty allocators. */
public class TestNettyAllocator {

  @Test
  @SuppressWarnings("SynchronizeOnNonFinalField")
  public void testMemoryUsage() {
    ListAppender<ILoggingEvent> memoryLogsAppender = new ListAppender<>();
    memoryLogsAppender.list = Collections.synchronizedList(memoryLogsAppender.list);
    Logger logger = (Logger) LoggerFactory.getLogger("arrow.allocator");
    try {
      logger.setLevel(Level.TRACE);
      logger.addAppender(memoryLogsAppender);
      memoryLogsAppender.start();
      try (ArrowBuf buf =
          new ArrowBuf(
              ReferenceManager.NO_OP,
              null,
              1024,
              new PooledByteBufAllocatorL().empty.memoryAddress())) {
        buf.memoryAddress();
      }
      boolean result = false;
      long startTime = System.currentTimeMillis();
      while ((System.currentTimeMillis() - startTime)
          < 10000) { // 10 seconds maximum for time to read logs
        // Lock on the list backing the appender since a background thread might try to add more
        // logs
        // while stream() is iterating over list elements. This would throw a flakey
        // ConcurrentModificationException.
        synchronized (memoryLogsAppender.list) {
          result =
              memoryLogsAppender.list.stream()
                  .anyMatch(
                      log ->
                          log.toString().contains("Memory Usage: \n")
                              && log.toString().contains("Large buffers outstanding: ")
                              && log.toString().contains("Normal buffers outstanding: ")
                              && log.getLevel().equals(Level.TRACE));
        }
        if (result) {
          break;
        }
      }
      synchronized (memoryLogsAppender.list) {
        assertTrue(
            result,
            "Log messages are:\n"
                + memoryLogsAppender.list.stream()
                    .map(ILoggingEvent::toString)
                    .collect(Collectors.joining("\n")));
      }

    } finally {
      memoryLogsAppender.stop();
      logger.detachAppender(memoryLogsAppender);
      logger.setLevel(null);
    }
  }
}