FlightSqlIngestionScenario.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.arrow.flight.integration.tests;

import com.google.common.collect.ImmutableMap;
import java.util.HashMap;
import java.util.Map;
import org.apache.arrow.flight.FlightClient;
import org.apache.arrow.flight.FlightProducer;
import org.apache.arrow.flight.Location;
import org.apache.arrow.flight.sql.FlightSqlClient;
import org.apache.arrow.flight.sql.FlightSqlClient.ExecuteIngestOptions;
import org.apache.arrow.flight.sql.FlightSqlProducer;
import org.apache.arrow.flight.sql.impl.FlightSql;
import org.apache.arrow.flight.sql.impl.FlightSql.CommandStatementIngest.TableDefinitionOptions;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.types.pojo.Schema;

/**
 * Integration test scenario for validating Flight SQL specs across multiple implementations. This
 * should ensure that RPC objects are being built and parsed correctly for multiple languages and
 * that the Arrow schemas are returned as expected.
 */
public class FlightSqlIngestionScenario extends FlightSqlScenario {

  @Override
  public FlightProducer producer(BufferAllocator allocator, Location location) throws Exception {
    FlightSqlScenarioProducer producer =
        (FlightSqlScenarioProducer) super.producer(allocator, location);
    producer
        .getSqlInfoBuilder()
        .withFlightSqlServerBulkIngestionTransaction(true)
        .withFlightSqlServerBulkIngestion(true);
    return producer;
  }

  @Override
  public void client(BufferAllocator allocator, Location location, FlightClient client)
      throws Exception {
    try (final FlightSqlClient sqlClient = new FlightSqlClient(client)) {
      validateMetadataRetrieval(sqlClient);
      validateIngestion(allocator, sqlClient);
    }
  }

  private void validateMetadataRetrieval(FlightSqlClient sqlClient) throws Exception {
    validate(
        FlightSqlProducer.Schemas.GET_SQL_INFO_SCHEMA,
        sqlClient.getSqlInfo(
            FlightSql.SqlInfo.FLIGHT_SQL_SERVER_INGEST_TRANSACTIONS_SUPPORTED,
            FlightSql.SqlInfo.FLIGHT_SQL_SERVER_BULK_INGESTION),
        sqlClient,
        s -> {
          Map<Integer, Object> infoValues = readSqlInfoStream(s);
          IntegrationAssertions.assertEquals(
              Boolean.TRUE,
              infoValues.get(
                  FlightSql.SqlInfo.FLIGHT_SQL_SERVER_INGEST_TRANSACTIONS_SUPPORTED_VALUE));
          IntegrationAssertions.assertEquals(
              Boolean.TRUE,
              infoValues.get(FlightSql.SqlInfo.FLIGHT_SQL_SERVER_BULK_INGESTION_VALUE));
        });
  }

  private VectorSchemaRoot getIngestVectorRoot(BufferAllocator allocator) {
    Schema schema = FlightSqlScenarioProducer.getIngestSchema();
    VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator);
    root.setRowCount(3);
    return root;
  }

  private void validateIngestion(BufferAllocator allocator, FlightSqlClient sqlClient) {
    try (VectorSchemaRoot data = getIngestVectorRoot(allocator)) {
      TableDefinitionOptions tableDefinitionOptions =
          TableDefinitionOptions.newBuilder()
              .setIfExists(TableDefinitionOptions.TableExistsOption.TABLE_EXISTS_OPTION_REPLACE)
              .setIfNotExist(
                  TableDefinitionOptions.TableNotExistOption.TABLE_NOT_EXIST_OPTION_CREATE)
              .build();
      Map<String, String> options = new HashMap<>(ImmutableMap.of("key1", "val1", "key2", "val2"));
      ExecuteIngestOptions executeIngestOptions =
          new ExecuteIngestOptions(
              "test_table", tableDefinitionOptions, true, "test_catalog", "test_schema", options);
      FlightSqlClient.Transaction transaction =
          new FlightSqlClient.Transaction(BULK_INGEST_TRANSACTION_ID);
      long updatedRows = sqlClient.executeIngest(data, executeIngestOptions, transaction);

      IntegrationAssertions.assertEquals(3L, updatedRows);
    }
  }
}