FlightSqlStatelessExample.java
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.arrow.flight.sql.example;
import static java.lang.String.format;
import static org.apache.arrow.adapter.jdbc.JdbcToArrow.sqlToArrowVectorIterator;
import static org.apache.arrow.adapter.jdbc.JdbcToArrowUtils.jdbcToArrowSchema;
import static org.apache.arrow.flight.sql.impl.FlightSql.*;
import static org.slf4j.LoggerFactory.getLogger;
import com.google.protobuf.ByteString;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.StreamCorruptedException;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import org.apache.arrow.adapter.jdbc.ArrowVectorIterator;
import org.apache.arrow.adapter.jdbc.JdbcParameterBinder;
import org.apache.arrow.flight.CallStatus;
import org.apache.arrow.flight.FlightDescriptor;
import org.apache.arrow.flight.FlightInfo;
import org.apache.arrow.flight.FlightStream;
import org.apache.arrow.flight.Location;
import org.apache.arrow.flight.PutResult;
import org.apache.arrow.flight.sql.FlightSqlProducer;
import org.apache.arrow.memory.ArrowBuf;
import org.apache.arrow.vector.VectorLoader;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.VectorUnloader;
import org.apache.arrow.vector.ipc.ArrowFileReader;
import org.apache.arrow.vector.ipc.ArrowFileWriter;
import org.apache.arrow.vector.ipc.SeekableReadChannel;
import org.apache.arrow.vector.ipc.message.ArrowBlock;
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.arrow.vector.util.ByteArrayReadableSeekableByteChannel;
import org.slf4j.Logger;
/**
* Example {@link FlightSqlProducer} implementation showing an Apache Derby backed Flight SQL server
* that generally supports all current features of Flight SQL.
*/
public class FlightSqlStatelessExample extends FlightSqlExample {
private static final Logger LOGGER = getLogger(FlightSqlStatelessExample.class);
public static final String DB_NAME = "derbyStatelessDB";
public FlightSqlStatelessExample(final Location location, final String dbName) {
super(location, dbName);
}
@Override
public Runnable acceptPutPreparedStatementQuery(
CommandPreparedStatementQuery command,
CallContext context,
FlightStream flightStream,
StreamListener<PutResult> ackStream) {
return () -> {
final String query = new String(command.getPreparedStatementHandle().toStringUtf8());
try (Connection connection = dataSource.getConnection();
PreparedStatement preparedStatement = createPreparedStatement(connection, query)) {
while (flightStream.next()) {
final VectorSchemaRoot root = flightStream.getRoot();
final JdbcParameterBinder binder =
JdbcParameterBinder.builder(preparedStatement, root).bindAll().build();
while (binder.next()) {
// Do not execute() - will be done in a getStream call
}
final ByteArrayOutputStream parametersStream = new ByteArrayOutputStream();
try (ArrowFileWriter writer =
new ArrowFileWriter(root, null, Channels.newChannel(parametersStream))) {
writer.start();
writer.writeBatch();
}
if (parametersStream.size() > 0) {
final DoPutPreparedStatementResultPOJO doPutPreparedStatementResultPOJO =
new DoPutPreparedStatementResultPOJO(query, parametersStream.toByteArray());
final byte[] doPutPreparedStatementResultPOJOArr =
serializePOJO(doPutPreparedStatementResultPOJO);
final DoPutPreparedStatementResult doPutPreparedStatementResult =
DoPutPreparedStatementResult.newBuilder()
.setPreparedStatementHandle(
ByteString.copyFrom(ByteBuffer.wrap(doPutPreparedStatementResultPOJOArr)))
.build();
try (final ArrowBuf buffer =
rootAllocator.buffer(doPutPreparedStatementResult.getSerializedSize())) {
buffer.writeBytes(doPutPreparedStatementResult.toByteArray());
ackStream.onNext(PutResult.metadata(buffer));
}
}
}
} catch (SQLException | IOException e) {
ackStream.onError(
CallStatus.INTERNAL
.withDescription("Failed to bind parameters: " + e.getMessage())
.withCause(e)
.toRuntimeException());
return;
}
ackStream.onCompleted();
};
}
@Override
public void getStreamPreparedStatement(
final CommandPreparedStatementQuery command,
final CallContext context,
final ServerStreamListener listener) {
final byte[] handle = command.getPreparedStatementHandle().toByteArray();
try {
// Case where there are parameters
try {
final DoPutPreparedStatementResultPOJO doPutPreparedStatementResultPOJO =
deserializePOJO(handle);
final String query = doPutPreparedStatementResultPOJO.getQuery();
try (Connection connection = dataSource.getConnection();
PreparedStatement statement = createPreparedStatement(connection, query);
ArrowFileReader reader =
new ArrowFileReader(
new SeekableReadChannel(
new ByteArrayReadableSeekableByteChannel(
doPutPreparedStatementResultPOJO.getParameters())),
rootAllocator)) {
for (ArrowBlock arrowBlock : reader.getRecordBlocks()) {
reader.loadRecordBatch(arrowBlock);
VectorSchemaRoot vectorSchemaRootRecover = reader.getVectorSchemaRoot();
JdbcParameterBinder binder =
JdbcParameterBinder.builder(statement, vectorSchemaRootRecover).bindAll().build();
while (binder.next()) {
executeQuery(statement, listener);
}
}
}
} catch (StreamCorruptedException e) {
// Case where there are no parameters
final String query = new String(command.getPreparedStatementHandle().toStringUtf8());
try (Connection connection = dataSource.getConnection();
PreparedStatement preparedStatement = createPreparedStatement(connection, query)) {
executeQuery(preparedStatement, listener);
}
}
} catch (final SQLException | IOException | ClassNotFoundException e) {
LOGGER.error(format("Failed to getStreamPreparedStatement: <%s>.", e.getMessage()), e);
listener.error(
CallStatus.INTERNAL
.withDescription("Failed to prepare statement: " + e)
.toRuntimeException());
} finally {
listener.completed();
}
}
private void executeQuery(PreparedStatement statement, final ServerStreamListener listener)
throws IOException, SQLException {
try (final ResultSet resultSet = statement.executeQuery()) {
final Schema schema = jdbcToArrowSchema(resultSet.getMetaData(), DEFAULT_CALENDAR);
try (final VectorSchemaRoot vectorSchemaRoot =
VectorSchemaRoot.create(schema, rootAllocator)) {
final VectorLoader loader = new VectorLoader(vectorSchemaRoot);
listener.start(vectorSchemaRoot);
final ArrowVectorIterator iterator = sqlToArrowVectorIterator(resultSet, rootAllocator);
while (iterator.hasNext()) {
final VectorSchemaRoot batch = iterator.next();
if (batch.getRowCount() == 0) {
break;
}
final VectorUnloader unloader = new VectorUnloader(batch);
loader.load(unloader.getRecordBatch());
listener.putNext();
vectorSchemaRoot.clear();
}
listener.putNext();
}
}
}
@Override
public FlightInfo getFlightInfoPreparedStatement(
final CommandPreparedStatementQuery command,
final CallContext context,
final FlightDescriptor descriptor) {
final byte[] handle = command.getPreparedStatementHandle().toByteArray();
try {
String query;
try {
query = deserializePOJO(handle).getQuery();
} catch (StreamCorruptedException e) {
query = new String(command.getPreparedStatementHandle().toStringUtf8());
}
try (Connection connection = dataSource.getConnection();
PreparedStatement statement = createPreparedStatement(connection, query)) {
ResultSetMetaData metaData = statement.getMetaData();
return getFlightInfoForSchema(
command, descriptor, jdbcToArrowSchema(metaData, DEFAULT_CALENDAR));
}
} catch (final SQLException | IOException | ClassNotFoundException e) {
LOGGER.error(
format("There was a problem executing the prepared statement: <%s>.", e.getMessage()), e);
throw CallStatus.INTERNAL.withCause(e).toRuntimeException();
}
}
private DoPutPreparedStatementResultPOJO deserializePOJO(byte[] handle)
throws IOException, ClassNotFoundException {
try (ByteArrayInputStream bis = new ByteArrayInputStream(handle);
ObjectInputStream ois = new ObjectInputStream(bis)) {
return (DoPutPreparedStatementResultPOJO) ois.readObject();
}
}
private byte[] serializePOJO(DoPutPreparedStatementResultPOJO doPutPreparedStatementResultPOJO)
throws IOException {
try (ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bos)) {
oos.writeObject(doPutPreparedStatementResultPOJO);
return bos.toByteArray();
}
}
private PreparedStatement createPreparedStatement(Connection connection, String query)
throws SQLException {
return connection.prepareStatement(
query, ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_READ_ONLY);
}
}