ArrowToAvroUtils.java
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.arrow.adapter.avro;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.regex.Pattern;
import org.apache.arrow.adapter.avro.producers.AvroBigIntProducer;
import org.apache.arrow.adapter.avro.producers.AvroBooleanProducer;
import org.apache.arrow.adapter.avro.producers.AvroBytesProducer;
import org.apache.arrow.adapter.avro.producers.AvroEnumProducer;
import org.apache.arrow.adapter.avro.producers.AvroFixedSizeBinaryProducer;
import org.apache.arrow.adapter.avro.producers.AvroFixedSizeListProducer;
import org.apache.arrow.adapter.avro.producers.AvroFloat2Producer;
import org.apache.arrow.adapter.avro.producers.AvroFloat4Producer;
import org.apache.arrow.adapter.avro.producers.AvroFloat8Producer;
import org.apache.arrow.adapter.avro.producers.AvroIntProducer;
import org.apache.arrow.adapter.avro.producers.AvroListProducer;
import org.apache.arrow.adapter.avro.producers.AvroMapProducer;
import org.apache.arrow.adapter.avro.producers.AvroNullProducer;
import org.apache.arrow.adapter.avro.producers.AvroNullableProducer;
import org.apache.arrow.adapter.avro.producers.AvroSmallIntProducer;
import org.apache.arrow.adapter.avro.producers.AvroStringProducer;
import org.apache.arrow.adapter.avro.producers.AvroStructProducer;
import org.apache.arrow.adapter.avro.producers.AvroTinyIntProducer;
import org.apache.arrow.adapter.avro.producers.AvroUint1Producer;
import org.apache.arrow.adapter.avro.producers.AvroUint2Producer;
import org.apache.arrow.adapter.avro.producers.AvroUint4Producer;
import org.apache.arrow.adapter.avro.producers.AvroUint8Producer;
import org.apache.arrow.adapter.avro.producers.BaseAvroProducer;
import org.apache.arrow.adapter.avro.producers.CompositeAvroProducer;
import org.apache.arrow.adapter.avro.producers.DictionaryDecodingProducer;
import org.apache.arrow.adapter.avro.producers.Producer;
import org.apache.arrow.adapter.avro.producers.logical.AvroDateDayProducer;
import org.apache.arrow.adapter.avro.producers.logical.AvroDateMilliProducer;
import org.apache.arrow.adapter.avro.producers.logical.AvroDecimal256Producer;
import org.apache.arrow.adapter.avro.producers.logical.AvroDecimalProducer;
import org.apache.arrow.adapter.avro.producers.logical.AvroTimeMicroProducer;
import org.apache.arrow.adapter.avro.producers.logical.AvroTimeMilliProducer;
import org.apache.arrow.adapter.avro.producers.logical.AvroTimeNanoProducer;
import org.apache.arrow.adapter.avro.producers.logical.AvroTimeSecProducer;
import org.apache.arrow.adapter.avro.producers.logical.AvroTimestampMicroProducer;
import org.apache.arrow.adapter.avro.producers.logical.AvroTimestampMicroTzProducer;
import org.apache.arrow.adapter.avro.producers.logical.AvroTimestampMilliProducer;
import org.apache.arrow.adapter.avro.producers.logical.AvroTimestampMilliTzProducer;
import org.apache.arrow.adapter.avro.producers.logical.AvroTimestampNanoProducer;
import org.apache.arrow.adapter.avro.producers.logical.AvroTimestampNanoTzProducer;
import org.apache.arrow.adapter.avro.producers.logical.AvroTimestampSecProducer;
import org.apache.arrow.adapter.avro.producers.logical.AvroTimestampSecTzProducer;
import org.apache.arrow.util.Preconditions;
import org.apache.arrow.vector.BaseIntVector;
import org.apache.arrow.vector.BigIntVector;
import org.apache.arrow.vector.BitVector;
import org.apache.arrow.vector.DateDayVector;
import org.apache.arrow.vector.DateMilliVector;
import org.apache.arrow.vector.Decimal256Vector;
import org.apache.arrow.vector.DecimalVector;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.FixedSizeBinaryVector;
import org.apache.arrow.vector.Float2Vector;
import org.apache.arrow.vector.Float4Vector;
import org.apache.arrow.vector.Float8Vector;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.NullVector;
import org.apache.arrow.vector.SmallIntVector;
import org.apache.arrow.vector.TimeMicroVector;
import org.apache.arrow.vector.TimeMilliVector;
import org.apache.arrow.vector.TimeNanoVector;
import org.apache.arrow.vector.TimeSecVector;
import org.apache.arrow.vector.TimeStampMicroTZVector;
import org.apache.arrow.vector.TimeStampMicroVector;
import org.apache.arrow.vector.TimeStampMilliTZVector;
import org.apache.arrow.vector.TimeStampMilliVector;
import org.apache.arrow.vector.TimeStampNanoTZVector;
import org.apache.arrow.vector.TimeStampNanoVector;
import org.apache.arrow.vector.TimeStampSecTZVector;
import org.apache.arrow.vector.TimeStampSecVector;
import org.apache.arrow.vector.TinyIntVector;
import org.apache.arrow.vector.UInt1Vector;
import org.apache.arrow.vector.UInt2Vector;
import org.apache.arrow.vector.UInt4Vector;
import org.apache.arrow.vector.UInt8Vector;
import org.apache.arrow.vector.VarBinaryVector;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.complex.FixedSizeListVector;
import org.apache.arrow.vector.complex.ListVector;
import org.apache.arrow.vector.complex.MapVector;
import org.apache.arrow.vector.complex.StructVector;
import org.apache.arrow.vector.dictionary.Dictionary;
import org.apache.arrow.vector.dictionary.DictionaryProvider;
import org.apache.arrow.vector.types.FloatingPointPrecision;
import org.apache.arrow.vector.types.TimeUnit;
import org.apache.arrow.vector.types.Types;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.util.Text;
import org.apache.avro.LogicalType;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
public class ArrowToAvroUtils {
public static final String GENERIC_RECORD_TYPE_NAME = "GenericRecord";
/**
* Create an Avro record schema for a given list of Arrow fields.
*
* <p>This method currently performs following type mapping for Avro data types to corresponding
* Arrow data types.
*
* <table>
* <thead><tr><th>Arrow type</th><th>Avro encoding</th></tr></thead>
* <tbody>
* <tr><td>ArrowType.Null</td><td>NULL</td></tr>
* <tr><td>ArrowType.Bool</td><td>BOOLEAN</td></tr>
* <tr><td>ArrowType.Int(64 bit, unsigned 32 bit)</td><td>LONG</td></tr>
* <tr><td>ArrowType.Int(signed 32 bit, < 32 bit)</td><td>INT</td></tr>
* <tr><td>ArrowType.FloatingPoint(double)</td><td>DOUBLE</td></tr>
* <tr><td>ArrowType.FloatingPoint(single, half)</td><td>FLOAT</td></tr>
* <tr><td>ArrowType.Utf8</td><td>STRING</td></tr>
* <tr><td>ArrowType.LargeUtf8</td><td>STRING</td></tr>
* <tr><td>ArrowType.Binary</td><td>BYTES</td></tr>
* <tr><td>ArrowType.LargeBinary</td><td>BYTES</td></tr>
* <tr><td>ArrowType.FixedSizeBinary</td><td>FIXED</td></tr>
* <tr><td>ArrowType.Decimal</td><td>decimal (FIXED)</td></tr>
* <tr><td>ArrowType.Date</td><td>date (INT)</td></tr>
* <tr><td>ArrowType.Time (SEC | MILLI)</td><td>time-millis (INT)</td></tr>
* <tr><td>ArrowType.Time (MICRO | NANO)</td><td>time-micros (LONG)</td></tr>
* <tr><td>ArrowType.Timestamp (NANOSECONDS, TZ != NULL)</td><td>time-nanos (LONG)</td></tr>
* <tr><td>ArrowType.Timestamp (MICROSECONDS, TZ != NULL)</td><td>time-micros (LONG)</td></tr>
* <tr><td>ArrowType.Timestamp (MILLISECONDS | SECONDS, TZ != NULL)</td><td>time-millis (LONG)</td></tr>
* <tr><td>ArrowType.Timestamp (NANOSECONDS, TZ == NULL)</td><td>local-time-nanos (LONG)</td></tr>
* <tr><td>ArrowType.Timestamp (MICROSECONDS, TZ == NULL)</td><td>local-time-micros (LONG)</td></tr>
* <tr><td>ArrowType.Timestamp (MILLISECONDS | SECONDS, TZ == NULL)</td><td>local-time-millis (LONG)</td></tr>
* <tr><td>ArrowType.Duration</td><td>duration (FIXED)</td></tr>
* <tr><td>ArrowType.Interval</td><td>duration (FIXED)</td></tr>
* <tr><td>ArrowType.Struct</td><td>record</td></tr>
* <tr><td>ArrowType.List</td><td>array</td></tr>
* <tr><td>ArrowType.LargeList</td><td>array</td></tr>
* <tr><td>ArrowType.FixedSizeList</td><td>array</td></tr>
* <tr><td>ArrowType.Map</td><td>map</td></tr>
* <tr><td>ArrowType.Union</td><td>union</td></tr>
* </tbody>
* </table>
*
* <p>Nullable fields are represented as a union of [base-type | null]. Special treatment is given
* to nullability of unions - a union is considered nullable if any of its child fields are
* nullable. The schema for a nullable union will always contain a null type as its first member,
* with none of the child types being nullable.
*
* <p>List fields must contain precisely one child field, which may be nullable. Map fields are
* represented as a list of structs, where the struct fields are "key" and "value". The key field
* must always be of type STRING (Utf8) and cannot be nullable. The value can be of any type and
* may be nullable. Record types must contain at least one child field and cannot contain multiple
* fields with the same name
*
* <p>String fields that are dictionary-encoded will be represented as an Avro enum, so long as
* all the values meet the restrictions on Avro enums (non-null, valid identifiers). Other data
* types that are dictionary encoded, or string fields that do not meet the avro requirements,
* will be output as their decoded type.
*
* @param arrowFields The arrow fields used to generate the Avro schema
* @param typeName Name of the top level Avro record type
* @param namespace Namespace of the top level Avro record type
* @param dictionaries A dictionary provider is required if any fields use dictionary encoding
* @return An Avro record schema for the given list of fields, with the specified name and
* namespace
*/
public static Schema createAvroSchema(
List<Field> arrowFields, String typeName, String namespace, DictionaryProvider dictionaries) {
SchemaBuilder.RecordBuilder<Schema> assembler =
SchemaBuilder.record(typeName).namespace(namespace);
return buildRecordSchema(assembler, arrowFields, namespace, dictionaries);
}
/** Overload provided for convenience, sets dictionaries = null. */
public static Schema createAvroSchema(
List<Field> arrowFields, String typeName, String namespace) {
return createAvroSchema(arrowFields, typeName, namespace, null);
}
/** Overload provided for convenience, sets namespace = null. */
public static Schema createAvroSchema(List<Field> arrowFields, String typeName) {
return createAvroSchema(arrowFields, typeName, null);
}
/** Overload provided for convenience, sets name = GENERIC_RECORD_TYPE_NAME. */
public static Schema createAvroSchema(List<Field> arrowFields) {
return createAvroSchema(arrowFields, GENERIC_RECORD_TYPE_NAME);
}
/**
* Overload provided for convenience, sets name = GENERIC_RECORD_TYPE_NAME and namespace = null.
*/
public static Schema createAvroSchema(List<Field> arrowFields, DictionaryProvider dictionaries) {
return createAvroSchema(arrowFields, GENERIC_RECORD_TYPE_NAME, null, dictionaries);
}
private static <T> T buildRecordSchema(
SchemaBuilder.RecordBuilder<T> builder,
List<Field> fields,
String namespace,
DictionaryProvider dictionaries) {
if (fields.isEmpty()) {
throw new IllegalArgumentException("Record field must have at least one child field");
}
SchemaBuilder.FieldAssembler<T> assembler = builder.namespace(namespace).fields();
for (Field field : fields) {
assembler = buildFieldSchema(assembler, field, namespace, dictionaries);
}
return assembler.endRecord();
}
private static <T> SchemaBuilder.FieldAssembler<T> buildFieldSchema(
SchemaBuilder.FieldAssembler<T> assembler,
Field field,
String namespace,
DictionaryProvider dictionaries) {
return assembler
.name(field.getName())
.type(buildTypeSchema(SchemaBuilder.builder(), field, namespace, dictionaries))
.noDefault();
}
private static <T> T buildTypeSchema(
SchemaBuilder.TypeBuilder<T> builder,
Field field,
String namespace,
DictionaryProvider dictionaries) {
// Nullable unions need special handling, since union types cannot be directly nested
if (field.getType().getTypeID() == ArrowType.ArrowTypeID.Union) {
boolean unionNullable = field.getChildren().stream().anyMatch(Field::isNullable);
if (unionNullable) {
SchemaBuilder.UnionAccumulator<T> union = builder.unionOf().nullType();
return addTypesToUnion(union, field.getChildren(), namespace, dictionaries);
} else {
Field headType = field.getChildren().get(0);
List<Field> tailTypes = field.getChildren().subList(1, field.getChildren().size());
SchemaBuilder.UnionAccumulator<T> union =
buildBaseTypeSchema(builder.unionOf(), headType, namespace, dictionaries);
return addTypesToUnion(union, tailTypes, namespace, dictionaries);
}
} else if (field.isNullable()) {
return buildBaseTypeSchema(builder.nullable(), field, namespace, dictionaries);
} else {
return buildBaseTypeSchema(builder, field, namespace, dictionaries);
}
}
private static <T> T buildArraySchema(
SchemaBuilder.ArrayBuilder<T> builder,
Field listField,
String namespace,
DictionaryProvider dictionaries) {
if (listField.getChildren().size() != 1) {
throw new IllegalArgumentException("List field must have exactly one child field");
}
Field itemField = listField.getChildren().get(0);
return buildTypeSchema(builder.items(), itemField, namespace, dictionaries);
}
private static <T> T buildMapSchema(
SchemaBuilder.MapBuilder<T> builder,
Field mapField,
String namespace,
DictionaryProvider dictionaries) {
if (mapField.getChildren().size() != 1) {
throw new IllegalArgumentException("Map field must have exactly one child field");
}
Field entriesField = mapField.getChildren().get(0);
if (mapField.getChildren().size() != 1) {
throw new IllegalArgumentException("Map entries must have exactly two child fields");
}
Field keyField = entriesField.getChildren().get(0);
Field valueField = entriesField.getChildren().get(1);
if (keyField.getType().getTypeID() != ArrowType.ArrowTypeID.Utf8 || keyField.isNullable()) {
throw new IllegalArgumentException(
"Map keys must be of type string and cannot be nullable for conversion to Avro");
}
return buildTypeSchema(builder.values(), valueField, namespace, dictionaries);
}
private static <T> T buildBaseTypeSchema(
SchemaBuilder.BaseTypeBuilder<T> builder,
Field field,
String namespace,
DictionaryProvider dictionaries) {
ArrowType.ArrowTypeID typeID = field.getType().getTypeID();
switch (typeID) {
case Null:
return builder.nullType();
case Bool:
return builder.booleanType();
case Int:
if (field.getDictionary() != null) {
if (dictionaries == null) {
throw new IllegalArgumentException(
"Field references a dictionary but no dictionaries were provided: "
+ field.getName());
}
Dictionary dictionary = dictionaries.lookup(field.getDictionary().getId());
if (dictionary == null) {
throw new IllegalArgumentException(
"Field references a dictionary that does not exist: "
+ field.getName()
+ ", dictionary ID = "
+ field.getDictionary().getId());
}
if (dictionaryIsValidEnum(dictionary)) {
String[] symbols = dictionarySymbols(dictionary);
return builder.enumeration(field.getName()).symbols(symbols);
} else {
Field decodedField =
new Field(
field.getName(),
dictionary.getVector().getField().getFieldType(),
dictionary.getVector().getField().getChildren());
return buildBaseTypeSchema(builder, decodedField, namespace, dictionaries);
}
}
ArrowType.Int intType = (ArrowType.Int) field.getType();
if (intType.getBitWidth() > 32 || (intType.getBitWidth() == 32 && !intType.getIsSigned())) {
return builder.longType();
} else {
return builder.intType();
}
case FloatingPoint:
ArrowType.FloatingPoint floatType = (ArrowType.FloatingPoint) field.getType();
if (floatType.getPrecision() == FloatingPointPrecision.DOUBLE) {
return builder.doubleType();
} else {
return builder.floatType();
}
case Utf8:
return builder.stringType();
case Binary:
return builder.bytesType();
case FixedSizeBinary:
ArrowType.FixedSizeBinary fixedType = (ArrowType.FixedSizeBinary) field.getType();
String fixedTypeName = field.getName();
int fixedTypeWidth = fixedType.getByteWidth();
return builder.fixed(fixedTypeName).size(fixedTypeWidth);
case Decimal:
ArrowType.Decimal decimalType = (ArrowType.Decimal) field.getType();
return builder.type(
LogicalTypes.decimal(decimalType.getPrecision(), decimalType.getScale())
.addToSchema(
Schema.createFixed(
field.getName(), namespace, "", decimalType.getBitWidth() / 8)));
case Date:
return builder.type(LogicalTypes.date().addToSchema(Schema.create(Schema.Type.INT)));
case Time:
ArrowType.Time timeType = (ArrowType.Time) field.getType();
if ((timeType.getUnit() == TimeUnit.SECOND || timeType.getUnit() == TimeUnit.MILLISECOND)) {
// Second and millisecond time types are encoded as time-millis (INT)
return builder.type(
LogicalTypes.timeMillis().addToSchema(Schema.create(Schema.Type.INT)));
} else {
// All other time types (micro, nano) are encoded as time-micros (LONG)
return builder.type(
LogicalTypes.timeMicros().addToSchema(Schema.create(Schema.Type.LONG)));
}
case Timestamp:
ArrowType.Timestamp timestampType = (ArrowType.Timestamp) field.getType();
LogicalType timestampLogicalType = timestampLogicalType(timestampType);
return builder.type(timestampLogicalType.addToSchema(Schema.create(Schema.Type.LONG)));
case Struct:
String childNamespace =
namespace == null ? field.getName() : namespace + "." + field.getName();
return buildRecordSchema(
builder.record(field.getName()), field.getChildren(), childNamespace, dictionaries);
case List:
case FixedSizeList:
// Arrow uses "$data$" as the field name for list items, that is not a valid Avro name
Field itemField = field.getChildren().get(0);
if (ListVector.DATA_VECTOR_NAME.equals(itemField.getName())) {
Field safeItemField =
new Field("item", itemField.getFieldType(), itemField.getChildren());
Field safeListField =
new Field(field.getName(), field.getFieldType(), List.of(safeItemField));
return buildArraySchema(builder.array(), safeListField, namespace, dictionaries);
} else {
return buildArraySchema(builder.array(), field, namespace, dictionaries);
}
case Map:
return buildMapSchema(builder.map(), field, namespace, dictionaries);
default:
throw new IllegalArgumentException(
"Element type not supported for Avro conversion: " + typeID.name());
}
}
private static <T> T addTypesToUnion(
SchemaBuilder.UnionAccumulator<T> accumulator,
List<Field> unionFields,
String namespace,
DictionaryProvider dictionaries) {
for (var field : unionFields) {
accumulator = buildBaseTypeSchema(accumulator.and(), field, namespace, dictionaries);
}
return accumulator.endUnion();
}
private static LogicalType timestampLogicalType(ArrowType.Timestamp timestampType) {
boolean zoneAware = timestampType.getTimezone() != null;
if (timestampType.getUnit() == TimeUnit.NANOSECOND) {
return zoneAware ? LogicalTypes.timestampNanos() : LogicalTypes.localTimestampNanos();
} else if (timestampType.getUnit() == TimeUnit.MICROSECOND) {
return zoneAware ? LogicalTypes.timestampMicros() : LogicalTypes.localTimestampMicros();
} else {
// Timestamp in seconds will be cast to milliseconds, Avro does not support seconds
return zoneAware ? LogicalTypes.timestampMillis() : LogicalTypes.localTimestampMillis();
}
}
private static boolean dictionaryIsValidEnum(Dictionary dictionary) {
if (dictionary.getVectorType().getTypeID() != ArrowType.ArrowTypeID.Utf8) {
return false;
}
VarCharVector vector = (VarCharVector) dictionary.getVector();
Set<String> symbols = new HashSet<>();
for (int i = 0; i < vector.getValueCount(); i++) {
if (vector.isNull(i)) {
return false;
}
Text text = vector.getObject(i);
if (text == null) {
return false;
}
String symbol = text.toString();
if (!ENUM_REGEX.matcher(symbol).matches()) {
return false;
}
if (symbols.contains(symbol)) {
return false;
}
symbols.add(symbol);
}
return true;
}
private static String[] dictionarySymbols(Dictionary dictionary) {
VarCharVector vector = (VarCharVector) dictionary.getVector();
String[] symbols = new String[vector.getValueCount()];
for (int i = 0; i < vector.getValueCount(); i++) {
Text text = vector.getObject(i);
// This should never happen if dictionaryIsValidEnum() succeeded
if (text == null) {
throw new IllegalArgumentException("Illegal null value in enum");
}
symbols[i] = text.toString();
}
return symbols;
}
private static final Pattern ENUM_REGEX = Pattern.compile("^[A-Za-z_][A-Za-z0-9_]*$");
/**
* Create a composite Avro producer for a set of field vectors (typically the root set of a VSR).
*
* @param vectors The vectors that will be used to produce Avro data
* @return The resulting composite Avro producer
*/
public static CompositeAvroProducer createCompositeProducer(
List<FieldVector> vectors, DictionaryProvider dictionaries) {
List<Producer<? extends FieldVector>> producers = new ArrayList<>(vectors.size());
for (FieldVector vector : vectors) {
BaseAvroProducer<? extends FieldVector> producer = createProducer(vector, dictionaries);
producers.add(producer);
}
return new CompositeAvroProducer(producers);
}
/** Overload provided for convenience, sets dictionaries = null. */
public static CompositeAvroProducer createCompositeProducer(List<FieldVector> vectors) {
return createCompositeProducer(vectors, null);
}
private static BaseAvroProducer<?> createProducer(
FieldVector vector, DictionaryProvider dictionaries) {
boolean nullable = vector.getField().isNullable();
return createProducer(vector, nullable, dictionaries);
}
private static BaseAvroProducer<?> createProducer(
FieldVector vector, boolean nullable, DictionaryProvider dictionaries) {
Preconditions.checkNotNull(vector, "Arrow vector object can't be null");
final Types.MinorType minorType = vector.getMinorType();
// Avro understands nullable types as a union of type | null
// Most nullable fields in a VSR will not be unions, so provide a special wrapper
if (nullable && minorType != Types.MinorType.UNION) {
final BaseAvroProducer<?> innerProducer = createProducer(vector, false, dictionaries);
return new AvroNullableProducer<>(innerProducer);
}
if (vector.getField().getDictionary() != null) {
if (dictionaries == null) {
throw new IllegalArgumentException(
"Field references a dictionary but no dictionaries were provided: "
+ vector.getField().getName());
}
Dictionary dictionary = dictionaries.lookup(vector.getField().getDictionary().getId());
if (dictionary == null) {
throw new IllegalArgumentException(
"Field references a dictionary that does not exist: "
+ vector.getField().getName()
+ ", dictionary ID = "
+ vector.getField().getDictionary().getId());
}
// If a field is dictionary-encoded but cannot be represented as an Avro enum,
// then decode it before writing
if (dictionaryIsValidEnum(dictionary)) {
return new AvroEnumProducer((BaseIntVector) vector);
} else {
BaseAvroProducer<?> dictProducer = createProducer(dictionary.getVector(), false, null);
return new DictionaryDecodingProducer<>((BaseIntVector) vector, dictProducer);
}
}
switch (minorType) {
case NULL:
return new AvroNullProducer((NullVector) vector);
case BIT:
return new AvroBooleanProducer((BitVector) vector);
case TINYINT:
return new AvroTinyIntProducer((TinyIntVector) vector);
case SMALLINT:
return new AvroSmallIntProducer((SmallIntVector) vector);
case INT:
return new AvroIntProducer((IntVector) vector);
case BIGINT:
return new AvroBigIntProducer((BigIntVector) vector);
case UINT1:
return new AvroUint1Producer((UInt1Vector) vector);
case UINT2:
return new AvroUint2Producer((UInt2Vector) vector);
case UINT4:
return new AvroUint4Producer((UInt4Vector) vector);
case UINT8:
return new AvroUint8Producer((UInt8Vector) vector);
case FLOAT2:
return new AvroFloat2Producer((Float2Vector) vector);
case FLOAT4:
return new AvroFloat4Producer((Float4Vector) vector);
case FLOAT8:
return new AvroFloat8Producer((Float8Vector) vector);
case VARBINARY:
return new AvroBytesProducer((VarBinaryVector) vector);
case FIXEDSIZEBINARY:
return new AvroFixedSizeBinaryProducer((FixedSizeBinaryVector) vector);
case VARCHAR:
return new AvroStringProducer((VarCharVector) vector);
// Logical types
case DECIMAL:
return new AvroDecimalProducer((DecimalVector) vector);
case DECIMAL256:
return new AvroDecimal256Producer((Decimal256Vector) vector);
case DATEDAY:
return new AvroDateDayProducer((DateDayVector) vector);
case DATEMILLI:
return new AvroDateMilliProducer((DateMilliVector) vector);
case TIMESEC:
return new AvroTimeSecProducer((TimeSecVector) vector);
case TIMEMILLI:
return new AvroTimeMilliProducer((TimeMilliVector) vector);
case TIMEMICRO:
return new AvroTimeMicroProducer((TimeMicroVector) vector);
case TIMENANO:
return new AvroTimeNanoProducer((TimeNanoVector) vector);
case TIMESTAMPSEC:
return new AvroTimestampSecProducer((TimeStampSecVector) vector);
case TIMESTAMPMILLI:
return new AvroTimestampMilliProducer((TimeStampMilliVector) vector);
case TIMESTAMPMICRO:
return new AvroTimestampMicroProducer((TimeStampMicroVector) vector);
case TIMESTAMPNANO:
return new AvroTimestampNanoProducer((TimeStampNanoVector) vector);
case TIMESTAMPSECTZ:
return new AvroTimestampSecTzProducer((TimeStampSecTZVector) vector);
case TIMESTAMPMILLITZ:
return new AvroTimestampMilliTzProducer((TimeStampMilliTZVector) vector);
case TIMESTAMPMICROTZ:
return new AvroTimestampMicroTzProducer((TimeStampMicroTZVector) vector);
case TIMESTAMPNANOTZ:
return new AvroTimestampNanoTzProducer((TimeStampNanoTZVector) vector);
// Complex types
case STRUCT:
StructVector structVector = (StructVector) vector;
List<FieldVector> childVectors = structVector.getChildrenFromFields();
Producer<?>[] childProducers = new Producer<?>[childVectors.size()];
for (int i = 0; i < childVectors.size(); i++) {
FieldVector childVector = childVectors.get(i);
childProducers[i] =
createProducer(childVector, childVector.getField().isNullable(), dictionaries);
}
return new AvroStructProducer(structVector, childProducers);
case LIST:
ListVector listVector = (ListVector) vector;
FieldVector itemVector = listVector.getDataVector();
Producer<?> itemProducer =
createProducer(itemVector, itemVector.getField().isNullable(), dictionaries);
return new AvroListProducer(listVector, itemProducer);
case FIXED_SIZE_LIST:
FixedSizeListVector fixedListVector = (FixedSizeListVector) vector;
FieldVector fixedItemVector = fixedListVector.getDataVector();
Producer<?> fixedItemProducer =
createProducer(fixedItemVector, fixedItemVector.getField().isNullable(), dictionaries);
return new AvroFixedSizeListProducer(fixedListVector, fixedItemProducer);
case MAP:
MapVector mapVector = (MapVector) vector;
StructVector entryVector = (StructVector) mapVector.getDataVector();
Types.MinorType keyType = entryVector.getChildrenFromFields().get(0).getMinorType();
if (keyType != Types.MinorType.VARCHAR) {
throw new IllegalArgumentException("MAP key type must be VARCHAR for Avro encoding");
}
VarCharVector keyVector = (VarCharVector) entryVector.getChildrenFromFields().get(0);
FieldVector valueVector = entryVector.getChildrenFromFields().get(1);
Producer<?> keyProducer = new AvroStringProducer(keyVector);
Producer<?> valueProducer =
createProducer(valueVector, valueVector.getField().isNullable(), dictionaries);
Producer<?> entryProducer =
new AvroStructProducer(entryVector, new Producer<?>[] {keyProducer, valueProducer});
return new AvroMapProducer(mapVector, entryProducer);
// Support for UNION and DENSEUNION is not currently available
// This is pending fixes in the implementation of the union vectors themselves
// https://github.com/apache/arrow-java/issues/108
default:
// Not all Arrow types are supported for encoding (yet)!
String error =
String.format(
"Encoding Arrow type %s to Avro is not currently supported", minorType.name());
throw new UnsupportedOperationException(error);
}
}
}