AbstractJdbcToArrowTest.java
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.arrow.adapter.jdbc;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Types;
import java.util.Calendar;
import java.util.HashMap;
import java.util.Map;
import java.util.TimeZone;
import java.util.function.Function;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.util.Preconditions;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.util.ValueVectorUtility;
import org.junit.jupiter.api.AfterEach;
/** Class to abstract out some common test functionality for testing JDBC to Arrow. */
public abstract class AbstractJdbcToArrowTest {
protected static final String BIGINT = "BIGINT_FIELD5";
protected static final String BINARY = "BINARY_FIELD12";
protected static final String BIT = "BIT_FIELD17";
protected static final String BLOB = "BLOB_FIELD14";
protected static final String BOOL = "BOOL_FIELD2";
protected static final String CHAR = "CHAR_FIELD16";
protected static final String CLOB = "CLOB_FIELD15";
protected static final String DATE = "DATE_FIELD10";
protected static final String DECIMAL = "DECIMAL_FIELD6";
protected static final String DOUBLE = "DOUBLE_FIELD7";
protected static final String INT = "INT_FIELD1";
protected static final String LIST = "LIST_FIELD19";
protected static final String MAP = "MAP_FIELD20";
protected static final String REAL = "REAL_FIELD8";
protected static final String SMALLINT = "SMALLINT_FIELD4";
protected static final String TIME = "TIME_FIELD9";
protected static final String TIMESTAMP = "TIMESTAMP_FIELD11";
protected static final String TINYINT = "TINYINT_FIELD3";
protected static final String VARCHAR = "VARCHAR_FIELD13";
protected static final String NULL = "NULL_FIELD18";
protected static final Map<String, JdbcFieldInfo> ARRAY_SUB_TYPE_BY_COLUMN_NAME_MAP =
new HashMap<>();
static {
ARRAY_SUB_TYPE_BY_COLUMN_NAME_MAP.put(LIST, new JdbcFieldInfo(Types.INTEGER));
}
protected Connection conn = null;
protected Table table;
protected boolean reuseVectorSchemaRoot;
/**
* This method creates Table object after reading YAML file.
*
* @param ymlFilePath path to file
* @return Table object
* @throws IOException on error
*/
protected static Table getTable(String ymlFilePath, @SuppressWarnings("rawtypes") Class clss)
throws IOException {
return new ObjectMapper(new YAMLFactory())
.readValue(clss.getClassLoader().getResourceAsStream(ymlFilePath), Table.class);
}
/**
* This method creates Connection object and DB table and also populate data into table for test.
*
* @throws SQLException on error
* @throws ClassNotFoundException on error
*/
protected void initializeDatabase(Table table) throws SQLException, ClassNotFoundException {
this.table = table;
TimeZone.setDefault(TimeZone.getTimeZone("UTC"));
String url = "jdbc:h2:mem:JdbcToArrowTest";
String driver = "org.h2.Driver";
Class.forName(driver);
conn = DriverManager.getConnection(url);
try (Statement stmt = conn.createStatement(); ) {
stmt.executeUpdate(table.getCreate());
for (String insert : table.getData()) {
stmt.executeUpdate(insert);
}
}
}
/**
* Clean up method to close connection after test completes.
*
* @throws SQLException on error
*/
@AfterEach
public void destroy() throws SQLException {
if (conn != null) {
conn.close();
conn = null;
}
}
/**
* Prepares test data and returns collection of Table object for each test iteration.
*
* @param testFiles files for test
* @param clss Class type
* @return Collection of Table objects
* @throws SQLException on error
* @throws ClassNotFoundException on error
* @throws IOException on error
*/
public static Object[][] prepareTestData(
String[] testFiles, @SuppressWarnings("rawtypes") Class clss)
throws SQLException, ClassNotFoundException, IOException {
Object[][] tableArr = new Object[testFiles.length][];
int i = 0;
for (String testFile : testFiles) {
tableArr[i++] = new Object[] {getTable(testFile, clss)};
}
return tableArr;
}
/**
* Abstract method to implement test Functionality to test JdbcToArrow methods.
*
* @param table Table object
* @throws SQLException on error
* @throws IOException on error
*/
public abstract void testJdbcToArrowValues(Table table)
throws SQLException, IOException, ClassNotFoundException;
/**
* Abstract method to implement logic to assert test various datatype values.
*
* @param root VectorSchemaRoot for test
* @param isIncludeMapVector is this dataset checks includes map column. Jdbc type to 'map'
* mapping declared in configuration only manually
*/
public abstract void testDataSets(VectorSchemaRoot root, boolean isIncludeMapVector);
/**
* For the given SQL query, execute and fetch the data from Relational DB and convert it to Arrow
* objects. This method uses the default Calendar instance with default TimeZone and Locale as
* returned by the JVM. If you wish to use specific TimeZone or Locale for any Date, Time and
* Timestamp datasets, you may want use overloaded API that taken Calendar object instance.
*
* <p>This method is for test only.
*
* @param connection Database connection to be used. This method will not close the passed
* connection object. Since the caller has passed the connection object it's the
* responsibility of the caller to close or return the connection to the pool.
* @param query The DB Query to fetch the data.
* @param allocator Memory allocator
* @return Arrow Data Objects {@link VectorSchemaRoot}
* @throws SQLException Propagate any SQL Exceptions to the caller after closing any resources
* opened such as ResultSet and Statement objects.
*/
public VectorSchemaRoot sqlToArrow(Connection connection, String query, BufferAllocator allocator)
throws SQLException, IOException {
Preconditions.checkNotNull(allocator, "Memory allocator object cannot be null");
JdbcToArrowConfig config =
new JdbcToArrowConfigBuilder(allocator, JdbcToArrowUtils.getUtcCalendar())
.setArraySubTypeByColumnNameMap(ARRAY_SUB_TYPE_BY_COLUMN_NAME_MAP)
.build();
return sqlToArrow(connection, query, config);
}
/**
* For the given SQL query, execute and fetch the data from Relational DB and convert it to Arrow
* objects.
*
* <p>This method is for test only.
*
* @param connection Database connection to be used. This method will not close the passed
* connection object. Since the caller has passed the connection object it's the
* responsibility of the caller to close or return the connection to the pool.
* @param query The DB Query to fetch the data.
* @param allocator Memory allocator
* @param calendar Calendar object to use to handle Date, Time and Timestamp datasets.
* @return Arrow Data Objects {@link VectorSchemaRoot}
* @throws SQLException Propagate any SQL Exceptions to the caller after closing any resources
* opened such as ResultSet and Statement objects.
*/
public VectorSchemaRoot sqlToArrow(
Connection connection, String query, BufferAllocator allocator, Calendar calendar)
throws SQLException, IOException {
Preconditions.checkNotNull(allocator, "Memory allocator object cannot be null");
Preconditions.checkNotNull(calendar, "Calendar object cannot be null");
JdbcToArrowConfig config =
new JdbcToArrowConfigBuilder(allocator, calendar)
.setArraySubTypeByColumnNameMap(ARRAY_SUB_TYPE_BY_COLUMN_NAME_MAP)
.build();
return sqlToArrow(connection, query, config);
}
/**
* For the given SQL query, execute and fetch the data from Relational DB and convert it to Arrow
* objects.
*
* <p>This method is for test only.
*
* @param connection Database connection to be used. This method will not close the passed
* connection object. Since the caller has passed the connection object it's the
* responsibility of the caller to close or return the connection to the pool.
* @param query The DB Query to fetch the data.
* @param config Configuration
* @return Arrow Data Objects {@link VectorSchemaRoot}
* @throws SQLException Propagate any SQL Exceptions to the caller after closing any resources
* opened such as ResultSet and Statement objects.
*/
public static VectorSchemaRoot sqlToArrow(
Connection connection, String query, JdbcToArrowConfig config)
throws SQLException, IOException {
Preconditions.checkNotNull(connection, "JDBC connection object cannot be null");
Preconditions.checkArgument(
query != null && query.length() > 0, "SQL query cannot be null or empty");
try (Statement stmt = connection.createStatement()) {
return sqlToArrow(stmt.executeQuery(query), config);
}
}
/**
* For the given JDBC {@link ResultSet}, fetch the data from Relational DB and convert it to Arrow
* objects. This method uses the default RootAllocator and Calendar object.
*
* <p>This method is for test only.
*
* @param resultSet ResultSet to use to fetch the data from underlying database
* @return Arrow Data Objects {@link VectorSchemaRoot}
* @throws SQLException on error
*/
public static VectorSchemaRoot sqlToArrow(ResultSet resultSet) throws SQLException, IOException {
Preconditions.checkNotNull(resultSet, "JDBC ResultSet object cannot be null");
return sqlToArrow(resultSet, JdbcToArrowUtils.getUtcCalendar());
}
/**
* For the given JDBC {@link ResultSet}, fetch the data from Relational DB and convert it to Arrow
* objects.
*
* <p>This method is for test only.
*
* @param resultSet ResultSet to use to fetch the data from underlying database
* @param allocator Memory allocator
* @return Arrow Data Objects {@link VectorSchemaRoot}
* @throws SQLException on error
*/
public static VectorSchemaRoot sqlToArrow(ResultSet resultSet, BufferAllocator allocator)
throws SQLException, IOException {
Preconditions.checkNotNull(allocator, "Memory Allocator object cannot be null");
JdbcToArrowConfig config =
new JdbcToArrowConfigBuilder(allocator, JdbcToArrowUtils.getUtcCalendar())
.setArraySubTypeByColumnNameMap(ARRAY_SUB_TYPE_BY_COLUMN_NAME_MAP)
.build();
return sqlToArrow(resultSet, config);
}
/**
* For the given JDBC {@link ResultSet}, fetch the data from Relational DB and convert it to Arrow
* objects.
*
* <p>This method is for test only.
*
* @param resultSet ResultSet to use to fetch the data from underlying database
* @param calendar Calendar instance to use for Date, Time and Timestamp datasets, or <code>null
* </code> if none.
* @return Arrow Data Objects {@link VectorSchemaRoot}
* @throws SQLException on error
*/
public static VectorSchemaRoot sqlToArrow(ResultSet resultSet, Calendar calendar)
throws SQLException, IOException {
Preconditions.checkNotNull(resultSet, "JDBC ResultSet object cannot be null");
JdbcToArrowConfig config =
new JdbcToArrowConfigBuilder(new RootAllocator(Integer.MAX_VALUE), calendar)
.setArraySubTypeByColumnNameMap(ARRAY_SUB_TYPE_BY_COLUMN_NAME_MAP)
.build();
return sqlToArrow(resultSet, config);
}
/**
* For the given JDBC {@link ResultSet}, fetch the data from Relational DB and convert it to Arrow
* objects.
*
* <p>This method is for test only.
*
* @param resultSet ResultSet to use to fetch the data from underlying database
* @param allocator Memory allocator to use.
* @param calendar Calendar instance to use for Date, Time and Timestamp datasets, or <code>null
* </code> if none.
* @return Arrow Data Objects {@link VectorSchemaRoot}
* @throws SQLException on error
*/
public static VectorSchemaRoot sqlToArrow(
ResultSet resultSet, BufferAllocator allocator, Calendar calendar)
throws SQLException, IOException {
Preconditions.checkNotNull(allocator, "Memory Allocator object cannot be null");
JdbcToArrowConfig config =
new JdbcToArrowConfigBuilder(allocator, calendar)
.setArraySubTypeByColumnNameMap(ARRAY_SUB_TYPE_BY_COLUMN_NAME_MAP)
.build();
return sqlToArrow(resultSet, config);
}
/**
* For the given JDBC {@link ResultSet}, fetch the data from Relational DB and convert it to Arrow
* objects.
*
* <p>This method is for test only.
*
* @param resultSet ResultSet to use to fetch the data from underlying database
* @param config Configuration of the conversion from JDBC to Arrow.
* @return Arrow Data Objects {@link VectorSchemaRoot}
* @throws SQLException on error
*/
public static VectorSchemaRoot sqlToArrow(ResultSet resultSet, JdbcToArrowConfig config)
throws SQLException, IOException {
Preconditions.checkNotNull(resultSet, "JDBC ResultSet object cannot be null");
Preconditions.checkNotNull(config, "The configuration cannot be null");
VectorSchemaRoot root =
VectorSchemaRoot.create(
JdbcToArrowUtils.jdbcToArrowSchema(resultSet.getMetaData(), config),
config.getAllocator());
if (config.getTargetBatchSize() != JdbcToArrowConfig.NO_LIMIT_BATCH_SIZE) {
ValueVectorUtility.preAllocate(root, config.getTargetBatchSize());
}
JdbcToArrowUtils.jdbcToArrowVectors(resultSet, root, config);
return root;
}
/**
* Register MAP_FIELD20 as ArrowType.Map
*
* @param calendar Calendar instance to use for Date, Time and Timestamp datasets, or <code>null
* </code> if none.
* @param rsmd ResultSetMetaData to lookup column name from result set metadata
* @return typeConverter instance with mapping column to Map type
*/
protected Function<JdbcFieldInfo, ArrowType> jdbcToArrowTypeConverter(
Calendar calendar, ResultSetMetaData rsmd) {
return (jdbcFieldInfo) -> {
String columnLabel = null;
try {
int columnIndex = jdbcFieldInfo.getColumn();
if (columnIndex != 0) {
columnLabel = rsmd.getColumnLabel(columnIndex);
}
} catch (SQLException e) {
throw new RuntimeException(e);
}
if (MAP.equals(columnLabel)) {
return new ArrowType.Map(false);
} else {
return JdbcToArrowUtils.getArrowTypeFromJdbcType(jdbcFieldInfo, calendar);
}
};
}
protected ResultSetMetaData getQueryMetaData(String query) throws SQLException {
return conn.createStatement().executeQuery(query).getMetaData();
}
}