ArrowToAvroDataTest.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.arrow.adapter.avro;

import static org.junit.jupiter.api.Assertions.*;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.InputStream;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.nio.ByteBuffer;
import java.time.Instant;
import java.time.LocalDate;
import java.time.ZonedDateTime;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.arrow.adapter.avro.producers.CompositeAvroProducer;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.memory.util.Float16;
import org.apache.arrow.vector.BigIntVector;
import org.apache.arrow.vector.BitVector;
import org.apache.arrow.vector.DateDayVector;
import org.apache.arrow.vector.DateMilliVector;
import org.apache.arrow.vector.Decimal256Vector;
import org.apache.arrow.vector.DecimalVector;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.FixedSizeBinaryVector;
import org.apache.arrow.vector.Float2Vector;
import org.apache.arrow.vector.Float4Vector;
import org.apache.arrow.vector.Float8Vector;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.NullVector;
import org.apache.arrow.vector.SmallIntVector;
import org.apache.arrow.vector.TimeMicroVector;
import org.apache.arrow.vector.TimeMilliVector;
import org.apache.arrow.vector.TimeNanoVector;
import org.apache.arrow.vector.TimeSecVector;
import org.apache.arrow.vector.TimeStampMicroTZVector;
import org.apache.arrow.vector.TimeStampMicroVector;
import org.apache.arrow.vector.TimeStampMilliTZVector;
import org.apache.arrow.vector.TimeStampMilliVector;
import org.apache.arrow.vector.TimeStampNanoTZVector;
import org.apache.arrow.vector.TimeStampNanoVector;
import org.apache.arrow.vector.TimeStampSecTZVector;
import org.apache.arrow.vector.TimeStampSecVector;
import org.apache.arrow.vector.TinyIntVector;
import org.apache.arrow.vector.UInt1Vector;
import org.apache.arrow.vector.UInt2Vector;
import org.apache.arrow.vector.UInt4Vector;
import org.apache.arrow.vector.UInt8Vector;
import org.apache.arrow.vector.VarBinaryVector;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.complex.FixedSizeListVector;
import org.apache.arrow.vector.complex.ListVector;
import org.apache.arrow.vector.complex.MapVector;
import org.apache.arrow.vector.complex.StructVector;
import org.apache.arrow.vector.complex.writer.BaseWriter;
import org.apache.arrow.vector.complex.writer.FieldWriter;
import org.apache.arrow.vector.dictionary.Dictionary;
import org.apache.arrow.vector.dictionary.DictionaryEncoder;
import org.apache.arrow.vector.dictionary.DictionaryProvider;
import org.apache.arrow.vector.types.DateUnit;
import org.apache.arrow.vector.types.FloatingPointPrecision;
import org.apache.arrow.vector.types.TimeUnit;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.DictionaryEncoding;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.FieldType;
import org.apache.arrow.vector.util.JsonStringArrayList;
import org.apache.arrow.vector.util.JsonStringHashMap;
import org.apache.avro.Conversions;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.util.Utf8;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

public class ArrowToAvroDataTest {

  @TempDir public static File TMP;

  // Data production for primitive types, nullable and non-nullable

  @Test
  public void testWriteNullColumn() throws Exception {

    // Field definition
    FieldType nullField = new FieldType(false, new ArrowType.Null(), null);

    // Create empty vector
    NullVector nullVector = new NullVector(new Field("nullColumn", nullField, null));

    int rowCount = 10;

    // Set up VSR
    List<FieldVector> vectors = Arrays.asList(nullVector);

    try (VectorSchemaRoot root = new VectorSchemaRoot(vectors)) {

      root.setRowCount(rowCount);
      root.allocateNew();

      // Set all values to null
      for (int row = 0; row < rowCount; row++) {
        nullVector.setNull(row);
      }

      File dataFile = new File(TMP, "testWriteNullColumn.avro");

      // Write an AVRO block using the producer classes
      try (FileOutputStream fos = new FileOutputStream(dataFile)) {
        BinaryEncoder encoder = new EncoderFactory().directBinaryEncoder(fos, null);
        CompositeAvroProducer producer = ArrowToAvroUtils.createCompositeProducer(vectors);
        for (int row = 0; row < rowCount; row++) {
          producer.produce(encoder);
        }
        encoder.flush();
      }

      // Set up reading the AVRO block as a GenericRecord
      Schema schema = ArrowToAvroUtils.createAvroSchema(root.getSchema().getFields());
      GenericDatumReader<GenericRecord> datumReader = new GenericDatumReader<>(schema);

      try (InputStream inputStream = new FileInputStream(dataFile)) {

        BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(inputStream, null);
        GenericRecord record = null;

        // Read and check values
        for (int row = 0; row < rowCount; row++) {
          record = datumReader.read(record, decoder);
          assertNull(record.get("nullColumn"));
        }
      }
    }
  }

  @Test
  public void testWriteBooleans() throws Exception {

    // Field definition
    FieldType booleanField = new FieldType(false, new ArrowType.Bool(), null);

    // Create empty vector
    BufferAllocator allocator = new RootAllocator();
    BitVector booleanVector = new BitVector(new Field("boolean", booleanField, null), allocator);

    // Set up VSR
    List<FieldVector> vectors = Arrays.asList(booleanVector);
    int rowCount = 10;

    try (VectorSchemaRoot root = new VectorSchemaRoot(vectors)) {

      root.setRowCount(rowCount);
      root.allocateNew();

      // Set test data
      for (int row = 0; row < rowCount; row++) {
        booleanVector.set(row, row % 2 == 0 ? 1 : 0);
      }

      File dataFile = new File(TMP, "testWriteBooleans.avro");

      // Write an AVRO block using the producer classes
      try (FileOutputStream fos = new FileOutputStream(dataFile)) {
        BinaryEncoder encoder = new EncoderFactory().directBinaryEncoder(fos, null);
        CompositeAvroProducer producer = ArrowToAvroUtils.createCompositeProducer(vectors);
        for (int row = 0; row < rowCount; row++) {
          producer.produce(encoder);
        }
        encoder.flush();
      }

      // Set up reading the AVRO block as a GenericRecord
      Schema schema = ArrowToAvroUtils.createAvroSchema(root.getSchema().getFields());
      GenericDatumReader<GenericRecord> datumReader = new GenericDatumReader<>(schema);

      try (InputStream inputStream = new FileInputStream(dataFile)) {

        BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(inputStream, null);
        GenericRecord record = null;

        // Read and check values
        for (int row = 0; row < rowCount; row++) {
          record = datumReader.read(record, decoder);
          assertEquals(booleanVector.get(row) == 1, record.get("boolean"));
        }
      }
    }
  }

  @Test
  public void testWriteNullableBooleans() throws Exception {

    // Field definition
    FieldType booleanField = new FieldType(true, new ArrowType.Bool(), null);

    // Create empty vector
    BufferAllocator allocator = new RootAllocator();
    BitVector booleanVector = new BitVector(new Field("boolean", booleanField, null), allocator);

    int rowCount = 3;

    // Set up VSR
    List<FieldVector> vectors = Arrays.asList(booleanVector);

    try (VectorSchemaRoot root = new VectorSchemaRoot(vectors)) {

      root.setRowCount(rowCount);
      root.allocateNew();

      // Null value
      booleanVector.setNull(0);

      // False value
      booleanVector.set(1, 0);

      // True value
      booleanVector.set(2, 1);

      File dataFile = new File(TMP, "testWriteNullableBooleans.avro");

      // Write an AVRO block using the producer classes
      try (FileOutputStream fos = new FileOutputStream(dataFile)) {
        BinaryEncoder encoder = new EncoderFactory().directBinaryEncoder(fos, null);
        CompositeAvroProducer producer = ArrowToAvroUtils.createCompositeProducer(vectors);
        for (int row = 0; row < rowCount; row++) {
          producer.produce(encoder);
        }
        encoder.flush();
      }

      // Set up reading the AVRO block as a GenericRecord
      Schema schema = ArrowToAvroUtils.createAvroSchema(root.getSchema().getFields());
      GenericDatumReader<GenericRecord> datumReader = new GenericDatumReader<>(schema);

      try (InputStream inputStream = new FileInputStream(dataFile)) {

        BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(inputStream, null);

        // Read and check values
        GenericRecord record = datumReader.read(null, decoder);
        assertNull(record.get("boolean"));

        for (int row = 1; row < rowCount; row++) {
          record = datumReader.read(record, decoder);
          assertEquals(booleanVector.get(row) == 1, record.get("boolean"));
        }
      }
    }
  }

  @Test
  public void testWriteIntegers() throws Exception {

    // Field definitions
    FieldType int8Field = new FieldType(false, new ArrowType.Int(8, true), null);
    FieldType int16Field = new FieldType(false, new ArrowType.Int(16, true), null);
    FieldType int32Field = new FieldType(false, new ArrowType.Int(32, true), null);
    FieldType int64Field = new FieldType(false, new ArrowType.Int(64, true), null);
    FieldType uint8Field = new FieldType(false, new ArrowType.Int(8, false), null);
    FieldType uint16Field = new FieldType(false, new ArrowType.Int(16, false), null);
    FieldType uint32Field = new FieldType(false, new ArrowType.Int(32, false), null);
    FieldType uint64Field = new FieldType(false, new ArrowType.Int(64, false), null);

    // Create empty vectors
    BufferAllocator allocator = new RootAllocator();
    TinyIntVector int8Vector = new TinyIntVector(new Field("int8", int8Field, null), allocator);
    SmallIntVector int16Vector =
        new SmallIntVector(new Field("int16", int16Field, null), allocator);
    IntVector int32Vector = new IntVector(new Field("int32", int32Field, null), allocator);
    BigIntVector int64Vector = new BigIntVector(new Field("int64", int64Field, null), allocator);
    UInt1Vector uint8Vector = new UInt1Vector(new Field("uint8", uint8Field, null), allocator);
    UInt2Vector uint16Vector = new UInt2Vector(new Field("uint16", uint16Field, null), allocator);
    UInt4Vector uint32Vector = new UInt4Vector(new Field("uint32", uint32Field, null), allocator);
    UInt8Vector uint64Vector = new UInt8Vector(new Field("uint64", uint64Field, null), allocator);

    // Set up VSR
    List<FieldVector> vectors =
        Arrays.asList(
            int8Vector,
            int16Vector,
            int32Vector,
            int64Vector,
            uint8Vector,
            uint16Vector,
            uint32Vector,
            uint64Vector);

    int rowCount = 12;

    try (VectorSchemaRoot root = new VectorSchemaRoot(vectors)) {

      root.setRowCount(rowCount);
      root.allocateNew();

      // Set test data
      for (int row = 0; row < 10; row++) {
        int8Vector.set(row, 11 * row * (row % 2 == 0 ? 1 : -1));
        int16Vector.set(row, 63 * row * (row % 2 == 0 ? 1 : -1));
        int32Vector.set(row, 513 * row * (row % 2 == 0 ? 1 : -1));
        int64Vector.set(row, 3791L * row * (row % 2 == 0 ? 1 : -1));
        uint8Vector.set(row, 11 * row);
        uint16Vector.set(row, 63 * row);
        uint32Vector.set(row, 513 * row);
        uint64Vector.set(row, 3791L * row);
      }

      // Min values
      int8Vector.set(10, Byte.MIN_VALUE);
      int16Vector.set(10, Short.MIN_VALUE);
      int32Vector.set(10, Integer.MIN_VALUE);
      int64Vector.set(10, Long.MIN_VALUE);
      uint8Vector.set(10, 0);
      uint16Vector.set(10, 0);
      uint32Vector.set(10, 0);
      uint64Vector.set(10, 0);

      // Max values
      int8Vector.set(11, Byte.MAX_VALUE);
      int16Vector.set(11, Short.MAX_VALUE);
      int32Vector.set(11, Integer.MAX_VALUE);
      int64Vector.set(11, Long.MAX_VALUE);
      uint8Vector.set(11, 0xff);
      uint16Vector.set(11, 0xffff);
      uint32Vector.set(11, 0xffffffff);
      uint64Vector.set(11, Long.MAX_VALUE); // Max that can be encoded

      File dataFile = new File(TMP, "testWriteIntegers.avro");

      // Write an AVRO block using the producer classes
      try (FileOutputStream fos = new FileOutputStream(dataFile)) {
        BinaryEncoder encoder = new EncoderFactory().directBinaryEncoder(fos, null);
        CompositeAvroProducer producer = ArrowToAvroUtils.createCompositeProducer(vectors);
        for (int row = 0; row < rowCount; row++) {
          producer.produce(encoder);
        }
        encoder.flush();
      }

      // Set up reading the AVRO block as a GenericRecord
      Schema schema = ArrowToAvroUtils.createAvroSchema(root.getSchema().getFields());
      GenericDatumReader<GenericRecord> datumReader = new GenericDatumReader<>(schema);

      try (InputStream inputStream = new FileInputStream(dataFile)) {

        BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(inputStream, null);
        GenericRecord record = null;

        // Read and check values
        for (int row = 0; row < rowCount; row++) {
          record = datumReader.read(record, decoder);
          assertEquals((int) int8Vector.get(row), record.get("int8"));
          assertEquals((int) int16Vector.get(row), record.get("int16"));
          assertEquals(int32Vector.get(row), record.get("int32"));
          assertEquals(int64Vector.get(row), record.get("int64"));
          assertEquals(Byte.toUnsignedInt(uint8Vector.get(row)), record.get("uint8"));
          assertEquals(Short.toUnsignedInt((short) uint16Vector.get(row)), record.get("uint16"));
          assertEquals(Integer.toUnsignedLong(uint32Vector.get(row)), record.get("uint32"));
          assertEquals(uint64Vector.get(row), record.get("uint64"));
        }
      }
    }
  }

  @Test
  public void testWriteNullableIntegers() throws Exception {

    // Field definitions
    FieldType int8Field = new FieldType(true, new ArrowType.Int(8, true), null);
    FieldType int16Field = new FieldType(true, new ArrowType.Int(16, true), null);
    FieldType int32Field = new FieldType(true, new ArrowType.Int(32, true), null);
    FieldType int64Field = new FieldType(true, new ArrowType.Int(64, true), null);
    FieldType uint8Field = new FieldType(true, new ArrowType.Int(8, false), null);
    FieldType uint16Field = new FieldType(true, new ArrowType.Int(16, false), null);
    FieldType uint32Field = new FieldType(true, new ArrowType.Int(32, false), null);
    FieldType uint64Field = new FieldType(true, new ArrowType.Int(64, false), null);

    // Create empty vectors
    BufferAllocator allocator = new RootAllocator();
    TinyIntVector int8Vector = new TinyIntVector(new Field("int8", int8Field, null), allocator);
    SmallIntVector int16Vector =
        new SmallIntVector(new Field("int16", int16Field, null), allocator);
    IntVector int32Vector = new IntVector(new Field("int32", int32Field, null), allocator);
    BigIntVector int64Vector = new BigIntVector(new Field("int64", int64Field, null), allocator);
    UInt1Vector uint8Vector = new UInt1Vector(new Field("uint8", uint8Field, null), allocator);
    UInt2Vector uint16Vector = new UInt2Vector(new Field("uint16", uint16Field, null), allocator);
    UInt4Vector uint32Vector = new UInt4Vector(new Field("uint32", uint32Field, null), allocator);
    UInt8Vector uint64Vector = new UInt8Vector(new Field("uint64", uint64Field, null), allocator);

    int rowCount = 3;

    // Set up VSR
    List<FieldVector> vectors =
        Arrays.asList(
            int8Vector,
            int16Vector,
            int32Vector,
            int64Vector,
            uint8Vector,
            uint16Vector,
            uint32Vector,
            uint64Vector);

    try (VectorSchemaRoot root = new VectorSchemaRoot(vectors)) {

      root.setRowCount(rowCount);
      root.allocateNew();

      // Null values
      int8Vector.setNull(0);
      int16Vector.setNull(0);
      int32Vector.setNull(0);
      int64Vector.setNull(0);
      uint8Vector.setNull(0);
      uint16Vector.setNull(0);
      uint32Vector.setNull(0);
      uint64Vector.setNull(0);

      // Zero values
      int8Vector.set(1, 0);
      int16Vector.set(1, 0);
      int32Vector.set(1, 0);
      int64Vector.set(1, 0);
      uint8Vector.set(1, 0);
      uint16Vector.set(1, 0);
      uint32Vector.set(1, 0);
      uint64Vector.set(1, 0);

      // Non-zero values
      int8Vector.set(2, Byte.MAX_VALUE);
      int16Vector.set(2, Short.MAX_VALUE);
      int32Vector.set(2, Integer.MAX_VALUE);
      int64Vector.set(2, Long.MAX_VALUE);
      uint8Vector.set(2, Byte.MAX_VALUE);
      uint16Vector.set(2, Short.MAX_VALUE);
      uint32Vector.set(2, Integer.MAX_VALUE);
      uint64Vector.set(2, Long.MAX_VALUE);

      File dataFile = new File(TMP, "testWriteNullableIntegers.avro");

      // Write an AVRO block using the producer classes
      try (FileOutputStream fos = new FileOutputStream(dataFile)) {
        BinaryEncoder encoder = new EncoderFactory().directBinaryEncoder(fos, null);
        CompositeAvroProducer producer = ArrowToAvroUtils.createCompositeProducer(vectors);
        for (int row = 0; row < rowCount; row++) {
          producer.produce(encoder);
        }
        encoder.flush();
      }

      // Set up reading the AVRO block as a GenericRecord
      Schema schema = ArrowToAvroUtils.createAvroSchema(root.getSchema().getFields());
      GenericDatumReader<GenericRecord> datumReader = new GenericDatumReader<>(schema);

      try (InputStream inputStream = new FileInputStream(dataFile)) {

        BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(inputStream, null);

        // Read and check values
        GenericRecord record = datumReader.read(null, decoder);
        assertNull(record.get("int8"));
        assertNull(record.get("int16"));
        assertNull(record.get("int32"));
        assertNull(record.get("int64"));
        assertNull(record.get("uint8"));
        assertNull(record.get("uint16"));
        assertNull(record.get("uint32"));
        assertNull(record.get("uint64"));

        for (int row = 1; row < rowCount; row++) {
          record = datumReader.read(record, decoder);
          assertEquals((int) int8Vector.get(row), record.get("int8"));
          assertEquals((int) int16Vector.get(row), record.get("int16"));
          assertEquals(int32Vector.get(row), record.get("int32"));
          assertEquals(int64Vector.get(row), record.get("int64"));
          assertEquals(Byte.toUnsignedInt(uint8Vector.get(row)), record.get("uint8"));
          assertEquals(Short.toUnsignedInt((short) uint16Vector.get(row)), record.get("uint16"));
          assertEquals(Integer.toUnsignedLong(uint32Vector.get(row)), record.get("uint32"));
          assertEquals(uint64Vector.get(row), record.get("uint64"));
        }
      }
    }
  }

  @Test
  public void testWriteFloatingPoints() throws Exception {

    // Field definitions
    FieldType float16Field =
        new FieldType(false, new ArrowType.FloatingPoint(FloatingPointPrecision.HALF), null);
    FieldType float32Field =
        new FieldType(false, new ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE), null);
    FieldType float64Field =
        new FieldType(false, new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE), null);

    // Create empty vectors
    BufferAllocator allocator = new RootAllocator();
    Float2Vector float16Vector =
        new Float2Vector(new Field("float16", float16Field, null), allocator);
    Float4Vector float32Vector =
        new Float4Vector(new Field("float32", float32Field, null), allocator);
    Float8Vector float64Vector =
        new Float8Vector(new Field("float64", float64Field, null), allocator);

    // Set up VSR
    List<FieldVector> vectors = Arrays.asList(float16Vector, float32Vector, float64Vector);
    int rowCount = 15;

    try (VectorSchemaRoot root = new VectorSchemaRoot(vectors)) {

      root.setRowCount(rowCount);
      root.allocateNew();

      // Set test data
      for (int row = 0; row < 10; row++) {
        float16Vector.set(row, Float16.toFloat16(3.6f * row * (row % 2 == 0 ? 1.0f : -1.0f)));
        float32Vector.set(row, 37.6f * row * (row % 2 == 0 ? 1 : -1));
        float64Vector.set(row, 37.6d * row * (row % 2 == 0 ? 1 : -1));
      }

      float16Vector.set(10, Float16.toFloat16(Float.MIN_VALUE));
      float32Vector.set(10, Float.MIN_VALUE);
      float64Vector.set(10, Double.MIN_VALUE);

      float16Vector.set(11, Float16.toFloat16(Float.MAX_VALUE));
      float32Vector.set(11, Float.MAX_VALUE);
      float64Vector.set(11, Double.MAX_VALUE);

      float16Vector.set(12, Float16.toFloat16(Float.NaN));
      float32Vector.set(12, Float.NaN);
      float64Vector.set(12, Double.NaN);

      float16Vector.set(13, Float16.toFloat16(Float.POSITIVE_INFINITY));
      float32Vector.set(13, Float.POSITIVE_INFINITY);
      float64Vector.set(13, Double.POSITIVE_INFINITY);

      float16Vector.set(14, Float16.toFloat16(Float.NEGATIVE_INFINITY));
      float32Vector.set(14, Float.NEGATIVE_INFINITY);
      float64Vector.set(14, Double.NEGATIVE_INFINITY);

      File dataFile = new File(TMP, "testWriteFloatingPoints.avro");

      // Write an AVRO block using the producer classes
      try (FileOutputStream fos = new FileOutputStream(dataFile)) {
        BinaryEncoder encoder = new EncoderFactory().directBinaryEncoder(fos, null);
        CompositeAvroProducer producer = ArrowToAvroUtils.createCompositeProducer(vectors);
        for (int row = 0; row < rowCount; row++) {
          producer.produce(encoder);
        }
        encoder.flush();
      }

      // Set up reading the AVRO block as a GenericRecord
      Schema schema = ArrowToAvroUtils.createAvroSchema(root.getSchema().getFields());
      GenericDatumReader<GenericRecord> datumReader = new GenericDatumReader<>(schema);

      try (InputStream inputStream = new FileInputStream(dataFile)) {
        BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(inputStream, null);
        GenericRecord record = null;

        // Read and check values
        for (int row = 0; row < rowCount; row++) {
          record = datumReader.read(record, decoder);
          assertEquals(float16Vector.getValueAsFloat(row), record.get("float16"));
          assertEquals(float32Vector.get(row), record.get("float32"));
          assertEquals(float64Vector.get(row), record.get("float64"));
        }
      }
    }
  }

  @Test
  public void testWriteNullableFloatingPoints() throws Exception {

    // Field definitions
    FieldType float16Field =
        new FieldType(true, new ArrowType.FloatingPoint(FloatingPointPrecision.HALF), null);
    FieldType float32Field =
        new FieldType(true, new ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE), null);
    FieldType float64Field =
        new FieldType(true, new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE), null);

    // Create empty vectors
    BufferAllocator allocator = new RootAllocator();
    Float2Vector float16Vector =
        new Float2Vector(new Field("float16", float16Field, null), allocator);
    Float4Vector float32Vector =
        new Float4Vector(new Field("float32", float32Field, null), allocator);
    Float8Vector float64Vector =
        new Float8Vector(new Field("float64", float64Field, null), allocator);

    int rowCount = 3;

    // Set up VSR
    List<FieldVector> vectors = Arrays.asList(float16Vector, float32Vector, float64Vector);

    try (VectorSchemaRoot root = new VectorSchemaRoot(vectors)) {

      root.setRowCount(rowCount);
      root.allocateNew();

      // Null values
      float16Vector.setNull(0);
      float32Vector.setNull(0);
      float64Vector.setNull(0);

      // Zero values
      float16Vector.setSafeWithPossibleTruncate(1, 0.0f);
      float32Vector.set(1, 0.0f);
      float64Vector.set(1, 0.0);

      // Non-zero values
      float16Vector.setSafeWithPossibleTruncate(2, 1.0f);
      float32Vector.set(2, 1.0f);
      float64Vector.set(2, 1.0);

      File dataFile = new File(TMP, "testWriteNullableFloatingPoints.avro");

      // Write an AVRO block using the producer classes
      try (FileOutputStream fos = new FileOutputStream(dataFile)) {
        BinaryEncoder encoder = new EncoderFactory().directBinaryEncoder(fos, null);
        CompositeAvroProducer producer = ArrowToAvroUtils.createCompositeProducer(vectors);
        for (int row = 0; row < rowCount; row++) {
          producer.produce(encoder);
        }
        encoder.flush();
      }

      // Set up reading the AVRO block as a GenericRecord
      Schema schema = ArrowToAvroUtils.createAvroSchema(root.getSchema().getFields());
      GenericDatumReader<GenericRecord> datumReader = new GenericDatumReader<>(schema);

      try (InputStream inputStream = new FileInputStream(dataFile)) {

        BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(inputStream, null);

        // Read and check values
        GenericRecord record = datumReader.read(null, decoder);
        assertNull(record.get("float16"));
        assertNull(record.get("float32"));
        assertNull(record.get("float64"));

        for (int row = 1; row < rowCount; row++) {
          record = datumReader.read(record, decoder);
          assertEquals(float16Vector.getValueAsFloat(row), record.get("float16"));
          assertEquals(float32Vector.get(row), record.get("float32"));
          assertEquals(float64Vector.get(row), record.get("float64"));
        }
      }
    }
  }

  @Test
  public void testWriteStrings() throws Exception {

    // Field definition
    FieldType stringField = new FieldType(false, new ArrowType.Utf8(), null);

    // Create empty vector
    BufferAllocator allocator = new RootAllocator();
    VarCharVector stringVector =
        new VarCharVector(new Field("string", stringField, null), allocator);

    // Set up VSR
    List<FieldVector> vectors = Arrays.asList(stringVector);
    int rowCount = 5;

    try (VectorSchemaRoot root = new VectorSchemaRoot(vectors)) {

      root.setRowCount(rowCount);
      root.allocateNew();

      // Set test data
      stringVector.setSafe(0, "Hello world!".getBytes());
      stringVector.setSafe(1, "<%**\r\n\t\\abc\0$$>".getBytes());
      stringVector.setSafe(2, "������������".getBytes());
      stringVector.setSafe(3, "���������� ��������������".getBytes());
      stringVector.setSafe(4, "(P ��� P ��� Q) ��� Q".getBytes());

      File dataFile = new File(TMP, "testWriteStrings.avro");

      // Write an AVRO block using the producer classes
      try (FileOutputStream fos = new FileOutputStream(dataFile)) {
        BinaryEncoder encoder = new EncoderFactory().directBinaryEncoder(fos, null);
        CompositeAvroProducer producer = ArrowToAvroUtils.createCompositeProducer(vectors);
        for (int row = 0; row < rowCount; row++) {
          producer.produce(encoder);
        }
        encoder.flush();
      }

      // Set up reading the AVRO block as a GenericRecord
      Schema schema = ArrowToAvroUtils.createAvroSchema(root.getSchema().getFields());
      GenericDatumReader<GenericRecord> datumReader = new GenericDatumReader<>(schema);

      try (InputStream inputStream = new FileInputStream(dataFile)) {

        BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(inputStream, null);
        GenericRecord record = null;

        // Read and check values
        for (int row = 0; row < rowCount; row++) {
          record = datumReader.read(record, decoder);
          assertEquals(stringVector.getObject(row).toString(), record.get("string").toString());
        }
      }
    }
  }

  @Test
  public void testWriteNullableStrings() throws Exception {

    // Field definition
    FieldType stringField = new FieldType(true, new ArrowType.Utf8(), null);

    // Create empty vector
    BufferAllocator allocator = new RootAllocator();
    VarCharVector stringVector =
        new VarCharVector(new Field("string", stringField, null), allocator);

    int rowCount = 3;

    // Set up VSR
    List<FieldVector> vectors = Arrays.asList(stringVector);

    try (VectorSchemaRoot root = new VectorSchemaRoot(vectors)) {

      root.setRowCount(rowCount);
      root.allocateNew();

      // Set test data
      stringVector.setNull(0);
      stringVector.setSafe(1, "".getBytes());
      stringVector.setSafe(2, "not empty".getBytes());

      File dataFile = new File(TMP, "testWriteNullableStrings.avro");

      // Write an AVRO block using the producer classes
      try (FileOutputStream fos = new FileOutputStream(dataFile)) {
        BinaryEncoder encoder = new EncoderFactory().directBinaryEncoder(fos, null);
        CompositeAvroProducer producer = ArrowToAvroUtils.createCompositeProducer(vectors);
        for (int row = 0; row < rowCount; row++) {
          producer.produce(encoder);
        }
        encoder.flush();
      }

      // Set up reading the AVRO block as a GenericRecord
      Schema schema = ArrowToAvroUtils.createAvroSchema(root.getSchema().getFields());
      GenericDatumReader<GenericRecord> datumReader = new GenericDatumReader<>(schema);

      try (InputStream inputStream = new FileInputStream(dataFile)) {

        BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(inputStream, null);

        // Read and check values
        GenericRecord record = datumReader.read(null, decoder);
        assertNull(record.get("string"));

        for (int row = 1; row < rowCount; row++) {
          record = datumReader.read(record, decoder);
          assertEquals(stringVector.getObject(row).toString(), record.get("string").toString());
        }
      }
    }
  }

  @Test
  public void testWriteBinary() throws Exception {

    // Field definition
    FieldType binaryField = new FieldType(false, new ArrowType.Binary(), null);
    FieldType fixedField = new FieldType(false, new ArrowType.FixedSizeBinary(5), null);

    // Create empty vector
    BufferAllocator allocator = new RootAllocator();
    VarBinaryVector binaryVector =
        new VarBinaryVector(new Field("binary", binaryField, null), allocator);
    FixedSizeBinaryVector fixedVector =
        new FixedSizeBinaryVector(new Field("fixed", fixedField, null), allocator);

    // Set up VSR
    List<FieldVector> vectors = Arrays.asList(binaryVector, fixedVector);
    int rowCount = 3;

    try (VectorSchemaRoot root = new VectorSchemaRoot(vectors)) {

      root.setRowCount(rowCount);
      root.allocateNew();

      // Set test data
      binaryVector.setSafe(0, new byte[] {1, 2, 3});
      binaryVector.setSafe(1, new byte[] {4, 5, 6, 7});
      binaryVector.setSafe(2, new byte[] {8, 9});

      fixedVector.setSafe(0, new byte[] {1, 2, 3, 4, 5});
      fixedVector.setSafe(1, new byte[] {4, 5, 6, 7, 8, 9});
      fixedVector.setSafe(2, new byte[] {8, 9, 10, 11, 12});

      File dataFile = new File(TMP, "testWriteBinary.avro");

      // Write an AVRO block using the producer classes
      try (FileOutputStream fos = new FileOutputStream(dataFile)) {
        BinaryEncoder encoder = new EncoderFactory().directBinaryEncoder(fos, null);
        CompositeAvroProducer producer = ArrowToAvroUtils.createCompositeProducer(vectors);
        for (int row = 0; row < rowCount; row++) {
          producer.produce(encoder);
        }
        encoder.flush();
      }

      // Set up reading the AVRO block as a GenericRecord
      Schema schema = ArrowToAvroUtils.createAvroSchema(root.getSchema().getFields());
      GenericDatumReader<GenericRecord> datumReader = new GenericDatumReader<>(schema);

      try (InputStream inputStream = new FileInputStream(dataFile)) {

        BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(inputStream, null);
        GenericRecord record = null;

        // Read and check values
        for (int row = 0; row < rowCount; row++) {
          record = datumReader.read(record, decoder);
          ByteBuffer buf = ((ByteBuffer) record.get("binary"));
          byte[] bytes = new byte[buf.remaining()];
          buf.get(bytes);
          byte[] fixedBytes = ((GenericData.Fixed) record.get("fixed")).bytes();
          assertArrayEquals(binaryVector.getObject(row), bytes);
          assertArrayEquals(fixedVector.getObject(row), fixedBytes);
        }
      }
    }
  }

  @Test
  public void testWriteNullableBinary() throws Exception {

    // Field definition
    FieldType binaryField = new FieldType(true, new ArrowType.Binary(), null);
    FieldType fixedField = new FieldType(true, new ArrowType.FixedSizeBinary(5), null);

    // Create empty vector
    BufferAllocator allocator = new RootAllocator();
    VarBinaryVector binaryVector =
        new VarBinaryVector(new Field("binary", binaryField, null), allocator);
    FixedSizeBinaryVector fixedVector =
        new FixedSizeBinaryVector(new Field("fixed", fixedField, null), allocator);

    int rowCount = 3;

    // Set up VSR
    List<FieldVector> vectors = Arrays.asList(binaryVector, fixedVector);

    try (VectorSchemaRoot root = new VectorSchemaRoot(vectors)) {

      root.setRowCount(rowCount);
      root.allocateNew();

      // Set test data
      binaryVector.setNull(0);
      binaryVector.setSafe(1, new byte[] {});
      binaryVector.setSafe(2, new byte[] {10, 11, 12});

      fixedVector.setNull(0);
      fixedVector.setSafe(1, new byte[] {0, 0, 0, 0, 0});
      fixedVector.setSafe(2, new byte[] {10, 11, 12, 13, 14});

      File dataFile = new File(TMP, "testWriteNullableBinary.avro");

      // Write an AVRO block using the producer classes
      try (FileOutputStream fos = new FileOutputStream(dataFile)) {
        BinaryEncoder encoder = new EncoderFactory().directBinaryEncoder(fos, null);
        CompositeAvroProducer producer = ArrowToAvroUtils.createCompositeProducer(vectors);
        for (int row = 0; row < rowCount; row++) {
          producer.produce(encoder);
        }
        encoder.flush();
      }

      // Set up reading the AVRO block as a GenericRecord
      Schema schema = ArrowToAvroUtils.createAvroSchema(root.getSchema().getFields());
      GenericDatumReader<GenericRecord> datumReader = new GenericDatumReader<>(schema);

      try (InputStream inputStream = new FileInputStream(dataFile)) {

        BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(inputStream, null);

        // Read and check values
        GenericRecord record = datumReader.read(null, decoder);
        assertNull(record.get("binary"));
        assertNull(record.get("fixed"));

        for (int row = 1; row < rowCount; row++) {
          record = datumReader.read(record, decoder);
          ByteBuffer buf = ((ByteBuffer) record.get("binary"));
          byte[] bytes = new byte[buf.remaining()];
          buf.get(bytes);
          byte[] fixedBytes = ((GenericData.Fixed) record.get("fixed")).bytes();
          assertArrayEquals(binaryVector.getObject(row), bytes);
          assertArrayEquals(fixedVector.getObject(row), fixedBytes);
        }
      }
    }
  }

  // Data production for logical types, nullable and non-nullable

  @Test
  public void testWriteDecimals() throws Exception {

    // Field definitions
    FieldType decimal128Field1 = new FieldType(false, new ArrowType.Decimal(38, 10, 128), null);
    FieldType decimal128Field2 = new FieldType(false, new ArrowType.Decimal(38, 5, 128), null);
    FieldType decimal256Field1 = new FieldType(false, new ArrowType.Decimal(76, 20, 256), null);
    FieldType decimal256Field2 = new FieldType(false, new ArrowType.Decimal(76, 10, 256), null);

    // Create empty vectors
    BufferAllocator allocator = new RootAllocator();
    DecimalVector decimal128Vector1 =
        new DecimalVector(new Field("decimal128_1", decimal128Field1, null), allocator);
    DecimalVector decimal128Vector2 =
        new DecimalVector(new Field("decimal128_2", decimal128Field2, null), allocator);
    Decimal256Vector decimal256Vector1 =
        new Decimal256Vector(new Field("decimal256_1", decimal256Field1, null), allocator);
    Decimal256Vector decimal256Vector2 =
        new Decimal256Vector(new Field("decimal256_2", decimal256Field2, null), allocator);

    // Set up VSR
    List<FieldVector> vectors =
        Arrays.asList(decimal128Vector1, decimal128Vector2, decimal256Vector1, decimal256Vector2);
    int rowCount = 3;

    try (VectorSchemaRoot root = new VectorSchemaRoot(vectors)) {

      root.setRowCount(rowCount);
      root.allocateNew();

      // Set test data
      decimal128Vector1.setSafe(
          0, new BigDecimal("12345.67890").setScale(10, RoundingMode.UNNECESSARY));
      decimal128Vector1.setSafe(
          1, new BigDecimal("-98765.43210").setScale(10, RoundingMode.UNNECESSARY));
      decimal128Vector1.setSafe(
          2, new BigDecimal("54321.09876").setScale(10, RoundingMode.UNNECESSARY));

      decimal128Vector2.setSafe(
          0, new BigDecimal("12345.67890").setScale(5, RoundingMode.UNNECESSARY));
      decimal128Vector2.setSafe(
          1, new BigDecimal("-98765.43210").setScale(5, RoundingMode.UNNECESSARY));
      decimal128Vector2.setSafe(
          2, new BigDecimal("54321.09876").setScale(5, RoundingMode.UNNECESSARY));

      decimal256Vector1.setSafe(
          0,
          new BigDecimal("12345678901234567890.12345678901234567890")
              .setScale(20, RoundingMode.UNNECESSARY));
      decimal256Vector1.setSafe(
          1,
          new BigDecimal("-98765432109876543210.98765432109876543210")
              .setScale(20, RoundingMode.UNNECESSARY));
      decimal256Vector1.setSafe(
          2,
          new BigDecimal("54321098765432109876.54321098765432109876")
              .setScale(20, RoundingMode.UNNECESSARY));

      decimal256Vector2.setSafe(
          0,
          new BigDecimal("12345678901234567890.1234567890").setScale(10, RoundingMode.UNNECESSARY));
      decimal256Vector2.setSafe(
          1,
          new BigDecimal("-98765432109876543210.9876543210")
              .setScale(10, RoundingMode.UNNECESSARY));
      decimal256Vector2.setSafe(
          2,
          new BigDecimal("54321098765432109876.5432109876").setScale(10, RoundingMode.UNNECESSARY));

      File dataFile = new File(TMP, "testWriteDecimals.avro");

      // Write an AVRO block using the producer classes
      try (FileOutputStream fos = new FileOutputStream(dataFile)) {
        BinaryEncoder encoder = new EncoderFactory().directBinaryEncoder(fos, null);
        CompositeAvroProducer producer = ArrowToAvroUtils.createCompositeProducer(vectors);
        for (int row = 0; row < rowCount; row++) {
          producer.produce(encoder);
        }
        encoder.flush();
      }

      // Set up reading the AVRO block as a GenericRecord
      Schema schema = ArrowToAvroUtils.createAvroSchema(root.getSchema().getFields());
      GenericDatumReader<GenericRecord> datumReader = new GenericDatumReader<>(schema);

      try (InputStream inputStream = new FileInputStream(dataFile)) {

        BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(inputStream, null);
        GenericRecord record = null;

        // Read and check values
        for (int row = 0; row < rowCount; row++) {
          record = datumReader.read(record, decoder);
          assertEquals(
              decimal128Vector1.getObject(row), decodeFixedDecimal(record, "decimal128_1"));
          assertEquals(
              decimal128Vector2.getObject(row), decodeFixedDecimal(record, "decimal128_2"));
          assertEquals(
              decimal256Vector1.getObject(row), decodeFixedDecimal(record, "decimal256_1"));
          assertEquals(
              decimal256Vector2.getObject(row), decodeFixedDecimal(record, "decimal256_2"));
        }
      }
    }
  }

  @Test
  public void testWriteNullableDecimals() throws Exception {

    // Field definitions
    FieldType decimal128Field1 = new FieldType(true, new ArrowType.Decimal(38, 10, 128), null);
    FieldType decimal128Field2 = new FieldType(true, new ArrowType.Decimal(38, 5, 128), null);
    FieldType decimal256Field1 = new FieldType(true, new ArrowType.Decimal(76, 20, 256), null);
    FieldType decimal256Field2 = new FieldType(true, new ArrowType.Decimal(76, 10, 256), null);

    // Create empty vectors
    BufferAllocator allocator = new RootAllocator();
    DecimalVector decimal128Vector1 =
        new DecimalVector(new Field("decimal128_1", decimal128Field1, null), allocator);
    DecimalVector decimal128Vector2 =
        new DecimalVector(new Field("decimal128_2", decimal128Field2, null), allocator);
    Decimal256Vector decimal256Vector1 =
        new Decimal256Vector(new Field("decimal256_1", decimal256Field1, null), allocator);
    Decimal256Vector decimal256Vector2 =
        new Decimal256Vector(new Field("decimal256_2", decimal256Field2, null), allocator);

    int rowCount = 3;

    // Set up VSR
    List<FieldVector> vectors =
        Arrays.asList(decimal128Vector1, decimal128Vector2, decimal256Vector1, decimal256Vector2);

    try (VectorSchemaRoot root = new VectorSchemaRoot(vectors)) {

      root.setRowCount(rowCount);
      root.allocateNew();

      // Set test data
      decimal128Vector1.setNull(0);
      decimal128Vector1.setSafe(1, BigDecimal.ZERO.setScale(10, RoundingMode.UNNECESSARY));
      decimal128Vector1.setSafe(
          2, new BigDecimal("12345.67890").setScale(10, RoundingMode.UNNECESSARY));

      decimal128Vector2.setNull(0);
      decimal128Vector2.setSafe(1, BigDecimal.ZERO.setScale(5, RoundingMode.UNNECESSARY));
      decimal128Vector2.setSafe(
          2, new BigDecimal("98765.43210").setScale(5, RoundingMode.UNNECESSARY));

      decimal256Vector1.setNull(0);
      decimal256Vector1.setSafe(1, BigDecimal.ZERO.setScale(20, RoundingMode.UNNECESSARY));
      decimal256Vector1.setSafe(
          2,
          new BigDecimal("12345678901234567890.12345678901234567890")
              .setScale(20, RoundingMode.UNNECESSARY));

      decimal256Vector2.setNull(0);
      decimal256Vector2.setSafe(1, BigDecimal.ZERO.setScale(10, RoundingMode.UNNECESSARY));
      decimal256Vector2.setSafe(
          2,
          new BigDecimal("98765432109876543210.9876543210").setScale(10, RoundingMode.UNNECESSARY));

      File dataFile = new File(TMP, "testWriteNullableDecimals.avro");

      // Write an AVRO block using the producer classes
      try (FileOutputStream fos = new FileOutputStream(dataFile)) {
        BinaryEncoder encoder = new EncoderFactory().directBinaryEncoder(fos, null);
        CompositeAvroProducer producer = ArrowToAvroUtils.createCompositeProducer(vectors);
        for (int row = 0; row < rowCount; row++) {
          producer.produce(encoder);
        }
        encoder.flush();
      }

      // Set up reading the AVRO block as a GenericRecord
      Schema schema = ArrowToAvroUtils.createAvroSchema(root.getSchema().getFields());
      GenericDatumReader<GenericRecord> datumReader = new GenericDatumReader<>(schema);

      try (InputStream inputStream = new FileInputStream(dataFile)) {

        BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(inputStream, null);

        // Read and check values
        GenericRecord record = datumReader.read(null, decoder);
        assertNull(record.get("decimal128_1"));
        assertNull(record.get("decimal128_2"));
        assertNull(record.get("decimal256_1"));
        assertNull(record.get("decimal256_2"));

        for (int row = 1; row < rowCount; row++) {
          record = datumReader.read(record, decoder);
          assertEquals(
              decimal128Vector1.getObject(row), decodeFixedDecimal(record, "decimal128_1"));
          assertEquals(
              decimal128Vector2.getObject(row), decodeFixedDecimal(record, "decimal128_2"));
          assertEquals(
              decimal256Vector1.getObject(row), decodeFixedDecimal(record, "decimal256_1"));
          assertEquals(
              decimal256Vector2.getObject(row), decodeFixedDecimal(record, "decimal256_2"));
        }
      }
    }
  }

  private static BigDecimal decodeFixedDecimal(GenericRecord record, String fieldName) {
    GenericData.Fixed fixed = (GenericData.Fixed) record.get(fieldName);
    var logicalType = LogicalTypes.fromSchema(fixed.getSchema());
    return new Conversions.DecimalConversion().fromFixed(fixed, fixed.getSchema(), logicalType);
  }

  @Test
  public void testWriteDates() throws Exception {

    // Field definitions
    FieldType dateDayField = new FieldType(false, new ArrowType.Date(DateUnit.DAY), null);
    FieldType dateMillisField =
        new FieldType(false, new ArrowType.Date(DateUnit.MILLISECOND), null);

    // Create empty vectors
    BufferAllocator allocator = new RootAllocator();
    DateDayVector dateDayVector =
        new DateDayVector(new Field("dateDay", dateDayField, null), allocator);
    DateMilliVector dateMillisVector =
        new DateMilliVector(new Field("dateMillis", dateMillisField, null), allocator);

    // Set up VSR
    List<FieldVector> vectors = Arrays.asList(dateDayVector, dateMillisVector);
    int rowCount = 3;

    try (VectorSchemaRoot root = new VectorSchemaRoot(vectors)) {

      root.setRowCount(rowCount);
      root.allocateNew();

      // Set test data
      dateDayVector.setSafe(0, (int) LocalDate.now().toEpochDay());
      dateDayVector.setSafe(1, (int) LocalDate.now().toEpochDay() + 1);
      dateDayVector.setSafe(2, (int) LocalDate.now().toEpochDay() + 2);

      dateMillisVector.setSafe(0, LocalDate.now().toEpochDay() * 86400000L);
      dateMillisVector.setSafe(1, (LocalDate.now().toEpochDay() + 1) * 86400000L);
      dateMillisVector.setSafe(2, (LocalDate.now().toEpochDay() + 2) * 86400000L);

      File dataFile = new File(TMP, "testWriteDates.avro");

      // Write an AVRO block using the producer classes
      try (FileOutputStream fos = new FileOutputStream(dataFile)) {
        BinaryEncoder encoder = new EncoderFactory().directBinaryEncoder(fos, null);
        CompositeAvroProducer producer = ArrowToAvroUtils.createCompositeProducer(vectors);
        for (int row = 0; row < rowCount; row++) {
          producer.produce(encoder);
        }
        encoder.flush();
      }

      // Set up reading the AVRO block as a GenericRecord
      Schema schema = ArrowToAvroUtils.createAvroSchema(root.getSchema().getFields());
      GenericDatumReader<GenericRecord> datumReader = new GenericDatumReader<>(schema);

      try (InputStream inputStream = new FileInputStream(dataFile)) {

        BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(inputStream, null);
        GenericRecord record = null;

        // Read and check values
        for (int row = 0; row < rowCount; row++) {
          record = datumReader.read(record, decoder);
          assertEquals(dateDayVector.get(row), record.get("dateDay"));
          assertEquals(
              dateMillisVector.get(row), ((long) (Integer) record.get("dateMillis")) * 86400000L);
        }
      }
    }
  }

  @Test
  public void testWriteNullableDates() throws Exception {

    // Field definitions
    FieldType dateDayField = new FieldType(true, new ArrowType.Date(DateUnit.DAY), null);
    FieldType dateMillisField = new FieldType(true, new ArrowType.Date(DateUnit.MILLISECOND), null);

    // Create empty vectors
    BufferAllocator allocator = new RootAllocator();
    DateDayVector dateDayVector =
        new DateDayVector(new Field("dateDay", dateDayField, null), allocator);
    DateMilliVector dateMillisVector =
        new DateMilliVector(new Field("dateMillis", dateMillisField, null), allocator);

    int rowCount = 3;

    // Set up VSR
    List<FieldVector> vectors = Arrays.asList(dateDayVector, dateMillisVector);

    try (VectorSchemaRoot root = new VectorSchemaRoot(vectors)) {

      root.setRowCount(rowCount);
      root.allocateNew();

      // Set test data
      dateDayVector.setNull(0);
      dateDayVector.setSafe(1, 0);
      dateDayVector.setSafe(2, (int) LocalDate.now().toEpochDay());

      dateMillisVector.setNull(0);
      dateMillisVector.setSafe(1, 0);
      dateMillisVector.setSafe(2, LocalDate.now().toEpochDay() * 86400000L);

      File dataFile = new File(TMP, "testWriteNullableDates.avro");

      // Write an AVRO block using the producer classes
      try (FileOutputStream fos = new FileOutputStream(dataFile)) {
        BinaryEncoder encoder = new EncoderFactory().directBinaryEncoder(fos, null);
        CompositeAvroProducer producer = ArrowToAvroUtils.createCompositeProducer(vectors);
        for (int row = 0; row < rowCount; row++) {
          producer.produce(encoder);
        }
        encoder.flush();
      }

      // Set up reading the AVRO block as a GenericRecord
      Schema schema = ArrowToAvroUtils.createAvroSchema(root.getSchema().getFields());
      GenericDatumReader<GenericRecord> datumReader = new GenericDatumReader<>(schema);

      try (InputStream inputStream = new FileInputStream(dataFile)) {

        BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(inputStream, null);

        // Read and check values
        GenericRecord record = datumReader.read(null, decoder);
        assertNull(record.get("dateDay"));
        assertNull(record.get("dateMillis"));

        for (int row = 1; row < rowCount; row++) {
          record = datumReader.read(record, decoder);
          assertEquals(dateDayVector.get(row), record.get("dateDay"));
          assertEquals(
              dateMillisVector.get(row), ((long) (Integer) record.get("dateMillis")) * 86400000L);
        }
      }
    }
  }

  @Test
  public void testWriteTimes() throws Exception {

    // Field definitions
    FieldType timeSecField = new FieldType(false, new ArrowType.Time(TimeUnit.SECOND, 32), null);
    FieldType timeMillisField =
        new FieldType(false, new ArrowType.Time(TimeUnit.MILLISECOND, 32), null);
    FieldType timeMicrosField =
        new FieldType(false, new ArrowType.Time(TimeUnit.MICROSECOND, 64), null);
    FieldType timeNanosField =
        new FieldType(false, new ArrowType.Time(TimeUnit.NANOSECOND, 64), null);

    // Create empty vectors
    BufferAllocator allocator = new RootAllocator();
    TimeSecVector timeSecVector =
        new TimeSecVector(new Field("timeSec", timeSecField, null), allocator);
    TimeMilliVector timeMillisVector =
        new TimeMilliVector(new Field("timeMillis", timeMillisField, null), allocator);
    TimeMicroVector timeMicrosVector =
        new TimeMicroVector(new Field("timeMicros", timeMicrosField, null), allocator);
    TimeNanoVector timeNanosVector =
        new TimeNanoVector(new Field("timeNanos", timeNanosField, null), allocator);

    // Set up VSR
    List<FieldVector> vectors =
        Arrays.asList(timeSecVector, timeMillisVector, timeMicrosVector, timeNanosVector);
    int rowCount = 3;

    try (VectorSchemaRoot root = new VectorSchemaRoot(vectors)) {

      root.setRowCount(rowCount);
      root.allocateNew();

      // Set test data
      timeSecVector.setSafe(0, ZonedDateTime.now().toLocalTime().toSecondOfDay());
      timeSecVector.setSafe(1, ZonedDateTime.now().toLocalTime().toSecondOfDay() - 1);
      timeSecVector.setSafe(2, ZonedDateTime.now().toLocalTime().toSecondOfDay() - 2);

      timeMillisVector.setSafe(
          0, (int) (ZonedDateTime.now().toLocalTime().toNanoOfDay() / 1000000));
      timeMillisVector.setSafe(
          1, (int) (ZonedDateTime.now().toLocalTime().toNanoOfDay() / 1000000) - 1000);
      timeMillisVector.setSafe(
          2, (int) (ZonedDateTime.now().toLocalTime().toNanoOfDay() / 1000000) - 2000);

      timeMicrosVector.setSafe(0, ZonedDateTime.now().toLocalTime().toNanoOfDay() / 1000);
      timeMicrosVector.setSafe(1, ZonedDateTime.now().toLocalTime().toNanoOfDay() / 1000 - 1000000);
      timeMicrosVector.setSafe(2, ZonedDateTime.now().toLocalTime().toNanoOfDay() / 1000 - 2000000);

      timeNanosVector.setSafe(0, ZonedDateTime.now().toLocalTime().toNanoOfDay());
      timeNanosVector.setSafe(1, ZonedDateTime.now().toLocalTime().toNanoOfDay() - 1000000000);
      timeNanosVector.setSafe(2, ZonedDateTime.now().toLocalTime().toNanoOfDay() - 2000000000);

      File dataFile = new File(TMP, "testWriteTimes.avro");

      // Write an AVRO block using the producer classes
      try (FileOutputStream fos = new FileOutputStream(dataFile)) {
        BinaryEncoder encoder = new EncoderFactory().directBinaryEncoder(fos, null);
        CompositeAvroProducer producer = ArrowToAvroUtils.createCompositeProducer(vectors);
        for (int row = 0; row < rowCount; row++) {
          producer.produce(encoder);
        }
        encoder.flush();
      }

      // Set up reading the AVRO block as a GenericRecord
      Schema schema = ArrowToAvroUtils.createAvroSchema(root.getSchema().getFields());
      GenericDatumReader<GenericRecord> datumReader = new GenericDatumReader<>(schema);

      try (InputStream inputStream = new FileInputStream(dataFile)) {

        BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(inputStream, null);
        GenericRecord record = null;

        // Read and check values
        for (int row = 0; row < rowCount; row++) {
          record = datumReader.read(record, decoder);
          assertEquals(timeSecVector.get(row), (int) (record.get("timeSec")) / 1000);
          assertEquals(timeMillisVector.get(row), record.get("timeMillis"));
          assertEquals(timeMicrosVector.get(row), record.get("timeMicros"));
          // Avro doesn't have time-nanos (mar 2025), so expect column to be saved as micros
          long nanosAsMicros = (timeNanosVector.get(row) / 1000);
          assertEquals(nanosAsMicros, (long) record.get("timeNanos"));
        }
      }
    }
  }

  @Test
  public void testWriteNullableTimes() throws Exception {

    // Field definitions
    FieldType timeSecField = new FieldType(true, new ArrowType.Time(TimeUnit.SECOND, 32), null);
    FieldType timeMillisField =
        new FieldType(true, new ArrowType.Time(TimeUnit.MILLISECOND, 32), null);
    FieldType timeMicrosField =
        new FieldType(true, new ArrowType.Time(TimeUnit.MICROSECOND, 64), null);
    FieldType timeNanosField =
        new FieldType(true, new ArrowType.Time(TimeUnit.NANOSECOND, 64), null);

    // Create empty vectors
    BufferAllocator allocator = new RootAllocator();
    TimeSecVector timeSecVector =
        new TimeSecVector(new Field("timeSec", timeSecField, null), allocator);
    TimeMilliVector timeMillisVector =
        new TimeMilliVector(new Field("timeMillis", timeMillisField, null), allocator);
    TimeMicroVector timeMicrosVector =
        new TimeMicroVector(new Field("timeMicros", timeMicrosField, null), allocator);
    TimeNanoVector timeNanosVector =
        new TimeNanoVector(new Field("timeNanos", timeNanosField, null), allocator);

    int rowCount = 3;

    // Set up VSR
    List<FieldVector> vectors =
        Arrays.asList(timeSecVector, timeMillisVector, timeMicrosVector, timeNanosVector);

    try (VectorSchemaRoot root = new VectorSchemaRoot(vectors)) {

      root.setRowCount(rowCount);
      root.allocateNew();

      // Set test data
      timeSecVector.setNull(0);
      timeSecVector.setSafe(1, 0);
      timeSecVector.setSafe(2, ZonedDateTime.now().toLocalTime().toSecondOfDay());

      timeMillisVector.setNull(0);
      timeMillisVector.setSafe(1, 0);
      timeMillisVector.setSafe(
          2, (int) (ZonedDateTime.now().toLocalTime().toNanoOfDay() / 1000000));

      timeMicrosVector.setNull(0);
      timeMicrosVector.setSafe(1, 0);
      timeMicrosVector.setSafe(2, ZonedDateTime.now().toLocalTime().toNanoOfDay() / 1000);

      timeNanosVector.setNull(0);
      timeNanosVector.setSafe(1, 0);
      timeNanosVector.setSafe(2, ZonedDateTime.now().toLocalTime().toNanoOfDay());

      File dataFile = new File(TMP, "testWriteNullableTimes.avro");

      // Write an AVRO block using the producer classes
      try (FileOutputStream fos = new FileOutputStream(dataFile)) {
        BinaryEncoder encoder = new EncoderFactory().directBinaryEncoder(fos, null);
        CompositeAvroProducer producer = ArrowToAvroUtils.createCompositeProducer(vectors);
        for (int row = 0; row < rowCount; row++) {
          producer.produce(encoder);
        }
        encoder.flush();
      }

      // Set up reading the AVRO block as a GenericRecord
      Schema schema = ArrowToAvroUtils.createAvroSchema(root.getSchema().getFields());
      GenericDatumReader<GenericRecord> datumReader = new GenericDatumReader<>(schema);

      try (InputStream inputStream = new FileInputStream(dataFile)) {

        BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(inputStream, null);

        // Read and check values
        GenericRecord record = datumReader.read(null, decoder);
        assertNull(record.get("timeSec"));
        assertNull(record.get("timeMillis"));
        assertNull(record.get("timeMicros"));
        assertNull(record.get("timeNanos"));

        for (int row = 1; row < rowCount; row++) {
          record = datumReader.read(record, decoder);
          assertEquals(timeSecVector.get(row), ((int) record.get("timeSec") / 1000));
          assertEquals(timeMillisVector.get(row), record.get("timeMillis"));
          assertEquals(timeMicrosVector.get(row), record.get("timeMicros"));
          // Avro doesn't have time-nanos (mar 2025), so expect column to be saved as micros
          long nanosAsMicros = (timeNanosVector.get(row) / 1000);
          assertEquals(nanosAsMicros, (long) record.get("timeNanos"));
        }
      }
    }
  }

  @Test
  public void testWriteZoneAwareTimestamps() throws Exception {

    // Field definitions
    FieldType timestampSecField =
        new FieldType(false, new ArrowType.Timestamp(TimeUnit.SECOND, "UTC"), null);
    FieldType timestampMillisField =
        new FieldType(false, new ArrowType.Timestamp(TimeUnit.MILLISECOND, "UTC"), null);
    FieldType timestampMicrosField =
        new FieldType(false, new ArrowType.Timestamp(TimeUnit.MICROSECOND, "UTC"), null);
    FieldType timestampNanosField =
        new FieldType(false, new ArrowType.Timestamp(TimeUnit.NANOSECOND, "UTC"), null);

    // Create empty vectors
    BufferAllocator allocator = new RootAllocator();
    TimeStampSecTZVector timestampSecVector =
        new TimeStampSecTZVector(new Field("timestampSec", timestampSecField, null), allocator);
    TimeStampMilliTZVector timestampMillisVector =
        new TimeStampMilliTZVector(
            new Field("timestampMillis", timestampMillisField, null), allocator);
    TimeStampMicroTZVector timestampMicrosVector =
        new TimeStampMicroTZVector(
            new Field("timestampMicros", timestampMicrosField, null), allocator);
    TimeStampNanoTZVector timestampNanosVector =
        new TimeStampNanoTZVector(
            new Field("timestampNanos", timestampNanosField, null), allocator);

    // Set up VSR
    List<FieldVector> vectors =
        Arrays.asList(
            timestampSecVector, timestampMillisVector, timestampMicrosVector, timestampNanosVector);
    int rowCount = 3;

    try (VectorSchemaRoot root = new VectorSchemaRoot(vectors)) {

      root.setRowCount(rowCount);
      root.allocateNew();

      // Set test data
      timestampSecVector.setSafe(0, (int) Instant.now().getEpochSecond());
      timestampSecVector.setSafe(1, (int) Instant.now().getEpochSecond() - 1);
      timestampSecVector.setSafe(2, (int) Instant.now().getEpochSecond() - 2);

      timestampMillisVector.setSafe(0, (int) Instant.now().toEpochMilli());
      timestampMillisVector.setSafe(1, (int) Instant.now().toEpochMilli() - 1000);
      timestampMillisVector.setSafe(2, (int) Instant.now().toEpochMilli() - 2000);

      timestampMicrosVector.setSafe(0, Instant.now().toEpochMilli() * 1000);
      timestampMicrosVector.setSafe(1, (Instant.now().toEpochMilli() - 1000) * 1000);
      timestampMicrosVector.setSafe(2, (Instant.now().toEpochMilli() - 2000) * 1000);

      timestampNanosVector.setSafe(0, Instant.now().toEpochMilli() * 1000000);
      timestampNanosVector.setSafe(1, (Instant.now().toEpochMilli() - 1000) * 1000000);
      timestampNanosVector.setSafe(2, (Instant.now().toEpochMilli() - 2000) * 1000000);

      File dataFile = new File(TMP, "testWriteZoneAwareTimestamps.avro");

      // Write an AVRO block using the producer classes
      try (FileOutputStream fos = new FileOutputStream(dataFile)) {
        BinaryEncoder encoder = new EncoderFactory().directBinaryEncoder(fos, null);
        CompositeAvroProducer producer = ArrowToAvroUtils.createCompositeProducer(vectors);
        for (int row = 0; row < rowCount; row++) {
          producer.produce(encoder);
        }
        encoder.flush();
      }

      // Set up reading the AVRO block as a GenericRecord
      Schema schema = ArrowToAvroUtils.createAvroSchema(root.getSchema().getFields());
      GenericDatumReader<GenericRecord> datumReader = new GenericDatumReader<>(schema);

      try (InputStream inputStream = new FileInputStream(dataFile)) {

        BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(inputStream, null);
        GenericRecord record = null;

        // Read and check values
        for (int row = 0; row < rowCount; row++) {
          record = datumReader.read(record, decoder);
          assertEquals(
              timestampSecVector.get(row), (int) ((long) record.get("timestampSec") / 1000));
          assertEquals(timestampMillisVector.get(row), (int) (long) record.get("timestampMillis"));
          assertEquals(timestampMicrosVector.get(row), record.get("timestampMicros"));
          assertEquals(timestampNanosVector.get(row), record.get("timestampNanos"));
        }
      }
    }
  }

  @Test
  public void testWriteNullableZoneAwareTimestamps() throws Exception {

    // Field definitions
    FieldType timestampSecField =
        new FieldType(true, new ArrowType.Timestamp(TimeUnit.SECOND, "UTC"), null);
    FieldType timestampMillisField =
        new FieldType(true, new ArrowType.Timestamp(TimeUnit.MILLISECOND, "UTC"), null);
    FieldType timestampMicrosField =
        new FieldType(true, new ArrowType.Timestamp(TimeUnit.MICROSECOND, "UTC"), null);
    FieldType timestampNanosField =
        new FieldType(true, new ArrowType.Timestamp(TimeUnit.NANOSECOND, "UTC"), null);

    // Create empty vectors
    BufferAllocator allocator = new RootAllocator();
    TimeStampSecTZVector timestampSecVector =
        new TimeStampSecTZVector(new Field("timestampSec", timestampSecField, null), allocator);
    TimeStampMilliTZVector timestampMillisVector =
        new TimeStampMilliTZVector(
            new Field("timestampMillis", timestampMillisField, null), allocator);
    TimeStampMicroTZVector timestampMicrosVector =
        new TimeStampMicroTZVector(
            new Field("timestampMicros", timestampMicrosField, null), allocator);
    TimeStampNanoTZVector timestampNanosVector =
        new TimeStampNanoTZVector(
            new Field("timestampNanos", timestampNanosField, null), allocator);

    int rowCount = 3;

    // Set up VSR
    List<FieldVector> vectors =
        Arrays.asList(
            timestampSecVector, timestampMillisVector, timestampMicrosVector, timestampNanosVector);

    try (VectorSchemaRoot root = new VectorSchemaRoot(vectors)) {

      root.setRowCount(rowCount);
      root.allocateNew();

      // Set test data
      timestampSecVector.setNull(0);
      timestampSecVector.setSafe(1, 0);
      timestampSecVector.setSafe(2, (int) Instant.now().getEpochSecond());

      timestampMillisVector.setNull(0);
      timestampMillisVector.setSafe(1, 0);
      timestampMillisVector.setSafe(2, (int) Instant.now().toEpochMilli());

      timestampMicrosVector.setNull(0);
      timestampMicrosVector.setSafe(1, 0);
      timestampMicrosVector.setSafe(2, Instant.now().toEpochMilli() * 1000);

      timestampNanosVector.setNull(0);
      timestampNanosVector.setSafe(1, 0);
      timestampNanosVector.setSafe(2, Instant.now().toEpochMilli() * 1000000);

      File dataFile = new File(TMP, "testWriteNullableZoneAwareTimestamps.avro");

      // Write an AVRO block using the producer classes
      try (FileOutputStream fos = new FileOutputStream(dataFile)) {
        BinaryEncoder encoder = new EncoderFactory().directBinaryEncoder(fos, null);
        CompositeAvroProducer producer = ArrowToAvroUtils.createCompositeProducer(vectors);
        for (int row = 0; row < rowCount; row++) {
          producer.produce(encoder);
        }
        encoder.flush();
      }

      // Set up reading the AVRO block as a GenericRecord
      Schema schema = ArrowToAvroUtils.createAvroSchema(root.getSchema().getFields());
      GenericDatumReader<GenericRecord> datumReader = new GenericDatumReader<>(schema);

      try (InputStream inputStream = new FileInputStream(dataFile)) {

        BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(inputStream, null);

        // Read and check values
        GenericRecord record = datumReader.read(null, decoder);
        assertNull(record.get("timestampSec"));
        assertNull(record.get("timestampMillis"));
        assertNull(record.get("timestampMicros"));
        assertNull(record.get("timestampNanos"));

        for (int row = 1; row < rowCount; row++) {
          record = datumReader.read(record, decoder);
          assertEquals(
              timestampSecVector.get(row), (int) ((long) record.get("timestampSec") / 1000));
          assertEquals(timestampMillisVector.get(row), (int) (long) record.get("timestampMillis"));
          assertEquals(timestampMicrosVector.get(row), record.get("timestampMicros"));
          assertEquals(timestampNanosVector.get(row), record.get("timestampNanos"));
        }
      }
    }
  }

  @Test
  public void testWriteLocalTimestamps() throws Exception {

    // Field definitions
    FieldType timestampSecField =
        new FieldType(false, new ArrowType.Timestamp(TimeUnit.SECOND, null), null);
    FieldType timestampMillisField =
        new FieldType(false, new ArrowType.Timestamp(TimeUnit.MILLISECOND, null), null);
    FieldType timestampMicrosField =
        new FieldType(false, new ArrowType.Timestamp(TimeUnit.MICROSECOND, null), null);
    FieldType timestampNanosField =
        new FieldType(false, new ArrowType.Timestamp(TimeUnit.NANOSECOND, null), null);

    // Create empty vectors
    BufferAllocator allocator = new RootAllocator();
    TimeStampSecVector timestampSecVector =
        new TimeStampSecVector(new Field("timestampSec", timestampSecField, null), allocator);
    TimeStampMilliVector timestampMillisVector =
        new TimeStampMilliVector(
            new Field("timestampMillis", timestampMillisField, null), allocator);
    TimeStampMicroVector timestampMicrosVector =
        new TimeStampMicroVector(
            new Field("timestampMicros", timestampMicrosField, null), allocator);
    TimeStampNanoVector timestampNanosVector =
        new TimeStampNanoVector(new Field("timestampNanos", timestampNanosField, null), allocator);

    // Set up VSR
    List<FieldVector> vectors =
        Arrays.asList(
            timestampSecVector, timestampMillisVector, timestampMicrosVector, timestampNanosVector);
    int rowCount = 3;

    try (VectorSchemaRoot root = new VectorSchemaRoot(vectors)) {

      root.setRowCount(rowCount);
      root.allocateNew();

      // Set test data
      timestampSecVector.setSafe(0, (int) Instant.now().getEpochSecond());
      timestampSecVector.setSafe(1, (int) Instant.now().getEpochSecond() - 1);
      timestampSecVector.setSafe(2, (int) Instant.now().getEpochSecond() - 2);

      timestampMillisVector.setSafe(0, (int) Instant.now().toEpochMilli());
      timestampMillisVector.setSafe(1, (int) Instant.now().toEpochMilli() - 1000);
      timestampMillisVector.setSafe(2, (int) Instant.now().toEpochMilli() - 2000);

      timestampMicrosVector.setSafe(0, Instant.now().toEpochMilli() * 1000);
      timestampMicrosVector.setSafe(1, (Instant.now().toEpochMilli() - 1000) * 1000);
      timestampMicrosVector.setSafe(2, (Instant.now().toEpochMilli() - 2000) * 1000);

      timestampNanosVector.setSafe(0, Instant.now().toEpochMilli() * 1000000);
      timestampNanosVector.setSafe(1, (Instant.now().toEpochMilli() - 1000) * 1000000);
      timestampNanosVector.setSafe(2, (Instant.now().toEpochMilli() - 2000) * 1000000);

      File dataFile = new File(TMP, "testWriteZoneAwareTimestamps.avro");

      // Write an AVRO block using the producer classes
      try (FileOutputStream fos = new FileOutputStream(dataFile)) {
        BinaryEncoder encoder = new EncoderFactory().directBinaryEncoder(fos, null);
        CompositeAvroProducer producer = ArrowToAvroUtils.createCompositeProducer(vectors);
        for (int row = 0; row < rowCount; row++) {
          producer.produce(encoder);
        }
        encoder.flush();
      }

      // Set up reading the AVRO block as a GenericRecord
      Schema schema = ArrowToAvroUtils.createAvroSchema(root.getSchema().getFields());
      GenericDatumReader<GenericRecord> datumReader = new GenericDatumReader<>(schema);

      try (InputStream inputStream = new FileInputStream(dataFile)) {

        BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(inputStream, null);
        GenericRecord record = null;

        // Read and check values
        for (int row = 0; row < rowCount; row++) {
          record = datumReader.read(record, decoder);
          assertEquals(
              timestampSecVector.get(row), (int) ((long) record.get("timestampSec") / 1000));
          assertEquals(timestampMillisVector.get(row), (int) (long) record.get("timestampMillis"));
          assertEquals(timestampMicrosVector.get(row), record.get("timestampMicros"));
          assertEquals(timestampNanosVector.get(row), record.get("timestampNanos"));
        }
      }
    }
  }

  @Test
  public void testWriteNullableLocalTimestamps() throws Exception {

    // Field definitions
    FieldType timestampSecField =
        new FieldType(true, new ArrowType.Timestamp(TimeUnit.SECOND, null), null);
    FieldType timestampMillisField =
        new FieldType(true, new ArrowType.Timestamp(TimeUnit.MILLISECOND, null), null);
    FieldType timestampMicrosField =
        new FieldType(true, new ArrowType.Timestamp(TimeUnit.MICROSECOND, null), null);
    FieldType timestampNanosField =
        new FieldType(true, new ArrowType.Timestamp(TimeUnit.NANOSECOND, null), null);

    // Create empty vectors
    BufferAllocator allocator = new RootAllocator();
    TimeStampSecVector timestampSecVector =
        new TimeStampSecVector(new Field("timestampSec", timestampSecField, null), allocator);
    TimeStampMilliVector timestampMillisVector =
        new TimeStampMilliVector(
            new Field("timestampMillis", timestampMillisField, null), allocator);
    TimeStampMicroVector timestampMicrosVector =
        new TimeStampMicroVector(
            new Field("timestampMicros", timestampMicrosField, null), allocator);
    TimeStampNanoVector timestampNanosVector =
        new TimeStampNanoVector(new Field("timestampNanos", timestampNanosField, null), allocator);

    int rowCount = 3;

    // Set up VSR
    List<FieldVector> vectors =
        Arrays.asList(
            timestampSecVector, timestampMillisVector, timestampMicrosVector, timestampNanosVector);

    try (VectorSchemaRoot root = new VectorSchemaRoot(vectors)) {

      root.setRowCount(rowCount);
      root.allocateNew();

      // Set test data
      timestampSecVector.setNull(0);
      timestampSecVector.setSafe(1, 0);
      timestampSecVector.setSafe(2, (int) Instant.now().getEpochSecond());

      timestampMillisVector.setNull(0);
      timestampMillisVector.setSafe(1, 0);
      timestampMillisVector.setSafe(2, (int) Instant.now().toEpochMilli());

      timestampMicrosVector.setNull(0);
      timestampMicrosVector.setSafe(1, 0);
      timestampMicrosVector.setSafe(2, Instant.now().toEpochMilli() * 1000);

      timestampNanosVector.setNull(0);
      timestampNanosVector.setSafe(1, 0);
      timestampNanosVector.setSafe(2, Instant.now().toEpochMilli() * 1000000);

      File dataFile = new File(TMP, "testWriteNullableZoneAwareTimestamps.avro");

      // Write an AVRO block using the producer classes
      try (FileOutputStream fos = new FileOutputStream(dataFile)) {
        BinaryEncoder encoder = new EncoderFactory().directBinaryEncoder(fos, null);
        CompositeAvroProducer producer = ArrowToAvroUtils.createCompositeProducer(vectors);
        for (int row = 0; row < rowCount; row++) {
          producer.produce(encoder);
        }
        encoder.flush();
      }

      // Set up reading the AVRO block as a GenericRecord
      Schema schema = ArrowToAvroUtils.createAvroSchema(root.getSchema().getFields());
      GenericDatumReader<GenericRecord> datumReader = new GenericDatumReader<>(schema);

      try (InputStream inputStream = new FileInputStream(dataFile)) {

        BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(inputStream, null);

        // Read and check values
        GenericRecord record = datumReader.read(null, decoder);
        assertNull(record.get("timestampSec"));
        assertNull(record.get("timestampMillis"));
        assertNull(record.get("timestampMicros"));
        assertNull(record.get("timestampNanos"));

        for (int row = 1; row < rowCount; row++) {
          record = datumReader.read(record, decoder);
          assertEquals(
              timestampSecVector.get(row), (int) ((long) record.get("timestampSec") / 1000));
          assertEquals(timestampMillisVector.get(row), (int) (long) record.get("timestampMillis"));
          assertEquals(timestampMicrosVector.get(row), record.get("timestampMicros"));
          assertEquals(timestampNanosVector.get(row), record.get("timestampNanos"));
        }
      }
    }
  }

  // Data production for containers of primitive and logical types, nullable and non-nullable

  @Test
  public void testWriteLists() throws Exception {

    // Field definitions
    FieldType intListField = new FieldType(false, new ArrowType.List(), null);
    FieldType stringListField = new FieldType(false, new ArrowType.List(), null);
    FieldType dateListField = new FieldType(false, new ArrowType.List(), null);

    Field intField = new Field("item", FieldType.notNullable(new ArrowType.Int(32, true)), null);
    Field stringField = new Field("item", FieldType.notNullable(new ArrowType.Utf8()), null);
    Field dateField =
        new Field("item", FieldType.notNullable(new ArrowType.Date(DateUnit.DAY)), null);

    // Create empty vectors
    BufferAllocator allocator = new RootAllocator();
    ListVector intListVector = new ListVector("intList", allocator, intListField, null);
    ListVector stringListVector = new ListVector("stringList", allocator, stringListField, null);
    ListVector dateListVector = new ListVector("dateList", allocator, dateListField, null);

    intListVector.initializeChildrenFromFields(Arrays.asList(intField));
    stringListVector.initializeChildrenFromFields(Arrays.asList(stringField));
    dateListVector.initializeChildrenFromFields(Arrays.asList(dateField));

    // Set up VSR
    List<FieldVector> vectors = Arrays.asList(intListVector, stringListVector, dateListVector);
    int rowCount = 3;

    try (VectorSchemaRoot root = new VectorSchemaRoot(vectors)) {

      root.setRowCount(rowCount);
      root.allocateNew();

      FieldWriter intListWriter = intListVector.getWriter();
      FieldWriter stringListWriter = stringListVector.getWriter();
      FieldWriter dateListWriter = dateListVector.getWriter();

      // Set test data for intList
      for (int i = 0; i < rowCount; i++) {
        intListWriter.startList();
        for (int j = 0; j < 5 - i; j++) {
          intListWriter.writeInt(j);
        }
        intListWriter.endList();
      }

      // Set test data for stringList
      for (int i = 0; i < rowCount; i++) {
        stringListWriter.startList();
        for (int j = 0; j < 5 - i; j++) {
          stringListWriter.writeVarChar("string" + j);
        }
        stringListWriter.endList();
      }

      // Set test data for dateList
      for (int i = 0; i < rowCount; i++) {
        dateListWriter.startList();
        for (int j = 0; j < 5 - i; j++) {
          dateListWriter.writeDateDay((int) LocalDate.now().plusDays(j).toEpochDay());
        }
        dateListWriter.endList();
      }

      // Update count for the vectors
      intListVector.setValueCount(rowCount);
      stringListVector.setValueCount(rowCount);
      dateListVector.setValueCount(rowCount);

      File dataFile = new File(TMP, "testWriteLists.avro");

      // Write an AVRO block using the producer classes
      try (FileOutputStream fos = new FileOutputStream(dataFile)) {
        BinaryEncoder encoder = new EncoderFactory().directBinaryEncoder(fos, null);
        CompositeAvroProducer producer = ArrowToAvroUtils.createCompositeProducer(vectors);
        for (int row = 0; row < rowCount; row++) {
          producer.produce(encoder);
        }
        encoder.flush();
      }

      // Set up reading the AVRO block as a GenericRecord
      Schema schema = ArrowToAvroUtils.createAvroSchema(root.getSchema().getFields());
      GenericDatumReader<GenericRecord> datumReader = new GenericDatumReader<>(schema);

      try (InputStream inputStream = new FileInputStream(dataFile)) {

        BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(inputStream, null);
        GenericRecord record = null;

        // Read and check values
        for (int row = 0; row < rowCount; row++) {
          record = datumReader.read(record, decoder);
          assertEquals(intListVector.getObject(row), record.get("intList"));
          assertEquals(dateListVector.getObject(row), record.get("dateList"));
          // Handle conversion from Arrow Text type
          List<?> vectorList = stringListVector.getObject(row);
          List<?> recordList = (List<?>) record.get("stringList");
          assertEquals(vectorList.size(), recordList.size());
          for (int i = 0; i < vectorList.size(); i++) {
            assertEquals(vectorList.get(i).toString(), recordList.get(i).toString());
          }
        }
      }
    }
  }

  @Test
  public void testWriteNullableLists() throws Exception {

    // Field definitions
    FieldType nullListType = new FieldType(true, new ArrowType.List(), null);
    FieldType nonNullListType = new FieldType(false, new ArrowType.List(), null);

    Field nullFieldType = new Field("item", FieldType.nullable(new ArrowType.Int(32, true)), null);
    Field nonNullFieldType =
        new Field("item", FieldType.notNullable(new ArrowType.Int(32, true)), null);

    // Create empty vectors
    BufferAllocator allocator = new RootAllocator();
    ListVector nullEntriesVector =
        new ListVector("nullEntriesVector", allocator, nonNullListType, null);
    ListVector nullListVector = new ListVector("nullListVector", allocator, nullListType, null);
    ListVector nullBothVector = new ListVector("nullBothVector", allocator, nullListType, null);

    nullEntriesVector.initializeChildrenFromFields(Arrays.asList(nullFieldType));
    nullListVector.initializeChildrenFromFields(Arrays.asList(nonNullFieldType));
    nullBothVector.initializeChildrenFromFields(Arrays.asList(nullFieldType));

    // Set up VSR
    List<FieldVector> vectors = Arrays.asList(nullEntriesVector, nullListVector, nullBothVector);
    int rowCount = 4;

    try (VectorSchemaRoot root = new VectorSchemaRoot(vectors)) {

      root.setRowCount(rowCount);
      root.allocateNew();

      // Set test data for nullEntriesVector
      FieldWriter nullEntriesWriter = nullEntriesVector.getWriter();
      nullEntriesWriter.startList();
      nullEntriesWriter.integer().writeNull();
      nullEntriesWriter.integer().writeNull();
      nullEntriesWriter.endList();
      nullEntriesWriter.startList();
      nullEntriesWriter.integer().writeInt(0);
      nullEntriesWriter.integer().writeInt(0);
      nullEntriesWriter.endList();
      nullEntriesWriter.startList();
      nullEntriesWriter.integer().writeInt(123);
      nullEntriesWriter.integer().writeInt(456);
      nullEntriesWriter.endList();
      nullEntriesWriter.startList();
      nullEntriesWriter.integer().writeInt(789);
      nullEntriesWriter.integer().writeInt(789);
      nullEntriesWriter.endList();

      // Set test data for nullListVector
      FieldWriter nullListWriter = nullListVector.getWriter();
      nullListWriter.writeNull();
      nullListWriter.setPosition(1); // writeNull() does not inc. idx() on list vector
      nullListWriter.startList();
      nullListWriter.integer().writeInt(0);
      nullListWriter.integer().writeInt(0);
      nullListWriter.endList();
      nullEntriesWriter.startList();
      nullEntriesWriter.integer().writeInt(123);
      nullEntriesWriter.integer().writeInt(456);
      nullEntriesWriter.endList();
      nullEntriesWriter.startList();
      nullEntriesWriter.integer().writeInt(789);
      nullEntriesWriter.integer().writeInt(789);
      nullEntriesWriter.endList();

      // Set test data for nullBothVector
      FieldWriter nullBothWriter = nullBothVector.getWriter();
      nullBothWriter.writeNull();
      nullBothWriter.setPosition(1);
      nullBothWriter.startList();
      nullBothWriter.integer().writeNull();
      nullBothWriter.integer().writeNull();
      nullBothWriter.endList();
      nullListWriter.startList();
      nullListWriter.integer().writeInt(0);
      nullListWriter.integer().writeInt(0);
      nullListWriter.endList();
      nullEntriesWriter.startList();
      nullEntriesWriter.integer().writeInt(123);
      nullEntriesWriter.integer().writeInt(456);
      nullEntriesWriter.endList();

      // Update count for the vectors
      nullListVector.setValueCount(4);
      nullEntriesVector.setValueCount(4);
      nullBothVector.setValueCount(4);

      File dataFile = new File(TMP, "testWriteNullableLists.avro");

      // Write an AVRO block using the producer classes
      try (FileOutputStream fos = new FileOutputStream(dataFile)) {
        BinaryEncoder encoder = new EncoderFactory().directBinaryEncoder(fos, null);
        CompositeAvroProducer producer = ArrowToAvroUtils.createCompositeProducer(vectors);
        for (int row = 0; row < rowCount; row++) {
          producer.produce(encoder);
        }
        encoder.flush();
      }

      // Set up reading the AVRO block as a GenericRecord
      Schema schema = ArrowToAvroUtils.createAvroSchema(root.getSchema().getFields());
      GenericDatumReader<GenericRecord> datumReader = new GenericDatumReader<>(schema);

      try (InputStream inputStream = new FileInputStream(dataFile)) {
        BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(inputStream, null);
        GenericRecord record = null;

        // Read and check values
        for (int row = 0; row < rowCount; row++) {
          record = datumReader.read(record, decoder);
          for (String list :
              Arrays.asList("nullEntriesVector", "nullListVector", "nullBothVector")) {
            ListVector vector = (ListVector) root.getVector(list);
            Object recordField = record.get(list);
            if (vector.isNull(row)) {
              assertNull(recordField);
            } else {
              assertEquals(vector.getObject(row), recordField);
            }
          }
        }
      }
    }
  }

  @Test
  public void testWriteFixedLists() throws Exception {

    // Field definitions
    FieldType intListField = new FieldType(false, new ArrowType.FixedSizeList(5), null);
    FieldType stringListField = new FieldType(false, new ArrowType.FixedSizeList(5), null);
    FieldType dateListField = new FieldType(false, new ArrowType.FixedSizeList(5), null);

    Field intField = new Field("item", FieldType.notNullable(new ArrowType.Int(32, true)), null);
    Field stringField = new Field("item", FieldType.notNullable(new ArrowType.Utf8()), null);
    Field dateField =
        new Field("item", FieldType.notNullable(new ArrowType.Date(DateUnit.DAY)), null);

    // Create empty vectors
    BufferAllocator allocator = new RootAllocator();
    FixedSizeListVector intListVector =
        new FixedSizeListVector("intList", allocator, intListField, null);
    FixedSizeListVector stringListVector =
        new FixedSizeListVector("stringList", allocator, stringListField, null);
    FixedSizeListVector dateListVector =
        new FixedSizeListVector("dateList", allocator, dateListField, null);

    intListVector.initializeChildrenFromFields(Arrays.asList(intField));
    stringListVector.initializeChildrenFromFields(Arrays.asList(stringField));
    dateListVector.initializeChildrenFromFields(Arrays.asList(dateField));

    // Set up VSR
    List<FieldVector> vectors = Arrays.asList(intListVector, stringListVector, dateListVector);
    int rowCount = 3;

    try (VectorSchemaRoot root = new VectorSchemaRoot(vectors)) {

      root.setRowCount(rowCount);
      root.allocateNew();

      FieldWriter intListWriter = intListVector.getWriter();
      FieldWriter stringListWriter = stringListVector.getWriter();
      FieldWriter dateListWriter = dateListVector.getWriter();

      // Set test data for intList
      for (int i = 0; i < rowCount; i++) {
        intListWriter.startList();
        for (int j = 0; j < 5; j++) {
          intListWriter.writeInt(j);
        }
        intListWriter.endList();
      }

      // Set test data for stringList
      for (int i = 0; i < rowCount; i++) {
        stringListWriter.startList();
        for (int j = 0; j < 5; j++) {
          stringListWriter.writeVarChar("string" + j);
        }
        stringListWriter.endList();
      }

      // Set test data for dateList
      for (int i = 0; i < rowCount; i++) {
        dateListWriter.startList();
        for (int j = 0; j < 5; j++) {
          dateListWriter.writeDateDay((int) LocalDate.now().plusDays(j).toEpochDay());
        }
        dateListWriter.endList();
      }
      File dataFile = new File(TMP, "testWriteFixedLists.avro");

      // Update count for the vectors
      intListVector.setValueCount(rowCount);
      stringListVector.setValueCount(rowCount);
      dateListVector.setValueCount(rowCount);

      // Write an AVRO block using the producer classes
      try (FileOutputStream fos = new FileOutputStream(dataFile)) {
        BinaryEncoder encoder = new EncoderFactory().directBinaryEncoder(fos, null);
        CompositeAvroProducer producer = ArrowToAvroUtils.createCompositeProducer(vectors);
        for (int row = 0; row < rowCount; row++) {
          producer.produce(encoder);
        }
        encoder.flush();
      }

      // Set up reading the AVRO block as a GenericRecord
      Schema schema = ArrowToAvroUtils.createAvroSchema(root.getSchema().getFields());
      GenericDatumReader<GenericRecord> datumReader = new GenericDatumReader<>(schema);

      try (InputStream inputStream = new FileInputStream(dataFile)) {

        BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(inputStream, null);
        GenericRecord record = null;

        // Read and check values
        for (int row = 0; row < rowCount; row++) {
          record = datumReader.read(record, decoder);
          assertEquals(intListVector.getObject(row), record.get("intList"));
          assertEquals(dateListVector.getObject(row), record.get("dateList"));
          // Handle conversion from Arrow Text type
          List<?> vectorList = stringListVector.getObject(row);
          List<?> recordList = (List<?>) record.get("stringList");
          assertEquals(vectorList.size(), recordList.size());
          for (int i = 0; i < vectorList.size(); i++) {
            assertEquals(vectorList.get(i).toString(), recordList.get(i).toString());
          }
        }
      }
    }
  }

  @Test
  public void testWriteNullableFixedLists() throws Exception {

    // Field definitions
    FieldType nullListType = new FieldType(true, new ArrowType.FixedSizeList(2), null);
    FieldType nonNullListType = new FieldType(false, new ArrowType.FixedSizeList(2), null);

    Field nullFieldType = new Field("item", FieldType.nullable(new ArrowType.Int(32, true)), null);
    Field nonNullFieldType =
        new Field("item", FieldType.notNullable(new ArrowType.Int(32, true)), null);

    // Create empty vectors
    BufferAllocator allocator = new RootAllocator();
    FixedSizeListVector nullEntriesVector =
        new FixedSizeListVector("nullEntriesVector", allocator, nonNullListType, null);
    FixedSizeListVector nullListVector =
        new FixedSizeListVector("nullListVector", allocator, nullListType, null);
    FixedSizeListVector nullBothVector =
        new FixedSizeListVector("nullBothVector", allocator, nullListType, null);

    nullEntriesVector.initializeChildrenFromFields(Arrays.asList(nullFieldType));
    nullListVector.initializeChildrenFromFields(Arrays.asList(nonNullFieldType));
    nullBothVector.initializeChildrenFromFields(Arrays.asList(nullFieldType));

    // Set up VSR
    List<FieldVector> vectors = Arrays.asList(nullEntriesVector, nullListVector, nullBothVector);
    int rowCount = 4;

    try (VectorSchemaRoot root = new VectorSchemaRoot(vectors)) {

      root.setRowCount(rowCount);
      root.allocateNew();

      // Set test data for nullEntriesVector
      FieldWriter nullEntriesWriter = nullEntriesVector.getWriter();
      nullEntriesWriter.startList();
      nullEntriesWriter.integer().writeNull();
      nullEntriesWriter.integer().writeNull();
      nullEntriesWriter.endList();
      nullEntriesWriter.startList();
      nullEntriesWriter.integer().writeInt(0);
      nullEntriesWriter.integer().writeInt(0);
      nullEntriesWriter.endList();
      nullEntriesWriter.startList();
      nullEntriesWriter.integer().writeInt(123);
      nullEntriesWriter.integer().writeInt(456);
      nullEntriesWriter.endList();
      nullEntriesWriter.startList();
      nullEntriesWriter.integer().writeInt(789);
      nullEntriesWriter.integer().writeInt(789);
      nullEntriesWriter.endList();

      // Set test data for nullListVector
      FieldWriter nullListWriter = nullListVector.getWriter();
      nullListWriter.writeNull();
      nullListWriter.setPosition(1); // writeNull() does not inc. idx() on list vector
      nullListWriter.startList();
      nullListWriter.integer().writeInt(123);
      nullListWriter.integer().writeInt(456);
      nullListWriter.endList();
      nullEntriesWriter.startList();
      nullEntriesWriter.integer().writeInt(789);
      nullEntriesWriter.integer().writeInt(456);
      nullEntriesWriter.endList();
      nullEntriesWriter.startList();
      nullEntriesWriter.integer().writeInt(12345);
      nullEntriesWriter.integer().writeInt(67891);
      nullEntriesWriter.endList();

      // Set test data for nullBothVector
      FieldWriter nullBothWriter = nullBothVector.getWriter();
      nullBothWriter.writeNull();
      nullBothWriter.setPosition(1);
      nullBothWriter.startList();
      nullListWriter.integer().writeNull();
      nullListWriter.integer().writeNull();
      nullBothWriter.endList();
      nullListWriter.startList();
      nullListWriter.integer().writeInt(123);
      nullListWriter.integer().writeInt(456);
      nullListWriter.endList();
      nullEntriesWriter.startList();
      nullEntriesWriter.integer().writeInt(789);
      nullEntriesWriter.integer().writeInt(456);
      nullEntriesWriter.endList();

      // Update count for the vectors
      nullListVector.setValueCount(4);
      nullEntriesVector.setValueCount(4);
      nullBothVector.setValueCount(4);

      File dataFile = new File(TMP, "testWriteNullableFixedLists.avro");

      // Write an AVRO block using the producer classes
      try (FileOutputStream fos = new FileOutputStream(dataFile)) {
        BinaryEncoder encoder = new EncoderFactory().directBinaryEncoder(fos, null);
        CompositeAvroProducer producer = ArrowToAvroUtils.createCompositeProducer(vectors);
        for (int row = 0; row < rowCount; row++) {
          producer.produce(encoder);
        }
        encoder.flush();
      }

      // Set up reading the AVRO block as a GenericRecord
      Schema schema = ArrowToAvroUtils.createAvroSchema(root.getSchema().getFields());
      GenericDatumReader<GenericRecord> datumReader = new GenericDatumReader<>(schema);

      try (InputStream inputStream = new FileInputStream(dataFile)) {
        BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(inputStream, null);
        GenericRecord record = null;

        // Read and check values
        for (int row = 0; row < rowCount; row++) {
          record = datumReader.read(record, decoder);
          for (String list :
              Arrays.asList("nullEntriesVector", "nullListVector", "nullBothVector")) {
            FixedSizeListVector vector = (FixedSizeListVector) root.getVector(list);
            Object recordField = record.get(list);
            if (vector.isNull(row)) {
              assertNull(recordField);
            } else {
              assertEquals(vector.getObject(row), recordField);
            }
          }
        }
      }
    }
  }

  @Test
  public void testWriteMap() throws Exception {

    // Field definitions
    FieldType intMapField = new FieldType(false, new ArrowType.Map(false), null);
    FieldType stringMapField = new FieldType(false, new ArrowType.Map(false), null);
    FieldType dateMapField = new FieldType(false, new ArrowType.Map(false), null);

    Field keyField = new Field("key", FieldType.notNullable(new ArrowType.Utf8()), null);
    Field intField = new Field("value", FieldType.notNullable(new ArrowType.Int(32, true)), null);
    Field stringField = new Field("value", FieldType.notNullable(new ArrowType.Utf8()), null);
    Field dateField =
        new Field("value", FieldType.notNullable(new ArrowType.Date(DateUnit.DAY)), null);

    Field intEntryField =
        new Field(
            "entries",
            FieldType.notNullable(new ArrowType.Struct()),
            Arrays.asList(keyField, intField));
    Field stringEntryField =
        new Field(
            "entries",
            FieldType.notNullable(new ArrowType.Struct()),
            Arrays.asList(keyField, stringField));
    Field dateEntryField =
        new Field(
            "entries",
            FieldType.notNullable(new ArrowType.Struct()),
            Arrays.asList(keyField, dateField));

    // Create empty vectors
    BufferAllocator allocator = new RootAllocator();
    MapVector intMapVector = new MapVector("intMap", allocator, intMapField, null);
    MapVector stringMapVector = new MapVector("stringMap", allocator, stringMapField, null);
    MapVector dateMapVector = new MapVector("dateMap", allocator, dateMapField, null);

    intMapVector.initializeChildrenFromFields(Arrays.asList(intEntryField));
    stringMapVector.initializeChildrenFromFields(Arrays.asList(stringEntryField));
    dateMapVector.initializeChildrenFromFields(Arrays.asList(dateEntryField));

    // Set up VSR
    List<FieldVector> vectors = Arrays.asList(intMapVector, stringMapVector, dateMapVector);
    int rowCount = 3;

    try (VectorSchemaRoot root = new VectorSchemaRoot(vectors)) {

      root.setRowCount(rowCount);
      root.allocateNew();

      // Total number of entries that will be writen to each vector
      int entryCount = 5 + 4 + 3;

      // Set test data for intList
      BaseWriter.MapWriter writer = intMapVector.getWriter();
      for (int i = 0; i < rowCount; i++) {
        writer.startMap();
        for (int j = 0; j < 5 - i; j++) {
          writer.startEntry();
          writer.key().varChar().writeVarChar("key" + j);
          writer.value().integer().writeInt(j);
          writer.endEntry();
        }
        writer.endMap();
      }

      // Update count for data vector (map writer does not do this)
      intMapVector.getDataVector().setValueCount(entryCount);

      // Set test data for stringList
      BaseWriter.MapWriter stringWriter = stringMapVector.getWriter();
      for (int i = 0; i < rowCount; i++) {
        stringWriter.startMap();
        for (int j = 0; j < 5 - i; j++) {
          stringWriter.startEntry();
          stringWriter.key().varChar().writeVarChar("key" + j);
          stringWriter.value().varChar().writeVarChar("string" + j);
          stringWriter.endEntry();
        }
        stringWriter.endMap();
      }

      // Update count for the vectors
      intMapVector.setValueCount(rowCount);
      stringMapVector.setValueCount(rowCount);
      dateMapVector.setValueCount(rowCount);

      // Update count for data vector (map writer does not do this)
      stringMapVector.getDataVector().setValueCount(entryCount);

      // Set test data for dateList
      BaseWriter.MapWriter dateWriter = dateMapVector.getWriter();
      for (int i = 0; i < rowCount; i++) {
        dateWriter.startMap();
        for (int j = 0; j < 5 - i; j++) {
          dateWriter.startEntry();
          dateWriter.key().varChar().writeVarChar("key" + j);
          dateWriter.value().dateDay().writeDateDay((int) LocalDate.now().plusDays(j).toEpochDay());
          dateWriter.endEntry();
        }
        dateWriter.endMap();
      }

      // Update count for data vector (map writer does not do this)
      dateMapVector.getDataVector().setValueCount(entryCount);

      File dataFile = new File(TMP, "testWriteMap.avro");

      // Write an AVRO block using the producer classes
      try (FileOutputStream fos = new FileOutputStream(dataFile)) {
        BinaryEncoder encoder = new EncoderFactory().directBinaryEncoder(fos, null);
        CompositeAvroProducer producer = ArrowToAvroUtils.createCompositeProducer(vectors);
        for (int row = 0; row < rowCount; row++) {
          producer.produce(encoder);
        }
        encoder.flush();
      }

      // Set up reading the AVRO block as a GenericRecord
      Schema schema = ArrowToAvroUtils.createAvroSchema(root.getSchema().getFields());
      GenericDatumReader<GenericRecord> datumReader = new GenericDatumReader<>(schema);

      try (InputStream inputStream = new FileInputStream(dataFile)) {

        BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(inputStream, null);
        GenericRecord record = null;

        // Read and check values
        for (int row = 0; row < rowCount; row++) {
          record = datumReader.read(record, decoder);
          Map<String, Object> intMap = convertMap(intMapVector.getObject(row));
          Map<String, Object> stringMap = convertMap(stringMapVector.getObject(row));
          Map<String, Object> dateMap = convertMap(dateMapVector.getObject(row));
          compareMaps(intMap, (Map) record.get("intMap"));
          compareMaps(stringMap, (Map) record.get("stringMap"));
          compareMaps(dateMap, (Map) record.get("dateMap"));
        }
      }
    }
  }

  @Test
  public void testWriteNullableMap() throws Exception {

    // Field definitions
    FieldType nullMapType = new FieldType(true, new ArrowType.Map(false), null);
    FieldType nonNullMapType = new FieldType(false, new ArrowType.Map(false), null);

    Field keyField = new Field("key", FieldType.notNullable(new ArrowType.Utf8()), null);
    Field nullFieldType = new Field("value", FieldType.nullable(new ArrowType.Int(32, true)), null);
    Field nonNullFieldType =
        new Field("value", FieldType.notNullable(new ArrowType.Int(32, true)), null);
    Field nullEntryField =
        new Field(
            "entries",
            FieldType.notNullable(new ArrowType.Struct()),
            Arrays.asList(keyField, nullFieldType));
    Field nonNullEntryField =
        new Field(
            "entries",
            FieldType.notNullable(new ArrowType.Struct()),
            Arrays.asList(keyField, nonNullFieldType));

    // Create empty vectors
    BufferAllocator allocator = new RootAllocator();
    MapVector nullEntriesVector =
        new MapVector("nullEntriesVector", allocator, nonNullMapType, null);
    MapVector nullMapVector = new MapVector("nullMapVector", allocator, nullMapType, null);
    MapVector nullBothVector = new MapVector("nullBothVector", allocator, nullMapType, null);

    nullEntriesVector.initializeChildrenFromFields(Arrays.asList(nullEntryField));
    nullMapVector.initializeChildrenFromFields(Arrays.asList(nonNullEntryField));
    nullBothVector.initializeChildrenFromFields(Arrays.asList(nullEntryField));

    // Set up VSR
    List<FieldVector> vectors = Arrays.asList(nullEntriesVector, nullMapVector, nullBothVector);
    int rowCount = 3;

    try (VectorSchemaRoot root = new VectorSchemaRoot(vectors)) {

      root.setRowCount(rowCount);
      root.allocateNew();

      // Set test data for intList
      BaseWriter.MapWriter writer = nullEntriesVector.getWriter();
      writer.startMap();
      writer.startEntry();
      writer.key().varChar().writeVarChar("key0");
      writer.value().integer().writeNull();
      writer.endEntry();
      writer.startEntry();
      writer.key().varChar().writeVarChar("key1");
      writer.value().integer().writeNull();
      writer.endEntry();
      writer.endMap();
      writer.startMap();
      writer.startEntry();
      writer.key().varChar().writeVarChar("key2");
      writer.value().integer().writeInt(0);
      writer.endEntry();
      writer.startEntry();
      writer.key().varChar().writeVarChar("key3");
      writer.value().integer().writeInt(0);
      writer.endEntry();
      writer.endMap();
      writer.startMap();
      writer.startEntry();
      writer.key().varChar().writeVarChar("key4");
      writer.value().integer().writeInt(123);
      writer.endEntry();
      writer.startEntry();
      writer.key().varChar().writeVarChar("key5");
      writer.value().integer().writeInt(456);
      writer.endEntry();
      writer.endMap();

      // Set test data for stringList
      BaseWriter.MapWriter nullMapWriter = nullMapVector.getWriter();
      nullMapWriter.writeNull();
      nullMapWriter.setPosition(1); // writeNull() does not inc. idx() on map (list) vector
      nullMapWriter.startMap();
      nullMapWriter.startEntry();
      nullMapWriter.key().varChar().writeVarChar("key2");
      nullMapWriter.value().integer().writeInt(0);
      nullMapWriter.endEntry();
      writer.startMap();
      writer.startEntry();
      writer.key().varChar().writeVarChar("key3");
      writer.value().integer().writeInt(0);
      writer.endEntry();
      nullMapWriter.endMap();
      nullMapWriter.startMap();
      writer.startEntry();
      writer.key().varChar().writeVarChar("key4");
      writer.value().integer().writeInt(123);
      writer.endEntry();
      writer.startEntry();
      writer.key().varChar().writeVarChar("key5");
      writer.value().integer().writeInt(456);
      writer.endEntry();
      nullMapWriter.endMap();

      // Set test data for dateList
      BaseWriter.MapWriter nullBothWriter = nullBothVector.getWriter();
      nullBothWriter.writeNull();
      nullBothWriter.setPosition(1);
      nullBothWriter.startMap();
      nullBothWriter.startEntry();
      nullBothWriter.key().varChar().writeVarChar("key2");
      nullBothWriter.value().integer().writeNull();
      nullBothWriter.endEntry();
      nullBothWriter.startEntry();
      nullBothWriter.key().varChar().writeVarChar("key3");
      nullBothWriter.value().integer().writeNull();
      nullBothWriter.endEntry();
      nullBothWriter.endMap();
      nullBothWriter.startMap();
      writer.startEntry();
      writer.key().varChar().writeVarChar("key4");
      writer.value().integer().writeInt(123);
      writer.endEntry();
      writer.startEntry();
      writer.key().varChar().writeVarChar("key5");
      writer.value().integer().writeInt(456);
      writer.endEntry();
      nullBothWriter.endMap();

      // Update count for the vectors
      nullEntriesVector.setValueCount(3);
      nullMapVector.setValueCount(3);
      nullBothVector.setValueCount(3);

      File dataFile = new File(TMP, "testWriteNullableMap.avro");

      // Write an AVRO block using the producer classes
      try (FileOutputStream fos = new FileOutputStream(dataFile)) {
        BinaryEncoder encoder = new EncoderFactory().directBinaryEncoder(fos, null);
        CompositeAvroProducer producer = ArrowToAvroUtils.createCompositeProducer(vectors);
        for (int row = 0; row < rowCount; row++) {
          producer.produce(encoder);
        }
        encoder.flush();
      }

      // Set up reading the AVRO block as a GenericRecord
      Schema schema = ArrowToAvroUtils.createAvroSchema(root.getSchema().getFields());
      GenericDatumReader<GenericRecord> datumReader = new GenericDatumReader<>(schema);

      try (InputStream inputStream = new FileInputStream(dataFile)) {

        BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(inputStream, null);
        GenericRecord record = null;

        // Read and check values
        for (int row = 0; row < rowCount; row++) {
          record = datumReader.read(record, decoder);
          Map<String, Object> intMap = convertMap(nullEntriesVector.getObject(row));
          Map<String, Object> stringMap = convertMap(nullMapVector.getObject(row));
          Map<String, Object> dateMap = convertMap(nullBothVector.getObject(row));
          compareMaps(intMap, (Map) record.get("nullEntriesVector"));
          compareMaps(stringMap, (Map) record.get("nullMapVector"));
          compareMaps(dateMap, (Map) record.get("nullBothVector"));
        }
      }
    }
  }

  private Map<String, Object> convertMap(List<?> entryList) {

    if (entryList == null) {
      return null;
    }

    Map<String, Object> map = new HashMap<>();
    JsonStringArrayList<?> structList = (JsonStringArrayList<?>) entryList;
    for (Object entry : structList) {
      JsonStringHashMap<String, ?> structEntry = (JsonStringHashMap<String, ?>) entry;
      String key = structEntry.get(MapVector.KEY_NAME).toString();
      Object value = structEntry.get(MapVector.VALUE_NAME);
      map.put(key, value);
    }
    return map;
  }

  private void compareMaps(Map<String, ?> expected, Map<?, ?> actual) {
    if (expected == null) {
      assertNull(actual);
    } else {
      assertEquals(expected.size(), actual.size());
      for (Object key : actual.keySet()) {
        assertTrue(expected.containsKey(key.toString()));
        Object actualValue = actual.get(key);
        if (actualValue instanceof Utf8) {
          assertEquals(expected.get(key.toString()).toString(), actualValue.toString());
        } else {
          assertEquals(expected.get(key.toString()), actual.get(key));
        }
      }
    }
  }

  @Test
  public void testWriteStruct() throws Exception {

    // Field definitions
    FieldType structFieldType = new FieldType(false, new ArrowType.Struct(), null);
    Field intField =
        new Field("intField", FieldType.notNullable(new ArrowType.Int(32, true)), null);
    Field stringField = new Field("stringField", FieldType.notNullable(new ArrowType.Utf8()), null);
    Field dateField =
        new Field("dateField", FieldType.notNullable(new ArrowType.Date(DateUnit.DAY)), null);

    // Create empty vector
    BufferAllocator allocator = new RootAllocator();
    StructVector structVector = new StructVector("struct", allocator, structFieldType, null);
    structVector.initializeChildrenFromFields(Arrays.asList(intField, stringField, dateField));

    // Set up VSR
    List<FieldVector> vectors = Arrays.asList(structVector);
    int rowCount = 3;

    try (VectorSchemaRoot root = new VectorSchemaRoot(vectors)) {

      root.setRowCount(rowCount);
      root.allocateNew();

      // Set test data
      BaseWriter.StructWriter structWriter = structVector.getWriter();

      for (int i = 0; i < rowCount; i++) {
        structWriter.start();
        structWriter.integer("intField").writeInt(i);
        structWriter.varChar("stringField").writeVarChar("string" + i);
        structWriter.dateDay("dateField").writeDateDay((int) LocalDate.now().toEpochDay() + i);
        structWriter.end();
      }

      File dataFile = new File(TMP, "testWriteStruct.avro");

      // Write an AVRO block using the producer classes
      try (FileOutputStream fos = new FileOutputStream(dataFile)) {
        BinaryEncoder encoder = new EncoderFactory().directBinaryEncoder(fos, null);
        CompositeAvroProducer producer = ArrowToAvroUtils.createCompositeProducer(vectors);
        for (int row = 0; row < rowCount; row++) {
          producer.produce(encoder);
        }
        encoder.flush();
      }

      // Update count for the vector
      structVector.setValueCount(rowCount);

      // Set up reading the AVRO block as a GenericRecord
      Schema schema = ArrowToAvroUtils.createAvroSchema(root.getSchema().getFields());
      GenericDatumReader<GenericRecord> datumReader = new GenericDatumReader<>(schema);

      try (InputStream inputStream = new FileInputStream(dataFile)) {

        BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(inputStream, null);
        GenericRecord record = null;

        // Read and check values
        for (int row = 0; row < rowCount; row++) {
          record = datumReader.read(record, decoder);
          assertNotNull(record.get("struct"));
          GenericRecord structRecord = (GenericRecord) record.get("struct");
          assertEquals(row, structRecord.get("intField"));
          assertEquals("string" + row, structRecord.get("stringField").toString());
          assertEquals((int) LocalDate.now().toEpochDay() + row, structRecord.get("dateField"));
        }
      }
    }
  }

  @Test
  public void testWriteNullableStructs() throws Exception {

    // Field definitions
    FieldType structFieldType = new FieldType(false, new ArrowType.Struct(), null);
    FieldType nullableStructFieldType = new FieldType(true, new ArrowType.Struct(), null);
    Field intField =
        new Field("intField", FieldType.notNullable(new ArrowType.Int(32, true)), null);
    Field nullableIntField =
        new Field("nullableIntField", FieldType.nullable(new ArrowType.Int(32, true)), null);

    // Create empty vectors
    BufferAllocator allocator = new RootAllocator();
    StructVector structVector = new StructVector("struct", allocator, structFieldType, null);
    StructVector nullableStructVector =
        new StructVector("nullableStruct", allocator, nullableStructFieldType, null);
    structVector.initializeChildrenFromFields(Arrays.asList(intField, nullableIntField));
    nullableStructVector.initializeChildrenFromFields(Arrays.asList(intField, nullableIntField));

    // Set up VSR
    List<FieldVector> vectors = Arrays.asList(structVector, nullableStructVector);
    int rowCount = 4;

    try (VectorSchemaRoot root = new VectorSchemaRoot(vectors)) {

      root.setRowCount(rowCount);
      root.allocateNew();

      // Set test data for structVector
      BaseWriter.StructWriter structWriter = structVector.getWriter();
      for (int i = 0; i < rowCount; i++) {
        structWriter.setPosition(i);
        structWriter.start();
        structWriter.integer("intField").writeInt(i);
        if (i % 2 == 0) {
          structWriter.integer("nullableIntField").writeInt(i * 10);
        } else {
          structWriter.integer("nullableIntField").writeNull();
        }
        structWriter.end();
      }

      // Set test data for nullableStructVector
      BaseWriter.StructWriter nullableStructWriter = nullableStructVector.getWriter();
      for (int i = 0; i < rowCount; i++) {
        nullableStructWriter.setPosition(i);
        if (i >= 2) {
          nullableStructWriter.start();
          nullableStructWriter.integer("intField").writeInt(i);
          if (i % 2 == 0) {
            nullableStructWriter.integer("nullableIntField").writeInt(i * 10);
          } else {
            nullableStructWriter.integer("nullableIntField").writeNull();
          }
          nullableStructWriter.end();
        } else {
          nullableStructWriter.writeNull();
        }
      }

      // Update count for the vector
      structVector.setValueCount(rowCount);
      nullableStructVector.setValueCount(rowCount);

      File dataFile = new File(TMP, "testWriteNullableStructs.avro");

      // Write an AVRO block using the producer classes
      try (FileOutputStream fos = new FileOutputStream(dataFile)) {
        BinaryEncoder encoder = new EncoderFactory().directBinaryEncoder(fos, null);
        CompositeAvroProducer producer = ArrowToAvroUtils.createCompositeProducer(vectors);
        for (int row = 0; row < rowCount; row++) {
          producer.produce(encoder);
        }
        encoder.flush();
      }

      // Set up reading the AVRO block as a GenericRecord
      Schema schema = ArrowToAvroUtils.createAvroSchema(root.getSchema().getFields());
      GenericDatumReader<GenericRecord> datumReader = new GenericDatumReader<>(schema);

      try (InputStream inputStream = new FileInputStream(dataFile)) {

        BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(inputStream, null);
        GenericRecord record = null;

        // Read and check values
        for (int row = 0; row < rowCount; row++) {
          record = datumReader.read(record, decoder);
          if (row % 2 == 0) {
            assertNotNull(record.get("struct"));
            GenericRecord structRecord = (GenericRecord) record.get("struct");
            assertEquals(row, structRecord.get("intField"));
            assertEquals(row * 10, structRecord.get("nullableIntField"));
          } else {
            assertNotNull(record.get("struct"));
            GenericRecord structRecord = (GenericRecord) record.get("struct");
            assertEquals(row, structRecord.get("intField"));
            assertNull(structRecord.get("nullableIntField"));
          }
          if (row >= 2) {
            assertNotNull(record.get("nullableStruct"));
            GenericRecord nullableStructRecord = (GenericRecord) record.get("nullableStruct");
            assertEquals(row, nullableStructRecord.get("intField"));
            if (row % 2 == 0) {
              assertEquals(row * 10, nullableStructRecord.get("nullableIntField"));
            } else {
              assertNull(nullableStructRecord.get("nullableIntField"));
            }
          } else {
            assertNull(record.get("nullableStruct"));
          }
        }
      }
    }
  }

  @Test
  public void testWriteDictEnumEncoded() throws Exception {

    BufferAllocator allocator = new RootAllocator();

    // Create a dictionary
    FieldType dictionaryField = new FieldType(false, new ArrowType.Utf8(), null);
    VarCharVector dictionaryVector =
        new VarCharVector(new Field("dictionary", dictionaryField, null), allocator);

    dictionaryVector.allocateNew(3);
    dictionaryVector.set(0, "apple".getBytes());
    dictionaryVector.set(1, "banana".getBytes());
    dictionaryVector.set(2, "cherry".getBytes());
    dictionaryVector.setValueCount(3);

    Dictionary dictionary =
        new Dictionary(dictionaryVector, new DictionaryEncoding(1L, false, null));
    DictionaryProvider dictionaries = new DictionaryProvider.MapDictionaryProvider(dictionary);

    // Field definition
    FieldType stringField = new FieldType(false, new ArrowType.Utf8(), null);
    VarCharVector stringVector =
        new VarCharVector(new Field("enumField", stringField, null), allocator);
    stringVector.allocateNew(10);
    stringVector.setSafe(0, "apple".getBytes());
    stringVector.setSafe(1, "banana".getBytes());
    stringVector.setSafe(2, "cherry".getBytes());
    stringVector.setSafe(3, "cherry".getBytes());
    stringVector.setSafe(4, "apple".getBytes());
    stringVector.setSafe(5, "banana".getBytes());
    stringVector.setSafe(6, "apple".getBytes());
    stringVector.setSafe(7, "cherry".getBytes());
    stringVector.setSafe(8, "banana".getBytes());
    stringVector.setSafe(9, "apple".getBytes());
    stringVector.setValueCount(10);

    IntVector encodedVector = (IntVector) DictionaryEncoder.encode(stringVector, dictionary);

    // Set up VSR
    List<FieldVector> vectors = Arrays.asList(encodedVector);
    int rowCount = 10;

    try (VectorSchemaRoot root = new VectorSchemaRoot(vectors)) {

      File dataFile = new File(TMP, "testWriteEnumEncoded.avro");

      // Write an AVRO block using the producer classes
      try (FileOutputStream fos = new FileOutputStream(dataFile)) {
        BinaryEncoder encoder = new EncoderFactory().directBinaryEncoder(fos, null);
        CompositeAvroProducer producer =
            ArrowToAvroUtils.createCompositeProducer(vectors, dictionaries);
        for (int row = 0; row < rowCount; row++) {
          producer.produce(encoder);
        }
        encoder.flush();
      }

      // Set up reading the AVRO block as a GenericRecord
      Schema schema = ArrowToAvroUtils.createAvroSchema(root.getSchema().getFields(), dictionaries);
      GenericDatumReader<GenericRecord> datumReader = new GenericDatumReader<>(schema);

      try (InputStream inputStream = new FileInputStream(dataFile)) {

        BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(inputStream, null);
        GenericRecord record = null;

        // Read and check values
        for (int row = 0; row < rowCount; row++) {
          record = datumReader.read(record, decoder);
          // Values read from Avro should be the decoded enum values
          assertEquals(stringVector.getObject(row).toString(), record.get("enumField").toString());
        }
      }
    }
  }
}