IntegrationTest.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.arrow.flight.integration.tests;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.arrow.flight.FlightClient;
import org.apache.arrow.flight.FlightServer;
import org.apache.arrow.flight.Location;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.junit.jupiter.api.Test;

/** Run the integration test scenarios in-process. */
class IntegrationTest {
  @Test
  void authBasicProto() throws Exception {
    testScenario("auth:basic_proto");
  }

  @Test
  void expirationTimeCancelFlightInfo() throws Exception {
    testScenario("expiration_time:cancel_flight_info");
  }

  @Test
  void expirationTimeDoGet() throws Exception {
    testScenario("expiration_time:do_get");
  }

  @Test
  void expirationTimeListActions() throws Exception {
    testScenario("expiration_time:list_actions");
  }

  @Test
  void expirationTimeRenewFlightEndpoint() throws Exception {
    testScenario("expiration_time:renew_flight_endpoint");
  }

  @Test
  void locationReuseConnection() throws Exception {
    testScenario("location:reuse_connection");
  }

  @Test
  void middleware() throws Exception {
    testScenario("middleware");
  }

  @Test
  void ordered() throws Exception {
    testScenario("ordered");
  }

  @Test
  void pollFlightInfo() throws Exception {
    testScenario("poll_flight_info");
  }

  @Test
  void flightSql() throws Exception {
    testScenario("flight_sql");
  }

  @Test
  void flightSqlExtension() throws Exception {
    testScenario("flight_sql:extension");
  }

  @Test
  void flightSqlIngestion() throws Exception {
    testScenario("flight_sql:ingestion");
  }

  @Test
  void appMetadataFlightInfoEndpoint() throws Exception {
    testScenario("app_metadata_flight_info_endpoint");
  }

  @Test
  void sessionOptions() throws Exception {
    testScenario("session_options");
  }

  @Test
  void doExchangeEcho() throws Exception {
    testScenario("do_exchange:echo");
  }

  void testScenario(String scenarioName) throws Exception {
    TestBufferAllocationListener listener = new TestBufferAllocationListener();
    try (final BufferAllocator allocator = new RootAllocator(listener, Long.MAX_VALUE)) {
      final ExecutorService exec =
          Executors.newCachedThreadPool(
              new ThreadFactoryBuilder()
                  .setNameFormat("integration-test-flight-server-executor-%d")
                  .build());
      final FlightServer.Builder builder =
          FlightServer.builder()
              .executor(exec)
              .allocator(allocator)
              .location(Location.forGrpcInsecure("0.0.0.0", 0));
      final Scenario scenario = Scenarios.getScenario(scenarioName);
      scenario.buildServer(builder);
      builder.producer(scenario.producer(allocator, Location.forGrpcInsecure("0.0.0.0", 0)));

      try (final FlightServer server = builder.build()) {
        server.start();

        final Location location = Location.forGrpcInsecure("localhost", server.getPort());
        try (final FlightClient client = FlightClient.builder(allocator, location).build()) {
          scenario.client(allocator, location, client);
        }
      }

      // Shutdown the executor while allowing existing tasks to finish.
      // Without this wait, allocator.close() may get invoked earlier than an executor thread may
      // have finished freeing up resources
      // In that case, allocator.close() can throw an IllegalStateException for memory leak, leading
      // to flaky tests
      exec.shutdown();
      final boolean unused = exec.awaitTermination(3, TimeUnit.SECONDS);
    } catch (IllegalStateException e) {
      // this could be due to Allocator detecting memory leak. Add allocation trail to help debug
      listener.reThrowWithAddedAllocatorInfo(e);
    }
  }
}