RoundTripDataTest.java
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.arrow.adapter.avro;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.time.Instant;
import java.time.LocalDate;
import java.time.ZonedDateTime;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.arrow.adapter.avro.producers.CompositeAvroProducer;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.BigIntVector;
import org.apache.arrow.vector.BitVector;
import org.apache.arrow.vector.DateDayVector;
import org.apache.arrow.vector.Decimal256Vector;
import org.apache.arrow.vector.DecimalVector;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.FixedSizeBinaryVector;
import org.apache.arrow.vector.Float4Vector;
import org.apache.arrow.vector.Float8Vector;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.NullVector;
import org.apache.arrow.vector.TimeMicroVector;
import org.apache.arrow.vector.TimeMilliVector;
import org.apache.arrow.vector.TimeStampMicroTZVector;
import org.apache.arrow.vector.TimeStampMicroVector;
import org.apache.arrow.vector.TimeStampMilliTZVector;
import org.apache.arrow.vector.TimeStampMilliVector;
import org.apache.arrow.vector.TimeStampNanoTZVector;
import org.apache.arrow.vector.TimeStampNanoVector;
import org.apache.arrow.vector.TinyIntVector;
import org.apache.arrow.vector.VarBinaryVector;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.complex.ListVector;
import org.apache.arrow.vector.complex.MapVector;
import org.apache.arrow.vector.complex.StructVector;
import org.apache.arrow.vector.complex.writer.BaseWriter;
import org.apache.arrow.vector.complex.writer.FieldWriter;
import org.apache.arrow.vector.dictionary.Dictionary;
import org.apache.arrow.vector.dictionary.DictionaryEncoder;
import org.apache.arrow.vector.dictionary.DictionaryProvider;
import org.apache.arrow.vector.types.DateUnit;
import org.apache.arrow.vector.types.FloatingPointPrecision;
import org.apache.arrow.vector.types.TimeUnit;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.DictionaryEncoding;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.FieldType;
import org.apache.avro.Schema;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.EncoderFactory;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
public class RoundTripDataTest {
@TempDir public static File TMP;
private static AvroToArrowConfig basicConfig(
BufferAllocator allocator, DictionaryProvider.MapDictionaryProvider dictionaries) {
return new AvroToArrowConfig(allocator, 1000, dictionaries, Collections.emptySet(), false);
}
private static VectorSchemaRoot readDataFile(
Schema schema,
File dataFile,
BufferAllocator allocator,
DictionaryProvider.MapDictionaryProvider dictionaries)
throws Exception {
try (FileInputStream fis = new FileInputStream(dataFile)) {
BinaryDecoder decoder = new DecoderFactory().directBinaryDecoder(fis, null);
return AvroToArrow.avroToArrow(schema, decoder, basicConfig(allocator, dictionaries));
}
}
private static void roundTripTest(
VectorSchemaRoot root, BufferAllocator allocator, File dataFile, int rowCount)
throws Exception {
roundTripTest(root, allocator, dataFile, rowCount, null);
}
private static void roundTripTest(
VectorSchemaRoot root,
BufferAllocator allocator,
File dataFile,
int rowCount,
DictionaryProvider dictionaries)
throws Exception {
// Write an AVRO block using the producer classes
try (FileOutputStream fos = new FileOutputStream(dataFile)) {
BinaryEncoder encoder = new EncoderFactory().directBinaryEncoder(fos, null);
CompositeAvroProducer producer =
ArrowToAvroUtils.createCompositeProducer(root.getFieldVectors(), dictionaries);
for (int row = 0; row < rowCount; row++) {
producer.produce(encoder);
}
encoder.flush();
}
// Generate AVRO schema
Schema schema = ArrowToAvroUtils.createAvroSchema(root.getSchema().getFields(), dictionaries);
DictionaryProvider.MapDictionaryProvider roundTripDictionaries =
new DictionaryProvider.MapDictionaryProvider();
// Read back in and compare
try (VectorSchemaRoot roundTrip =
readDataFile(schema, dataFile, allocator, roundTripDictionaries)) {
assertEquals(root.getSchema(), roundTrip.getSchema());
assertEquals(rowCount, roundTrip.getRowCount());
// Read and check values
for (int row = 0; row < rowCount; row++) {
assertEquals(root.getVector(0).getObject(row), roundTrip.getVector(0).getObject(row));
}
if (dictionaries != null) {
for (long id : dictionaries.getDictionaryIds()) {
Dictionary originalDictionary = dictionaries.lookup(id);
Dictionary roundTripDictionary = roundTripDictionaries.lookup(id);
assertEquals(
originalDictionary.getVector().getValueCount(),
roundTripDictionary.getVector().getValueCount());
for (int j = 0; j < originalDictionary.getVector().getValueCount(); j++) {
assertEquals(
originalDictionary.getVector().getObject(j),
roundTripDictionary.getVector().getObject(j));
}
}
}
}
}
private static void roundTripByteArrayTest(
VectorSchemaRoot root, BufferAllocator allocator, File dataFile, int rowCount)
throws Exception {
// Write an AVRO block using the producer classes
try (FileOutputStream fos = new FileOutputStream(dataFile)) {
BinaryEncoder encoder = new EncoderFactory().directBinaryEncoder(fos, null);
CompositeAvroProducer producer =
ArrowToAvroUtils.createCompositeProducer(root.getFieldVectors());
for (int row = 0; row < rowCount; row++) {
producer.produce(encoder);
}
encoder.flush();
}
// Generate AVRO schema
Schema schema = ArrowToAvroUtils.createAvroSchema(root.getSchema().getFields());
// Read back in and compare
try (VectorSchemaRoot roundTrip = readDataFile(schema, dataFile, allocator, null)) {
assertEquals(root.getSchema(), roundTrip.getSchema());
assertEquals(rowCount, roundTrip.getRowCount());
// Read and check values
for (int row = 0; row < rowCount; row++) {
byte[] rootBytes = (byte[]) root.getVector(0).getObject(row);
byte[] roundTripBytes = (byte[]) roundTrip.getVector(0).getObject(row);
assertArrayEquals(rootBytes, roundTripBytes);
}
}
}
// Data round trip for primitive types, nullable and non-nullable
@Test
public void testRoundTripNullColumn() throws Exception {
// The current read implementation expects EOF, which never happens for a single null vector
// Include a boolean vector with this test for now, so that EOF exception will be triggered
// Field definition
FieldType nullField = new FieldType(false, new ArrowType.Null(), null);
FieldType booleanField = new FieldType(false, new ArrowType.Bool(), null);
// Create empty vector
BufferAllocator allocator = new RootAllocator();
NullVector nullVector = new NullVector(new Field("nullColumn", nullField, null));
BitVector booleanVector = new BitVector(new Field("boolean", booleanField, null), allocator);
int rowCount = 10;
// Set up VSR
List<FieldVector> vectors = Arrays.asList(nullVector, booleanVector);
try (VectorSchemaRoot root = new VectorSchemaRoot(vectors)) {
root.setRowCount(rowCount);
root.allocateNew();
// Set all values to null
for (int row = 0; row < rowCount; row++) {
nullVector.setNull(row);
booleanVector.set(row, 0);
}
File dataFile = new File(TMP, "testRoundTripNullColumn.avro");
roundTripTest(root, allocator, dataFile, rowCount);
}
}
@Test
public void testRoundTripBooleans() throws Exception {
// Field definition
FieldType booleanField = new FieldType(false, new ArrowType.Bool(), null);
// Create empty vector
BufferAllocator allocator = new RootAllocator();
BitVector booleanVector = new BitVector(new Field("boolean", booleanField, null), allocator);
// Set up VSR
List<FieldVector> vectors = Arrays.asList(booleanVector);
int rowCount = 10;
try (VectorSchemaRoot root = new VectorSchemaRoot(vectors)) {
root.setRowCount(rowCount);
root.allocateNew();
// Set test data
for (int row = 0; row < rowCount; row++) {
booleanVector.set(row, row % 2 == 0 ? 1 : 0);
}
File dataFile = new File(TMP, "testRoundTripBooleans.avro");
roundTripTest(root, allocator, dataFile, rowCount);
}
}
@Test
public void testRoundTripNullableBooleans() throws Exception {
// Field definition
FieldType booleanField = new FieldType(true, new ArrowType.Bool(), null);
// Create empty vector
BufferAllocator allocator = new RootAllocator();
BitVector booleanVector = new BitVector(new Field("boolean", booleanField, null), allocator);
int rowCount = 3;
// Set up VSR
List<FieldVector> vectors = Arrays.asList(booleanVector);
try (VectorSchemaRoot root = new VectorSchemaRoot(vectors)) {
root.setRowCount(rowCount);
root.allocateNew();
// Null value
booleanVector.setNull(0);
// False value
booleanVector.set(1, 0);
// True value
booleanVector.set(2, 1);
File dataFile = new File(TMP, "testRoundTripNullableBooleans.avro");
roundTripTest(root, allocator, dataFile, rowCount);
}
}
@Test
public void testRoundTripIntegers() throws Exception {
// Field definitions
FieldType int32Field = new FieldType(false, new ArrowType.Int(32, true), null);
FieldType int64Field = new FieldType(false, new ArrowType.Int(64, true), null);
// Create empty vectors
BufferAllocator allocator = new RootAllocator();
IntVector int32Vector = new IntVector(new Field("int32", int32Field, null), allocator);
BigIntVector int64Vector = new BigIntVector(new Field("int64", int64Field, null), allocator);
// Set up VSR
List<FieldVector> vectors = Arrays.asList(int32Vector, int64Vector);
int rowCount = 12;
try (VectorSchemaRoot root = new VectorSchemaRoot(vectors)) {
root.setRowCount(rowCount);
root.allocateNew();
// Set test data
for (int row = 0; row < 10; row++) {
int32Vector.set(row, 513 * row * (row % 2 == 0 ? 1 : -1));
int64Vector.set(row, 3791L * row * (row % 2 == 0 ? 1 : -1));
}
// Min values
int32Vector.set(10, Integer.MIN_VALUE);
int64Vector.set(10, Long.MIN_VALUE);
// Max values
int32Vector.set(11, Integer.MAX_VALUE);
int64Vector.set(11, Long.MAX_VALUE);
File dataFile = new File(TMP, "testRoundTripIntegers.avro");
roundTripTest(root, allocator, dataFile, rowCount);
}
}
@Test
public void testRoundTripNullableIntegers() throws Exception {
// Field definitions
FieldType int32Field = new FieldType(true, new ArrowType.Int(32, true), null);
FieldType int64Field = new FieldType(true, new ArrowType.Int(64, true), null);
// Create empty vectors
BufferAllocator allocator = new RootAllocator();
IntVector int32Vector = new IntVector(new Field("int32", int32Field, null), allocator);
BigIntVector int64Vector = new BigIntVector(new Field("int64", int64Field, null), allocator);
int rowCount = 3;
// Set up VSR
List<FieldVector> vectors = Arrays.asList(int32Vector, int64Vector);
try (VectorSchemaRoot root = new VectorSchemaRoot(vectors)) {
root.setRowCount(rowCount);
root.allocateNew();
// Null values
int32Vector.setNull(0);
int64Vector.setNull(0);
// Zero values
int32Vector.set(1, 0);
int64Vector.set(1, 0);
// Non-zero values
int32Vector.set(2, Integer.MAX_VALUE);
int64Vector.set(2, Long.MAX_VALUE);
File dataFile = new File(TMP, "testRoundTripNullableIntegers.avro");
roundTripTest(root, allocator, dataFile, rowCount);
}
}
@Test
public void testRoundTripFloatingPoints() throws Exception {
// Field definitions
FieldType float32Field =
new FieldType(false, new ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE), null);
FieldType float64Field =
new FieldType(false, new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE), null);
// Create empty vectors
BufferAllocator allocator = new RootAllocator();
Float4Vector float32Vector =
new Float4Vector(new Field("float32", float32Field, null), allocator);
Float8Vector float64Vector =
new Float8Vector(new Field("float64", float64Field, null), allocator);
// Set up VSR
List<FieldVector> vectors = Arrays.asList(float32Vector, float64Vector);
int rowCount = 15;
try (VectorSchemaRoot root = new VectorSchemaRoot(vectors)) {
root.setRowCount(rowCount);
root.allocateNew();
// Set test data
for (int row = 0; row < 10; row++) {
float32Vector.set(row, 37.6f * row * (row % 2 == 0 ? 1 : -1));
float64Vector.set(row, 37.6d * row * (row % 2 == 0 ? 1 : -1));
}
float32Vector.set(10, Float.MIN_VALUE);
float64Vector.set(10, Double.MIN_VALUE);
float32Vector.set(11, Float.MAX_VALUE);
float64Vector.set(11, Double.MAX_VALUE);
float32Vector.set(12, Float.NaN);
float64Vector.set(12, Double.NaN);
float32Vector.set(13, Float.POSITIVE_INFINITY);
float64Vector.set(13, Double.POSITIVE_INFINITY);
float32Vector.set(14, Float.NEGATIVE_INFINITY);
float64Vector.set(14, Double.NEGATIVE_INFINITY);
File dataFile = new File(TMP, "testRoundTripFloatingPoints.avro");
roundTripTest(root, allocator, dataFile, rowCount);
}
}
@Test
public void testRoundTripNullableFloatingPoints() throws Exception {
// Field definitions
FieldType float32Field =
new FieldType(true, new ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE), null);
FieldType float64Field =
new FieldType(true, new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE), null);
// Create empty vectors
BufferAllocator allocator = new RootAllocator();
Float4Vector float32Vector =
new Float4Vector(new Field("float32", float32Field, null), allocator);
Float8Vector float64Vector =
new Float8Vector(new Field("float64", float64Field, null), allocator);
int rowCount = 3;
// Set up VSR
List<FieldVector> vectors = Arrays.asList(float32Vector, float64Vector);
try (VectorSchemaRoot root = new VectorSchemaRoot(vectors)) {
root.setRowCount(rowCount);
root.allocateNew();
// Null values
float32Vector.setNull(0);
float64Vector.setNull(0);
// Zero values
float32Vector.set(1, 0.0f);
float64Vector.set(1, 0.0);
// Non-zero values
float32Vector.set(2, 1.0f);
float64Vector.set(2, 1.0);
File dataFile = new File(TMP, "testRoundTripNullableFloatingPoints.avro");
roundTripTest(root, allocator, dataFile, rowCount);
}
}
@Test
public void testRoundTripStrings() throws Exception {
// Field definition
FieldType stringField = new FieldType(false, new ArrowType.Utf8(), null);
// Create empty vector
BufferAllocator allocator = new RootAllocator();
VarCharVector stringVector =
new VarCharVector(new Field("string", stringField, null), allocator);
// Set up VSR
List<FieldVector> vectors = Arrays.asList(stringVector);
int rowCount = 5;
try (VectorSchemaRoot root = new VectorSchemaRoot(vectors)) {
root.setRowCount(rowCount);
root.allocateNew();
// Set test data
stringVector.setSafe(0, "Hello world!".getBytes());
stringVector.setSafe(1, "<%**\r\n\t\\abc\0$$>".getBytes());
stringVector.setSafe(2, "������������".getBytes());
stringVector.setSafe(3, "���������� ��������������".getBytes());
stringVector.setSafe(4, "(P ��� P ��� Q) ��� Q".getBytes());
File dataFile = new File(TMP, "testRoundTripStrings.avro");
roundTripTest(root, allocator, dataFile, rowCount);
}
}
@Test
public void testRoundTripNullableStrings() throws Exception {
// Field definition
FieldType stringField = new FieldType(true, new ArrowType.Utf8(), null);
// Create empty vector
BufferAllocator allocator = new RootAllocator();
VarCharVector stringVector =
new VarCharVector(new Field("string", stringField, null), allocator);
int rowCount = 3;
// Set up VSR
List<FieldVector> vectors = Arrays.asList(stringVector);
try (VectorSchemaRoot root = new VectorSchemaRoot(vectors)) {
root.setRowCount(rowCount);
root.allocateNew();
// Set test data
stringVector.setNull(0);
stringVector.setSafe(1, "".getBytes());
stringVector.setSafe(2, "not empty".getBytes());
File dataFile = new File(TMP, "testRoundTripNullableStrings.avro");
roundTripTest(root, allocator, dataFile, rowCount);
}
}
@Test
public void testRoundTripBinary() throws Exception {
// Field definition
FieldType binaryField = new FieldType(false, new ArrowType.Binary(), null);
FieldType fixedField = new FieldType(false, new ArrowType.FixedSizeBinary(5), null);
// Create empty vector
BufferAllocator allocator = new RootAllocator();
VarBinaryVector binaryVector =
new VarBinaryVector(new Field("binary", binaryField, null), allocator);
FixedSizeBinaryVector fixedVector =
new FixedSizeBinaryVector(new Field("fixed", fixedField, null), allocator);
// Set up VSR
List<FieldVector> vectors = Arrays.asList(binaryVector, fixedVector);
int rowCount = 3;
try (VectorSchemaRoot root = new VectorSchemaRoot(vectors)) {
root.setRowCount(rowCount);
root.allocateNew();
// Set test data
binaryVector.setSafe(0, new byte[] {1, 2, 3});
binaryVector.setSafe(1, new byte[] {4, 5, 6, 7});
binaryVector.setSafe(2, new byte[] {8, 9});
fixedVector.setSafe(0, new byte[] {1, 2, 3, 4, 5});
fixedVector.setSafe(1, new byte[] {4, 5, 6, 7, 8, 9});
fixedVector.setSafe(2, new byte[] {8, 9, 10, 11, 12});
File dataFile = new File(TMP, "testRoundTripBinary.avro");
roundTripByteArrayTest(root, allocator, dataFile, rowCount);
}
}
@Test
public void testRoundTripNullableBinary() throws Exception {
// Field definition
FieldType binaryField = new FieldType(true, new ArrowType.Binary(), null);
FieldType fixedField = new FieldType(true, new ArrowType.FixedSizeBinary(5), null);
// Create empty vector
BufferAllocator allocator = new RootAllocator();
VarBinaryVector binaryVector =
new VarBinaryVector(new Field("binary", binaryField, null), allocator);
FixedSizeBinaryVector fixedVector =
new FixedSizeBinaryVector(new Field("fixed", fixedField, null), allocator);
int rowCount = 3;
// Set up VSR
List<FieldVector> vectors = Arrays.asList(binaryVector, fixedVector);
try (VectorSchemaRoot root = new VectorSchemaRoot(vectors)) {
root.setRowCount(rowCount);
root.allocateNew();
// Set test data
binaryVector.setNull(0);
binaryVector.setSafe(1, new byte[] {});
binaryVector.setSafe(2, new byte[] {10, 11, 12});
fixedVector.setNull(0);
fixedVector.setSafe(1, new byte[] {0, 0, 0, 0, 0});
fixedVector.setSafe(2, new byte[] {10, 11, 12, 13, 14});
File dataFile = new File(TMP, "testRoundTripNullableBinary.avro");
roundTripByteArrayTest(root, allocator, dataFile, rowCount);
}
}
// Data round trip for logical types, nullable and non-nullable
@Test
public void testRoundTripDecimals() throws Exception {
// Field definitions
FieldType decimal128Field1 = new FieldType(false, new ArrowType.Decimal(38, 10, 128), null);
FieldType decimal128Field2 = new FieldType(false, new ArrowType.Decimal(38, 5, 128), null);
FieldType decimal256Field1 = new FieldType(false, new ArrowType.Decimal(76, 20, 256), null);
FieldType decimal256Field2 = new FieldType(false, new ArrowType.Decimal(76, 10, 256), null);
// Create empty vectors
BufferAllocator allocator = new RootAllocator();
DecimalVector decimal128Vector1 =
new DecimalVector(new Field("decimal128_1", decimal128Field1, null), allocator);
DecimalVector decimal128Vector2 =
new DecimalVector(new Field("decimal128_2", decimal128Field2, null), allocator);
Decimal256Vector decimal256Vector1 =
new Decimal256Vector(new Field("decimal256_1", decimal256Field1, null), allocator);
Decimal256Vector decimal256Vector2 =
new Decimal256Vector(new Field("decimal256_2", decimal256Field2, null), allocator);
// Set up VSR
List<FieldVector> vectors =
Arrays.asList(decimal128Vector1, decimal128Vector2, decimal256Vector1, decimal256Vector2);
int rowCount = 3;
try (VectorSchemaRoot root = new VectorSchemaRoot(vectors)) {
root.setRowCount(rowCount);
root.allocateNew();
// Set test data
decimal128Vector1.setSafe(
0, new BigDecimal("12345.67890").setScale(10, RoundingMode.UNNECESSARY));
decimal128Vector1.setSafe(
1, new BigDecimal("-98765.43210").setScale(10, RoundingMode.UNNECESSARY));
decimal128Vector1.setSafe(
2, new BigDecimal("54321.09876").setScale(10, RoundingMode.UNNECESSARY));
decimal128Vector2.setSafe(
0, new BigDecimal("12345.67890").setScale(5, RoundingMode.UNNECESSARY));
decimal128Vector2.setSafe(
1, new BigDecimal("-98765.43210").setScale(5, RoundingMode.UNNECESSARY));
decimal128Vector2.setSafe(
2, new BigDecimal("54321.09876").setScale(5, RoundingMode.UNNECESSARY));
decimal256Vector1.setSafe(
0,
new BigDecimal("12345678901234567890.12345678901234567890")
.setScale(20, RoundingMode.UNNECESSARY));
decimal256Vector1.setSafe(
1,
new BigDecimal("-98765432109876543210.98765432109876543210")
.setScale(20, RoundingMode.UNNECESSARY));
decimal256Vector1.setSafe(
2,
new BigDecimal("54321098765432109876.54321098765432109876")
.setScale(20, RoundingMode.UNNECESSARY));
decimal256Vector2.setSafe(
0,
new BigDecimal("12345678901234567890.1234567890").setScale(10, RoundingMode.UNNECESSARY));
decimal256Vector2.setSafe(
1,
new BigDecimal("-98765432109876543210.9876543210")
.setScale(10, RoundingMode.UNNECESSARY));
decimal256Vector2.setSafe(
2,
new BigDecimal("54321098765432109876.5432109876").setScale(10, RoundingMode.UNNECESSARY));
File dataFile = new File(TMP, "testRoundTripDecimals.avro");
roundTripTest(root, allocator, dataFile, rowCount);
}
}
@Test
public void testRoundTripNullableDecimals() throws Exception {
// Field definitions
FieldType decimal128Field1 = new FieldType(true, new ArrowType.Decimal(38, 10, 128), null);
FieldType decimal128Field2 = new FieldType(true, new ArrowType.Decimal(38, 5, 128), null);
FieldType decimal256Field1 = new FieldType(true, new ArrowType.Decimal(76, 20, 256), null);
FieldType decimal256Field2 = new FieldType(true, new ArrowType.Decimal(76, 10, 256), null);
// Create empty vectors
BufferAllocator allocator = new RootAllocator();
DecimalVector decimal128Vector1 =
new DecimalVector(new Field("decimal128_1", decimal128Field1, null), allocator);
DecimalVector decimal128Vector2 =
new DecimalVector(new Field("decimal128_2", decimal128Field2, null), allocator);
Decimal256Vector decimal256Vector1 =
new Decimal256Vector(new Field("decimal256_1", decimal256Field1, null), allocator);
Decimal256Vector decimal256Vector2 =
new Decimal256Vector(new Field("decimal256_2", decimal256Field2, null), allocator);
int rowCount = 3;
// Set up VSR
List<FieldVector> vectors =
Arrays.asList(decimal128Vector1, decimal128Vector2, decimal256Vector1, decimal256Vector2);
try (VectorSchemaRoot root = new VectorSchemaRoot(vectors)) {
root.setRowCount(rowCount);
root.allocateNew();
// Set test data
decimal128Vector1.setNull(0);
decimal128Vector1.setSafe(1, BigDecimal.ZERO.setScale(10, RoundingMode.UNNECESSARY));
decimal128Vector1.setSafe(
2, new BigDecimal("12345.67890").setScale(10, RoundingMode.UNNECESSARY));
decimal128Vector2.setNull(0);
decimal128Vector2.setSafe(1, BigDecimal.ZERO.setScale(5, RoundingMode.UNNECESSARY));
decimal128Vector2.setSafe(
2, new BigDecimal("98765.43210").setScale(5, RoundingMode.UNNECESSARY));
decimal256Vector1.setNull(0);
decimal256Vector1.setSafe(1, BigDecimal.ZERO.setScale(20, RoundingMode.UNNECESSARY));
decimal256Vector1.setSafe(
2,
new BigDecimal("12345678901234567890.12345678901234567890")
.setScale(20, RoundingMode.UNNECESSARY));
decimal256Vector2.setNull(0);
decimal256Vector2.setSafe(1, BigDecimal.ZERO.setScale(10, RoundingMode.UNNECESSARY));
decimal256Vector2.setSafe(
2,
new BigDecimal("98765432109876543210.9876543210").setScale(10, RoundingMode.UNNECESSARY));
File dataFile = new File(TMP, "testRoundTripNullableDecimals.avro");
roundTripTest(root, allocator, dataFile, rowCount);
}
}
@Test
public void testRoundTripDates() throws Exception {
// Field definitions
FieldType dateDayField = new FieldType(false, new ArrowType.Date(DateUnit.DAY), null);
// Create empty vectors
BufferAllocator allocator = new RootAllocator();
DateDayVector dateDayVector =
new DateDayVector(new Field("dateDay", dateDayField, null), allocator);
// Set up VSR
List<FieldVector> vectors = Arrays.asList(dateDayVector);
int rowCount = 3;
try (VectorSchemaRoot root = new VectorSchemaRoot(vectors)) {
root.setRowCount(rowCount);
root.allocateNew();
// Set test data
dateDayVector.setSafe(0, (int) LocalDate.now().toEpochDay());
dateDayVector.setSafe(1, (int) LocalDate.now().toEpochDay() + 1);
dateDayVector.setSafe(2, (int) LocalDate.now().toEpochDay() + 2);
File dataFile = new File(TMP, "testRoundTripDates.avro");
roundTripTest(root, allocator, dataFile, rowCount);
}
}
@Test
public void testRoundTripNullableDates() throws Exception {
// Field definitions
FieldType dateDayField = new FieldType(true, new ArrowType.Date(DateUnit.DAY), null);
// Create empty vectors
BufferAllocator allocator = new RootAllocator();
DateDayVector dateDayVector =
new DateDayVector(new Field("dateDay", dateDayField, null), allocator);
int rowCount = 3;
// Set up VSR
List<FieldVector> vectors = Arrays.asList(dateDayVector);
try (VectorSchemaRoot root = new VectorSchemaRoot(vectors)) {
root.setRowCount(rowCount);
root.allocateNew();
// Set test data
dateDayVector.setNull(0);
dateDayVector.setSafe(1, 0);
dateDayVector.setSafe(2, (int) LocalDate.now().toEpochDay());
File dataFile = new File(TMP, "testRoundTripNullableDates.avro");
roundTripTest(root, allocator, dataFile, rowCount);
}
}
@Test
public void testRoundTripTimes() throws Exception {
// Field definitions
FieldType timeMillisField =
new FieldType(false, new ArrowType.Time(TimeUnit.MILLISECOND, 32), null);
FieldType timeMicrosField =
new FieldType(false, new ArrowType.Time(TimeUnit.MICROSECOND, 64), null);
// Create empty vectors
BufferAllocator allocator = new RootAllocator();
TimeMilliVector timeMillisVector =
new TimeMilliVector(new Field("timeMillis", timeMillisField, null), allocator);
TimeMicroVector timeMicrosVector =
new TimeMicroVector(new Field("timeMicros", timeMicrosField, null), allocator);
// Set up VSR
List<FieldVector> vectors = Arrays.asList(timeMillisVector, timeMicrosVector);
int rowCount = 3;
try (VectorSchemaRoot root = new VectorSchemaRoot(vectors)) {
root.setRowCount(rowCount);
root.allocateNew();
// Set test data
timeMillisVector.setSafe(
0, (int) (ZonedDateTime.now().toLocalTime().toNanoOfDay() / 1000000));
timeMillisVector.setSafe(
1, (int) (ZonedDateTime.now().toLocalTime().toNanoOfDay() / 1000000) - 1000);
timeMillisVector.setSafe(
2, (int) (ZonedDateTime.now().toLocalTime().toNanoOfDay() / 1000000) - 2000);
timeMicrosVector.setSafe(0, ZonedDateTime.now().toLocalTime().toNanoOfDay() / 1000);
timeMicrosVector.setSafe(1, ZonedDateTime.now().toLocalTime().toNanoOfDay() / 1000 - 1000000);
timeMicrosVector.setSafe(2, ZonedDateTime.now().toLocalTime().toNanoOfDay() / 1000 - 2000000);
File dataFile = new File(TMP, "testRoundTripTimes.avro");
roundTripTest(root, allocator, dataFile, rowCount);
}
}
@Test
public void testRoundTripNullableTimes() throws Exception {
// Field definitions
FieldType timeMillisField =
new FieldType(true, new ArrowType.Time(TimeUnit.MILLISECOND, 32), null);
FieldType timeMicrosField =
new FieldType(true, new ArrowType.Time(TimeUnit.MICROSECOND, 64), null);
// Create empty vectors
BufferAllocator allocator = new RootAllocator();
TimeMilliVector timeMillisVector =
new TimeMilliVector(new Field("timeMillis", timeMillisField, null), allocator);
TimeMicroVector timeMicrosVector =
new TimeMicroVector(new Field("timeMicros", timeMicrosField, null), allocator);
int rowCount = 3;
// Set up VSR
List<FieldVector> vectors = Arrays.asList(timeMillisVector, timeMicrosVector);
try (VectorSchemaRoot root = new VectorSchemaRoot(vectors)) {
root.setRowCount(rowCount);
root.allocateNew();
// Set test data
timeMillisVector.setNull(0);
timeMillisVector.setSafe(1, 0);
timeMillisVector.setSafe(
2, (int) (ZonedDateTime.now().toLocalTime().toNanoOfDay() / 1000000));
timeMicrosVector.setNull(0);
timeMicrosVector.setSafe(1, 0);
timeMicrosVector.setSafe(2, ZonedDateTime.now().toLocalTime().toNanoOfDay() / 1000);
File dataFile = new File(TMP, "testRoundTripNullableTimes.avro");
roundTripTest(root, allocator, dataFile, rowCount);
}
}
@Test
public void testRoundTripZoneAwareTimestamps() throws Exception {
// Field definitions
FieldType timestampMillisField =
new FieldType(false, new ArrowType.Timestamp(TimeUnit.MILLISECOND, "UTC"), null);
FieldType timestampMicrosField =
new FieldType(false, new ArrowType.Timestamp(TimeUnit.MICROSECOND, "UTC"), null);
FieldType timestampNanosField =
new FieldType(false, new ArrowType.Timestamp(TimeUnit.NANOSECOND, "UTC"), null);
// Create empty vectors
BufferAllocator allocator = new RootAllocator();
TimeStampMilliTZVector timestampMillisVector =
new TimeStampMilliTZVector(
new Field("timestampMillis", timestampMillisField, null), allocator);
TimeStampMicroTZVector timestampMicrosVector =
new TimeStampMicroTZVector(
new Field("timestampMicros", timestampMicrosField, null), allocator);
TimeStampNanoTZVector timestampNanosVector =
new TimeStampNanoTZVector(
new Field("timestampNanos", timestampNanosField, null), allocator);
// Set up VSR
List<FieldVector> vectors =
Arrays.asList(timestampMillisVector, timestampMicrosVector, timestampNanosVector);
int rowCount = 3;
try (VectorSchemaRoot root = new VectorSchemaRoot(vectors)) {
root.setRowCount(rowCount);
root.allocateNew();
// Set test data
timestampMillisVector.setSafe(0, (int) Instant.now().toEpochMilli());
timestampMillisVector.setSafe(1, (int) Instant.now().toEpochMilli() - 1000);
timestampMillisVector.setSafe(2, (int) Instant.now().toEpochMilli() - 2000);
timestampMicrosVector.setSafe(0, Instant.now().toEpochMilli() * 1000);
timestampMicrosVector.setSafe(1, (Instant.now().toEpochMilli() - 1000) * 1000);
timestampMicrosVector.setSafe(2, (Instant.now().toEpochMilli() - 2000) * 1000);
timestampNanosVector.setSafe(0, Instant.now().toEpochMilli() * 1000000);
timestampNanosVector.setSafe(1, (Instant.now().toEpochMilli() - 1000) * 1000000);
timestampNanosVector.setSafe(2, (Instant.now().toEpochMilli() - 2000) * 1000000);
File dataFile = new File(TMP, "testRoundTripZoneAwareTimestamps.avro");
roundTripTest(root, allocator, dataFile, rowCount);
}
}
@Test
public void testRoundTripNullableZoneAwareTimestamps() throws Exception {
// Field definitions
FieldType timestampMillisField =
new FieldType(true, new ArrowType.Timestamp(TimeUnit.MILLISECOND, "UTC"), null);
FieldType timestampMicrosField =
new FieldType(true, new ArrowType.Timestamp(TimeUnit.MICROSECOND, "UTC"), null);
FieldType timestampNanosField =
new FieldType(true, new ArrowType.Timestamp(TimeUnit.NANOSECOND, "UTC"), null);
// Create empty vectors
BufferAllocator allocator = new RootAllocator();
TimeStampMilliTZVector timestampMillisVector =
new TimeStampMilliTZVector(
new Field("timestampMillis", timestampMillisField, null), allocator);
TimeStampMicroTZVector timestampMicrosVector =
new TimeStampMicroTZVector(
new Field("timestampMicros", timestampMicrosField, null), allocator);
TimeStampNanoTZVector timestampNanosVector =
new TimeStampNanoTZVector(
new Field("timestampNanos", timestampNanosField, null), allocator);
int rowCount = 3;
// Set up VSR
List<FieldVector> vectors =
Arrays.asList(timestampMillisVector, timestampMicrosVector, timestampNanosVector);
try (VectorSchemaRoot root = new VectorSchemaRoot(vectors)) {
root.setRowCount(rowCount);
root.allocateNew();
// Set test data
timestampMillisVector.setNull(0);
timestampMillisVector.setSafe(1, 0);
timestampMillisVector.setSafe(2, (int) Instant.now().toEpochMilli());
timestampMicrosVector.setNull(0);
timestampMicrosVector.setSafe(1, 0);
timestampMicrosVector.setSafe(2, Instant.now().toEpochMilli() * 1000);
timestampNanosVector.setNull(0);
timestampNanosVector.setSafe(1, 0);
timestampNanosVector.setSafe(2, Instant.now().toEpochMilli() * 1000000);
File dataFile = new File(TMP, "testRoundTripNullableZoneAwareTimestamps.avro");
roundTripTest(root, allocator, dataFile, rowCount);
}
}
@Test
public void testRoundTripLocalTimestamps() throws Exception {
// Field definitions
FieldType timestampMillisField =
new FieldType(false, new ArrowType.Timestamp(TimeUnit.MILLISECOND, null), null);
FieldType timestampMicrosField =
new FieldType(false, new ArrowType.Timestamp(TimeUnit.MICROSECOND, null), null);
FieldType timestampNanosField =
new FieldType(false, new ArrowType.Timestamp(TimeUnit.NANOSECOND, null), null);
// Create empty vectors
BufferAllocator allocator = new RootAllocator();
TimeStampMilliVector timestampMillisVector =
new TimeStampMilliVector(
new Field("timestampMillis", timestampMillisField, null), allocator);
TimeStampMicroVector timestampMicrosVector =
new TimeStampMicroVector(
new Field("timestampMicros", timestampMicrosField, null), allocator);
TimeStampNanoVector timestampNanosVector =
new TimeStampNanoVector(new Field("timestampNanos", timestampNanosField, null), allocator);
// Set up VSR
List<FieldVector> vectors =
Arrays.asList(timestampMillisVector, timestampMicrosVector, timestampNanosVector);
int rowCount = 3;
try (VectorSchemaRoot root = new VectorSchemaRoot(vectors)) {
root.setRowCount(rowCount);
root.allocateNew();
// Set test data
timestampMillisVector.setSafe(0, (int) Instant.now().toEpochMilli());
timestampMillisVector.setSafe(1, (int) Instant.now().toEpochMilli() - 1000);
timestampMillisVector.setSafe(2, (int) Instant.now().toEpochMilli() - 2000);
timestampMicrosVector.setSafe(0, Instant.now().toEpochMilli() * 1000);
timestampMicrosVector.setSafe(1, (Instant.now().toEpochMilli() - 1000) * 1000);
timestampMicrosVector.setSafe(2, (Instant.now().toEpochMilli() - 2000) * 1000);
timestampNanosVector.setSafe(0, Instant.now().toEpochMilli() * 1000000);
timestampNanosVector.setSafe(1, (Instant.now().toEpochMilli() - 1000) * 1000000);
timestampNanosVector.setSafe(2, (Instant.now().toEpochMilli() - 2000) * 1000000);
File dataFile = new File(TMP, "testRoundTripLocalTimestamps.avro");
roundTripTest(root, allocator, dataFile, rowCount);
}
}
@Test
public void testRoundTripNullableLocalTimestamps() throws Exception {
// Field definitions
FieldType timestampMillisField =
new FieldType(true, new ArrowType.Timestamp(TimeUnit.MILLISECOND, null), null);
FieldType timestampMicrosField =
new FieldType(true, new ArrowType.Timestamp(TimeUnit.MICROSECOND, null), null);
FieldType timestampNanosField =
new FieldType(true, new ArrowType.Timestamp(TimeUnit.NANOSECOND, null), null);
// Create empty vectors
BufferAllocator allocator = new RootAllocator();
TimeStampMilliVector timestampMillisVector =
new TimeStampMilliVector(
new Field("timestampMillis", timestampMillisField, null), allocator);
TimeStampMicroVector timestampMicrosVector =
new TimeStampMicroVector(
new Field("timestampMicros", timestampMicrosField, null), allocator);
TimeStampNanoVector timestampNanosVector =
new TimeStampNanoVector(new Field("timestampNanos", timestampNanosField, null), allocator);
int rowCount = 3;
// Set up VSR
List<FieldVector> vectors =
Arrays.asList(timestampMillisVector, timestampMicrosVector, timestampNanosVector);
try (VectorSchemaRoot root = new VectorSchemaRoot(vectors)) {
root.setRowCount(rowCount);
root.allocateNew();
// Set test data
timestampMillisVector.setNull(0);
timestampMillisVector.setSafe(1, 0);
timestampMillisVector.setSafe(2, (int) Instant.now().toEpochMilli());
timestampMicrosVector.setNull(0);
timestampMicrosVector.setSafe(1, 0);
timestampMicrosVector.setSafe(2, Instant.now().toEpochMilli() * 1000);
timestampNanosVector.setNull(0);
timestampNanosVector.setSafe(1, 0);
timestampNanosVector.setSafe(2, Instant.now().toEpochMilli() * 1000000);
File dataFile = new File(TMP, "testRoundTripNullableLocalTimestamps.avro");
roundTripTest(root, allocator, dataFile, rowCount);
}
}
// Data round trip for containers of primitive and logical types, nullable and non-nullable
@Test
public void testRoundTripLists() throws Exception {
// Field definitions
FieldType intListField = new FieldType(false, new ArrowType.List(), null);
FieldType stringListField = new FieldType(false, new ArrowType.List(), null);
FieldType dateListField = new FieldType(false, new ArrowType.List(), null);
Field intField = new Field("item", FieldType.notNullable(new ArrowType.Int(32, true)), null);
Field stringField = new Field("item", FieldType.notNullable(new ArrowType.Utf8()), null);
Field dateField =
new Field("item", FieldType.notNullable(new ArrowType.Date(DateUnit.DAY)), null);
// Create empty vectors
BufferAllocator allocator = new RootAllocator();
ListVector intListVector = new ListVector("intList", allocator, intListField, null);
ListVector stringListVector = new ListVector("stringList", allocator, stringListField, null);
ListVector dateListVector = new ListVector("dateList", allocator, dateListField, null);
intListVector.initializeChildrenFromFields(Arrays.asList(intField));
stringListVector.initializeChildrenFromFields(Arrays.asList(stringField));
dateListVector.initializeChildrenFromFields(Arrays.asList(dateField));
// Set up VSR
List<FieldVector> vectors = Arrays.asList(intListVector, stringListVector, dateListVector);
int rowCount = 3;
try (VectorSchemaRoot root = new VectorSchemaRoot(vectors)) {
root.setRowCount(rowCount);
root.allocateNew();
FieldWriter intListWriter = intListVector.getWriter();
FieldWriter stringListWriter = stringListVector.getWriter();
FieldWriter dateListWriter = dateListVector.getWriter();
// Set test data for intList
for (int i = 0; i < rowCount; i++) {
intListWriter.startList();
for (int j = 0; j < 5 - i; j++) {
intListWriter.writeInt(j);
}
intListWriter.endList();
}
// Set test data for stringList
for (int i = 0; i < rowCount; i++) {
stringListWriter.startList();
for (int j = 0; j < 5 - i; j++) {
stringListWriter.writeVarChar("string" + j);
}
stringListWriter.endList();
}
// Set test data for dateList
for (int i = 0; i < rowCount; i++) {
dateListWriter.startList();
for (int j = 0; j < 5 - i; j++) {
dateListWriter.writeDateDay((int) LocalDate.now().plusDays(j).toEpochDay());
}
dateListWriter.endList();
}
// Update count for the vectors
intListVector.setValueCount(rowCount);
stringListVector.setValueCount(rowCount);
dateListVector.setValueCount(rowCount);
File dataFile = new File(TMP, "testRoundTripLists.avro");
roundTripTest(root, allocator, dataFile, rowCount);
}
}
@Test
public void testRoundTripNullableLists() throws Exception {
// Field definitions
FieldType nullListType = new FieldType(true, new ArrowType.List(), null);
FieldType nonNullListType = new FieldType(false, new ArrowType.List(), null);
Field nullFieldType = new Field("item", FieldType.nullable(new ArrowType.Int(32, true)), null);
Field nonNullFieldType =
new Field("item", FieldType.notNullable(new ArrowType.Int(32, true)), null);
// Create empty vectors
BufferAllocator allocator = new RootAllocator();
ListVector nullEntriesVector =
new ListVector("nullEntriesVector", allocator, nonNullListType, null);
ListVector nullListVector = new ListVector("nullListVector", allocator, nullListType, null);
ListVector nullBothVector = new ListVector("nullBothVector", allocator, nullListType, null);
nullEntriesVector.initializeChildrenFromFields(Arrays.asList(nullFieldType));
nullListVector.initializeChildrenFromFields(Arrays.asList(nonNullFieldType));
nullBothVector.initializeChildrenFromFields(Arrays.asList(nullFieldType));
// Set up VSR
List<FieldVector> vectors = Arrays.asList(nullEntriesVector, nullListVector, nullBothVector);
int rowCount = 4;
try (VectorSchemaRoot root = new VectorSchemaRoot(vectors)) {
root.setRowCount(rowCount);
root.allocateNew();
// Set test data for nullEntriesVector
FieldWriter nullEntriesWriter = nullEntriesVector.getWriter();
nullEntriesWriter.startList();
nullEntriesWriter.integer().writeNull();
nullEntriesWriter.integer().writeNull();
nullEntriesWriter.endList();
nullEntriesWriter.startList();
nullEntriesWriter.integer().writeInt(0);
nullEntriesWriter.integer().writeInt(0);
nullEntriesWriter.endList();
nullEntriesWriter.startList();
nullEntriesWriter.integer().writeInt(123);
nullEntriesWriter.integer().writeInt(456);
nullEntriesWriter.endList();
nullEntriesWriter.startList();
nullEntriesWriter.integer().writeInt(789);
nullEntriesWriter.integer().writeInt(789);
nullEntriesWriter.endList();
// Set test data for nullListVector
FieldWriter nullListWriter = nullListVector.getWriter();
nullListWriter.writeNull();
nullListWriter.setPosition(1); // writeNull() does not inc. idx() on list vector
nullListWriter.startList();
nullListWriter.integer().writeInt(0);
nullListWriter.integer().writeInt(0);
nullListWriter.endList();
nullEntriesWriter.startList();
nullEntriesWriter.integer().writeInt(123);
nullEntriesWriter.integer().writeInt(456);
nullEntriesWriter.endList();
nullEntriesWriter.startList();
nullEntriesWriter.integer().writeInt(789);
nullEntriesWriter.integer().writeInt(789);
nullEntriesWriter.endList();
// Set test data for nullBothVector
FieldWriter nullBothWriter = nullBothVector.getWriter();
nullBothWriter.writeNull();
nullBothWriter.setPosition(1);
nullBothWriter.startList();
nullBothWriter.integer().writeNull();
nullBothWriter.integer().writeNull();
nullBothWriter.endList();
nullListWriter.startList();
nullListWriter.integer().writeInt(0);
nullListWriter.integer().writeInt(0);
nullListWriter.endList();
nullEntriesWriter.startList();
nullEntriesWriter.integer().writeInt(123);
nullEntriesWriter.integer().writeInt(456);
nullEntriesWriter.endList();
// Update count for the vectors
nullListVector.setValueCount(4);
nullEntriesVector.setValueCount(4);
nullBothVector.setValueCount(4);
File dataFile = new File(TMP, "testRoundTripNullableLists.avro");
roundTripTest(root, allocator, dataFile, rowCount);
}
}
@Test
public void testRoundTripMap() throws Exception {
// Field definitions
FieldType intMapField = new FieldType(false, new ArrowType.Map(false), null);
FieldType stringMapField = new FieldType(false, new ArrowType.Map(false), null);
FieldType dateMapField = new FieldType(false, new ArrowType.Map(false), null);
Field keyField = new Field("key", FieldType.notNullable(new ArrowType.Utf8()), null);
Field intField = new Field("value", FieldType.notNullable(new ArrowType.Int(32, true)), null);
Field stringField = new Field("value", FieldType.notNullable(new ArrowType.Utf8()), null);
Field dateField =
new Field("value", FieldType.notNullable(new ArrowType.Date(DateUnit.DAY)), null);
Field intEntryField =
new Field(
"entries",
FieldType.notNullable(new ArrowType.Struct()),
Arrays.asList(keyField, intField));
Field stringEntryField =
new Field(
"entries",
FieldType.notNullable(new ArrowType.Struct()),
Arrays.asList(keyField, stringField));
Field dateEntryField =
new Field(
"entries",
FieldType.notNullable(new ArrowType.Struct()),
Arrays.asList(keyField, dateField));
// Create empty vectors
BufferAllocator allocator = new RootAllocator();
MapVector intMapVector = new MapVector("intMap", allocator, intMapField, null);
MapVector stringMapVector = new MapVector("stringMap", allocator, stringMapField, null);
MapVector dateMapVector = new MapVector("dateMap", allocator, dateMapField, null);
intMapVector.initializeChildrenFromFields(Arrays.asList(intEntryField));
stringMapVector.initializeChildrenFromFields(Arrays.asList(stringEntryField));
dateMapVector.initializeChildrenFromFields(Arrays.asList(dateEntryField));
// Set up VSR
List<FieldVector> vectors = Arrays.asList(intMapVector, stringMapVector, dateMapVector);
int rowCount = 3;
try (VectorSchemaRoot root = new VectorSchemaRoot(vectors)) {
root.setRowCount(rowCount);
root.allocateNew();
// Total number of entries that will be writen to each vector
int entryCount = 5 + 4 + 3;
// Set test data for intList
BaseWriter.MapWriter writer = intMapVector.getWriter();
for (int i = 0; i < rowCount; i++) {
writer.startMap();
for (int j = 0; j < 5 - i; j++) {
writer.startEntry();
writer.key().varChar().writeVarChar("key" + j);
writer.value().integer().writeInt(j);
writer.endEntry();
}
writer.endMap();
}
// Update count for data vector (map writer does not do this)
intMapVector.getDataVector().setValueCount(entryCount);
// Set test data for stringList
BaseWriter.MapWriter stringWriter = stringMapVector.getWriter();
for (int i = 0; i < rowCount; i++) {
stringWriter.startMap();
for (int j = 0; j < 5 - i; j++) {
stringWriter.startEntry();
stringWriter.key().varChar().writeVarChar("key" + j);
stringWriter.value().varChar().writeVarChar("string" + j);
stringWriter.endEntry();
}
stringWriter.endMap();
}
// Update count for the vectors
intMapVector.setValueCount(rowCount);
stringMapVector.setValueCount(rowCount);
dateMapVector.setValueCount(rowCount);
// Update count for data vector (map writer does not do this)
stringMapVector.getDataVector().setValueCount(entryCount);
// Set test data for dateList
BaseWriter.MapWriter dateWriter = dateMapVector.getWriter();
for (int i = 0; i < rowCount; i++) {
dateWriter.startMap();
for (int j = 0; j < 5 - i; j++) {
dateWriter.startEntry();
dateWriter.key().varChar().writeVarChar("key" + j);
dateWriter.value().dateDay().writeDateDay((int) LocalDate.now().plusDays(j).toEpochDay());
dateWriter.endEntry();
}
dateWriter.endMap();
}
// Update count for data vector (map writer does not do this)
dateMapVector.getDataVector().setValueCount(entryCount);
File dataFile = new File(TMP, "testRoundTripMap.avro");
roundTripTest(root, allocator, dataFile, rowCount);
}
}
@Test
public void testRoundTripNullableMap() throws Exception {
// Field definitions
FieldType nullMapType = new FieldType(true, new ArrowType.Map(false), null);
FieldType nonNullMapType = new FieldType(false, new ArrowType.Map(false), null);
Field keyField = new Field("key", FieldType.notNullable(new ArrowType.Utf8()), null);
Field nullFieldType = new Field("value", FieldType.nullable(new ArrowType.Int(32, true)), null);
Field nonNullFieldType =
new Field("value", FieldType.notNullable(new ArrowType.Int(32, true)), null);
Field nullEntryField =
new Field(
"entries",
FieldType.notNullable(new ArrowType.Struct()),
Arrays.asList(keyField, nullFieldType));
Field nonNullEntryField =
new Field(
"entries",
FieldType.notNullable(new ArrowType.Struct()),
Arrays.asList(keyField, nonNullFieldType));
// Create empty vectors
BufferAllocator allocator = new RootAllocator();
MapVector nullEntriesVector =
new MapVector("nullEntriesVector", allocator, nonNullMapType, null);
MapVector nullMapVector = new MapVector("nullMapVector", allocator, nullMapType, null);
MapVector nullBothVector = new MapVector("nullBothVector", allocator, nullMapType, null);
nullEntriesVector.initializeChildrenFromFields(Arrays.asList(nullEntryField));
nullMapVector.initializeChildrenFromFields(Arrays.asList(nonNullEntryField));
nullBothVector.initializeChildrenFromFields(Arrays.asList(nullEntryField));
// Set up VSR
List<FieldVector> vectors = Arrays.asList(nullEntriesVector, nullMapVector, nullBothVector);
int rowCount = 3;
try (VectorSchemaRoot root = new VectorSchemaRoot(vectors)) {
root.setRowCount(rowCount);
root.allocateNew();
// Set test data for intList
BaseWriter.MapWriter writer = nullEntriesVector.getWriter();
writer.startMap();
writer.startEntry();
writer.key().varChar().writeVarChar("key0");
writer.value().integer().writeNull();
writer.endEntry();
writer.startEntry();
writer.key().varChar().writeVarChar("key1");
writer.value().integer().writeNull();
writer.endEntry();
writer.endMap();
writer.startMap();
writer.startEntry();
writer.key().varChar().writeVarChar("key2");
writer.value().integer().writeInt(0);
writer.endEntry();
writer.startEntry();
writer.key().varChar().writeVarChar("key3");
writer.value().integer().writeInt(0);
writer.endEntry();
writer.endMap();
writer.startMap();
writer.startEntry();
writer.key().varChar().writeVarChar("key4");
writer.value().integer().writeInt(123);
writer.endEntry();
writer.startEntry();
writer.key().varChar().writeVarChar("key5");
writer.value().integer().writeInt(456);
writer.endEntry();
writer.endMap();
// Set test data for stringList
BaseWriter.MapWriter nullMapWriter = nullMapVector.getWriter();
nullMapWriter.writeNull();
nullMapWriter.setPosition(1); // writeNull() does not inc. idx() on map (list) vector
nullMapWriter.startMap();
nullMapWriter.startEntry();
nullMapWriter.key().varChar().writeVarChar("key2");
nullMapWriter.value().integer().writeInt(0);
nullMapWriter.endEntry();
writer.startMap();
writer.startEntry();
writer.key().varChar().writeVarChar("key3");
writer.value().integer().writeInt(0);
writer.endEntry();
nullMapWriter.endMap();
nullMapWriter.startMap();
writer.startEntry();
writer.key().varChar().writeVarChar("key4");
writer.value().integer().writeInt(123);
writer.endEntry();
writer.startEntry();
writer.key().varChar().writeVarChar("key5");
writer.value().integer().writeInt(456);
writer.endEntry();
nullMapWriter.endMap();
// Set test data for dateList
BaseWriter.MapWriter nullBothWriter = nullBothVector.getWriter();
nullBothWriter.writeNull();
nullBothWriter.setPosition(1);
nullBothWriter.startMap();
nullBothWriter.startEntry();
nullBothWriter.key().varChar().writeVarChar("key2");
nullBothWriter.value().integer().writeNull();
nullBothWriter.endEntry();
nullBothWriter.startEntry();
nullBothWriter.key().varChar().writeVarChar("key3");
nullBothWriter.value().integer().writeNull();
nullBothWriter.endEntry();
nullBothWriter.endMap();
nullBothWriter.startMap();
writer.startEntry();
writer.key().varChar().writeVarChar("key4");
writer.value().integer().writeInt(123);
writer.endEntry();
writer.startEntry();
writer.key().varChar().writeVarChar("key5");
writer.value().integer().writeInt(456);
writer.endEntry();
nullBothWriter.endMap();
// Update count for the vectors
nullEntriesVector.setValueCount(3);
nullMapVector.setValueCount(3);
nullBothVector.setValueCount(3);
File dataFile = new File(TMP, "testRoundTripNullableMap.avro");
roundTripTest(root, allocator, dataFile, rowCount);
}
}
@Test
public void testRoundTripStruct() throws Exception {
// Field definitions
FieldType structFieldType = new FieldType(false, new ArrowType.Struct(), null);
Field intField =
new Field("intField", FieldType.notNullable(new ArrowType.Int(32, true)), null);
Field stringField = new Field("stringField", FieldType.notNullable(new ArrowType.Utf8()), null);
Field dateField =
new Field("dateField", FieldType.notNullable(new ArrowType.Date(DateUnit.DAY)), null);
// Create empty vector
BufferAllocator allocator = new RootAllocator();
StructVector structVector = new StructVector("struct", allocator, structFieldType, null);
structVector.initializeChildrenFromFields(Arrays.asList(intField, stringField, dateField));
// Set up VSR
List<FieldVector> vectors = Arrays.asList(structVector);
int rowCount = 3;
try (VectorSchemaRoot root = new VectorSchemaRoot(vectors)) {
root.setRowCount(rowCount);
root.allocateNew();
// Set test data
BaseWriter.StructWriter structWriter = structVector.getWriter();
for (int i = 0; i < rowCount; i++) {
structWriter.start();
structWriter.integer("intField").writeInt(i);
structWriter.varChar("stringField").writeVarChar("string" + i);
structWriter.dateDay("dateField").writeDateDay((int) LocalDate.now().toEpochDay() + i);
structWriter.end();
}
File dataFile = new File(TMP, "testRoundTripStruct.avro");
roundTripTest(root, allocator, dataFile, rowCount);
}
}
@Test
public void testRoundTripNullableStructs() throws Exception {
// Field definitions
FieldType structFieldType = new FieldType(false, new ArrowType.Struct(), null);
FieldType nullableStructFieldType = new FieldType(true, new ArrowType.Struct(), null);
Field intField =
new Field("intField", FieldType.notNullable(new ArrowType.Int(32, true)), null);
Field nullableIntField =
new Field("nullableIntField", FieldType.nullable(new ArrowType.Int(32, true)), null);
// Create empty vectors
BufferAllocator allocator = new RootAllocator();
StructVector structVector = new StructVector("struct", allocator, structFieldType, null);
StructVector nullableStructVector =
new StructVector("nullableStruct", allocator, nullableStructFieldType, null);
structVector.initializeChildrenFromFields(Arrays.asList(intField, nullableIntField));
nullableStructVector.initializeChildrenFromFields(Arrays.asList(intField, nullableIntField));
// Set up VSR
List<FieldVector> vectors = Arrays.asList(structVector, nullableStructVector);
int rowCount = 4;
try (VectorSchemaRoot root = new VectorSchemaRoot(vectors)) {
root.setRowCount(rowCount);
root.allocateNew();
// Set test data for structVector
BaseWriter.StructWriter structWriter = structVector.getWriter();
for (int i = 0; i < rowCount; i++) {
structWriter.setPosition(i);
structWriter.start();
structWriter.integer("intField").writeInt(i);
if (i % 2 == 0) {
structWriter.integer("nullableIntField").writeInt(i * 10);
} else {
structWriter.integer("nullableIntField").writeNull();
}
structWriter.end();
}
// Set test data for nullableStructVector
BaseWriter.StructWriter nullableStructWriter = nullableStructVector.getWriter();
for (int i = 0; i < rowCount; i++) {
nullableStructWriter.setPosition(i);
if (i >= 2) {
nullableStructWriter.start();
nullableStructWriter.integer("intField").writeInt(i);
if (i % 2 == 0) {
nullableStructWriter.integer("nullableIntField").writeInt(i * 10);
} else {
nullableStructWriter.integer("nullableIntField").writeNull();
}
nullableStructWriter.end();
} else {
nullableStructWriter.writeNull();
}
}
// Update count for the vector
structVector.setValueCount(rowCount);
nullableStructVector.setValueCount(rowCount);
File dataFile = new File(TMP, "testRoundTripNullableStructs.avro");
roundTripTest(root, allocator, dataFile, rowCount);
}
}
@Test
public void testRoundTripEnum() throws Exception {
BufferAllocator allocator = new RootAllocator();
// Create a dictionary
FieldType dictionaryField = new FieldType(false, new ArrowType.Utf8(), null);
VarCharVector dictionaryVector =
new VarCharVector(new Field("dictionary", dictionaryField, null), allocator);
dictionaryVector.allocateNew(3);
dictionaryVector.set(0, "apple".getBytes());
dictionaryVector.set(1, "banana".getBytes());
dictionaryVector.set(2, "cherry".getBytes());
dictionaryVector.setValueCount(3);
// For simplicity, ensure the index type matches what will be decoded during Avro enum decoding
Dictionary dictionary =
new Dictionary(
dictionaryVector, new DictionaryEncoding(0L, false, new ArrowType.Int(8, true)));
DictionaryProvider dictionaries = new DictionaryProvider.MapDictionaryProvider(dictionary);
// Field definition
FieldType stringField = new FieldType(false, new ArrowType.Utf8(), null);
VarCharVector stringVector =
new VarCharVector(new Field("enumField", stringField, null), allocator);
stringVector.allocateNew(10);
stringVector.setSafe(0, "apple".getBytes());
stringVector.setSafe(1, "banana".getBytes());
stringVector.setSafe(2, "cherry".getBytes());
stringVector.setSafe(3, "cherry".getBytes());
stringVector.setSafe(4, "apple".getBytes());
stringVector.setSafe(5, "banana".getBytes());
stringVector.setSafe(6, "apple".getBytes());
stringVector.setSafe(7, "cherry".getBytes());
stringVector.setSafe(8, "banana".getBytes());
stringVector.setSafe(9, "apple".getBytes());
stringVector.setValueCount(10);
TinyIntVector encodedVector =
(TinyIntVector) DictionaryEncoder.encode(stringVector, dictionary);
// Set up VSR
List<FieldVector> vectors = Arrays.asList(encodedVector);
int rowCount = 10;
try (VectorSchemaRoot root = new VectorSchemaRoot(vectors)) {
File dataFile = new File(TMP, "testRoundTripEnums.avro");
roundTripTest(root, allocator, dataFile, rowCount, dictionaries);
}
}
}