TestFlightGrpcUtils.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.arrow.flight;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

import com.google.protobuf.Empty;
import io.grpc.BindableService;
import io.grpc.ConnectivityState;
import io.grpc.ManagedChannel;
import io.grpc.Server;
import io.grpc.inprocess.InProcessChannelBuilder;
import io.grpc.inprocess.InProcessServerBuilder;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.arrow.flight.auth.ServerAuthHandler;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/** Unit test which adds 2 services to same server end point. */
public class TestFlightGrpcUtils {
  private Server server;
  private BufferAllocator allocator;
  private String serverName;

  @BeforeEach
  public void setup() throws IOException {
    // Defines flight service
    allocator = new RootAllocator(Integer.MAX_VALUE);
    final NoOpFlightProducer producer = new NoOpFlightProducer();
    final ServerAuthHandler authHandler = ServerAuthHandler.NO_OP;
    final ExecutorService exec = Executors.newCachedThreadPool();
    final BindableService flightBindingService =
        FlightGrpcUtils.createFlightService(allocator, producer, authHandler, exec);

    // initializes server with 2 services - FlightBindingService & TestService
    serverName = InProcessServerBuilder.generateName();
    server =
        InProcessServerBuilder.forName(serverName)
            .directExecutor()
            .addService(flightBindingService)
            .addService(new TestServiceAdapter())
            .build();
    server.start();
  }

  @AfterEach
  public void cleanup() {
    server.shutdownNow();
  }

  /**
   * This test checks if multiple gRPC services can be added to the same server endpoint and if they
   * can be used by different clients via the same channel.
   *
   * @throws IOException If server fails to start.
   */
  @Test
  public void testMultipleGrpcServices() throws IOException {
    // Initializes channel so that multiple clients can communicate with server
    final ManagedChannel managedChannel =
        InProcessChannelBuilder.forName(serverName).directExecutor().build();

    // Defines flight client and calls service method. Since we use a NoOpFlightProducer we expect
    // the service
    // to throw a RunTimeException
    final FlightClient flightClient = FlightGrpcUtils.createFlightClient(allocator, managedChannel);
    final Iterable<ActionType> actionTypes = flightClient.listActions();
    assertThrows(
        FlightRuntimeException.class,
        () -> actionTypes.forEach(actionType -> System.out.println(actionType.toString())));

    // Define Test client as a blocking stub and call test method which correctly returns an empty
    // protobuf object
    final TestServiceGrpc.TestServiceBlockingStub blockingStub =
        TestServiceGrpc.newBlockingStub(managedChannel);
    assertEquals(Empty.newBuilder().build(), blockingStub.test(Empty.newBuilder().build()));
  }

  @Test
  public void testShutdown() throws IOException, InterruptedException {
    // Initializes channel so that multiple clients can communicate with server
    final ManagedChannel managedChannel =
        InProcessChannelBuilder.forName(serverName).directExecutor().build();

    // Defines flight client and calls service method. Since we use a NoOpFlightProducer we expect
    // the service
    // to throw a RunTimeException
    final FlightClient flightClient =
        FlightGrpcUtils.createFlightClientWithSharedChannel(allocator, managedChannel);

    // Should be a no-op.
    flightClient.close();
    assertFalse(managedChannel.isShutdown());
    assertFalse(managedChannel.isTerminated());
    assertEquals(ConnectivityState.IDLE, managedChannel.getState(false));
    managedChannel.shutdownNow();
  }

  @Test
  public void testProxyChannel() throws IOException, InterruptedException {
    // Initializes channel so that multiple clients can communicate with server
    final ManagedChannel managedChannel =
        InProcessChannelBuilder.forName(serverName).directExecutor().build();

    final FlightGrpcUtils.NonClosingProxyManagedChannel proxyChannel =
        new FlightGrpcUtils.NonClosingProxyManagedChannel(managedChannel);
    assertFalse(proxyChannel.isShutdown());
    assertFalse(proxyChannel.isTerminated());
    proxyChannel.shutdown();
    assertTrue(proxyChannel.isShutdown());
    assertTrue(proxyChannel.isTerminated());
    assertEquals(ConnectivityState.SHUTDOWN, proxyChannel.getState(false));
    try {
      proxyChannel.newCall(null, null);
      fail();
    } catch (IllegalStateException e) {
      // This is expected, since the proxy channel is shut down.
    }

    assertFalse(managedChannel.isShutdown());
    assertFalse(managedChannel.isTerminated());
    assertEquals(ConnectivityState.IDLE, managedChannel.getState(false));

    managedChannel.shutdownNow();
  }

  @Test
  public void testProxyChannelWithClosedChannel() throws IOException, InterruptedException {
    // Initializes channel so that multiple clients can communicate with server
    final ManagedChannel managedChannel =
        InProcessChannelBuilder.forName(serverName).directExecutor().build();

    final FlightGrpcUtils.NonClosingProxyManagedChannel proxyChannel =
        new FlightGrpcUtils.NonClosingProxyManagedChannel(managedChannel);
    assertFalse(proxyChannel.isShutdown());
    assertFalse(proxyChannel.isTerminated());
    managedChannel.shutdownNow();
    assertTrue(proxyChannel.isShutdown());
    assertTrue(proxyChannel.isTerminated());
    assertEquals(ConnectivityState.SHUTDOWN, proxyChannel.getState(false));
    try {
      proxyChannel.newCall(null, null);
      fail();
    } catch (IllegalStateException e) {
      // This is expected, since the proxy channel is shut down.
    }

    assertTrue(managedChannel.isShutdown());
    assertTrue(managedChannel.isTerminated());
    assertEquals(ConnectivityState.SHUTDOWN, managedChannel.getState(false));
  }

  /** Private class used for testing purposes that overrides service behavior. */
  private static class TestServiceAdapter extends TestServiceGrpc.TestServiceImplBase {

    /**
     * gRPC service that receives an empty object & returns and empty protobuf object.
     *
     * @param request google.protobuf.Empty
     * @param responseObserver google.protobuf.Empty
     */
    @Override
    public void test(Empty request, StreamObserver<Empty> responseObserver) {
      responseObserver.onNext(Empty.newBuilder().build());
      responseObserver.onCompleted();
    }
  }
}