TestFlightSql.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.arrow.flight.sql.test;

import static java.util.Arrays.asList;
import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
import static org.apache.arrow.flight.sql.util.FlightStreamUtils.getResults;
import static org.apache.arrow.util.AutoCloseables.close;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertAll;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;

import com.google.common.collect.ImmutableList;
import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.nio.charset.StandardCharsets;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.IntStream;
import org.apache.arrow.flight.CancelFlightInfoRequest;
import org.apache.arrow.flight.FlightClient;
import org.apache.arrow.flight.FlightInfo;
import org.apache.arrow.flight.FlightRuntimeException;
import org.apache.arrow.flight.FlightServer;
import org.apache.arrow.flight.FlightStatusCode;
import org.apache.arrow.flight.FlightStream;
import org.apache.arrow.flight.Location;
import org.apache.arrow.flight.RenewFlightEndpointRequest;
import org.apache.arrow.flight.sql.FlightSqlClient;
import org.apache.arrow.flight.sql.FlightSqlClient.PreparedStatement;
import org.apache.arrow.flight.sql.FlightSqlColumnMetadata;
import org.apache.arrow.flight.sql.FlightSqlProducer;
import org.apache.arrow.flight.sql.example.FlightSqlExample;
import org.apache.arrow.flight.sql.impl.FlightSql;
import org.apache.arrow.flight.sql.impl.FlightSql.CommandStatementIngest.TableDefinitionOptions;
import org.apache.arrow.flight.sql.impl.FlightSql.CommandStatementIngest.TableDefinitionOptions.TableExistsOption;
import org.apache.arrow.flight.sql.impl.FlightSql.CommandStatementIngest.TableDefinitionOptions.TableNotExistOption;
import org.apache.arrow.flight.sql.impl.FlightSql.SqlSupportedCaseSensitivity;
import org.apache.arrow.flight.sql.util.TableRef;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.ipc.ArrowStreamReader;
import org.apache.arrow.vector.ipc.ArrowStreamWriter;
import org.apache.arrow.vector.types.Types.MinorType;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.FieldType;
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.arrow.vector.util.Text;
import org.apache.arrow.vector.util.VectorBatchAppender;
import org.assertj.core.api.Condition;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.function.Executable;

/** Test direct usage of Flight SQL workflows. */
public class TestFlightSql {

  protected static final Schema SCHEMA_INT_TABLE =
      new Schema(
          asList(
              new Field("ID", new FieldType(false, MinorType.INT.getType(), null), null),
              Field.nullable("KEYNAME", MinorType.VARCHAR.getType()),
              Field.nullable("VALUE", MinorType.INT.getType()),
              Field.nullable("FOREIGNID", MinorType.INT.getType())));
  private static final List<List<String>> EXPECTED_RESULTS_FOR_STAR_SELECT_QUERY =
      ImmutableList.of(
          asList("1", "one", "1", "1"),
          asList("2", "zero", "0", "1"),
          asList("3", "negative one", "-1", "1"));
  protected static final List<List<String>> EXPECTED_RESULTS_FOR_PARAMETER_BINDING =
      ImmutableList.of(asList("1", "one", "1", "1"));
  private static final Map<String, String> GET_SQL_INFO_EXPECTED_RESULTS_MAP =
      new LinkedHashMap<>();
  protected static final String LOCALHOST = "localhost";
  protected static BufferAllocator allocator;
  protected static FlightServer server;
  protected static FlightSqlClient sqlClient;

  private static void populateNext10RowsInIngestRootBatch(
      int startRowNumber,
      IntVector valueVector,
      VarCharVector keyNameVector,
      IntVector foreignIdVector,
      VarCharVector keyNamesToBeDeletedVector,
      VectorSchemaRoot ingestRoot) {

    final int NumRowsInBatch = 10;

    valueVector.reset();
    keyNameVector.reset();
    foreignIdVector.reset();

    final IntStream range = IntStream.range(1, NumRowsInBatch);

    range.forEach(
        i -> {
          valueVector.setSafe(i - 1, (i + startRowNumber - 1) * NumRowsInBatch);
          keyNameVector.setSafe(i - 1, new Text("value" + (i + startRowNumber - 1)));
          foreignIdVector.setSafe(i - 1, 1);
        });
    // put some comma and double-quote containing string as well
    valueVector.setSafe(NumRowsInBatch - 1, (NumRowsInBatch + startRowNumber - 1) * NumRowsInBatch);
    keyNameVector.setSafe(
        NumRowsInBatch - 1,
        new Text(
            String.format(
                "value%d, is \"%d\"",
                (NumRowsInBatch + startRowNumber - 1),
                (NumRowsInBatch + startRowNumber - 1) * NumRowsInBatch)));
    foreignIdVector.setSafe(NumRowsInBatch - 1, 1);
    ingestRoot.setRowCount(NumRowsInBatch);

    VectorBatchAppender.batchAppend(keyNamesToBeDeletedVector, keyNameVector);
  }

  @BeforeAll
  public static void setUp() throws Exception {
    setUpClientServer();
    setUpExpectedResultsMap();
  }

  private static void setUpClientServer() throws Exception {
    allocator = new RootAllocator(Integer.MAX_VALUE);

    final Location serverLocation = Location.forGrpcInsecure(LOCALHOST, 0);
    server =
        FlightServer.builder(
                allocator,
                serverLocation,
                new FlightSqlExample(serverLocation, FlightSqlExample.DB_NAME))
            .build()
            .start();

    final Location clientLocation = Location.forGrpcInsecure(LOCALHOST, server.getPort());
    sqlClient = new FlightSqlClient(FlightClient.builder(allocator, clientLocation).build());
  }

  protected static void setUpExpectedResultsMap() {
    GET_SQL_INFO_EXPECTED_RESULTS_MAP.put(
        Integer.toString(FlightSql.SqlInfo.FLIGHT_SQL_SERVER_NAME_VALUE), "Apache Derby");
    GET_SQL_INFO_EXPECTED_RESULTS_MAP.put(
        Integer.toString(FlightSql.SqlInfo.FLIGHT_SQL_SERVER_VERSION_VALUE),
        "10.15.2.0 - (1873585)");
    GET_SQL_INFO_EXPECTED_RESULTS_MAP.put(
        Integer.toString(FlightSql.SqlInfo.FLIGHT_SQL_SERVER_ARROW_VERSION_VALUE),
        "10.15.2.0 - (1873585)");
    GET_SQL_INFO_EXPECTED_RESULTS_MAP.put(
        Integer.toString(FlightSql.SqlInfo.FLIGHT_SQL_SERVER_READ_ONLY_VALUE), "false");
    GET_SQL_INFO_EXPECTED_RESULTS_MAP.put(
        Integer.toString(FlightSql.SqlInfo.SQL_ALL_TABLES_ARE_SELECTABLE_VALUE), "true");
    GET_SQL_INFO_EXPECTED_RESULTS_MAP.put(
        Integer.toString(FlightSql.SqlInfo.SQL_NULL_ORDERING_VALUE),
        Integer.toString(FlightSql.SqlNullOrdering.SQL_NULLS_SORTED_AT_END_VALUE));
    GET_SQL_INFO_EXPECTED_RESULTS_MAP.put(
        Integer.toString(FlightSql.SqlInfo.SQL_DDL_CATALOG_VALUE), "false");
    GET_SQL_INFO_EXPECTED_RESULTS_MAP.put(
        Integer.toString(FlightSql.SqlInfo.SQL_DDL_SCHEMA_VALUE), "true");
    GET_SQL_INFO_EXPECTED_RESULTS_MAP.put(
        Integer.toString(FlightSql.SqlInfo.SQL_DDL_TABLE_VALUE), "true");
    GET_SQL_INFO_EXPECTED_RESULTS_MAP.put(
        Integer.toString(FlightSql.SqlInfo.SQL_IDENTIFIER_CASE_VALUE),
        Integer.toString(SqlSupportedCaseSensitivity.SQL_CASE_SENSITIVITY_UPPERCASE_VALUE));
    GET_SQL_INFO_EXPECTED_RESULTS_MAP.put(
        Integer.toString(FlightSql.SqlInfo.SQL_IDENTIFIER_QUOTE_CHAR_VALUE), "\"");
    GET_SQL_INFO_EXPECTED_RESULTS_MAP.put(
        Integer.toString(FlightSql.SqlInfo.SQL_QUOTED_IDENTIFIER_CASE_VALUE),
        Integer.toString(SqlSupportedCaseSensitivity.SQL_CASE_SENSITIVITY_CASE_INSENSITIVE_VALUE));
    GET_SQL_INFO_EXPECTED_RESULTS_MAP.put(
        Integer.toString(FlightSql.SqlInfo.SQL_MAX_COLUMNS_IN_TABLE_VALUE), "42");
  }

  @AfterAll
  public static void tearDown() throws Exception {
    close(sqlClient, server, allocator);
    FlightSqlExample.removeDerbyDatabaseIfExists(FlightSqlExample.DB_NAME);
  }

  private static List<List<String>> getNonConformingResultsForGetSqlInfo(
      final List<? extends List<String>> results) {
    return getNonConformingResultsForGetSqlInfo(
        results,
        FlightSql.SqlInfo.FLIGHT_SQL_SERVER_NAME,
        FlightSql.SqlInfo.FLIGHT_SQL_SERVER_VERSION,
        FlightSql.SqlInfo.FLIGHT_SQL_SERVER_ARROW_VERSION,
        FlightSql.SqlInfo.FLIGHT_SQL_SERVER_READ_ONLY,
        FlightSql.SqlInfo.SQL_ALL_TABLES_ARE_SELECTABLE,
        FlightSql.SqlInfo.SQL_NULL_ORDERING,
        FlightSql.SqlInfo.SQL_DDL_CATALOG,
        FlightSql.SqlInfo.SQL_DDL_SCHEMA,
        FlightSql.SqlInfo.SQL_DDL_TABLE,
        FlightSql.SqlInfo.SQL_IDENTIFIER_CASE,
        FlightSql.SqlInfo.SQL_IDENTIFIER_QUOTE_CHAR,
        FlightSql.SqlInfo.SQL_QUOTED_IDENTIFIER_CASE,
        FlightSql.SqlInfo.SQL_MAX_COLUMNS_IN_TABLE);
  }

  private static List<List<String>> getNonConformingResultsForGetSqlInfo(
      final List<? extends List<String>> results, final FlightSql.SqlInfo... args) {
    final List<List<String>> nonConformingResults = new ArrayList<>();
    if (results.size() == args.length) {
      for (int index = 0; index < results.size(); index++) {
        final List<String> result = results.get(index);
        final String providedName = result.get(0);
        final String expectedName = Integer.toString(args[index].getNumber());
        System.err.println(expectedName);
        if (!(GET_SQL_INFO_EXPECTED_RESULTS_MAP.get(providedName).equals(result.get(1))
            && providedName.equals(expectedName))) {
          nonConformingResults.add(result);
          break;
        }
      }
    }
    return nonConformingResults;
  }

  @Test
  public void testGetTablesSchema() {
    final FlightInfo info = sqlClient.getTables(null, null, null, null, true);
    assertThat(info.getSchemaOptional())
        .isEqualTo(Optional.of(FlightSqlProducer.Schemas.GET_TABLES_SCHEMA));
  }

  @Test
  public void testGetTablesSchemaExcludeSchema() {
    final FlightInfo info = sqlClient.getTables(null, null, null, null, false);
    assertThat(info.getSchemaOptional())
        .isEqualTo(Optional.of(FlightSqlProducer.Schemas.GET_TABLES_SCHEMA_NO_SCHEMA));
  }

  @Test
  public void testGetTablesResultNoSchema() throws Exception {
    try (final FlightStream stream =
        sqlClient.getStream(
            sqlClient.getTables(null, null, null, null, false).getEndpoints().get(0).getTicket())) {
      assertAll(
          () -> {
            assertThat(stream.getSchema())
                .isEqualTo(FlightSqlProducer.Schemas.GET_TABLES_SCHEMA_NO_SCHEMA);
          },
          () -> {
            final List<List<String>> results = getResults(stream);
            final List<List<String>> expectedResults =
                ImmutableList.of(
                    // catalog_name | schema_name | table_name | table_type | table_schema
                    asList(null /* TODO No catalog yet */, "SYS", "SYSALIASES", "SYSTEM TABLE"),
                    asList(null /* TODO No catalog yet */, "SYS", "SYSCHECKS", "SYSTEM TABLE"),
                    asList(null /* TODO No catalog yet */, "SYS", "SYSCOLPERMS", "SYSTEM TABLE"),
                    asList(null /* TODO No catalog yet */, "SYS", "SYSCOLUMNS", "SYSTEM TABLE"),
                    asList(
                        null /* TODO No catalog yet */, "SYS", "SYSCONGLOMERATES", "SYSTEM TABLE"),
                    asList(null /* TODO No catalog yet */, "SYS", "SYSCONSTRAINTS", "SYSTEM TABLE"),
                    asList(null /* TODO No catalog yet */, "SYS", "SYSDEPENDS", "SYSTEM TABLE"),
                    asList(null /* TODO No catalog yet */, "SYS", "SYSFILES", "SYSTEM TABLE"),
                    asList(null /* TODO No catalog yet */, "SYS", "SYSFOREIGNKEYS", "SYSTEM TABLE"),
                    asList(null /* TODO No catalog yet */, "SYS", "SYSKEYS", "SYSTEM TABLE"),
                    asList(null /* TODO No catalog yet */, "SYS", "SYSPERMS", "SYSTEM TABLE"),
                    asList(null /* TODO No catalog yet */, "SYS", "SYSROLES", "SYSTEM TABLE"),
                    asList(
                        null /* TODO No catalog yet */, "SYS", "SYSROUTINEPERMS", "SYSTEM TABLE"),
                    asList(null /* TODO No catalog yet */, "SYS", "SYSSCHEMAS", "SYSTEM TABLE"),
                    asList(null /* TODO No catalog yet */, "SYS", "SYSSEQUENCES", "SYSTEM TABLE"),
                    asList(null /* TODO No catalog yet */, "SYS", "SYSSTATEMENTS", "SYSTEM TABLE"),
                    asList(null /* TODO No catalog yet */, "SYS", "SYSSTATISTICS", "SYSTEM TABLE"),
                    asList(null /* TODO No catalog yet */, "SYS", "SYSTABLEPERMS", "SYSTEM TABLE"),
                    asList(null /* TODO No catalog yet */, "SYS", "SYSTABLES", "SYSTEM TABLE"),
                    asList(null /* TODO No catalog yet */, "SYS", "SYSTRIGGERS", "SYSTEM TABLE"),
                    asList(null /* TODO No catalog yet */, "SYS", "SYSUSERS", "SYSTEM TABLE"),
                    asList(null /* TODO No catalog yet */, "SYS", "SYSVIEWS", "SYSTEM TABLE"),
                    asList(null /* TODO No catalog yet */, "SYSIBM", "SYSDUMMY1", "SYSTEM TABLE"),
                    asList(null /* TODO No catalog yet */, "APP", "FOREIGNTABLE", "TABLE"),
                    asList(null /* TODO No catalog yet */, "APP", "INTTABLE", "TABLE"));
            assertThat(results).isEqualTo(expectedResults);
          });
    }
  }

  @Test
  public void testGetTablesResultFilteredNoSchema() throws Exception {
    try (final FlightStream stream =
        sqlClient.getStream(
            sqlClient
                .getTables(null, null, null, singletonList("TABLE"), false)
                .getEndpoints()
                .get(0)
                .getTicket())) {

      assertAll(
          () ->
              assertThat(stream.getSchema())
                  .isEqualTo(FlightSqlProducer.Schemas.GET_TABLES_SCHEMA_NO_SCHEMA),
          () -> {
            final List<List<String>> results = getResults(stream);
            final List<List<String>> expectedResults =
                ImmutableList.of(
                    // catalog_name | schema_name | table_name | table_type | table_schema
                    asList(null /* TODO No catalog yet */, "APP", "FOREIGNTABLE", "TABLE"),
                    asList(null /* TODO No catalog yet */, "APP", "INTTABLE", "TABLE"));
            assertThat(results).isEqualTo(expectedResults);
          });
    }
  }

  @Test
  public void testGetTablesResultFilteredWithSchema() throws Exception {
    try (final FlightStream stream =
        sqlClient.getStream(
            sqlClient
                .getTables(null, null, null, singletonList("TABLE"), true)
                .getEndpoints()
                .get(0)
                .getTicket())) {
      assertAll(
          () ->
              assertThat(stream.getSchema()).isEqualTo(FlightSqlProducer.Schemas.GET_TABLES_SCHEMA),
          () -> {
            assertThat(stream.getSchema()).isEqualTo(FlightSqlProducer.Schemas.GET_TABLES_SCHEMA);
            final List<List<String>> results = getResults(stream);
            final List<List<String>> expectedResults =
                ImmutableList.of(
                    // catalog_name | schema_name | table_name | table_type | table_schema
                    asList(
                        null /* TODO No catalog yet */,
                        "APP",
                        "FOREIGNTABLE",
                        "TABLE",
                        new Schema(
                                asList(
                                    new Field(
                                        "ID",
                                        new FieldType(
                                            false,
                                            MinorType.INT.getType(),
                                            null,
                                            new FlightSqlColumnMetadata.Builder()
                                                .catalogName("")
                                                .typeName("INTEGER")
                                                .schemaName("APP")
                                                .tableName("FOREIGNTABLE")
                                                .precision(10)
                                                .scale(0)
                                                .isAutoIncrement(true)
                                                .build()
                                                .getMetadataMap()),
                                        null),
                                    new Field(
                                        "FOREIGNNAME",
                                        new FieldType(
                                            true,
                                            MinorType.VARCHAR.getType(),
                                            null,
                                            new FlightSqlColumnMetadata.Builder()
                                                .catalogName("")
                                                .typeName("VARCHAR")
                                                .schemaName("APP")
                                                .tableName("FOREIGNTABLE")
                                                .precision(100)
                                                .scale(0)
                                                .isAutoIncrement(false)
                                                .build()
                                                .getMetadataMap()),
                                        null),
                                    new Field(
                                        "VALUE",
                                        new FieldType(
                                            true,
                                            MinorType.INT.getType(),
                                            null,
                                            new FlightSqlColumnMetadata.Builder()
                                                .catalogName("")
                                                .typeName("INTEGER")
                                                .schemaName("APP")
                                                .tableName("FOREIGNTABLE")
                                                .precision(10)
                                                .scale(0)
                                                .isAutoIncrement(false)
                                                .build()
                                                .getMetadataMap()),
                                        null)))
                            .toJson()),
                    asList(
                        null /* TODO No catalog yet */,
                        "APP",
                        "INTTABLE",
                        "TABLE",
                        new Schema(
                                asList(
                                    new Field(
                                        "ID",
                                        new FieldType(
                                            false,
                                            MinorType.INT.getType(),
                                            null,
                                            new FlightSqlColumnMetadata.Builder()
                                                .catalogName("")
                                                .typeName("INTEGER")
                                                .schemaName("APP")
                                                .tableName("INTTABLE")
                                                .precision(10)
                                                .scale(0)
                                                .isAutoIncrement(true)
                                                .build()
                                                .getMetadataMap()),
                                        null),
                                    new Field(
                                        "KEYNAME",
                                        new FieldType(
                                            true,
                                            MinorType.VARCHAR.getType(),
                                            null,
                                            new FlightSqlColumnMetadata.Builder()
                                                .catalogName("")
                                                .typeName("VARCHAR")
                                                .schemaName("APP")
                                                .tableName("INTTABLE")
                                                .precision(100)
                                                .scale(0)
                                                .isAutoIncrement(false)
                                                .build()
                                                .getMetadataMap()),
                                        null),
                                    new Field(
                                        "VALUE",
                                        new FieldType(
                                            true,
                                            MinorType.INT.getType(),
                                            null,
                                            new FlightSqlColumnMetadata.Builder()
                                                .catalogName("")
                                                .typeName("INTEGER")
                                                .schemaName("APP")
                                                .tableName("INTTABLE")
                                                .precision(10)
                                                .scale(0)
                                                .isAutoIncrement(false)
                                                .build()
                                                .getMetadataMap()),
                                        null),
                                    new Field(
                                        "FOREIGNID",
                                        new FieldType(
                                            true,
                                            MinorType.INT.getType(),
                                            null,
                                            new FlightSqlColumnMetadata.Builder()
                                                .catalogName("")
                                                .typeName("INTEGER")
                                                .schemaName("APP")
                                                .tableName("INTTABLE")
                                                .precision(10)
                                                .scale(0)
                                                .isAutoIncrement(false)
                                                .build()
                                                .getMetadataMap()),
                                        null)))
                            .toJson()));
            assertThat(results).isEqualTo(expectedResults);
          });
    }
  }

  @Test
  public void testSimplePreparedStatementSchema() throws Exception {
    try (final PreparedStatement preparedStatement = sqlClient.prepare("SELECT * FROM intTable")) {
      assertAll(
          () -> {
            final Schema actualSchema = preparedStatement.getResultSetSchema();
            assertThat(actualSchema).isEqualTo(SCHEMA_INT_TABLE);
          },
          () -> {
            final FlightInfo info = preparedStatement.execute();
            assertThat(info.getSchemaOptional()).isEqualTo(Optional.of(SCHEMA_INT_TABLE));
          });
    }
  }

  @Test
  public void testSimplePreparedStatementResults() throws Exception {
    try (final PreparedStatement preparedStatement = sqlClient.prepare("SELECT * FROM intTable");
        final FlightStream stream =
            sqlClient.getStream(preparedStatement.execute().getEndpoints().get(0).getTicket())) {
      assertAll(
          () -> assertThat(stream.getSchema()).isEqualTo(SCHEMA_INT_TABLE),
          () -> assertThat(getResults(stream)).isEqualTo(EXPECTED_RESULTS_FOR_STAR_SELECT_QUERY));
    }
  }

  @Test
  public void testSimplePreparedStatementResultsWithParameterBinding() throws Exception {
    try (PreparedStatement prepare = sqlClient.prepare("SELECT * FROM intTable WHERE id = ?")) {
      final Schema parameterSchema = prepare.getParameterSchema();
      try (final VectorSchemaRoot insertRoot =
          VectorSchemaRoot.create(parameterSchema, allocator)) {
        insertRoot.allocateNew();

        final IntVector valueVector = (IntVector) insertRoot.getVector(0);
        valueVector.setSafe(0, 1);
        insertRoot.setRowCount(1);

        prepare.setParameters(insertRoot);
        FlightInfo flightInfo = prepare.execute();

        FlightStream stream = sqlClient.getStream(flightInfo.getEndpoints().get(0).getTicket());

        assertAll(
            () -> assertThat(stream.getSchema()).isEqualTo(SCHEMA_INT_TABLE),
            () -> assertThat(getResults(stream)).isEqualTo(EXPECTED_RESULTS_FOR_PARAMETER_BINDING));
      }
    }
  }

  @Test
  public void testSimplePreparedStatementUpdateResults() throws SQLException {
    try (PreparedStatement prepare =
            sqlClient.prepare("INSERT INTO INTTABLE (keyName, value ) VALUES (?, ?)");
        PreparedStatement deletePrepare =
            sqlClient.prepare("DELETE FROM INTTABLE WHERE keyName = ?")) {
      final Schema parameterSchema = prepare.getParameterSchema();
      try (final VectorSchemaRoot insertRoot =
          VectorSchemaRoot.create(parameterSchema, allocator)) {
        final VarCharVector varCharVector = (VarCharVector) insertRoot.getVector(0);
        final IntVector valueVector = (IntVector) insertRoot.getVector(1);
        final int counter = 10;
        insertRoot.allocateNew();

        final IntStream range = IntStream.range(0, counter);

        range.forEach(
            i -> {
              valueVector.setSafe(i, i * counter);
              varCharVector.setSafe(i, new Text("value" + i));
            });

        insertRoot.setRowCount(counter);

        prepare.setParameters(insertRoot);
        final long updatedRows = prepare.executeUpdate();

        final long deletedRows;
        try (final VectorSchemaRoot deleteRoot = VectorSchemaRoot.of(varCharVector)) {
          deletePrepare.setParameters(deleteRoot);
          deletedRows = deletePrepare.executeUpdate();
        }
        assertAll(
            () -> assertThat(updatedRows).isEqualTo(10L),
            () -> assertThat(deletedRows).isEqualTo(10L));
      }
    }
  }

  @Test
  public void testBulkIngest() throws IOException {
    // For bulk ingest DerbyDB requires uppercase column names
    var keyName = new Field("KEYNAME", FieldType.nullable(new ArrowType.Utf8()), null);
    var value = new Field("VALUE", FieldType.nullable(new ArrowType.Int(32, true)), null);
    var foreignId = new Field("FOREIGNID", FieldType.nullable(new ArrowType.Int(32, true)), null);

    Schema dataSchema = new Schema(List.of(keyName, value, foreignId));

    try (final VectorSchemaRoot ingestRoot = VectorSchemaRoot.create(dataSchema, allocator);
        final VarCharVector keyNamesToBeDeletedVector = new VarCharVector(keyName, allocator)) {
      final VarCharVector keyNameVector = (VarCharVector) ingestRoot.getVector(0);
      final IntVector valueVector = (IntVector) ingestRoot.getVector(1);
      final IntVector foreignIdVector = (IntVector) ingestRoot.getVector(2);
      ingestRoot.allocateNew();
      keyNamesToBeDeletedVector.allocateNew();

      try (PipedInputStream inPipe = new PipedInputStream(1024);
          PipedOutputStream outPipe = new PipedOutputStream(inPipe);
          ArrowStreamReader reader = new ArrowStreamReader(inPipe, allocator)) {

        new Thread(
                () -> {
                  try (ArrowStreamWriter writer =
                      new ArrowStreamWriter(ingestRoot, null, outPipe)) {
                    writer.start();
                    populateNext10RowsInIngestRootBatch(
                        1,
                        valueVector,
                        keyNameVector,
                        foreignIdVector,
                        keyNamesToBeDeletedVector,
                        ingestRoot);
                    writer.writeBatch();
                    populateNext10RowsInIngestRootBatch(
                        11,
                        valueVector,
                        keyNameVector,
                        foreignIdVector,
                        keyNamesToBeDeletedVector,
                        ingestRoot);
                    writer.writeBatch();
                  } catch (Exception e) {
                    throw new RuntimeException(e);
                  }
                })
            .start();

        // Ingest from a stream
        final long updatedRows =
            sqlClient.executeIngest(
                reader,
                new FlightSqlClient.ExecuteIngestOptions(
                    "INTTABLE",
                    TableDefinitionOptions.newBuilder()
                        .setIfExists(TableExistsOption.TABLE_EXISTS_OPTION_APPEND)
                        .setIfNotExist(TableNotExistOption.TABLE_NOT_EXIST_OPTION_FAIL)
                        .build(),
                    null,
                    null,
                    null));

        assertThat(updatedRows).isEqualTo(-1L);

        // Ingest directly using VectorSchemaRoot
        populateNext10RowsInIngestRootBatch(
            21, valueVector, keyNameVector, foreignIdVector, keyNamesToBeDeletedVector, ingestRoot);
        sqlClient.executeIngest(
            ingestRoot,
            new FlightSqlClient.ExecuteIngestOptions(
                "INTTABLE",
                TableDefinitionOptions.newBuilder()
                    .setIfExists(TableExistsOption.TABLE_EXISTS_OPTION_APPEND)
                    .setIfNotExist(TableNotExistOption.TABLE_NOT_EXIST_OPTION_FAIL)
                    .build(),
                null,
                null,
                null));

        try (PreparedStatement deletePrepare =
            sqlClient.prepare("DELETE FROM INTTABLE WHERE keyName = ?")) {
          final long deletedRows;
          try (final VectorSchemaRoot deleteRoot = VectorSchemaRoot.of(keyNamesToBeDeletedVector)) {
            deletePrepare.setParameters(deleteRoot);
            deletedRows = deletePrepare.executeUpdate();
          }

          assertThat(deletedRows).isEqualTo(30L);
        }
      }
    }
  }

  @Test
  public void testBulkIngestTransaction() {
    assertThrows(
        RuntimeException.class,
        () -> {
          sqlClient.executeIngest(
              VectorSchemaRoot.create(new Schema(List.of()), allocator),
              new FlightSqlClient.ExecuteIngestOptions(
                  "INTTABLE",
                  TableDefinitionOptions.newBuilder()
                      .setIfExists(TableExistsOption.TABLE_EXISTS_OPTION_APPEND)
                      .setIfNotExist(TableNotExistOption.TABLE_NOT_EXIST_OPTION_FAIL)
                      .build(),
                  null,
                  null,
                  null),
              new FlightSqlClient.Transaction("123".getBytes(StandardCharsets.UTF_8)));
        });
  }

  @Test
  public void testSimplePreparedStatementUpdateResultsWithoutParameters() throws SQLException {
    try (PreparedStatement prepare =
            sqlClient.prepare("INSERT INTO INTTABLE (keyName, value ) VALUES ('test', 1000)");
        PreparedStatement deletePrepare =
            sqlClient.prepare("DELETE FROM INTTABLE WHERE keyName = 'test'")) {
      final long updatedRows = prepare.executeUpdate();

      final long deletedRows = deletePrepare.executeUpdate();

      assertAll(
          () -> assertThat(updatedRows).isEqualTo(1L), () -> assertThat(deletedRows).isEqualTo(1L));
    }
  }

  @Test
  public void testSimplePreparedStatementClosesProperly() {
    final PreparedStatement preparedStatement = sqlClient.prepare("SELECT * FROM intTable");
    assertAll(
        () -> {
          assertThat(preparedStatement.isClosed()).isEqualTo(false);
        },
        () -> {
          preparedStatement.close();
          assertThat(preparedStatement.isClosed()).isEqualTo(true);
        });
  }

  @Test
  public void testGetCatalogsSchema() {
    final FlightInfo info = sqlClient.getCatalogs();
    assertThat(info.getSchemaOptional())
        .isEqualTo(Optional.of(FlightSqlProducer.Schemas.GET_CATALOGS_SCHEMA));
  }

  @Test
  public void testGetCatalogsResults() throws Exception {
    try (final FlightStream stream =
        sqlClient.getStream(sqlClient.getCatalogs().getEndpoints().get(0).getTicket())) {
      assertAll(
          () ->
              assertThat(stream.getSchema())
                  .isEqualTo(FlightSqlProducer.Schemas.GET_CATALOGS_SCHEMA),
          () -> {
            List<List<String>> catalogs = getResults(stream);
            assertThat(catalogs).isEqualTo(emptyList());
          });
    }
  }

  @Test
  public void testGetTableTypesSchema() {
    final FlightInfo info = sqlClient.getTableTypes();
    assertThat(info.getSchemaOptional())
        .isEqualTo(Optional.of(FlightSqlProducer.Schemas.GET_TABLE_TYPES_SCHEMA));
  }

  @Test
  public void testGetTableTypesResult() throws Exception {
    try (final FlightStream stream =
        sqlClient.getStream(sqlClient.getTableTypes().getEndpoints().get(0).getTicket())) {
      assertAll(
          () -> {
            assertThat(stream.getSchema())
                .isEqualTo(FlightSqlProducer.Schemas.GET_TABLE_TYPES_SCHEMA);
          },
          () -> {
            final List<List<String>> tableTypes = getResults(stream);
            final List<List<String>> expectedTableTypes =
                ImmutableList.of(
                    // table_type
                    singletonList("SYNONYM"),
                    singletonList("SYSTEM TABLE"),
                    singletonList("TABLE"),
                    singletonList("VIEW"));
            assertThat(tableTypes).isEqualTo(expectedTableTypes);
          });
    }
  }

  @Test
  public void testGetSchemasSchema() {
    final FlightInfo info = sqlClient.getSchemas(null, null);
    assertThat(info.getSchemaOptional())
        .isEqualTo(Optional.of(FlightSqlProducer.Schemas.GET_SCHEMAS_SCHEMA));
  }

  @Test
  public void testGetSchemasResult() throws Exception {
    try (final FlightStream stream =
        sqlClient.getStream(sqlClient.getSchemas(null, null).getEndpoints().get(0).getTicket())) {
      assertAll(
          () -> {
            assertThat(stream.getSchema()).isEqualTo(FlightSqlProducer.Schemas.GET_SCHEMAS_SCHEMA);
          },
          () -> {
            final List<List<String>> schemas = getResults(stream);
            final List<List<String>> expectedSchemas =
                ImmutableList.of(
                    // catalog_name | schema_name
                    asList(null /* TODO Add catalog. */, "APP"),
                    asList(null /* TODO Add catalog. */, "NULLID"),
                    asList(null /* TODO Add catalog. */, "SQLJ"),
                    asList(null /* TODO Add catalog. */, "SYS"),
                    asList(null /* TODO Add catalog. */, "SYSCAT"),
                    asList(null /* TODO Add catalog. */, "SYSCS_DIAG"),
                    asList(null /* TODO Add catalog. */, "SYSCS_UTIL"),
                    asList(null /* TODO Add catalog. */, "SYSFUN"),
                    asList(null /* TODO Add catalog. */, "SYSIBM"),
                    asList(null /* TODO Add catalog. */, "SYSPROC"),
                    asList(null /* TODO Add catalog. */, "SYSSTAT"));
            assertThat(schemas).isEqualTo(expectedSchemas);
          });
    }
  }

  @Test
  public void testGetPrimaryKey() {
    final FlightInfo flightInfo = sqlClient.getPrimaryKeys(TableRef.of(null, null, "INTTABLE"));
    final FlightStream stream = sqlClient.getStream(flightInfo.getEndpoints().get(0).getTicket());

    final List<List<String>> results = getResults(stream);

    assertAll(
        () -> assertThat(results.size()).isEqualTo(1),
        () -> {
          final List<String> result = results.get(0);
          assertAll(
              () -> assertThat(result.get(0)).isEqualTo(""),
              () -> assertThat(result.get(1)).isEqualTo("APP"),
              () -> assertThat(result.get(2)).isEqualTo("INTTABLE"),
              () -> assertThat(result.get(3)).isEqualTo("ID"),
              () -> assertThat(result.get(4)).isEqualTo("1"),
              () -> assertThat(result.get(5)).isNotNull());
        });
  }

  @Test
  public void testGetSqlInfoSchema() {
    final FlightInfo info = sqlClient.getSqlInfo();
    assertThat(info.getSchemaOptional())
        .isEqualTo(Optional.of(FlightSqlProducer.Schemas.GET_SQL_INFO_SCHEMA));
  }

  @Test
  public void testGetSqlInfoResults() throws Exception {
    final FlightInfo info = sqlClient.getSqlInfo();
    try (final FlightStream stream = sqlClient.getStream(info.getEndpoints().get(0).getTicket())) {
      assertAll(
          () ->
              assertThat(stream.getSchema())
                  .isEqualTo(FlightSqlProducer.Schemas.GET_SQL_INFO_SCHEMA),
          () ->
              assertThat(getNonConformingResultsForGetSqlInfo(getResults(stream)))
                  .isEqualTo(emptyList()));
    }
  }

  @Test
  public void testGetSqlInfoResultsWithSingleArg() throws Exception {
    final FlightSql.SqlInfo arg = FlightSql.SqlInfo.FLIGHT_SQL_SERVER_NAME;
    final FlightInfo info = sqlClient.getSqlInfo(arg);
    try (final FlightStream stream = sqlClient.getStream(info.getEndpoints().get(0).getTicket())) {
      assertAll(
          () ->
              assertThat(stream.getSchema())
                  .isEqualTo(FlightSqlProducer.Schemas.GET_SQL_INFO_SCHEMA),
          () ->
              assertThat(getNonConformingResultsForGetSqlInfo(getResults(stream), arg))
                  .isEqualTo(emptyList()));
    }
  }

  @Test
  public void testGetSqlInfoResultsWithManyArgs() throws Exception {
    final FlightSql.SqlInfo[] args = {
      FlightSql.SqlInfo.FLIGHT_SQL_SERVER_NAME,
      FlightSql.SqlInfo.FLIGHT_SQL_SERVER_VERSION,
      FlightSql.SqlInfo.FLIGHT_SQL_SERVER_ARROW_VERSION,
      FlightSql.SqlInfo.FLIGHT_SQL_SERVER_READ_ONLY,
      FlightSql.SqlInfo.SQL_ALL_TABLES_ARE_SELECTABLE,
      FlightSql.SqlInfo.SQL_NULL_ORDERING,
      FlightSql.SqlInfo.SQL_DDL_CATALOG,
      FlightSql.SqlInfo.SQL_DDL_SCHEMA,
      FlightSql.SqlInfo.SQL_DDL_TABLE,
      FlightSql.SqlInfo.SQL_IDENTIFIER_CASE,
      FlightSql.SqlInfo.SQL_IDENTIFIER_QUOTE_CHAR,
      FlightSql.SqlInfo.SQL_QUOTED_IDENTIFIER_CASE,
      FlightSql.SqlInfo.SQL_MAX_COLUMNS_IN_TABLE
    };
    final FlightInfo info = sqlClient.getSqlInfo(args);
    try (final FlightStream stream = sqlClient.getStream(info.getEndpoints().get(0).getTicket())) {
      assertAll(
          () ->
              assertThat(stream.getSchema())
                  .isEqualTo(FlightSqlProducer.Schemas.GET_SQL_INFO_SCHEMA),
          () ->
              assertThat(getNonConformingResultsForGetSqlInfo(getResults(stream), args))
                  .isEqualTo(emptyList()));
    }
  }

  @Test
  public void testGetCommandExportedKeys() throws Exception {
    try (final FlightStream stream =
        sqlClient.getStream(
            sqlClient
                .getExportedKeys(TableRef.of(null, null, "FOREIGNTABLE"))
                .getEndpoints()
                .get(0)
                .getTicket())) {

      final List<List<String>> results = getResults(stream);

      final List<Condition<String>> matchers =
          asList(
              new Condition<>(Objects::isNull, "pk_catalog_name expected to be null"),
              new Condition<>(c -> c.equals("APP"), "pk_schema_name expected to equal APP"),
              new Condition<>(
                  c -> c.equals("FOREIGNTABLE"), "pk_table_name should equal FOREIGNTABLE"),
              new Condition<>(c -> c.equals("ID"), "pk_column_name should equal ID"),
              new Condition<>(Objects::isNull, "fk_catalog_name expected to be null"),
              new Condition<>(c -> c.equals("APP"), "fk_schema_name expected to be APP"),
              new Condition<>(c -> c.equals("INTTABLE"), "fk_table_name expeced to be INTTABLE"),
              new Condition<>(
                  c -> c.equals("FOREIGNID"), "fk_column_name expected to equal FOREIGNID"),
              new Condition<>(c -> c.equals("1"), "key_sequence expected to equal 1"),
              new Condition<>(c -> c.contains("SQL"), "fk_key_name expected to contain SQL"),
              new Condition<>(c -> c.contains("SQL"), "pk_key_name expected to contain SQL"),
              new Condition<>(c -> c.equals("3"), "update_rule expected to equal 3"),
              new Condition<>(c -> c.equals("3"), "delete_rule expected to equal 3"));

      final List<Executable> assertions = new ArrayList<>();
      assertEquals(1, results.size());
      for (int i = 0; i < matchers.size(); i++) {
        final String actual = results.get(0).get(i);
        final Condition<String> expected = matchers.get(i);
        assertions.add(() -> assertThat(actual).satisfies(expected));
      }
      assertAll(assertions);
    }
  }

  @Test
  public void testGetCommandImportedKeys() throws Exception {
    try (final FlightStream stream =
        sqlClient.getStream(
            sqlClient
                .getImportedKeys(TableRef.of(null, null, "INTTABLE"))
                .getEndpoints()
                .get(0)
                .getTicket())) {

      final List<List<String>> results = getResults(stream);

      final List<Condition<String>> matchers =
          asList(
              new Condition<>(Objects::isNull, "pk_catalog_name expected to be null"),
              new Condition<>(c -> c.equals("APP"), "pk_schema_name expected to equal APP"),
              new Condition<>(
                  c -> c.equals("FOREIGNTABLE"), "pk_table_name should equal FOREIGNTABLE"),
              new Condition<>(c -> c.equals("ID"), "pk_column_name should equal ID"),
              new Condition<>(Objects::isNull, "fk_catalog_name expected to be null"),
              new Condition<>(c -> c.equals("APP"), "fk_schema_name expected to be APP"),
              new Condition<>(c -> c.equals("INTTABLE"), "fk_table_name expeced to be INTTABLE"),
              new Condition<>(
                  c -> c.equals("FOREIGNID"), "fk_column_name expected to equal FOREIGNID"),
              new Condition<>(c -> c.equals("1"), "key_sequence expected to equal 1"),
              new Condition<>(c -> c.contains("SQL"), "fk_key_name expected to contain SQL"),
              new Condition<>(c -> c.contains("SQL"), "pk_key_name expected to contain SQL"),
              new Condition<>(c -> c.equals("3"), "update_rule expected to equal 3"),
              new Condition<>(c -> c.equals("3"), "delete_rule expected to equal 3"));

      assertEquals(1, results.size());
      final List<Executable> assertions = new ArrayList<>();
      for (int i = 0; i < matchers.size(); i++) {
        final String actual = results.get(0).get(i);
        final Condition<String> expected = matchers.get(i);
        assertions.add(() -> assertThat(actual).satisfies(expected));
      }
      assertAll(assertions);
    }
  }

  @Test
  public void testGetTypeInfo() throws Exception {
    FlightInfo flightInfo = sqlClient.getXdbcTypeInfo();

    try (FlightStream stream = sqlClient.getStream(flightInfo.getEndpoints().get(0).getTicket())) {

      final List<List<String>> results = getResults(stream);

      final List<List<String>> matchers =
          ImmutableList.of(
              asList(
                  "BIGINT",
                  "-5",
                  "19",
                  null,
                  null,
                  emptyList().toString(),
                  "1",
                  "false",
                  "2",
                  "false",
                  "false",
                  "true",
                  "BIGINT",
                  "0",
                  "0",
                  null,
                  null,
                  "10",
                  null),
              asList(
                  "LONG VARCHAR FOR BIT DATA",
                  "-4",
                  "32700",
                  "X'",
                  "'",
                  emptyList().toString(),
                  "1",
                  "false",
                  "0",
                  "true",
                  "false",
                  "false",
                  "LONG VARCHAR FOR BIT DATA",
                  null,
                  null,
                  null,
                  null,
                  null,
                  null),
              asList(
                  "VARCHAR () FOR BIT DATA",
                  "-3",
                  "32672",
                  "X'",
                  "'",
                  singletonList("length").toString(),
                  "1",
                  "false",
                  "2",
                  "true",
                  "false",
                  "false",
                  "VARCHAR () FOR BIT DATA",
                  null,
                  null,
                  null,
                  null,
                  null,
                  null),
              asList(
                  "CHAR () FOR BIT DATA",
                  "-2",
                  "254",
                  "X'",
                  "'",
                  singletonList("length").toString(),
                  "1",
                  "false",
                  "2",
                  "true",
                  "false",
                  "false",
                  "CHAR () FOR BIT DATA",
                  null,
                  null,
                  null,
                  null,
                  null,
                  null),
              asList(
                  "LONG VARCHAR",
                  "-1",
                  "32700",
                  "'",
                  "'",
                  emptyList().toString(),
                  "1",
                  "true",
                  "1",
                  "true",
                  "false",
                  "false",
                  "LONG VARCHAR",
                  null,
                  null,
                  null,
                  null,
                  null,
                  null),
              asList(
                  "CHAR",
                  "1",
                  "254",
                  "'",
                  "'",
                  singletonList("length").toString(),
                  "1",
                  "true",
                  "3",
                  "true",
                  "false",
                  "false",
                  "CHAR",
                  null,
                  null,
                  null,
                  null,
                  null,
                  null),
              asList(
                  "NUMERIC",
                  "2",
                  "31",
                  null,
                  null,
                  Arrays.asList("precision", "scale").toString(),
                  "1",
                  "false",
                  "2",
                  "false",
                  "true",
                  "false",
                  "NUMERIC",
                  "0",
                  "31",
                  null,
                  null,
                  "10",
                  null),
              asList(
                  "DECIMAL",
                  "3",
                  "31",
                  null,
                  null,
                  Arrays.asList("precision", "scale").toString(),
                  "1",
                  "false",
                  "2",
                  "false",
                  "true",
                  "false",
                  "DECIMAL",
                  "0",
                  "31",
                  null,
                  null,
                  "10",
                  null),
              asList(
                  "INTEGER",
                  "4",
                  "10",
                  null,
                  null,
                  emptyList().toString(),
                  "1",
                  "false",
                  "2",
                  "false",
                  "false",
                  "true",
                  "INTEGER",
                  "0",
                  "0",
                  null,
                  null,
                  "10",
                  null),
              asList(
                  "SMALLINT",
                  "5",
                  "5",
                  null,
                  null,
                  emptyList().toString(),
                  "1",
                  "false",
                  "2",
                  "false",
                  "false",
                  "true",
                  "SMALLINT",
                  "0",
                  "0",
                  null,
                  null,
                  "10",
                  null),
              asList(
                  "FLOAT",
                  "6",
                  "52",
                  null,
                  null,
                  singletonList("precision").toString(),
                  "1",
                  "false",
                  "2",
                  "false",
                  "false",
                  "false",
                  "FLOAT",
                  null,
                  null,
                  null,
                  null,
                  "2",
                  null),
              asList(
                  "REAL",
                  "7",
                  "23",
                  null,
                  null,
                  emptyList().toString(),
                  "1",
                  "false",
                  "2",
                  "false",
                  "false",
                  "false",
                  "REAL",
                  null,
                  null,
                  null,
                  null,
                  "2",
                  null),
              asList(
                  "DOUBLE",
                  "8",
                  "52",
                  null,
                  null,
                  emptyList().toString(),
                  "1",
                  "false",
                  "2",
                  "false",
                  "false",
                  "false",
                  "DOUBLE",
                  null,
                  null,
                  null,
                  null,
                  "2",
                  null),
              asList(
                  "VARCHAR",
                  "12",
                  "32672",
                  "'",
                  "'",
                  singletonList("length").toString(),
                  "1",
                  "true",
                  "3",
                  "true",
                  "false",
                  "false",
                  "VARCHAR",
                  null,
                  null,
                  null,
                  null,
                  null,
                  null),
              asList(
                  "BOOLEAN",
                  "16",
                  "1",
                  null,
                  null,
                  emptyList().toString(),
                  "1",
                  "false",
                  "2",
                  "true",
                  "false",
                  "false",
                  "BOOLEAN",
                  null,
                  null,
                  null,
                  null,
                  null,
                  null),
              asList(
                  "DATE",
                  "91",
                  "10",
                  "DATE'",
                  "'",
                  emptyList().toString(),
                  "1",
                  "false",
                  "2",
                  "true",
                  "false",
                  "false",
                  "DATE",
                  "0",
                  "0",
                  null,
                  null,
                  "10",
                  null),
              asList(
                  "TIME",
                  "92",
                  "8",
                  "TIME'",
                  "'",
                  emptyList().toString(),
                  "1",
                  "false",
                  "2",
                  "true",
                  "false",
                  "false",
                  "TIME",
                  "0",
                  "0",
                  null,
                  null,
                  "10",
                  null),
              asList(
                  "TIMESTAMP",
                  "93",
                  "29",
                  "TIMESTAMP'",
                  "'",
                  emptyList().toString(),
                  "1",
                  "false",
                  "2",
                  "true",
                  "false",
                  "false",
                  "TIMESTAMP",
                  "0",
                  "9",
                  null,
                  null,
                  "10",
                  null),
              asList(
                  "OBJECT",
                  "2000",
                  null,
                  null,
                  null,
                  emptyList().toString(),
                  "1",
                  "false",
                  "2",
                  "true",
                  "false",
                  "false",
                  "OBJECT",
                  null,
                  null,
                  null,
                  null,
                  null,
                  null),
              asList(
                  "BLOB",
                  "2004",
                  "2147483647",
                  null,
                  null,
                  singletonList("length").toString(),
                  "1",
                  "false",
                  "0",
                  null,
                  "false",
                  null,
                  "BLOB",
                  null,
                  null,
                  null,
                  null,
                  null,
                  null),
              asList(
                  "CLOB",
                  "2005",
                  "2147483647",
                  "'",
                  "'",
                  singletonList("length").toString(),
                  "1",
                  "true",
                  "1",
                  null,
                  "false",
                  null,
                  "CLOB",
                  null,
                  null,
                  null,
                  null,
                  null,
                  null),
              asList(
                  "XML",
                  "2009",
                  null,
                  null,
                  null,
                  emptyList().toString(),
                  "1",
                  "true",
                  "0",
                  "false",
                  "false",
                  "false",
                  "XML",
                  null,
                  null,
                  null,
                  null,
                  null,
                  null));
      assertThat(results).isEqualTo(matchers);
    }
  }

  @Test
  public void testGetTypeInfoWithFiltering() throws Exception {
    FlightInfo flightInfo = sqlClient.getXdbcTypeInfo(-5);

    try (FlightStream stream = sqlClient.getStream(flightInfo.getEndpoints().get(0).getTicket())) {

      final List<List<String>> results = getResults(stream);

      final List<List<String>> matchers =
          ImmutableList.of(
              asList(
                  "BIGINT",
                  "-5",
                  "19",
                  null,
                  null,
                  emptyList().toString(),
                  "1",
                  "false",
                  "2",
                  "false",
                  "false",
                  "true",
                  "BIGINT",
                  "0",
                  "0",
                  null,
                  null,
                  "10",
                  null));
      assertThat(results).isEqualTo(matchers);
    }
  }

  @Test
  public void testGetCommandCrossReference() throws Exception {
    final FlightInfo flightInfo =
        sqlClient.getCrossReference(
            TableRef.of(null, null, "FOREIGNTABLE"), TableRef.of(null, null, "INTTABLE"));
    try (final FlightStream stream =
        sqlClient.getStream(flightInfo.getEndpoints().get(0).getTicket())) {

      final List<List<String>> results = getResults(stream);

      final List<Condition<String>> matchers =
          asList(
              new Condition<>(Objects::isNull, "pk_catalog_name expected to be null"),
              new Condition<>(c -> c.equals("APP"), "pk_schema_name expected to equal APP"),
              new Condition<>(
                  c -> c.equals("FOREIGNTABLE"), "pk_table_name should equal FOREIGNTABLE"),
              new Condition<>(c -> c.equals("ID"), "pk_column_name should equal ID"),
              new Condition<>(Objects::isNull, "fk_catalog_name expected to be null"),
              new Condition<>(c -> c.equals("APP"), "fk_schema_name expected to be APP"),
              new Condition<>(c -> c.equals("INTTABLE"), "fk_table_name expeced to be INTTABLE"),
              new Condition<>(
                  c -> c.equals("FOREIGNID"), "fk_column_name expected to equal FOREIGNID"),
              new Condition<>(c -> c.equals("1"), "key_sequence expected to equal 1"),
              new Condition<>(c -> c.contains("SQL"), "fk_key_name expected to contain SQL"),
              new Condition<>(c -> c.contains("SQL"), "pk_key_name expected to contain SQL"),
              new Condition<>(c -> c.equals("3"), "update_rule expected to equal 3"),
              new Condition<>(c -> c.equals("3"), "delete_rule expected to equal 3"));

      assertEquals(1, results.size());
      final List<Executable> assertions = new ArrayList<>();
      for (int i = 0; i < matchers.size(); i++) {
        final String actual = results.get(0).get(i);
        final Condition<String> expected = matchers.get(i);
        assertions.add(() -> assertThat(actual).satisfies(expected));
      }
      assertAll(assertions);
    }
  }

  @Test
  public void testCreateStatementSchema() throws Exception {
    final FlightInfo info = sqlClient.execute("SELECT * FROM intTable");
    assertThat(info.getSchemaOptional()).isEqualTo(Optional.of(SCHEMA_INT_TABLE));

    // Consume statement to close connection before cache eviction
    try (FlightStream stream = sqlClient.getStream(info.getEndpoints().get(0).getTicket())) {
      while (stream.next()) {
        // Do nothing
      }
    }
  }

  @Test
  public void testCreateStatementResults() throws Exception {
    try (final FlightStream stream =
        sqlClient.getStream(
            sqlClient.execute("SELECT * FROM intTable").getEndpoints().get(0).getTicket())) {
      assertAll(
          () -> {
            assertThat(stream.getSchema()).isEqualTo(SCHEMA_INT_TABLE);
          },
          () -> {
            assertThat(getResults(stream)).isEqualTo(EXPECTED_RESULTS_FOR_STAR_SELECT_QUERY);
          });
    }
  }

  @Test
  public void testExecuteUpdate() {
    assertAll(
        () -> {
          long insertedCount =
              sqlClient.executeUpdate(
                  "INSERT INTO INTTABLE (keyName, value) VALUES "
                      + "('KEYNAME1', 1001), ('KEYNAME2', 1002), ('KEYNAME3', 1003)");
          assertThat(insertedCount).isEqualTo(3L);
        },
        () -> {
          long updatedCount =
              sqlClient.executeUpdate(
                  "UPDATE INTTABLE SET keyName = 'KEYNAME1' "
                      + "WHERE keyName = 'KEYNAME2' OR keyName = 'KEYNAME3'");
          assertThat(updatedCount).isEqualTo(2L);
        },
        () -> {
          long deletedCount =
              sqlClient.executeUpdate("DELETE FROM INTTABLE WHERE keyName = 'KEYNAME1'");
          assertThat(deletedCount).isEqualTo(3L);
        });
  }

  @Test
  public void testQueryWithNoResultsShouldNotHang() throws Exception {
    try (final PreparedStatement preparedStatement =
            sqlClient.prepare("SELECT * FROM intTable WHERE 1 = 0");
        final FlightStream stream =
            sqlClient.getStream(preparedStatement.execute().getEndpoints().get(0).getTicket())) {
      assertAll(
          () -> assertThat(stream.getSchema()).isEqualTo(SCHEMA_INT_TABLE),
          () -> {
            final List<List<String>> result = getResults(stream);
            assertThat(result).isEqualTo(emptyList());
          });
    }
  }

  @Test
  public void testCancelFlightInfo() {
    FlightInfo info = sqlClient.getSqlInfo();
    CancelFlightInfoRequest request = new CancelFlightInfoRequest(info);
    FlightRuntimeException fre =
        assertThrows(FlightRuntimeException.class, () -> sqlClient.cancelFlightInfo(request));
    assertEquals(FlightStatusCode.UNIMPLEMENTED, fre.status().code());
  }

  @Test
  public void testCancelQuery() {
    FlightInfo info = sqlClient.getSqlInfo();
    FlightRuntimeException fre =
        assertThrows(FlightRuntimeException.class, () -> sqlClient.cancelQuery(info));
    assertEquals(FlightStatusCode.UNIMPLEMENTED, fre.status().code());
  }

  @Test
  public void testRenewEndpoint() {
    FlightInfo info = sqlClient.getSqlInfo();
    FlightRuntimeException fre =
        assertThrows(
            FlightRuntimeException.class,
            () ->
                sqlClient.renewFlightEndpoint(
                    new RenewFlightEndpointRequest(info.getEndpoints().get(0))));
    assertEquals(FlightStatusCode.UNIMPLEMENTED, fre.status().code());
  }
}