ResultSetTest.java
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.arrow.driver.jdbc;
import static java.lang.String.format;
import static java.util.Collections.synchronizedSet;
import static org.apache.arrow.flight.Location.forGrpcInsecure;
import static org.hamcrest.CoreMatchers.allOf;
import static org.hamcrest.CoreMatchers.anyOf;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.*;
import com.google.common.collect.ImmutableSet;
import java.nio.charset.StandardCharsets;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.SQLTimeoutException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import org.apache.arrow.driver.jdbc.utils.CoreMockedSqlProducers;
import org.apache.arrow.driver.jdbc.utils.FallbackFlightSqlProducer;
import org.apache.arrow.driver.jdbc.utils.PartitionedFlightSqlProducer;
import org.apache.arrow.flight.FlightEndpoint;
import org.apache.arrow.flight.FlightProducer;
import org.apache.arrow.flight.FlightRuntimeException;
import org.apache.arrow.flight.FlightServer;
import org.apache.arrow.flight.FlightStatusCode;
import org.apache.arrow.flight.Location;
import org.apache.arrow.flight.Ticket;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.types.Types;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.Schema;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
public class ResultSetTest {
private static final Random RANDOM = new Random(10);
@RegisterExtension
public static final FlightServerTestExtension FLIGHT_SERVER_TEST_EXTENSION =
FlightServerTestExtension.createStandardTestExtension(
CoreMockedSqlProducers.getLegacyProducer());
private static Connection connection;
@BeforeAll
public static void setup() throws SQLException {
connection = FLIGHT_SERVER_TEST_EXTENSION.getConnection(false);
}
@AfterAll
public static void tearDown() throws SQLException {
connection.close();
}
private static void resultSetNextUntilDone(ResultSet resultSet) throws SQLException {
while (resultSet.next()) {
// TODO: implement resultSet.last()
// Pass to the next until resultSet is done
}
}
/**
* Tests whether the {@link ArrowFlightJdbcDriver} can run a query successfully.
*
* @throws Exception If the connection fails to be established.
*/
@Test
public void testShouldRunSelectQuery() throws Exception {
try (Statement statement = connection.createStatement();
ResultSet resultSet =
statement.executeQuery(CoreMockedSqlProducers.LEGACY_REGULAR_SQL_CMD)) {
CoreMockedSqlProducers.assertLegacyRegularSqlResultSet(resultSet);
}
}
@Test
public void testShouldExecuteQueryNotBlockIfClosedBeforeEnd() throws Exception {
try (Statement statement = connection.createStatement();
ResultSet resultSet =
statement.executeQuery(CoreMockedSqlProducers.LEGACY_REGULAR_SQL_CMD)) {
for (int i = 0; i < 7500; i++) {
assertTrue(resultSet.next());
}
}
}
/**
* Tests whether the {@link ArrowFlightJdbcDriver} query only returns only the amount of value set
* by {@link org.apache.calcite.avatica.AvaticaStatement#setMaxRows(int)}.
*
* @throws Exception If the connection fails to be established.
*/
@Test
public void testShouldRunSelectQuerySettingMaxRowLimit() throws Exception {
try (Statement statement = connection.createStatement();
ResultSet resultSet =
statement.executeQuery(CoreMockedSqlProducers.LEGACY_REGULAR_SQL_CMD)) {
final int maxRowsLimit = 3;
statement.setMaxRows(maxRowsLimit);
assertThat(statement.getMaxRows(), is(maxRowsLimit));
int count = 0;
int columns = 6;
for (; resultSet.next(); count++) {
for (int column = 1; column <= columns; column++) {
resultSet.getObject(column);
}
assertThat("Test Name #" + count, is(resultSet.getString(2)));
}
assertThat(maxRowsLimit, is(count));
}
}
/**
* Tests whether the {@link ArrowFlightJdbcDriver} fails upon attempting to run an invalid query.
*
* @throws Exception If the connection fails to be established.
*/
@Test
public void testShouldThrowExceptionUponAttemptingToExecuteAnInvalidSelectQuery() {
assertThrows(
SQLException.class,
() -> {
try (Statement statement = connection.createStatement();
ResultSet resultSet = statement.executeQuery("SELECT * FROM SHOULD-FAIL")) {
fail();
}
});
}
/**
* Tests whether the {@link ArrowFlightJdbcDriver} query only returns only the amount of value set
* by {@link org.apache.calcite.avatica.AvaticaStatement#setLargeMaxRows(long)} (int)}.
*
* @throws Exception If the connection fails to be established.
*/
@Test
public void testShouldRunSelectQuerySettingLargeMaxRowLimit() throws Exception {
try (Statement statement = connection.createStatement();
ResultSet resultSet =
statement.executeQuery(CoreMockedSqlProducers.LEGACY_REGULAR_SQL_CMD)) {
final long maxRowsLimit = 3;
statement.setLargeMaxRows(maxRowsLimit);
assertThat(statement.getLargeMaxRows(), is(maxRowsLimit));
int count = 0;
int columns = resultSet.getMetaData().getColumnCount();
for (; resultSet.next(); count++) {
for (int column = 1; column <= columns; column++) {
resultSet.getObject(column);
}
assertEquals("Test Name #" + count, resultSet.getString(2));
}
assertEquals(maxRowsLimit, count);
}
}
@Test
public void testColumnCountShouldRemainConsistentForResultSetThroughoutEntireDuration()
throws SQLException {
final Set<Integer> counts = new HashSet<>();
try (final Statement statement = connection.createStatement();
final ResultSet resultSet =
statement.executeQuery(CoreMockedSqlProducers.LEGACY_REGULAR_SQL_CMD)) {
while (resultSet.next()) {
counts.add(resultSet.getMetaData().getColumnCount());
}
}
assertThat(counts, is(ImmutableSet.of(6)));
}
/**
* Tests whether the {@link ArrowFlightJdbcDriver} close the statement after complete ResultSet
* when call {@link org.apache.calcite.avatica.AvaticaStatement#closeOnCompletion()}.
*
* @throws Exception If the connection fails to be established.
*/
@Test
public void testShouldCloseStatementWhenIsCloseOnCompletion() throws Exception {
try (Statement statement = connection.createStatement();
ResultSet resultSet =
statement.executeQuery(CoreMockedSqlProducers.LEGACY_REGULAR_SQL_CMD)) {
statement.closeOnCompletion();
resultSetNextUntilDone(resultSet);
assertThat(statement.isClosed(), is(true));
}
}
/**
* Tests whether the {@link ArrowFlightJdbcDriver} close the statement after complete ResultSet
* with max rows limit when call {@link
* org.apache.calcite.avatica.AvaticaStatement#closeOnCompletion()}.
*
* @throws Exception If the connection fails to be established.
*/
@Test
public void testShouldCloseStatementWhenIsCloseOnCompletionWithMaxRowsLimit() throws Exception {
try (Statement statement = connection.createStatement();
ResultSet resultSet =
statement.executeQuery(CoreMockedSqlProducers.LEGACY_REGULAR_SQL_CMD)) {
final long maxRowsLimit = 3;
statement.setLargeMaxRows(maxRowsLimit);
statement.closeOnCompletion();
resultSetNextUntilDone(resultSet);
assertThat(statement.isClosed(), is(true));
}
}
/**
* Tests whether the {@link ArrowFlightJdbcDriver} not close the statement after complete
* ResultSet with max rows limit when call {@link
* org.apache.calcite.avatica.AvaticaStatement#closeOnCompletion()}.
*
* @throws Exception If the connection fails to be established.
*/
@Test
public void testShouldNotCloseStatementWhenIsNotCloseOnCompletionWithMaxRowsLimit()
throws Exception {
try (Statement statement = connection.createStatement();
ResultSet resultSet =
statement.executeQuery(CoreMockedSqlProducers.LEGACY_REGULAR_SQL_CMD)) {
final long maxRowsLimit = 3;
statement.setLargeMaxRows(maxRowsLimit);
assertThat(statement.isClosed(), is(false));
resultSetNextUntilDone(resultSet);
assertThat(resultSet.isClosed(), is(false));
assertThat(resultSet, is(instanceOf(ArrowFlightJdbcFlightStreamResultSet.class)));
}
}
@Test
public void testShouldCancelQueryUponCancelAfterQueryingResultSet() throws SQLException {
try (final Statement statement = connection.createStatement();
final ResultSet resultSet =
statement.executeQuery(CoreMockedSqlProducers.LEGACY_REGULAR_SQL_CMD)) {
final int column = RANDOM.nextInt(resultSet.getMetaData().getColumnCount()) + 1;
assertThat(resultSet.isClosed(), is(false));
assertThat(resultSet.next(), is(true));
assertDoesNotThrow(() -> resultSet.getObject(column));
statement.cancel();
// Should reset `ResultSet`; keep both `ResultSet` and `Connection` open.
assertThat(statement.isClosed(), is(false));
assertThat(resultSet.isClosed(), is(false));
assertThat(resultSet.getMetaData().getColumnCount(), is(0));
}
}
@Test
public void testShouldInterruptFlightStreamsIfQueryIsCancelledMidQuerying()
throws SQLException, InterruptedException {
try (final Statement statement = connection.createStatement()) {
final CountDownLatch latch = new CountDownLatch(1);
final Set<Exception> exceptions = synchronizedSet(new HashSet<>(1));
final Thread thread =
new Thread(
() -> {
try (final ResultSet resultSet =
statement.executeQuery(CoreMockedSqlProducers.LEGACY_REGULAR_SQL_CMD)) {
final int cachedColumnCount = resultSet.getMetaData().getColumnCount();
Thread.sleep(300);
while (resultSet.next()) {
resultSet.getObject(RANDOM.nextInt(cachedColumnCount) + 1);
}
} catch (final SQLException | InterruptedException e) {
exceptions.add(e);
} finally {
latch.countDown();
}
});
thread.setName("Test Case: interrupt query execution before first retrieval");
thread.start();
statement.cancel();
thread.join();
assertThat(
exceptions.stream()
.map(Exception::getMessage)
.map(StringBuilder::new)
.reduce(StringBuilder::append)
.orElseThrow(IllegalArgumentException::new)
.toString(),
is("Statement canceled"));
}
}
@Test
public void
testShouldInterruptFlightStreamsIfQueryIsCancelledMidProcessingForTimeConsumingQueries()
throws SQLException, InterruptedException {
final String query = CoreMockedSqlProducers.LEGACY_CANCELLATION_SQL_CMD;
try (final Statement statement = connection.createStatement()) {
final Set<Exception> exceptions = synchronizedSet(new HashSet<>(1));
final Thread thread =
new Thread(
() -> {
try (final ResultSet ignored = statement.executeQuery(query)) {
fail();
} catch (final SQLException e) {
exceptions.add(e);
}
});
thread.setName("Test Case: interrupt query execution mid-process");
thread.setPriority(Thread.MAX_PRIORITY);
thread.start();
Thread.sleep(5000); // Let the other thread attempt to retrieve results.
statement.cancel();
thread.join();
assertThat(
exceptions.stream()
.map(Exception::getMessage)
.map(StringBuilder::new)
.reduce(StringBuilder::append)
.orElseThrow(IllegalStateException::new)
.toString(),
anyOf(
is(format("Error while executing SQL \"%s\": Query canceled", query)),
allOf(
containsString(format("Error while executing SQL \"%s\"", query)),
anyOf(containsString("CANCELLED"), containsString("Cancelling")))));
}
}
@Test
public void testShouldInterruptFlightStreamsIfQueryTimeoutIsOver() throws SQLException {
final String query = CoreMockedSqlProducers.LEGACY_CANCELLATION_SQL_CMD;
final int timeoutValue = 2;
final String timeoutUnit = "SECONDS";
try (final Statement statement = connection.createStatement()) {
statement.setQueryTimeout(timeoutValue);
final Set<Exception> exceptions = new HashSet<>(1);
try {
statement.executeQuery(query);
} catch (final Exception e) {
exceptions.add(e);
}
final Throwable comparisonCause =
exceptions.stream().findFirst().orElseThrow(RuntimeException::new).getCause().getCause();
assertThat(comparisonCause, is(instanceOf(SQLTimeoutException.class)));
assertThat(
comparisonCause.getMessage(),
is(format("Query timed out after %d %s", timeoutValue, timeoutUnit)));
}
}
@Test
public void testFlightStreamsQueryShouldNotTimeout() throws SQLException {
final String query = CoreMockedSqlProducers.LEGACY_REGULAR_SQL_CMD;
final int timeoutValue = 5;
try (Statement statement = connection.createStatement()) {
statement.setQueryTimeout(timeoutValue);
try (ResultSet resultSet = statement.executeQuery(query)) {
CoreMockedSqlProducers.assertLegacyRegularSqlResultSet(resultSet);
}
}
}
@Test
public void testPartitionedFlightServer() throws Exception {
// Arrange
final Schema schema =
new Schema(
Arrays.asList(Field.nullablePrimitive("int_column", new ArrowType.Int(32, true))));
try (BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);
VectorSchemaRoot firstPartition = VectorSchemaRoot.create(schema, allocator);
VectorSchemaRoot secondPartition = VectorSchemaRoot.create(schema, allocator)) {
firstPartition.setRowCount(1);
((IntVector) firstPartition.getVector(0)).set(0, 1);
secondPartition.setRowCount(1);
((IntVector) secondPartition.getVector(0)).set(0, 2);
// Construct the data-only nodes first.
FlightProducer firstProducer =
new PartitionedFlightSqlProducer.DataOnlyFlightSqlProducer(
new Ticket("first".getBytes(StandardCharsets.UTF_8)), firstPartition);
FlightProducer secondProducer =
new PartitionedFlightSqlProducer.DataOnlyFlightSqlProducer(
new Ticket("second".getBytes(StandardCharsets.UTF_8)), secondPartition);
final FlightServer.Builder firstBuilder =
FlightServer.builder(allocator, forGrpcInsecure("localhost", 0), firstProducer);
final FlightServer.Builder secondBuilder =
FlightServer.builder(allocator, forGrpcInsecure("localhost", 0), secondProducer);
// Run the data-only nodes so that we can get the Locations they are running at.
try (FlightServer firstServer = firstBuilder.build();
FlightServer secondServer = secondBuilder.build()) {
firstServer.start();
secondServer.start();
final FlightEndpoint firstEndpoint =
new FlightEndpoint(
new Ticket("first".getBytes(StandardCharsets.UTF_8)), firstServer.getLocation());
final FlightEndpoint secondEndpoint =
new FlightEndpoint(
new Ticket("second".getBytes(StandardCharsets.UTF_8)), secondServer.getLocation());
// Finally start the root node.
try (final PartitionedFlightSqlProducer rootProducer =
new PartitionedFlightSqlProducer(schema, firstEndpoint, secondEndpoint);
FlightServer rootServer =
FlightServer.builder(allocator, forGrpcInsecure("localhost", 0), rootProducer)
.build()
.start();
Connection newConnection =
DriverManager.getConnection(
String.format(
"jdbc:arrow-flight-sql://%s:%d/?useEncryption=false",
rootServer.getLocation().getUri().getHost(), rootServer.getPort()));
Statement newStatement = newConnection.createStatement();
// Act
ResultSet result = newStatement.executeQuery("Select partitioned_data")) {
List<Integer> resultData = new ArrayList<>();
while (result.next()) {
resultData.add(result.getInt(1));
}
// Assert
assertEquals(
firstPartition.getRowCount() + secondPartition.getRowCount(), resultData.size());
assertTrue(resultData.contains(((IntVector) firstPartition.getVector(0)).get(0)));
assertTrue(resultData.contains(((IntVector) secondPartition.getVector(0)).get(0)));
}
}
}
}
@Test
public void testPartitionedFlightServerIgnoreFailure() throws Exception {
final Schema schema =
new Schema(
Collections.singletonList(
Field.nullablePrimitive("int_column", new ArrowType.Int(32, true))));
try (BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE)) {
final FlightEndpoint firstEndpoint =
new FlightEndpoint(
new Ticket("first".getBytes(StandardCharsets.UTF_8)),
Location.forGrpcInsecure("127.0.0.2", 1234),
Location.forGrpcInsecure("127.0.0.3", 1234));
try (final PartitionedFlightSqlProducer rootProducer =
new PartitionedFlightSqlProducer(schema, firstEndpoint);
FlightServer rootServer =
FlightServer.builder(allocator, forGrpcInsecure("localhost", 0), rootProducer)
.build()
.start();
Connection newConnection =
DriverManager.getConnection(
String.format(
"jdbc:arrow-flight-sql://%s:%d/?useEncryption=false",
rootServer.getLocation().getUri().getHost(), rootServer.getPort()));
Statement newStatement = newConnection.createStatement()) {
final SQLException e =
assertThrows(
SQLException.class,
() -> {
ResultSet result = newStatement.executeQuery("Select partitioned_data");
while (result.next()) {}
});
final Throwable cause = e.getCause();
assertTrue(cause instanceof FlightRuntimeException);
final FlightRuntimeException fre = (FlightRuntimeException) cause;
assertEquals(FlightStatusCode.UNAVAILABLE, fre.status().code());
}
}
}
@Test
public void testPartitionedFlightServerAllFailure() throws Exception {
// Arrange
final Schema schema =
new Schema(
Collections.singletonList(
Field.nullablePrimitive("int_column", new ArrowType.Int(32, true))));
try (BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);
VectorSchemaRoot firstPartition = VectorSchemaRoot.create(schema, allocator)) {
firstPartition.setRowCount(1);
((IntVector) firstPartition.getVector(0)).set(0, 1);
// Construct the data-only nodes first.
FlightProducer firstProducer =
new PartitionedFlightSqlProducer.DataOnlyFlightSqlProducer(
new Ticket("first".getBytes(StandardCharsets.UTF_8)), firstPartition);
final FlightServer.Builder firstBuilder =
FlightServer.builder(allocator, forGrpcInsecure("localhost", 0), firstProducer);
// Run the data-only nodes so that we can get the Locations they are running at.
try (FlightServer firstServer = firstBuilder.build()) {
firstServer.start();
final Location badLocation = Location.forGrpcInsecure("127.0.0.2", 1234);
final FlightEndpoint firstEndpoint =
new FlightEndpoint(
new Ticket("first".getBytes(StandardCharsets.UTF_8)),
badLocation,
firstServer.getLocation());
// Finally start the root node.
try (final PartitionedFlightSqlProducer rootProducer =
new PartitionedFlightSqlProducer(schema, firstEndpoint);
FlightServer rootServer =
FlightServer.builder(allocator, forGrpcInsecure("localhost", 0), rootProducer)
.build()
.start();
Connection newConnection =
DriverManager.getConnection(
String.format(
"jdbc:arrow-flight-sql://%s:%d/?useEncryption=false",
rootServer.getLocation().getUri().getHost(), rootServer.getPort()));
Statement newStatement = newConnection.createStatement();
// Act
ResultSet result = newStatement.executeQuery("Select partitioned_data")) {
List<Integer> resultData = new ArrayList<>();
while (result.next()) {
resultData.add(result.getInt(1));
}
// Assert
assertEquals(firstPartition.getRowCount(), resultData.size());
assertTrue(resultData.contains(((IntVector) firstPartition.getVector(0)).get(0)));
}
}
}
}
@Test
public void testFallbackFlightServer() throws Exception {
final Schema schema =
new Schema(
Collections.singletonList(Field.nullable("int_column", Types.MinorType.INT.getType())));
try (BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);
VectorSchemaRoot resultData = VectorSchemaRoot.create(schema, allocator)) {
resultData.setRowCount(1);
((IntVector) resultData.getVector(0)).set(0, 1);
try (final FallbackFlightSqlProducer rootProducer =
new FallbackFlightSqlProducer(resultData);
FlightServer rootServer =
FlightServer.builder(allocator, forGrpcInsecure("localhost", 0), rootProducer)
.build()
.start();
Connection newConnection =
DriverManager.getConnection(
String.format(
"jdbc:arrow-flight-sql://%s:%d/?useEncryption=false",
rootServer.getLocation().getUri().getHost(), rootServer.getPort()));
Statement newStatement = newConnection.createStatement();
ResultSet result = newStatement.executeQuery("fallback")) {
List<Integer> actualData = new ArrayList<>();
while (result.next()) {
actualData.add(result.getInt(1));
}
// Assert
assertEquals(resultData.getRowCount(), actualData.size());
assertTrue(actualData.contains(((IntVector) resultData.getVector(0)).get(0)));
}
}
}
@Test
public void testFallbackSecondFlightServer() throws Exception {
final Schema schema =
new Schema(
Collections.singletonList(Field.nullable("int_column", Types.MinorType.INT.getType())));
try (BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);
VectorSchemaRoot resultData = VectorSchemaRoot.create(schema, allocator)) {
resultData.setRowCount(1);
((IntVector) resultData.getVector(0)).set(0, 1);
try (final FallbackFlightSqlProducer rootProducer =
new FallbackFlightSqlProducer(resultData);
FlightServer rootServer =
FlightServer.builder(allocator, forGrpcInsecure("localhost", 0), rootProducer)
.build()
.start();
Connection newConnection =
DriverManager.getConnection(
String.format(
"jdbc:arrow-flight-sql://%s:%d/?useEncryption=false",
rootServer.getLocation().getUri().getHost(), rootServer.getPort()));
Statement newStatement = newConnection.createStatement();
ResultSet result = newStatement.executeQuery("fallback with error")) {
List<Integer> actualData = new ArrayList<>();
while (result.next()) {
actualData.add(result.getInt(1));
}
// Assert
assertEquals(resultData.getRowCount(), actualData.size());
assertTrue(actualData.contains(((IntVector) resultData.getVector(0)).get(0)));
}
}
}
@Test
public void testFallbackUnresolvableFlightServer() throws Exception {
final Schema schema =
new Schema(
Collections.singletonList(Field.nullable("int_column", Types.MinorType.INT.getType())));
try (BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);
VectorSchemaRoot resultData = VectorSchemaRoot.create(schema, allocator)) {
resultData.setRowCount(1);
((IntVector) resultData.getVector(0)).set(0, 1);
try (final FallbackFlightSqlProducer rootProducer =
new FallbackFlightSqlProducer(resultData);
FlightServer rootServer =
FlightServer.builder(allocator, forGrpcInsecure("localhost", 0), rootProducer)
.build()
.start();
Connection newConnection =
DriverManager.getConnection(
String.format(
"jdbc:arrow-flight-sql://%s:%d/?useEncryption=false",
rootServer.getLocation().getUri().getHost(), rootServer.getPort()))) {
// This first attempt should take a measurable amount of time.
long start = System.nanoTime();
try (Statement newStatement = newConnection.createStatement()) {
try (ResultSet result = newStatement.executeQuery("fallback with unresolvable")) {
List<Integer> actualData = new ArrayList<>();
while (result.next()) {
actualData.add(result.getInt(1));
}
// Assert
assertEquals(resultData.getRowCount(), actualData.size());
assertTrue(actualData.contains(((IntVector) resultData.getVector(0)).get(0)));
}
}
long attempt1 = System.nanoTime();
double elapsedMs = (attempt1 - start) / 1_000_000.;
assertTrue(
elapsedMs >= 5000.,
String.format(
"Expected first attempt to hit the timeout, but only %f ms elapsed", elapsedMs));
// Once the client cache is implemented (GH-661), this second attempt should take less time,
// since the failure from before should be cached.
start = System.nanoTime();
try (Statement newStatement = newConnection.createStatement()) {
try (ResultSet result = newStatement.executeQuery("fallback with unresolvable")) {
List<Integer> actualData = new ArrayList<>();
while (result.next()) {
actualData.add(result.getInt(1));
}
// Assert
assertEquals(resultData.getRowCount(), actualData.size());
assertTrue(actualData.contains(((IntVector) resultData.getVector(0)).get(0)));
}
}
attempt1 = System.nanoTime();
elapsedMs = (attempt1 - start) / 1_000_000.;
assertTrue(
elapsedMs < 5000.,
String.format("Expected second attempt to be faster, but %f ms elapsed", elapsedMs));
}
}
}
@Test
public void testFallbackUnresolvableFlightServerDisableCache() throws Exception {
final Schema schema =
new Schema(
Collections.singletonList(Field.nullable("int_column", Types.MinorType.INT.getType())));
try (BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);
VectorSchemaRoot resultData = VectorSchemaRoot.create(schema, allocator)) {
resultData.setRowCount(1);
((IntVector) resultData.getVector(0)).set(0, 1);
try (final FallbackFlightSqlProducer rootProducer =
new FallbackFlightSqlProducer(resultData);
FlightServer rootServer =
FlightServer.builder(allocator, forGrpcInsecure("localhost", 0), rootProducer)
.build()
.start();
Connection newConnection =
DriverManager.getConnection(
String.format(
"jdbc:arrow-flight-sql://%s:%d/?useEncryption=false&useClientCache=false",
rootServer.getLocation().getUri().getHost(), rootServer.getPort()))) {
// This first attempt should take a measurable amount of time.
long start = System.nanoTime();
try (Statement newStatement = newConnection.createStatement()) {
try (ResultSet result = newStatement.executeQuery("fallback with unresolvable")) {
List<Integer> actualData = new ArrayList<>();
while (result.next()) {
actualData.add(result.getInt(1));
}
// Assert
assertEquals(resultData.getRowCount(), actualData.size());
assertTrue(actualData.contains(((IntVector) resultData.getVector(0)).get(0)));
}
}
long attempt1 = System.nanoTime();
double elapsedMs = (attempt1 - start) / 1_000_000.;
assertTrue(
elapsedMs >= 5000.,
String.format(
"Expected first attempt to hit the timeout, but only %f ms elapsed", elapsedMs));
// This second attempt should take a long time still, since we disabled the cache.
start = System.nanoTime();
try (Statement newStatement = newConnection.createStatement()) {
try (ResultSet result = newStatement.executeQuery("fallback with unresolvable")) {
List<Integer> actualData = new ArrayList<>();
while (result.next()) {
actualData.add(result.getInt(1));
}
// Assert
assertEquals(resultData.getRowCount(), actualData.size());
assertTrue(actualData.contains(((IntVector) resultData.getVector(0)).get(0)));
}
}
attempt1 = System.nanoTime();
elapsedMs = (attempt1 - start) / 1_000_000.;
assertTrue(
elapsedMs >= 5000.,
String.format(
"Expected second attempt to hit the timeout, but only %f ms elapsed", elapsedMs));
}
}
}
@Test
public void testShouldRunSelectQueryWithEmptyVectorsEmbedded() throws Exception {
try (Statement statement = connection.createStatement();
ResultSet resultSet =
statement.executeQuery(CoreMockedSqlProducers.LEGACY_REGULAR_WITH_EMPTY_SQL_CMD)) {
long rowCount = 0;
while (resultSet.next()) {
++rowCount;
}
assertEquals(2, rowCount);
}
}
@Test
public void testResultSetAppMetadata() throws Exception {
try (Statement statement = connection.createStatement();
ResultSet resultSet =
statement.executeQuery(CoreMockedSqlProducers.LEGACY_REGULAR_SQL_CMD)) {
assertArrayEquals(
((ArrowFlightJdbcFlightStreamResultSet) resultSet).getAppMetadata(),
"foo".getBytes(StandardCharsets.UTF_8));
}
}
}