FlightSqlExample.java
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.arrow.flight.sql.example;
import static com.google.common.base.Strings.emptyToNull;
import static com.google.protobuf.Any.pack;
import static com.google.protobuf.ByteString.copyFrom;
import static java.lang.String.format;
import static java.util.Collections.singletonList;
import static java.util.Objects.isNull;
import static java.util.UUID.randomUUID;
import static java.util.stream.IntStream.range;
import static org.apache.arrow.adapter.jdbc.JdbcToArrow.sqlToArrowVectorIterator;
import static org.apache.arrow.adapter.jdbc.JdbcToArrowUtils.jdbcToArrowSchema;
import static org.apache.arrow.flight.sql.impl.FlightSql.*;
import static org.apache.arrow.flight.sql.impl.FlightSql.CommandGetCrossReference;
import static org.apache.arrow.flight.sql.impl.FlightSql.CommandGetDbSchemas;
import static org.apache.arrow.flight.sql.impl.FlightSql.CommandGetExportedKeys;
import static org.apache.arrow.flight.sql.impl.FlightSql.CommandGetImportedKeys;
import static org.apache.arrow.flight.sql.impl.FlightSql.DoPutUpdateResult;
import static org.apache.arrow.flight.sql.impl.FlightSql.TicketStatementQuery;
import static org.apache.arrow.util.Preconditions.checkState;
import static org.slf4j.LoggerFactory.getLogger;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.protobuf.ByteString;
import com.google.protobuf.Message;
import com.google.protobuf.ProtocolStringList;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.SQLSyntaxErrorException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.arrow.adapter.jdbc.ArrowVectorIterator;
import org.apache.arrow.adapter.jdbc.JdbcFieldInfo;
import org.apache.arrow.adapter.jdbc.JdbcParameterBinder;
import org.apache.arrow.adapter.jdbc.JdbcToArrowUtils;
import org.apache.arrow.flight.CallStatus;
import org.apache.arrow.flight.Criteria;
import org.apache.arrow.flight.FlightDescriptor;
import org.apache.arrow.flight.FlightEndpoint;
import org.apache.arrow.flight.FlightInfo;
import org.apache.arrow.flight.FlightServer;
import org.apache.arrow.flight.FlightStream;
import org.apache.arrow.flight.Location;
import org.apache.arrow.flight.PutResult;
import org.apache.arrow.flight.Result;
import org.apache.arrow.flight.SchemaResult;
import org.apache.arrow.flight.Ticket;
import org.apache.arrow.flight.sql.FlightSqlColumnMetadata;
import org.apache.arrow.flight.sql.FlightSqlProducer;
import org.apache.arrow.flight.sql.SqlInfoBuilder;
import org.apache.arrow.flight.sql.impl.FlightSql.ActionClosePreparedStatementRequest;
import org.apache.arrow.flight.sql.impl.FlightSql.ActionCreatePreparedStatementRequest;
import org.apache.arrow.flight.sql.impl.FlightSql.ActionCreatePreparedStatementResult;
import org.apache.arrow.flight.sql.impl.FlightSql.CommandGetCatalogs;
import org.apache.arrow.flight.sql.impl.FlightSql.CommandGetPrimaryKeys;
import org.apache.arrow.flight.sql.impl.FlightSql.CommandGetSqlInfo;
import org.apache.arrow.flight.sql.impl.FlightSql.CommandGetTableTypes;
import org.apache.arrow.flight.sql.impl.FlightSql.CommandGetTables;
import org.apache.arrow.flight.sql.impl.FlightSql.CommandPreparedStatementQuery;
import org.apache.arrow.flight.sql.impl.FlightSql.CommandPreparedStatementUpdate;
import org.apache.arrow.flight.sql.impl.FlightSql.CommandStatementIngest;
import org.apache.arrow.flight.sql.impl.FlightSql.CommandStatementIngest.TableDefinitionOptions;
import org.apache.arrow.flight.sql.impl.FlightSql.CommandStatementIngest.TableDefinitionOptions.TableExistsOption;
import org.apache.arrow.flight.sql.impl.FlightSql.CommandStatementIngest.TableDefinitionOptions.TableNotExistOption;
import org.apache.arrow.flight.sql.impl.FlightSql.CommandStatementQuery;
import org.apache.arrow.flight.sql.impl.FlightSql.CommandStatementUpdate;
import org.apache.arrow.flight.sql.impl.FlightSql.SqlSupportedCaseSensitivity;
import org.apache.arrow.memory.ArrowBuf;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.util.AutoCloseables;
import org.apache.arrow.util.Preconditions;
import org.apache.arrow.vector.BitVector;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.UInt1Vector;
import org.apache.arrow.vector.VarBinaryVector;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.VectorLoader;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.VectorUnloader;
import org.apache.arrow.vector.complex.ListVector;
import org.apache.arrow.vector.complex.impl.UnionListWriter;
import org.apache.arrow.vector.ipc.WriteChannel;
import org.apache.arrow.vector.ipc.message.MessageSerializer;
import org.apache.arrow.vector.types.Types.MinorType;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.FieldType;
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.arrow.vector.util.Text;
import org.apache.commons.dbcp2.ConnectionFactory;
import org.apache.commons.dbcp2.DriverManagerConnectionFactory;
import org.apache.commons.dbcp2.PoolableConnection;
import org.apache.commons.dbcp2.PoolableConnectionFactory;
import org.apache.commons.dbcp2.PoolingDataSource;
import org.apache.commons.pool2.ObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.text.StringEscapeUtils;
import org.slf4j.Logger;
/**
* Example {@link FlightSqlProducer} implementation showing an Apache Derby backed Flight SQL server
* that generally supports all current features of Flight SQL.
*/
public class FlightSqlExample implements FlightSqlProducer, AutoCloseable {
private static final Logger LOGGER = getLogger(FlightSqlExample.class);
protected static final Calendar DEFAULT_CALENDAR = JdbcToArrowUtils.getUtcCalendar();
public static final String DB_NAME = "derbyDB";
private final String databaseUri;
// ARROW-15315: Use ExecutorService to simulate an async scenario
private final ExecutorService executorService = Executors.newFixedThreadPool(10);
private final Location location;
protected final PoolingDataSource<PoolableConnection> dataSource;
protected final BufferAllocator rootAllocator = new RootAllocator();
private final Cache<ByteString, StatementContext<PreparedStatement>>
preparedStatementLoadingCache;
private final Cache<ByteString, StatementContext<Statement>> statementLoadingCache;
private final SqlInfoBuilder sqlInfoBuilder;
public static void main(String[] args) throws Exception {
Location location = Location.forGrpcInsecure("localhost", 55555);
final FlightSqlExample example = new FlightSqlExample(location, DB_NAME);
Location listenLocation = Location.forGrpcInsecure("0.0.0.0", 55555);
try (final BufferAllocator allocator = new RootAllocator();
final FlightServer server =
FlightServer.builder(allocator, listenLocation, example).build()) {
server.start();
server.awaitTermination();
}
}
public FlightSqlExample(final Location location, final String dbName) {
// TODO Constructor should not be doing work.
checkState(removeDerbyDatabaseIfExists(dbName), "Failed to clear Derby database!");
checkState(populateDerbyDatabase(dbName), "Failed to populate Derby database!");
databaseUri = "jdbc:derby:target/" + dbName;
final ConnectionFactory connectionFactory =
new DriverManagerConnectionFactory(databaseUri, new Properties());
final PoolableConnectionFactory poolableConnectionFactory =
new PoolableConnectionFactory(connectionFactory, null);
final ObjectPool<PoolableConnection> connectionPool =
new GenericObjectPool<>(poolableConnectionFactory);
poolableConnectionFactory.setPool(connectionPool);
// PoolingDataSource takes ownership of `connectionPool`
dataSource = new PoolingDataSource<>(connectionPool);
preparedStatementLoadingCache =
CacheBuilder.newBuilder()
.maximumSize(100)
.expireAfterWrite(10, TimeUnit.MINUTES)
.removalListener(new StatementRemovalListener<PreparedStatement>())
.build();
statementLoadingCache =
CacheBuilder.newBuilder()
.maximumSize(100)
.expireAfterWrite(10, TimeUnit.MINUTES)
.removalListener(new StatementRemovalListener<>())
.build();
this.location = location;
sqlInfoBuilder = new SqlInfoBuilder();
try (final Connection connection = dataSource.getConnection()) {
final DatabaseMetaData metaData = connection.getMetaData();
sqlInfoBuilder
.withFlightSqlServerName(metaData.getDatabaseProductName())
.withFlightSqlServerVersion(metaData.getDatabaseProductVersion())
.withFlightSqlServerArrowVersion(metaData.getDriverVersion())
.withFlightSqlServerReadOnly(metaData.isReadOnly())
.withFlightSqlServerSql(true)
.withFlightSqlServerSubstrait(false)
.withFlightSqlServerTransaction(SqlSupportedTransaction.SQL_SUPPORTED_TRANSACTION_NONE)
.withSqlIdentifierQuoteChar(metaData.getIdentifierQuoteString())
.withSqlDdlCatalog(metaData.supportsCatalogsInDataManipulation())
.withSqlDdlSchema(metaData.supportsSchemasInDataManipulation())
.withSqlDdlTable(metaData.allTablesAreSelectable())
.withSqlIdentifierCase(
metaData.storesMixedCaseIdentifiers()
? SqlSupportedCaseSensitivity.SQL_CASE_SENSITIVITY_CASE_INSENSITIVE
: metaData.storesUpperCaseIdentifiers()
? SqlSupportedCaseSensitivity.SQL_CASE_SENSITIVITY_UPPERCASE
: metaData.storesLowerCaseIdentifiers()
? SqlSupportedCaseSensitivity.SQL_CASE_SENSITIVITY_LOWERCASE
: SqlSupportedCaseSensitivity.SQL_CASE_SENSITIVITY_UNKNOWN)
.withSqlQuotedIdentifierCase(
metaData.storesMixedCaseQuotedIdentifiers()
? SqlSupportedCaseSensitivity.SQL_CASE_SENSITIVITY_CASE_INSENSITIVE
: metaData.storesUpperCaseQuotedIdentifiers()
? SqlSupportedCaseSensitivity.SQL_CASE_SENSITIVITY_UPPERCASE
: metaData.storesLowerCaseQuotedIdentifiers()
? SqlSupportedCaseSensitivity.SQL_CASE_SENSITIVITY_LOWERCASE
: SqlSupportedCaseSensitivity.SQL_CASE_SENSITIVITY_UNKNOWN)
.withSqlAllTablesAreSelectable(true)
.withSqlNullOrdering(SqlNullOrdering.SQL_NULLS_SORTED_AT_END)
.withSqlMaxColumnsInTable(42)
.withFlightSqlServerBulkIngestion(true)
.withFlightSqlServerBulkIngestionTransaction(false);
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
public static boolean removeDerbyDatabaseIfExists(final String dbName) {
final Path path = Paths.get("target" + File.separator + dbName);
try (final Stream<Path> walk = Files.walk(path)) {
/*
* Iterate over all paths to delete, mapping each path to the outcome of its own
* deletion as a boolean representing whether each individual operation was
* successful; then reduce all booleans into a single answer.
* If for whatever reason the resulting `Stream<Boolean>` is empty, throw an `IOException`;
* this not expected.
*/
boolean unused =
walk.sorted(Comparator.reverseOrder())
.map(Path::toFile)
.map(File::delete)
.reduce(Boolean::logicalAnd)
.orElseThrow(IOException::new);
} catch (NoSuchFileException e) {
/*
* The only acceptable scenario for an `IOException` to be thrown here is if
* an attempt to delete an non-existing file takes place -- which should be
* alright, since they would be deleted anyway.
*/
LOGGER.error(format("No existing Derby database to delete.: <%s>", e.getMessage()), e);
return true;
} catch (Exception e) {
LOGGER.error(format("Failed attempt to clear DerbyDB.: <%s>", e.getMessage()), e);
return false;
}
return true;
}
private static boolean populateDerbyDatabase(final String dbName) {
try (final Connection connection =
DriverManager.getConnection("jdbc:derby:target/" + dbName + ";create=true");
Statement statement = connection.createStatement()) {
dropTable(statement, "intTable");
dropTable(statement, "foreignTable");
statement.execute(
"CREATE TABLE foreignTable ("
+ "id INT not null primary key GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1), "
+ "foreignName varchar(100), "
+ "value int)");
statement.execute(
"CREATE TABLE intTable ("
+ "id INT not null primary key GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1), "
+ "keyName varchar(100), "
+ "value int, "
+ "foreignId int references foreignTable(id))");
statement.execute("INSERT INTO foreignTable (foreignName, value) VALUES ('keyOne', 1)");
statement.execute("INSERT INTO foreignTable (foreignName, value) VALUES ('keyTwo', 0)");
statement.execute("INSERT INTO foreignTable (foreignName, value) VALUES ('keyThree', -1)");
statement.execute("INSERT INTO intTable (keyName, value, foreignId) VALUES ('one', 1, 1)");
statement.execute("INSERT INTO intTable (keyName, value, foreignId) VALUES ('zero', 0, 1)");
statement.execute(
"INSERT INTO intTable (keyName, value, foreignId) VALUES ('negative one', -1, 1)");
} catch (final SQLException e) {
LOGGER.error(format("Failed attempt to populate DerbyDB: <%s>", e.getMessage()), e);
return false;
}
return true;
}
private static void dropTable(final Statement statement, final String tableName)
throws SQLException {
try {
statement.execute("DROP TABLE " + tableName);
} catch (SQLException e) {
// sql error code for "object does not exist"; which is fine, we're trying to delete the table
// see https://db.apache.org/derby/docs/10.17/ref/rrefexcept71493.html
if (!"42Y55".equals(e.getSQLState())) {
throw e;
}
}
}
private static ArrowType getArrowTypeFromJdbcType(
final int jdbcDataType, final int precision, final int scale) {
try {
return JdbcToArrowUtils.getArrowTypeFromJdbcType(
new JdbcFieldInfo(jdbcDataType, precision, scale), DEFAULT_CALENDAR);
} catch (UnsupportedOperationException ignored) {
return ArrowType.Utf8.INSTANCE;
}
}
private static void saveToVector(final Byte data, final UInt1Vector vector, final int index) {
vectorConsumer(
data,
vector,
fieldVector -> fieldVector.setNull(index),
(theData, fieldVector) -> fieldVector.setSafe(index, theData));
}
private static void saveToVector(final Byte data, final BitVector vector, final int index) {
vectorConsumer(
data,
vector,
fieldVector -> fieldVector.setNull(index),
(theData, fieldVector) -> fieldVector.setSafe(index, theData));
}
private static void saveToVector(final String data, final VarCharVector vector, final int index) {
preconditionCheckSaveToVector(vector, index);
vectorConsumer(
data,
vector,
fieldVector -> fieldVector.setNull(index),
(theData, fieldVector) -> fieldVector.setSafe(index, new Text(theData)));
}
private static void saveToVector(final Integer data, final IntVector vector, final int index) {
preconditionCheckSaveToVector(vector, index);
vectorConsumer(
data,
vector,
fieldVector -> fieldVector.setNull(index),
(theData, fieldVector) -> fieldVector.setSafe(index, theData));
}
private static void saveToVector(
final byte[] data, final VarBinaryVector vector, final int index) {
preconditionCheckSaveToVector(vector, index);
vectorConsumer(
data,
vector,
fieldVector -> fieldVector.setNull(index),
(theData, fieldVector) -> fieldVector.setSafe(index, theData));
}
private static void preconditionCheckSaveToVector(final FieldVector vector, final int index) {
Objects.requireNonNull(vector, "vector cannot be null.");
checkState(index >= 0, "Index must be a positive number!");
}
private static <T, V extends FieldVector> void vectorConsumer(
final T data,
final V vector,
final Consumer<V> consumerIfNullable,
final BiConsumer<T, V> defaultConsumer) {
if (isNull(data)) {
consumerIfNullable.accept(vector);
return;
}
defaultConsumer.accept(data, vector);
}
private static VectorSchemaRoot getSchemasRoot(
final ResultSet data, final BufferAllocator allocator) throws SQLException {
final VarCharVector catalogs = new VarCharVector("catalog_name", allocator);
final VarCharVector schemas =
new VarCharVector(
"db_schema_name", FieldType.notNullable(MinorType.VARCHAR.getType()), allocator);
final List<FieldVector> vectors = ImmutableList.of(catalogs, schemas);
vectors.forEach(FieldVector::allocateNew);
final Map<FieldVector, String> vectorToColumnName =
ImmutableMap.of(
catalogs, "TABLE_CATALOG",
schemas, "TABLE_SCHEM");
saveToVectors(vectorToColumnName, data);
final int rows =
vectors.stream()
.map(FieldVector::getValueCount)
.findAny()
.orElseThrow(IllegalStateException::new);
vectors.forEach(vector -> vector.setValueCount(rows));
return new VectorSchemaRoot(vectors);
}
private static <T extends FieldVector> int saveToVectors(
final Map<T, String> vectorToColumnName, final ResultSet data, boolean emptyToNull)
throws SQLException {
Predicate<ResultSet> alwaysTrue = (resultSet) -> true;
return saveToVectors(vectorToColumnName, data, emptyToNull, alwaysTrue);
}
@SuppressWarnings("StringSplitter")
private static <T extends FieldVector> int saveToVectors(
final Map<T, String> vectorToColumnName,
final ResultSet data,
boolean emptyToNull,
Predicate<ResultSet> resultSetPredicate)
throws SQLException {
Objects.requireNonNull(vectorToColumnName, "vectorToColumnName cannot be null.");
Objects.requireNonNull(data, "data cannot be null.");
final Set<Entry<T, String>> entrySet = vectorToColumnName.entrySet();
int rows = 0;
while (data.next()) {
if (!resultSetPredicate.test(data)) {
continue;
}
for (final Entry<T, String> vectorToColumn : entrySet) {
final T vector = vectorToColumn.getKey();
final String columnName = vectorToColumn.getValue();
if (vector instanceof VarCharVector) {
String thisData = data.getString(columnName);
saveToVector(
emptyToNull ? emptyToNull(thisData) : thisData, (VarCharVector) vector, rows);
} else if (vector instanceof IntVector) {
final int intValue = data.getInt(columnName);
saveToVector(data.wasNull() ? null : intValue, (IntVector) vector, rows);
} else if (vector instanceof UInt1Vector) {
final byte byteValue = data.getByte(columnName);
saveToVector(data.wasNull() ? null : byteValue, (UInt1Vector) vector, rows);
} else if (vector instanceof BitVector) {
final byte byteValue = data.getByte(columnName);
saveToVector(data.wasNull() ? null : byteValue, (BitVector) vector, rows);
} else if (vector instanceof ListVector) {
String createParamsValues = data.getString(columnName);
UnionListWriter writer = ((ListVector) vector).getWriter();
BufferAllocator allocator = vector.getAllocator();
final ArrowBuf buf = allocator.buffer(1024);
writer.setPosition(rows);
writer.startList();
if (createParamsValues != null) {
String[] split = createParamsValues.split(",");
range(0, split.length)
.forEach(
i -> {
byte[] bytes = split[i].getBytes(StandardCharsets.UTF_8);
Preconditions.checkState(
bytes.length < 1024,
"The amount of bytes is greater than what the ArrowBuf supports");
buf.setBytes(0, bytes);
writer.varChar().writeVarChar(0, bytes.length, buf);
});
}
buf.close();
writer.endList();
} else {
throw CallStatus.INVALID_ARGUMENT
.withDescription("Provided vector not supported")
.toRuntimeException();
}
}
rows++;
}
for (final Entry<T, String> vectorToColumn : entrySet) {
vectorToColumn.getKey().setValueCount(rows);
}
return rows;
}
private static <T extends FieldVector> void saveToVectors(
final Map<T, String> vectorToColumnName, final ResultSet data) throws SQLException {
saveToVectors(vectorToColumnName, data, false);
}
private static VectorSchemaRoot getTableTypesRoot(
final ResultSet data, final BufferAllocator allocator) throws SQLException {
return getRoot(data, allocator, "table_type", "TABLE_TYPE");
}
private static VectorSchemaRoot getCatalogsRoot(
final ResultSet data, final BufferAllocator allocator) throws SQLException {
return getRoot(data, allocator, "catalog_name", "TABLE_CATALOG");
}
private static VectorSchemaRoot getRoot(
final ResultSet data,
final BufferAllocator allocator,
final String fieldVectorName,
final String columnName)
throws SQLException {
final VarCharVector dataVector =
new VarCharVector(
fieldVectorName, FieldType.notNullable(MinorType.VARCHAR.getType()), allocator);
saveToVectors(ImmutableMap.of(dataVector, columnName), data);
final int rows = dataVector.getValueCount();
dataVector.setValueCount(rows);
return new VectorSchemaRoot(singletonList(dataVector));
}
private static VectorSchemaRoot getTypeInfoRoot(
CommandGetXdbcTypeInfo request, ResultSet typeInfo, final BufferAllocator allocator)
throws SQLException {
Preconditions.checkNotNull(allocator, "BufferAllocator cannot be null.");
VectorSchemaRoot root = VectorSchemaRoot.create(Schemas.GET_TYPE_INFO_SCHEMA, allocator);
Map<FieldVector, String> mapper = new HashMap<>();
mapper.put(root.getVector("type_name"), "TYPE_NAME");
mapper.put(root.getVector("data_type"), "DATA_TYPE");
mapper.put(root.getVector("column_size"), "PRECISION");
mapper.put(root.getVector("literal_prefix"), "LITERAL_PREFIX");
mapper.put(root.getVector("literal_suffix"), "LITERAL_SUFFIX");
mapper.put(root.getVector("create_params"), "CREATE_PARAMS");
mapper.put(root.getVector("nullable"), "NULLABLE");
mapper.put(root.getVector("case_sensitive"), "CASE_SENSITIVE");
mapper.put(root.getVector("searchable"), "SEARCHABLE");
mapper.put(root.getVector("unsigned_attribute"), "UNSIGNED_ATTRIBUTE");
mapper.put(root.getVector("fixed_prec_scale"), "FIXED_PREC_SCALE");
mapper.put(root.getVector("auto_increment"), "AUTO_INCREMENT");
mapper.put(root.getVector("local_type_name"), "LOCAL_TYPE_NAME");
mapper.put(root.getVector("minimum_scale"), "MINIMUM_SCALE");
mapper.put(root.getVector("maximum_scale"), "MAXIMUM_SCALE");
mapper.put(root.getVector("sql_data_type"), "SQL_DATA_TYPE");
mapper.put(root.getVector("datetime_subcode"), "SQL_DATETIME_SUB");
mapper.put(root.getVector("num_prec_radix"), "NUM_PREC_RADIX");
Predicate<ResultSet> predicate;
if (request.hasDataType()) {
predicate =
(resultSet) -> {
try {
return resultSet.getInt("DATA_TYPE") == request.getDataType();
} catch (SQLException e) {
throw new RuntimeException(e);
}
};
} else {
predicate = resultSet -> true;
}
int rows = saveToVectors(mapper, typeInfo, true, predicate);
root.setRowCount(rows);
return root;
}
private static VectorSchemaRoot getTablesRoot(
final DatabaseMetaData databaseMetaData,
final BufferAllocator allocator,
final boolean includeSchema,
final String catalog,
final String schemaFilterPattern,
final String tableFilterPattern,
final String... tableTypes)
throws SQLException, IOException {
/*
* TODO Fix DerbyDB inconsistency if possible.
* During the early development of this prototype, an inconsistency has been found in the database
* used for this demonstration; as DerbyDB does not operate with the concept of catalogs, fetching
* the catalog name for a given table from `DatabaseMetadata#getColumns` and `DatabaseMetadata#getSchemas`
* returns null, as expected. However, the inconsistency lies in the fact that accessing the same
* information -- that is, the catalog name for a given table -- from `DatabaseMetadata#getSchemas`
* returns an empty String.The temporary workaround for this was making sure we convert the empty Strings
* to null using `com.google.common.base.Strings#emptyToNull`.
*/
Objects.requireNonNull(allocator, "BufferAllocator cannot be null.");
final VarCharVector catalogNameVector = new VarCharVector("catalog_name", allocator);
final VarCharVector schemaNameVector = new VarCharVector("db_schema_name", allocator);
final VarCharVector tableNameVector =
new VarCharVector(
"table_name", FieldType.notNullable(MinorType.VARCHAR.getType()), allocator);
final VarCharVector tableTypeVector =
new VarCharVector(
"table_type", FieldType.notNullable(MinorType.VARCHAR.getType()), allocator);
final List<FieldVector> vectors = new ArrayList<>(4);
vectors.add(catalogNameVector);
vectors.add(schemaNameVector);
vectors.add(tableNameVector);
vectors.add(tableTypeVector);
vectors.forEach(FieldVector::allocateNew);
final Map<FieldVector, String> vectorToColumnName =
ImmutableMap.of(
catalogNameVector, "TABLE_CAT",
schemaNameVector, "TABLE_SCHEM",
tableNameVector, "TABLE_NAME",
tableTypeVector, "TABLE_TYPE");
try (final ResultSet data =
Objects.requireNonNull(
databaseMetaData,
format("%s cannot be null.", databaseMetaData.getClass().getName()))
.getTables(catalog, schemaFilterPattern, tableFilterPattern, tableTypes)) {
saveToVectors(vectorToColumnName, data, true);
final int rows =
vectors.stream()
.map(FieldVector::getValueCount)
.findAny()
.orElseThrow(IllegalStateException::new);
vectors.forEach(vector -> vector.setValueCount(rows));
if (includeSchema) {
final VarBinaryVector tableSchemaVector =
new VarBinaryVector(
"table_schema", FieldType.notNullable(MinorType.VARBINARY.getType()), allocator);
tableSchemaVector.allocateNew(rows);
try (final ResultSet columnsData =
databaseMetaData.getColumns(catalog, schemaFilterPattern, tableFilterPattern, null)) {
final Map<String, List<Field>> tableToFields = new HashMap<>();
while (columnsData.next()) {
final String catalogName = columnsData.getString("TABLE_CAT");
final String schemaName = columnsData.getString("TABLE_SCHEM");
final String tableName = columnsData.getString("TABLE_NAME");
final String typeName = columnsData.getString("TYPE_NAME");
final String fieldName = columnsData.getString("COLUMN_NAME");
final int dataType = columnsData.getInt("DATA_TYPE");
final boolean isNullable =
columnsData.getInt("NULLABLE") != DatabaseMetaData.columnNoNulls;
final int precision = columnsData.getInt("COLUMN_SIZE");
final int scale = columnsData.getInt("DECIMAL_DIGITS");
boolean isAutoIncrement =
Objects.equals(columnsData.getString("IS_AUTOINCREMENT"), "YES");
final List<Field> fields =
tableToFields.computeIfAbsent(tableName, tableName_ -> new ArrayList<>());
final FlightSqlColumnMetadata columnMetadata =
new FlightSqlColumnMetadata.Builder()
.catalogName(catalogName)
.schemaName(schemaName)
.tableName(tableName)
.typeName(typeName)
.precision(precision)
.scale(scale)
.isAutoIncrement(isAutoIncrement)
.build();
final Field field =
new Field(
fieldName,
new FieldType(
isNullable,
getArrowTypeFromJdbcType(dataType, precision, scale),
null,
columnMetadata.getMetadataMap()),
null);
fields.add(field);
}
for (int index = 0; index < rows; index++) {
final String tableName = tableNameVector.getObject(index).toString();
final Schema schema = new Schema(tableToFields.get(tableName));
saveToVector(
copyFrom(serializeMetadata(schema)).toByteArray(), tableSchemaVector, index);
}
}
tableSchemaVector.setValueCount(rows);
vectors.add(tableSchemaVector);
}
}
return new VectorSchemaRoot(vectors);
}
private static ByteBuffer serializeMetadata(final Schema schema) {
final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
try {
MessageSerializer.serialize(new WriteChannel(Channels.newChannel(outputStream)), schema);
return ByteBuffer.wrap(outputStream.toByteArray());
} catch (final IOException e) {
throw new RuntimeException("Failed to serialize schema", e);
}
}
private static String getRootAsCSVNoHeader(final VectorSchemaRoot root) {
StringBuilder sb = new StringBuilder();
Schema schema = root.getSchema();
int rowCount = root.getRowCount();
List<FieldVector> fieldVectors = root.getFieldVectors();
List<Object> row = new ArrayList<>(schema.getFields().size());
for (int i = 0; i < rowCount; i++) {
if (i > 0) {
sb.append("\n");
}
row.clear();
for (FieldVector v : fieldVectors) {
row.add(v.getObject(i));
}
printRowAsCSV(sb, row);
}
return sb.toString();
}
private static void printRowAsCSV(StringBuilder sb, List<Object> values) {
sb.append(
values.stream()
.map(v -> isNull(v) ? "" : v.toString())
.map(StringEscapeUtils::escapeCsv)
.collect(Collectors.joining(",")));
}
@Override
public void getStreamPreparedStatement(
final CommandPreparedStatementQuery command,
final CallContext context,
final ServerStreamListener listener) {
final ByteString handle = command.getPreparedStatementHandle();
StatementContext<PreparedStatement> statementContext =
preparedStatementLoadingCache.getIfPresent(handle);
Objects.requireNonNull(statementContext);
final PreparedStatement statement = statementContext.getStatement();
try (final ResultSet resultSet = statement.executeQuery()) {
final Schema schema = jdbcToArrowSchema(resultSet.getMetaData(), DEFAULT_CALENDAR);
try (final VectorSchemaRoot vectorSchemaRoot =
VectorSchemaRoot.create(schema, rootAllocator)) {
final VectorLoader loader = new VectorLoader(vectorSchemaRoot);
listener.start(vectorSchemaRoot);
final ArrowVectorIterator iterator = sqlToArrowVectorIterator(resultSet, rootAllocator);
while (iterator.hasNext()) {
final VectorSchemaRoot batch = iterator.next();
if (batch.getRowCount() == 0) {
break;
}
final VectorUnloader unloader = new VectorUnloader(batch);
loader.load(unloader.getRecordBatch());
listener.putNext();
vectorSchemaRoot.clear();
}
listener.putNext();
}
} catch (final SQLException | IOException e) {
LOGGER.error(format("Failed to getStreamPreparedStatement: <%s>.", e.getMessage()), e);
listener.error(
CallStatus.INTERNAL
.withDescription("Failed to prepare statement: " + e)
.toRuntimeException());
} finally {
listener.completed();
}
}
@Override
public void closePreparedStatement(
final ActionClosePreparedStatementRequest request,
final CallContext context,
final StreamListener<Result> listener) {
// Running on another thread
Future<?> unused =
executorService.submit(
() -> {
try {
preparedStatementLoadingCache.invalidate(request.getPreparedStatementHandle());
} catch (final Exception e) {
listener.onError(e);
return;
}
listener.onCompleted();
});
}
@Override
public FlightInfo getFlightInfoStatement(
final CommandStatementQuery request,
final CallContext context,
final FlightDescriptor descriptor) {
ByteString handle = copyFrom(randomUUID().toString().getBytes(StandardCharsets.UTF_8));
try {
// Ownership of the connection will be passed to the context. Do NOT close!
final Connection connection = dataSource.getConnection();
final Statement statement =
connection.createStatement(ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_READ_ONLY);
final String query = request.getQuery();
final StatementContext<Statement> statementContext = new StatementContext<>(statement, query);
statementLoadingCache.put(handle, statementContext);
final ResultSet resultSet = statement.executeQuery(query);
TicketStatementQuery ticket =
TicketStatementQuery.newBuilder().setStatementHandle(handle).build();
return getFlightInfoForSchema(
ticket, descriptor, jdbcToArrowSchema(resultSet.getMetaData(), DEFAULT_CALENDAR));
} catch (final SQLException e) {
LOGGER.error(
format("There was a problem executing the prepared statement: <%s>.", e.getMessage()), e);
throw CallStatus.INTERNAL.withCause(e).toRuntimeException();
}
}
@Override
public FlightInfo getFlightInfoPreparedStatement(
final CommandPreparedStatementQuery command,
final CallContext context,
final FlightDescriptor descriptor) {
final ByteString preparedStatementHandle = command.getPreparedStatementHandle();
StatementContext<PreparedStatement> statementContext =
preparedStatementLoadingCache.getIfPresent(preparedStatementHandle);
try {
assert statementContext != null;
PreparedStatement statement = statementContext.getStatement();
ResultSetMetaData metaData = statement.getMetaData();
return getFlightInfoForSchema(
command, descriptor, jdbcToArrowSchema(metaData, DEFAULT_CALENDAR));
} catch (final SQLException e) {
LOGGER.error(
format("There was a problem executing the prepared statement: <%s>.", e.getMessage()), e);
throw CallStatus.INTERNAL.withCause(e).toRuntimeException();
}
}
@Override
public SchemaResult getSchemaStatement(
final CommandStatementQuery command,
final CallContext context,
final FlightDescriptor descriptor) {
throw CallStatus.UNIMPLEMENTED.toRuntimeException();
}
@Override
public void close() throws Exception {
try {
preparedStatementLoadingCache.cleanUp();
} catch (Throwable t) {
LOGGER.error(format("Failed to close resources: <%s>", t.getMessage()), t);
}
AutoCloseables.close(dataSource, rootAllocator);
}
@Override
public void listFlights(
CallContext context, Criteria criteria, StreamListener<FlightInfo> listener) {
// TODO - build example implementation
throw CallStatus.UNIMPLEMENTED.toRuntimeException();
}
@Override
public void createPreparedStatement(
final ActionCreatePreparedStatementRequest request,
final CallContext context,
final StreamListener<Result> listener) {
// Running on another thread
Future<?> unused =
executorService.submit(
() -> {
try {
final ByteString preparedStatementHandle =
copyFrom(request.getQuery().getBytes(StandardCharsets.UTF_8));
// Ownership of the connection will be passed to the context. Do NOT close!
final Connection connection = dataSource.getConnection();
final PreparedStatement preparedStatement =
connection.prepareStatement(
request.getQuery(),
ResultSet.TYPE_SCROLL_INSENSITIVE,
ResultSet.CONCUR_READ_ONLY);
final StatementContext<PreparedStatement> preparedStatementContext =
new StatementContext<>(preparedStatement, request.getQuery());
preparedStatementLoadingCache.put(
preparedStatementHandle, preparedStatementContext);
final Schema parameterSchema =
jdbcToArrowSchema(preparedStatement.getParameterMetaData(), DEFAULT_CALENDAR);
final ResultSetMetaData metaData = preparedStatement.getMetaData();
final ByteString bytes =
isNull(metaData)
? ByteString.EMPTY
: ByteString.copyFrom(
serializeMetadata(jdbcToArrowSchema(metaData, DEFAULT_CALENDAR)));
final ActionCreatePreparedStatementResult result =
ActionCreatePreparedStatementResult.newBuilder()
.setDatasetSchema(bytes)
.setParameterSchema(copyFrom(serializeMetadata(parameterSchema)))
.setPreparedStatementHandle(preparedStatementHandle)
.build();
listener.onNext(new Result(pack(result).toByteArray()));
} catch (final SQLException e) {
listener.onError(
CallStatus.INTERNAL
.withDescription("Failed to create prepared statement: " + e)
.toRuntimeException());
return;
} catch (final Throwable t) {
listener.onError(
CallStatus.INTERNAL
.withDescription("Unknown error: " + t)
.toRuntimeException());
return;
}
listener.onCompleted();
});
}
@Override
public void doExchange(CallContext context, FlightStream reader, ServerStreamListener writer) {
// TODO - build example implementation
throw CallStatus.UNIMPLEMENTED.toRuntimeException();
}
@Override
public Runnable acceptPutStatement(
CommandStatementUpdate command,
CallContext context,
FlightStream flightStream,
StreamListener<PutResult> ackStream) {
final String query = command.getQuery();
return () -> {
try (final Connection connection = dataSource.getConnection();
final Statement statement = connection.createStatement()) {
final int result = statement.executeUpdate(query);
final DoPutUpdateResult build =
DoPutUpdateResult.newBuilder().setRecordCount(result).build();
try (final ArrowBuf buffer = rootAllocator.buffer(build.getSerializedSize())) {
buffer.writeBytes(build.toByteArray());
ackStream.onNext(PutResult.metadata(buffer));
ackStream.onCompleted();
}
} catch (SQLSyntaxErrorException e) {
ackStream.onError(
CallStatus.INVALID_ARGUMENT
.withDescription("Failed to execute statement (invalid syntax): " + e)
.toRuntimeException());
} catch (SQLException e) {
ackStream.onError(
CallStatus.INTERNAL
.withDescription("Failed to execute statement: " + e)
.toRuntimeException());
}
};
}
@Override
public Runnable acceptPutStatementBulkIngest(
CommandStatementIngest command,
CallContext context,
FlightStream flightStream,
StreamListener<PutResult> ackStream) {
final String schema = command.hasSchema() ? command.getSchema() : null;
final String table = command.getTable();
final boolean temporary = command.getTemporary();
final boolean transactionId = command.hasTransactionId();
final TableDefinitionOptions tableDefinitionOptions =
command.hasTableDefinitionOptions() ? command.getTableDefinitionOptions() : null;
return () -> {
TableExistsOption ifExists = TableExistsOption.TABLE_EXISTS_OPTION_APPEND;
if (temporary) {
ackStream.onError(
CallStatus.UNIMPLEMENTED
.withDescription("Bulk ingestion using temporary tables is not supported")
.toRuntimeException());
} else if (transactionId) {
ackStream.onError(
CallStatus.UNIMPLEMENTED
.withDescription(
"Bulk ingestion automatically happens in a transaction. Specifying explicit transaction is not supported.")
.toRuntimeException());
} else if (isNull(tableDefinitionOptions)) {
ackStream.onError(
CallStatus.INVALID_ARGUMENT
.withDescription("TableDefinitionOptions not provided.")
.toRuntimeException());
} else {
TableNotExistOption ifNotExist = tableDefinitionOptions.getIfNotExist();
ifExists = tableDefinitionOptions.getIfExists();
if (!TableNotExistOption.TABLE_NOT_EXIST_OPTION_FAIL.equals(ifNotExist)) {
ackStream.onError(
CallStatus.UNIMPLEMENTED
.withDescription(
"Only supported option is TABLE_NOT_EXIST_OPTION_FAIL for TableNotExistsOption.")
.toRuntimeException());
} else if (TableExistsOption.TABLE_EXISTS_OPTION_UNSPECIFIED.equals(ifExists)) {
ackStream.onError(
CallStatus.INVALID_ARGUMENT
.withDescription("TableExistsOption must be specified")
.toRuntimeException());
} else if (TableExistsOption.TABLE_EXISTS_OPTION_FAIL.equals(ifExists)) {
ackStream.onError(
CallStatus.UNIMPLEMENTED
.withDescription("TABLE_EXISTS_OPTION_FAIL is not supported.")
.toRuntimeException());
}
}
Path tempFile = null;
try {
tempFile = Files.createTempFile(null, null);
VectorSchemaRoot root = null;
int counter = 0;
while (flightStream.next()) {
if (counter > 0) {
Files.writeString(tempFile, "\n", StandardCharsets.UTF_8, StandardOpenOption.APPEND);
}
counter += 1;
root = flightStream.getRoot();
Files.writeString(
tempFile,
getRootAsCSVNoHeader(root),
StandardCharsets.UTF_8,
StandardOpenOption.APPEND);
}
if (counter > 0) {
Files.writeString(tempFile, "\n", StandardCharsets.UTF_8, StandardOpenOption.APPEND);
}
if (!isNull(root)) {
String header =
root.getSchema().getFields().stream()
.map(Field::getName)
.collect(Collectors.joining(","));
try (final Connection connection = dataSource.getConnection();
final PreparedStatement preparedStatement =
connection.prepareStatement(
"CALL SYSCS_UTIL.SYSCS_IMPORT_DATA (?,?,?,null,?,?,?,?,?)")) {
preparedStatement.setString(1, schema);
preparedStatement.setString(2, table);
preparedStatement.setString(3, header);
preparedStatement.setString(4, tempFile.toString());
preparedStatement.setString(5, ",");
preparedStatement.setString(6, "\"");
preparedStatement.setString(7, "UTF-8");
preparedStatement.setInt(
8, TableExistsOption.TABLE_EXISTS_OPTION_REPLACE.equals(ifExists) ? 1 : 0);
preparedStatement.execute();
final DoPutUpdateResult build =
DoPutUpdateResult.newBuilder().setRecordCount(-1).build();
try (final ArrowBuf buffer = rootAllocator.buffer(build.getSerializedSize())) {
buffer.writeBytes(build.toByteArray());
ackStream.onNext(PutResult.metadata(buffer));
ackStream.onCompleted();
}
} catch (SQLException e) {
ackStream.onError(
CallStatus.INTERNAL
.withDescription("Failed to execute bulk ingest: " + e)
.toRuntimeException());
}
}
} catch (IOException e) {
ackStream.onError(
CallStatus.INTERNAL
.withDescription("Failed to create temp file for bulk loading: " + e)
.toRuntimeException());
} finally {
if (!isNull(tempFile)) {
try {
Files.delete(tempFile);
} catch (IOException e) {
//
}
}
}
};
}
@Override
public Runnable acceptPutPreparedStatementUpdate(
CommandPreparedStatementUpdate command,
CallContext context,
FlightStream flightStream,
StreamListener<PutResult> ackStream) {
final StatementContext<PreparedStatement> statement =
preparedStatementLoadingCache.getIfPresent(command.getPreparedStatementHandle());
return () -> {
if (statement == null) {
ackStream.onError(
CallStatus.NOT_FOUND
.withDescription("Prepared statement does not exist")
.toRuntimeException());
return;
}
try {
final PreparedStatement preparedStatement = statement.getStatement();
while (flightStream.next()) {
final VectorSchemaRoot root = flightStream.getRoot();
final int rowCount = root.getRowCount();
final int recordCount;
if (rowCount == 0) {
preparedStatement.execute();
recordCount = preparedStatement.getUpdateCount();
} else {
final JdbcParameterBinder binder =
JdbcParameterBinder.builder(preparedStatement, root).bindAll().build();
while (binder.next()) {
preparedStatement.addBatch();
}
final int[] recordCounts = preparedStatement.executeBatch();
recordCount = Arrays.stream(recordCounts).sum();
}
final DoPutUpdateResult build =
DoPutUpdateResult.newBuilder().setRecordCount(recordCount).build();
try (final ArrowBuf buffer = rootAllocator.buffer(build.getSerializedSize())) {
buffer.writeBytes(build.toByteArray());
ackStream.onNext(PutResult.metadata(buffer));
}
}
} catch (SQLException e) {
ackStream.onError(
CallStatus.INTERNAL
.withDescription("Failed to execute update: " + e)
.toRuntimeException());
return;
}
ackStream.onCompleted();
};
}
@Override
public Runnable acceptPutPreparedStatementQuery(
CommandPreparedStatementQuery command,
CallContext context,
FlightStream flightStream,
StreamListener<PutResult> ackStream) {
final StatementContext<PreparedStatement> statementContext =
preparedStatementLoadingCache.getIfPresent(command.getPreparedStatementHandle());
return () -> {
assert statementContext != null;
PreparedStatement preparedStatement = statementContext.getStatement();
try {
while (flightStream.next()) {
final VectorSchemaRoot root = flightStream.getRoot();
final JdbcParameterBinder binder =
JdbcParameterBinder.builder(preparedStatement, root).bindAll().build();
while (binder.next()) {
// Do not execute() - will be done in a getStream call
}
}
} catch (SQLException e) {
ackStream.onError(
CallStatus.INTERNAL
.withDescription("Failed to bind parameters: " + e.getMessage())
.withCause(e)
.toRuntimeException());
return;
}
ackStream.onCompleted();
};
}
@Override
public FlightInfo getFlightInfoSqlInfo(
final CommandGetSqlInfo request,
final CallContext context,
final FlightDescriptor descriptor) {
return getFlightInfoForSchema(request, descriptor, Schemas.GET_SQL_INFO_SCHEMA);
}
@Override
public void getStreamSqlInfo(
final CommandGetSqlInfo command,
final CallContext context,
final ServerStreamListener listener) {
this.sqlInfoBuilder.send(command.getInfoList(), listener);
}
@Override
public FlightInfo getFlightInfoTypeInfo(
CommandGetXdbcTypeInfo request, CallContext context, FlightDescriptor descriptor) {
return getFlightInfoForSchema(request, descriptor, Schemas.GET_TYPE_INFO_SCHEMA);
}
@Override
public void getStreamTypeInfo(
CommandGetXdbcTypeInfo request, CallContext context, ServerStreamListener listener) {
try (final Connection connection = dataSource.getConnection();
final ResultSet typeInfo = connection.getMetaData().getTypeInfo();
final VectorSchemaRoot vectorSchemaRoot =
getTypeInfoRoot(request, typeInfo, rootAllocator)) {
listener.start(vectorSchemaRoot);
listener.putNext();
} catch (SQLException e) {
LOGGER.error(format("Failed to getStreamCatalogs: <%s>.", e.getMessage()), e);
listener.error(e);
} finally {
listener.completed();
}
}
@Override
public FlightInfo getFlightInfoCatalogs(
final CommandGetCatalogs request,
final CallContext context,
final FlightDescriptor descriptor) {
return getFlightInfoForSchema(request, descriptor, Schemas.GET_CATALOGS_SCHEMA);
}
@Override
public void getStreamCatalogs(final CallContext context, final ServerStreamListener listener) {
try (final Connection connection = dataSource.getConnection();
final ResultSet catalogs = connection.getMetaData().getCatalogs();
final VectorSchemaRoot vectorSchemaRoot = getCatalogsRoot(catalogs, rootAllocator)) {
listener.start(vectorSchemaRoot);
listener.putNext();
} catch (SQLException e) {
LOGGER.error(format("Failed to getStreamCatalogs: <%s>.", e.getMessage()), e);
listener.error(e);
} finally {
listener.completed();
}
}
@Override
public FlightInfo getFlightInfoSchemas(
final CommandGetDbSchemas request,
final CallContext context,
final FlightDescriptor descriptor) {
return getFlightInfoForSchema(request, descriptor, Schemas.GET_SCHEMAS_SCHEMA);
}
@Override
public void getStreamSchemas(
final CommandGetDbSchemas command,
final CallContext context,
final ServerStreamListener listener) {
final String catalog = command.hasCatalog() ? command.getCatalog() : null;
final String schemaFilterPattern =
command.hasDbSchemaFilterPattern() ? command.getDbSchemaFilterPattern() : null;
try (final Connection connection = dataSource.getConnection();
final ResultSet schemas =
connection.getMetaData().getSchemas(catalog, schemaFilterPattern);
final VectorSchemaRoot vectorSchemaRoot = getSchemasRoot(schemas, rootAllocator)) {
listener.start(vectorSchemaRoot);
listener.putNext();
} catch (SQLException e) {
LOGGER.error(format("Failed to getStreamSchemas: <%s>.", e.getMessage()), e);
listener.error(e);
} finally {
listener.completed();
}
}
@Override
public FlightInfo getFlightInfoTables(
final CommandGetTables request,
final CallContext context,
final FlightDescriptor descriptor) {
Schema schemaToUse = Schemas.GET_TABLES_SCHEMA;
if (!request.getIncludeSchema()) {
schemaToUse = Schemas.GET_TABLES_SCHEMA_NO_SCHEMA;
}
return getFlightInfoForSchema(request, descriptor, schemaToUse);
}
@Override
public void getStreamTables(
final CommandGetTables command,
final CallContext context,
final ServerStreamListener listener) {
final String catalog = command.hasCatalog() ? command.getCatalog() : null;
final String schemaFilterPattern =
command.hasDbSchemaFilterPattern() ? command.getDbSchemaFilterPattern() : null;
final String tableFilterPattern =
command.hasTableNameFilterPattern() ? command.getTableNameFilterPattern() : null;
final ProtocolStringList protocolStringList = command.getTableTypesList();
final int protocolSize = protocolStringList.size();
final String[] tableTypes =
protocolSize == 0 ? null : protocolStringList.toArray(new String[protocolSize]);
try (final Connection connection = DriverManager.getConnection(databaseUri);
final VectorSchemaRoot vectorSchemaRoot =
getTablesRoot(
connection.getMetaData(),
rootAllocator,
command.getIncludeSchema(),
catalog,
schemaFilterPattern,
tableFilterPattern,
tableTypes)) {
listener.start(vectorSchemaRoot);
listener.putNext();
} catch (SQLException | IOException e) {
LOGGER.error(format("Failed to getStreamTables: <%s>.", e.getMessage()), e);
listener.error(e);
} finally {
listener.completed();
}
}
@Override
public FlightInfo getFlightInfoTableTypes(
final CommandGetTableTypes request,
final CallContext context,
final FlightDescriptor descriptor) {
return getFlightInfoForSchema(request, descriptor, Schemas.GET_TABLE_TYPES_SCHEMA);
}
@Override
public void getStreamTableTypes(final CallContext context, final ServerStreamListener listener) {
try (final Connection connection = dataSource.getConnection();
final ResultSet tableTypes = connection.getMetaData().getTableTypes();
final VectorSchemaRoot vectorSchemaRoot = getTableTypesRoot(tableTypes, rootAllocator)) {
listener.start(vectorSchemaRoot);
listener.putNext();
} catch (SQLException e) {
LOGGER.error(format("Failed to getStreamTableTypes: <%s>.", e.getMessage()), e);
listener.error(e);
} finally {
listener.completed();
}
}
@Override
public FlightInfo getFlightInfoPrimaryKeys(
final CommandGetPrimaryKeys request,
final CallContext context,
final FlightDescriptor descriptor) {
return getFlightInfoForSchema(request, descriptor, Schemas.GET_PRIMARY_KEYS_SCHEMA);
}
@Override
public void getStreamPrimaryKeys(
final CommandGetPrimaryKeys command,
final CallContext context,
final ServerStreamListener listener) {
final String catalog = command.hasCatalog() ? command.getCatalog() : null;
final String schema = command.hasDbSchema() ? command.getDbSchema() : null;
final String table = command.getTable();
try (Connection connection = DriverManager.getConnection(databaseUri)) {
final ResultSet primaryKeys = connection.getMetaData().getPrimaryKeys(catalog, schema, table);
final VarCharVector catalogNameVector = new VarCharVector("catalog_name", rootAllocator);
final VarCharVector schemaNameVector = new VarCharVector("db_schema_name", rootAllocator);
final VarCharVector tableNameVector = new VarCharVector("table_name", rootAllocator);
final VarCharVector columnNameVector = new VarCharVector("column_name", rootAllocator);
final IntVector keySequenceVector = new IntVector("key_sequence", rootAllocator);
final VarCharVector keyNameVector = new VarCharVector("key_name", rootAllocator);
final List<FieldVector> vectors =
new ArrayList<>(
ImmutableList.of(
catalogNameVector,
schemaNameVector,
tableNameVector,
columnNameVector,
keySequenceVector,
keyNameVector));
vectors.forEach(FieldVector::allocateNew);
int rows = 0;
for (; primaryKeys.next(); rows++) {
saveToVector(primaryKeys.getString("TABLE_CAT"), catalogNameVector, rows);
saveToVector(primaryKeys.getString("TABLE_SCHEM"), schemaNameVector, rows);
saveToVector(primaryKeys.getString("TABLE_NAME"), tableNameVector, rows);
saveToVector(primaryKeys.getString("COLUMN_NAME"), columnNameVector, rows);
final int key_seq = primaryKeys.getInt("KEY_SEQ");
saveToVector(primaryKeys.wasNull() ? null : key_seq, keySequenceVector, rows);
saveToVector(primaryKeys.getString("PK_NAME"), keyNameVector, rows);
}
try (final VectorSchemaRoot vectorSchemaRoot = new VectorSchemaRoot(vectors)) {
vectorSchemaRoot.setRowCount(rows);
listener.start(vectorSchemaRoot);
listener.putNext();
}
} catch (SQLException e) {
listener.error(e);
} finally {
listener.completed();
}
}
@Override
public FlightInfo getFlightInfoExportedKeys(
final CommandGetExportedKeys request,
final CallContext context,
final FlightDescriptor descriptor) {
return getFlightInfoForSchema(request, descriptor, Schemas.GET_EXPORTED_KEYS_SCHEMA);
}
@Override
public void getStreamExportedKeys(
final CommandGetExportedKeys command,
final CallContext context,
final ServerStreamListener listener) {
String catalog = command.hasCatalog() ? command.getCatalog() : null;
String schema = command.hasDbSchema() ? command.getDbSchema() : null;
String table = command.getTable();
try (Connection connection = DriverManager.getConnection(databaseUri);
ResultSet keys = connection.getMetaData().getExportedKeys(catalog, schema, table);
VectorSchemaRoot vectorSchemaRoot = createVectors(keys)) {
listener.start(vectorSchemaRoot);
listener.putNext();
} catch (SQLException e) {
listener.error(e);
} finally {
listener.completed();
}
}
@Override
public FlightInfo getFlightInfoImportedKeys(
final CommandGetImportedKeys request,
final CallContext context,
final FlightDescriptor descriptor) {
return getFlightInfoForSchema(request, descriptor, Schemas.GET_IMPORTED_KEYS_SCHEMA);
}
@Override
public void getStreamImportedKeys(
final CommandGetImportedKeys command,
final CallContext context,
final ServerStreamListener listener) {
String catalog = command.hasCatalog() ? command.getCatalog() : null;
String schema = command.hasDbSchema() ? command.getDbSchema() : null;
String table = command.getTable();
try (Connection connection = DriverManager.getConnection(databaseUri);
ResultSet keys = connection.getMetaData().getImportedKeys(catalog, schema, table);
VectorSchemaRoot vectorSchemaRoot = createVectors(keys)) {
listener.start(vectorSchemaRoot);
listener.putNext();
} catch (final SQLException e) {
listener.error(e);
} finally {
listener.completed();
}
}
@Override
public FlightInfo getFlightInfoCrossReference(
CommandGetCrossReference request, CallContext context, FlightDescriptor descriptor) {
return getFlightInfoForSchema(request, descriptor, Schemas.GET_CROSS_REFERENCE_SCHEMA);
}
@Override
public void getStreamCrossReference(
CommandGetCrossReference command, CallContext context, ServerStreamListener listener) {
final String pkCatalog = command.hasPkCatalog() ? command.getPkCatalog() : null;
final String pkSchema = command.hasPkDbSchema() ? command.getPkDbSchema() : null;
final String fkCatalog = command.hasFkCatalog() ? command.getFkCatalog() : null;
final String fkSchema = command.hasFkDbSchema() ? command.getFkDbSchema() : null;
final String pkTable = command.getPkTable();
final String fkTable = command.getFkTable();
try (Connection connection = DriverManager.getConnection(databaseUri);
ResultSet keys =
connection
.getMetaData()
.getCrossReference(pkCatalog, pkSchema, pkTable, fkCatalog, fkSchema, fkTable);
VectorSchemaRoot vectorSchemaRoot = createVectors(keys)) {
listener.start(vectorSchemaRoot);
listener.putNext();
} catch (final SQLException e) {
listener.error(e);
} finally {
listener.completed();
}
}
private VectorSchemaRoot createVectors(ResultSet keys) throws SQLException {
final VarCharVector pkCatalogNameVector = new VarCharVector("pk_catalog_name", rootAllocator);
final VarCharVector pkSchemaNameVector = new VarCharVector("pk_db_schema_name", rootAllocator);
final VarCharVector pkTableNameVector = new VarCharVector("pk_table_name", rootAllocator);
final VarCharVector pkColumnNameVector = new VarCharVector("pk_column_name", rootAllocator);
final VarCharVector fkCatalogNameVector = new VarCharVector("fk_catalog_name", rootAllocator);
final VarCharVector fkSchemaNameVector = new VarCharVector("fk_db_schema_name", rootAllocator);
final VarCharVector fkTableNameVector = new VarCharVector("fk_table_name", rootAllocator);
final VarCharVector fkColumnNameVector = new VarCharVector("fk_column_name", rootAllocator);
final IntVector keySequenceVector = new IntVector("key_sequence", rootAllocator);
final VarCharVector fkKeyNameVector = new VarCharVector("fk_key_name", rootAllocator);
final VarCharVector pkKeyNameVector = new VarCharVector("pk_key_name", rootAllocator);
final UInt1Vector updateRuleVector = new UInt1Vector("update_rule", rootAllocator);
final UInt1Vector deleteRuleVector = new UInt1Vector("delete_rule", rootAllocator);
Map<FieldVector, String> vectorToColumnName = new HashMap<>();
vectorToColumnName.put(pkCatalogNameVector, "PKTABLE_CAT");
vectorToColumnName.put(pkSchemaNameVector, "PKTABLE_SCHEM");
vectorToColumnName.put(pkTableNameVector, "PKTABLE_NAME");
vectorToColumnName.put(pkColumnNameVector, "PKCOLUMN_NAME");
vectorToColumnName.put(fkCatalogNameVector, "FKTABLE_CAT");
vectorToColumnName.put(fkSchemaNameVector, "FKTABLE_SCHEM");
vectorToColumnName.put(fkTableNameVector, "FKTABLE_NAME");
vectorToColumnName.put(fkColumnNameVector, "FKCOLUMN_NAME");
vectorToColumnName.put(keySequenceVector, "KEY_SEQ");
vectorToColumnName.put(updateRuleVector, "UPDATE_RULE");
vectorToColumnName.put(deleteRuleVector, "DELETE_RULE");
vectorToColumnName.put(fkKeyNameVector, "FK_NAME");
vectorToColumnName.put(pkKeyNameVector, "PK_NAME");
final VectorSchemaRoot vectorSchemaRoot =
VectorSchemaRoot.of(
pkCatalogNameVector,
pkSchemaNameVector,
pkTableNameVector,
pkColumnNameVector,
fkCatalogNameVector,
fkSchemaNameVector,
fkTableNameVector,
fkColumnNameVector,
keySequenceVector,
fkKeyNameVector,
pkKeyNameVector,
updateRuleVector,
deleteRuleVector);
vectorSchemaRoot.allocateNew();
final int rowCount = saveToVectors(vectorToColumnName, keys, true);
vectorSchemaRoot.setRowCount(rowCount);
return vectorSchemaRoot;
}
@Override
public void getStreamStatement(
final TicketStatementQuery ticketStatementQuery,
final CallContext context,
final ServerStreamListener listener) {
final ByteString handle = ticketStatementQuery.getStatementHandle();
final StatementContext<Statement> statementContext =
Objects.requireNonNull(statementLoadingCache.getIfPresent(handle));
try (final ResultSet resultSet = statementContext.getStatement().getResultSet()) {
final Schema schema = jdbcToArrowSchema(resultSet.getMetaData(), DEFAULT_CALENDAR);
try (VectorSchemaRoot vectorSchemaRoot = VectorSchemaRoot.create(schema, rootAllocator)) {
final VectorLoader loader = new VectorLoader(vectorSchemaRoot);
listener.start(vectorSchemaRoot);
final ArrowVectorIterator iterator = sqlToArrowVectorIterator(resultSet, rootAllocator);
while (iterator.hasNext()) {
final VectorUnloader unloader = new VectorUnloader(iterator.next());
loader.load(unloader.getRecordBatch());
listener.putNext();
vectorSchemaRoot.clear();
}
listener.putNext();
}
} catch (SQLException | IOException e) {
LOGGER.error(format("Failed to getStreamPreparedStatement: <%s>.", e.getMessage()), e);
listener.error(e);
} finally {
listener.completed();
statementLoadingCache.invalidate(handle);
}
}
protected <T extends Message> FlightInfo getFlightInfoForSchema(
final T request, final FlightDescriptor descriptor, final Schema schema) {
final Ticket ticket = new Ticket(pack(request).toByteArray());
// TODO Support multiple endpoints.
final List<FlightEndpoint> endpoints = singletonList(new FlightEndpoint(ticket, location));
return new FlightInfo(schema, descriptor, endpoints, -1, -1);
}
private static class StatementRemovalListener<T extends Statement>
implements RemovalListener<ByteString, StatementContext<T>> {
@Override
public void onRemoval(final RemovalNotification<ByteString, StatementContext<T>> notification) {
try {
AutoCloseables.close(notification.getValue());
} catch (final Exception e) {
// swallow
}
}
}
}