TestHiveFileFormats.java

/*
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.facebook.presto.hive;

import com.facebook.presto.common.predicate.TupleDomain;
import com.facebook.presto.common.type.ArrayType;
import com.facebook.presto.common.type.RowType;
import com.facebook.presto.hive.datasink.OutputStreamDataSinkFactory;
import com.facebook.presto.hive.metastore.Column;
import com.facebook.presto.hive.metastore.Storage;
import com.facebook.presto.hive.metastore.StorageFormat;
import com.facebook.presto.hive.orc.DwrfBatchPageSourceFactory;
import com.facebook.presto.hive.orc.OrcBatchPageSourceFactory;
import com.facebook.presto.hive.parquet.ParquetFileWriterFactory;
import com.facebook.presto.hive.parquet.ParquetPageSourceFactory;
import com.facebook.presto.hive.rcfile.RcFilePageSourceFactory;
import com.facebook.presto.orc.StorageStripeMetadataSource;
import com.facebook.presto.orc.StripeMetadataSourceFactory;
import com.facebook.presto.orc.cache.StorageOrcFileTailSource;
import com.facebook.presto.parquet.FileParquetDataSource;
import com.facebook.presto.parquet.cache.MetadataReader;
import com.facebook.presto.spi.ConnectorPageSource;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.ErrorCodeSupplier;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.RecordCursor;
import com.facebook.presto.spi.RecordPageSource;
import com.facebook.presto.spi.SchemaTableName;
import com.facebook.presto.spi.session.PropertyMetadata;
import com.facebook.presto.testing.TestingConnectorSession;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import io.airlift.compress.lzo.LzoCodec;
import io.airlift.compress.lzo.LzopCodec;
import io.airlift.slice.Slices;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.common.type.HiveVarchar;
import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.typeinfo.VarcharTypeInfo;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.Type;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

import java.io.File;
import java.io.IOException;
import java.sql.Timestamp;
import java.time.Instant;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.TimeZone;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static com.facebook.presto.common.type.BigintType.BIGINT;
import static com.facebook.presto.common.type.BooleanType.BOOLEAN;
import static com.facebook.presto.common.type.DoubleType.DOUBLE;
import static com.facebook.presto.common.type.IntegerType.INTEGER;
import static com.facebook.presto.common.type.VarcharType.createUnboundedVarcharType;
import static com.facebook.presto.expressions.LogicalRowExpressions.TRUE_CONSTANT;
import static com.facebook.presto.hive.HiveDwrfEncryptionProvider.NO_ENCRYPTION;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_INVALID_PARTITION_VALUE;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_PARTITION_SCHEMA_MISMATCH;
import static com.facebook.presto.hive.HiveFileContext.DEFAULT_HIVE_FILE_CONTEXT;
import static com.facebook.presto.hive.HiveStorageFormat.AVRO;
import static com.facebook.presto.hive.HiveStorageFormat.CSV;
import static com.facebook.presto.hive.HiveStorageFormat.DWRF;
import static com.facebook.presto.hive.HiveStorageFormat.JSON;
import static com.facebook.presto.hive.HiveStorageFormat.ORC;
import static com.facebook.presto.hive.HiveStorageFormat.PARQUET;
import static com.facebook.presto.hive.HiveStorageFormat.RCBINARY;
import static com.facebook.presto.hive.HiveStorageFormat.RCTEXT;
import static com.facebook.presto.hive.HiveStorageFormat.SEQUENCEFILE;
import static com.facebook.presto.hive.HiveStorageFormat.TEXTFILE;
import static com.facebook.presto.hive.HiveTestUtils.FUNCTION_AND_TYPE_MANAGER;
import static com.facebook.presto.hive.HiveTestUtils.FUNCTION_RESOLUTION;
import static com.facebook.presto.hive.HiveTestUtils.HDFS_ENVIRONMENT;
import static com.facebook.presto.hive.HiveTestUtils.HIVE_CLIENT_CONFIG;
import static com.facebook.presto.hive.HiveTestUtils.ROW_EXPRESSION_SERVICE;
import static com.facebook.presto.hive.HiveTestUtils.SESSION;
import static com.facebook.presto.hive.HiveTestUtils.getAllSessionProperties;
import static com.facebook.presto.hive.HiveTestUtils.getTypes;
import static com.facebook.presto.tests.StructuralTestUtil.arrayBlockOf;
import static com.facebook.presto.tests.StructuralTestUtil.mapBlockOf;
import static com.facebook.presto.tests.StructuralTestUtil.rowBlockOf;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.Iterables.filter;
import static io.airlift.slice.Slices.utf8Slice;
import static java.io.File.createTempFile;
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.toList;
import static org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory.getStandardListObjectInspector;
import static org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory.getStandardMapObjectInspector;
import static org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory.getStandardStructObjectInspector;
import static org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory.getPrimitiveJavaObjectInspector;
import static org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory.javaBooleanObjectInspector;
import static org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory.javaDoubleObjectInspector;
import static org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory.javaFloatObjectInspector;
import static org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory.javaIntObjectInspector;
import static org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory.javaLongObjectInspector;
import static org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory.javaStringObjectInspector;
import static org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory.javaTimestampObjectInspector;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

public class TestHiveFileFormats
        extends AbstractTestHiveFileFormats
{
    private static final FileFormatDataSourceStats STATS = new FileFormatDataSourceStats();
    private static final MetadataReader METADATA_READER = new MetadataReader();
    private static TestingConnectorSession parquetPageSourceSession = new TestingConnectorSession(getAllSessionProperties(new HiveClientConfig(), createParquetHiveCommonClientConfig(false)));
    private static TestingConnectorSession parquetPageSourceSessionUseName = new TestingConnectorSession(getAllSessionProperties(new HiveClientConfig(), createParquetHiveCommonClientConfig(true)));

    private static final DateTimeZone HIVE_STORAGE_TIME_ZONE = DateTimeZone.forID("America/Bahia_Banderas");

    @DataProvider(name = "rowCount")
    public static Object[][] rowCountProvider()
    {
        return new Object[][] {{0}, {1000}};
    }

    @BeforeClass(alwaysRun = true)
    public void setUp()
    {
        // ensure the expected timezone is configured for this VM
        assertEquals(TimeZone.getDefault().getID(),
                "America/Bahia_Banderas",
                "Timezone not configured correctly. Add -Duser.timezone=America/Bahia_Banderas to your JVM arguments");
    }

    @Test(dataProvider = "rowCount")
    public void testTextFile(int rowCount)
            throws Exception
    {
        List<TestColumn> testColumns = TEST_COLUMNS.stream()
                .filter(column -> !column.getName().equals("t_map_null_key_complex_key_value"))
                .collect(toList());

        assertThatFileFormat(TEXTFILE)
                .withColumns(testColumns)
                .withRowsCount(rowCount)
                .isReadableByRecordCursor(new GenericHiveRecordCursorProvider(HDFS_ENVIRONMENT));
    }

    @Test(dataProvider = "rowCount")
    public void testCsvFile(int rowCount)
            throws Exception
    {
        List<TestColumn> testColumns = TEST_COLUMNS.stream()
                // CSV table only support Hive string columns. Notice that CSV does not allow to store null, it uses an empty string instead.
                .filter(column -> column.isPartitionKey() || ("string".equals(column.getType()) && !column.getName().contains("_null_")))
                .collect(toImmutableList());

        assertTrue(testColumns.size() > 5);

        assertThatFileFormat(CSV)
                .withColumns(testColumns)
                .withRowsCount(rowCount)
                .isReadableByRecordCursor(new GenericHiveRecordCursorProvider(HDFS_ENVIRONMENT));
    }

    @Test
    public void testCsvFileWithNullAndValue()
            throws Exception
    {
        assertThatFileFormat(CSV)
                .withColumns(ImmutableList.of(
                        new TestColumn("t_null_string", javaStringObjectInspector, null, Slices.utf8Slice("")), // null was converted to empty string!
                        new TestColumn("t_string", javaStringObjectInspector, "test", Slices.utf8Slice("test"))))
                .withRowsCount(2)
                .isReadableByRecordCursor(new GenericHiveRecordCursorProvider(HDFS_ENVIRONMENT));
    }

    @Test(dataProvider = "rowCount")
    public void testJson(int rowCount)
            throws Exception
    {
        List<TestColumn> testColumns = TEST_COLUMNS.stream()
                // binary is not supported
                .filter(column -> !column.getName().equals("t_binary"))
                // non-string map keys are not supported
                .filter(column -> !column.getName().equals("t_map_tinyint"))
                .filter(column -> !column.getName().equals("t_map_smallint"))
                .filter(column -> !column.getName().equals("t_map_int"))
                .filter(column -> !column.getName().equals("t_map_bigint"))
                .filter(column -> !column.getName().equals("t_map_float"))
                .filter(column -> !column.getName().equals("t_map_double"))
                // null map keys are not supported
                .filter(column -> !column.getName().equals("t_map_null_key"))
                .filter(column -> !column.getName().equals("t_map_null_key_complex_key_value"))
                .filter(column -> !column.getName().equals("t_map_null_key_complex_value"))
                // decimal(38) is broken or not supported
                .filter(column -> !column.getName().equals("t_decimal_precision_38"))
                .filter(column -> !column.getName().equals("t_map_decimal_precision_38"))
                .filter(column -> !column.getName().equals("t_array_decimal_precision_38"))
                .collect(toList());

        assertThatFileFormat(JSON)
                .withColumns(testColumns)
                .withRowsCount(rowCount)
                .isReadableByRecordCursor(new GenericHiveRecordCursorProvider(HDFS_ENVIRONMENT));
    }

    @Test(dataProvider = "rowCount")
    public void testRCText(int rowCount)
            throws Exception
    {
        List<TestColumn> testColumns = ImmutableList.copyOf(filter(TEST_COLUMNS, testColumn -> {
            // TODO: This is a bug in the RC text reader
            // RC file does not support complex type as key of a map
            return !testColumn.getName().equals("t_struct_null")
                    && !testColumn.getName().equals("t_map_null_key_complex_key_value");
        }));
        assertThatFileFormat(RCTEXT)
                .withColumns(testColumns)
                .withRowsCount(rowCount)
                .isReadableByRecordCursor(new GenericHiveRecordCursorProvider(HDFS_ENVIRONMENT));
    }

    @Test(dataProvider = "rowCount")
    public void testRcTextPageSource(int rowCount)
            throws Exception
    {
        assertThatFileFormat(RCTEXT)
                .withColumns(TEST_COLUMNS)
                .withRowsCount(rowCount)
                .isReadableByPageSource(new RcFilePageSourceFactory(FUNCTION_AND_TYPE_MANAGER, HDFS_ENVIRONMENT, STATS));
    }

    @Test(dataProvider = "rowCount")
    public void testRcTextOptimizedWriter(int rowCount)
            throws Exception
    {
        List<TestColumn> testColumns = TEST_COLUMNS.stream()
                // t_map_null_key_* must be disabled because Presto can not produce maps with null keys so the writer will throw
                .filter(TestHiveFileFormats::withoutNullMapKeyTests)
                .collect(toImmutableList());

        List<PropertyMetadata<?>> allSessionProperties = getAllSessionProperties(createRcFileHiveClientConfig(true), new HiveCommonClientConfig());
        TestingConnectorSession session = new TestingConnectorSession(allSessionProperties);

        assertThatFileFormat(RCTEXT)
                .withColumns(testColumns)
                .withRowsCount(rowCount)
                .withSession(session)
                .withFileWriterFactory(new RcFileFileWriterFactory(HDFS_ENVIRONMENT, FUNCTION_AND_TYPE_MANAGER, new NodeVersion("test"), HIVE_STORAGE_TIME_ZONE, STATS))
                .isReadableByRecordCursor(new GenericHiveRecordCursorProvider(HDFS_ENVIRONMENT))
                .isReadableByPageSource(new RcFilePageSourceFactory(FUNCTION_AND_TYPE_MANAGER, HDFS_ENVIRONMENT, STATS));
    }

    @Test(dataProvider = "rowCount")
    public void testRCBinary(int rowCount)
            throws Exception
    {
        // RCBinary does not support complex type as key of a map and interprets empty VARCHAR as nulls
        List<TestColumn> testColumns = TEST_COLUMNS.stream()
                .filter(testColumn -> {
                    String name = testColumn.getName();
                    return !name.equals("t_map_null_key_complex_key_value") && !name.equals("t_empty_varchar");
                }).collect(toList());
        assertThatFileFormat(RCBINARY)
                .withColumns(testColumns)
                .withRowsCount(rowCount)
                .isReadableByRecordCursor(new GenericHiveRecordCursorProvider(HDFS_ENVIRONMENT));
    }

    @Test(dataProvider = "rowCount")
    public void testRcBinaryPageSource(int rowCount)
            throws Exception
    {
        // RCBinary does not support complex type as key of a map and interprets empty VARCHAR as nulls
        List<TestColumn> testColumns = TEST_COLUMNS.stream()
                .filter(testColumn -> !testColumn.getName().equals("t_empty_varchar"))
                .collect(toList());

        assertThatFileFormat(RCBINARY)
                .withColumns(testColumns)
                .withRowsCount(rowCount)
                .isReadableByPageSource(new RcFilePageSourceFactory(FUNCTION_AND_TYPE_MANAGER, HDFS_ENVIRONMENT, STATS));
    }

    @Test(dataProvider = "rowCount")
    public void testRcBinaryOptimizedWriter(int rowCount)
            throws Exception
    {
        List<TestColumn> testColumns = TEST_COLUMNS.stream()
                // RCBinary interprets empty VARCHAR as nulls
                .filter(testColumn -> !testColumn.getName().equals("t_empty_varchar"))
                // t_map_null_key_* must be disabled because Presto can not produce maps with null keys so the writer will throw
                .filter(TestHiveFileFormats::withoutNullMapKeyTests)
                .collect(toList());

        List<PropertyMetadata<?>> allSessionProperties = getAllSessionProperties(createRcFileHiveClientConfig(true), new HiveCommonClientConfig());
        TestingConnectorSession session = new TestingConnectorSession(allSessionProperties);

        assertThatFileFormat(RCBINARY)
                .withColumns(testColumns)
                .withRowsCount(rowCount)
                .withSession(session)
                .withFileWriterFactory(new RcFileFileWriterFactory(HDFS_ENVIRONMENT, FUNCTION_AND_TYPE_MANAGER, new NodeVersion("test"), HIVE_STORAGE_TIME_ZONE, STATS))
                .isReadableByRecordCursor(new GenericHiveRecordCursorProvider(HDFS_ENVIRONMENT))
                .isReadableByPageSource(new RcFilePageSourceFactory(FUNCTION_AND_TYPE_MANAGER, HDFS_ENVIRONMENT, STATS));
    }

    @Test(dataProvider = "rowCount")
    public void testOrc(int rowCount)
            throws Exception
    {
        assertThatFileFormat(ORC)
                .withColumns(TEST_COLUMNS)
                .withRowsCount(rowCount)
                .isReadableByPageSource(new OrcBatchPageSourceFactory(FUNCTION_AND_TYPE_MANAGER, false, HDFS_ENVIRONMENT, STATS, 100, new StorageOrcFileTailSource(), StripeMetadataSourceFactory.of(new StorageStripeMetadataSource())));
    }

    @Test(dataProvider = "rowCount")
    public void testOrcOptimizedWriter(int rowCount)
            throws Exception
    {
        List<PropertyMetadata<?>> allSessionProperties = getAllSessionProperties(
                new HiveClientConfig(),
                createOrcHiveCommonClientConfig(true, 100.0));
        TestingConnectorSession session = new TestingConnectorSession(allSessionProperties);

        // A Presto page can not contain a map with null keys, so a page based writer can not write null keys
        List<TestColumn> testColumns = TEST_COLUMNS.stream()
                .filter(testColumn -> !testColumn.getName().equals("t_map_null_key") && !testColumn.getName().equals("t_map_null_key_complex_value") && !testColumn.getName().equals("t_map_null_key_complex_key_value"))
                .collect(toList());

        assertThatFileFormat(ORC)
                .withColumns(testColumns)
                .withRowsCount(rowCount)
                .withSession(session)
                .withFileWriterFactory(new OrcFileWriterFactory(HDFS_ENVIRONMENT, new OutputStreamDataSinkFactory(), FUNCTION_AND_TYPE_MANAGER, new NodeVersion("test"), HIVE_STORAGE_TIME_ZONE, STATS, new OrcFileWriterConfig(), NO_ENCRYPTION))
                .isReadableByRecordCursor(new GenericHiveRecordCursorProvider(HDFS_ENVIRONMENT))
                .isReadableByPageSource(new OrcBatchPageSourceFactory(FUNCTION_AND_TYPE_MANAGER, false, HDFS_ENVIRONMENT, STATS, 100, new StorageOrcFileTailSource(), StripeMetadataSourceFactory.of(new StorageStripeMetadataSource())));
    }

    @Test(dataProvider = "rowCount")
    public void testOptimizedParquetWriter(int rowCount)
            throws Exception
    {
        List<PropertyMetadata<?>> allSessionProperties = getAllSessionProperties(
                new HiveClientConfig(),
                new ParquetFileWriterConfig().setParquetOptimizedWriterEnabled(true),
                createOrcHiveCommonClientConfig(true, 100.0));
        TestingConnectorSession session = new TestingConnectorSession(allSessionProperties);

        // A Presto page can not contain a map with null keys, so a page based writer can not write null keys
        List<TestColumn> testColumns = TEST_COLUMNS.stream()
                .filter(testColumn -> !testColumn.getName().equals("t_map_null_key") && !testColumn.getName().equals("t_map_null_key_complex_value") && !testColumn.getName().equals("t_map_null_key_complex_key_value"))
                .collect(toList());

        assertThatFileFormat(PARQUET)
                .withSession(session)
                .withColumns(testColumns)
                .withRowsCount(rowCount)
                .withFileWriterFactory(new ParquetFileWriterFactory(HDFS_ENVIRONMENT, FUNCTION_AND_TYPE_MANAGER, new NodeVersion("test"), HIVE_STORAGE_TIME_ZONE))
                .isReadableByPageSource(new ParquetPageSourceFactory(FUNCTION_AND_TYPE_MANAGER, FUNCTION_RESOLUTION, HDFS_ENVIRONMENT, STATS, METADATA_READER));
    }

    @Test(dataProvider = "rowCount")
    public void testOrcUseColumnNames(int rowCount)
            throws Exception
    {
        TestingConnectorSession session = new TestingConnectorSession(getAllSessionProperties(
                new HiveClientConfig(),
                new HiveCommonClientConfig()));

        assertThatFileFormat(ORC)
                .withWriteColumns(TEST_COLUMNS)
                .withRowsCount(rowCount)
                .withReadColumns(Lists.reverse(TEST_COLUMNS))
                .withSession(session)
                .isReadableByPageSource(new OrcBatchPageSourceFactory(FUNCTION_AND_TYPE_MANAGER, true, HDFS_ENVIRONMENT, STATS, 100, new StorageOrcFileTailSource(), StripeMetadataSourceFactory.of(new StorageStripeMetadataSource())));
    }

    @Test(dataProvider = "rowCount")
    public void testOrcUseColumnNamesCompatibility(int rowCount)
            throws Exception
    {
        // test hive.orc.use-column-names can fallback to use hive column names, if in orc file has no real column names
        // only have old hive style name _col1, _col2, _col3
        TestingConnectorSession session = new TestingConnectorSession(getAllSessionProperties(
                new HiveClientConfig(),
                new HiveCommonClientConfig()));

        assertThatFileFormat(ORC)
                .withWriteColumns(getHiveColumnNameColumns())
                .withRowsCount(rowCount)
                .withReadColumns(TEST_COLUMNS)
                .withSession(session)
                .isReadableByPageSource(new OrcBatchPageSourceFactory(FUNCTION_AND_TYPE_MANAGER, true, HDFS_ENVIRONMENT, STATS, 100, new StorageOrcFileTailSource(), StripeMetadataSourceFactory.of(new StorageStripeMetadataSource())));
    }

    private static List<TestColumn> getHiveColumnNameColumns()
    {
        // Creates a new list of TestColumn objects with Hive-style column names based on their indices.
        // Each column's name is replaced with "_col" followed by its index in the list.
        return IntStream.range(0, TEST_COLUMNS.size())
                .mapToObj(index -> TEST_COLUMNS.get(index).withName("_col" + index))
                .collect(toList());
    }

    @Test(dataProvider = "rowCount")
    public void testAvro(int rowCount)
            throws Exception
    {
        assertThatFileFormat(AVRO)
                .withColumns(getTestColumnsSupportedByAvro())
                .withRowsCount(rowCount)
                .isReadableByRecordCursor(new GenericHiveRecordCursorProvider(HDFS_ENVIRONMENT));
    }

    private static List<TestColumn> getTestColumnsSupportedByAvro()
    {
        // Avro only supports String for Map keys, and doesn't support smallint or tinyint.
        return TEST_COLUMNS.stream()
                .filter(column -> !column.getName().startsWith("t_map_") || column.getName().equals("t_map_string"))
                .filter(column -> !column.getName().endsWith("_smallint"))
                .filter(column -> !column.getName().endsWith("_tinyint"))
                .collect(toList());
    }

    @Test(dataProvider = "rowCount")
    public void testParquetPageSource(int rowCount)
            throws Exception
    {
        List<TestColumn> testColumns = getTestColumnsSupportedByParquet();
        assertThatFileFormat(PARQUET)
                .withColumns(testColumns)
                .withSession(parquetPageSourceSession)
                .withRowsCount(rowCount)
                .isReadableByPageSource(new ParquetPageSourceFactory(FUNCTION_AND_TYPE_MANAGER, FUNCTION_RESOLUTION, HDFS_ENVIRONMENT, STATS, METADATA_READER));
    }

    @Test(dataProvider = "rowCount")
    public void testParquetPageSourceGzip(int rowCount)
            throws Exception
    {
        List<TestColumn> testColumns = getTestColumnsSupportedByParquet();
        assertThatFileFormat(PARQUET)
                .withColumns(testColumns)
                .withSession(parquetPageSourceSession)
                .withCompressionCodec(HiveCompressionCodec.GZIP)
                .withRowsCount(rowCount)
                .isReadableByPageSource(new ParquetPageSourceFactory(FUNCTION_AND_TYPE_MANAGER, FUNCTION_RESOLUTION, HDFS_ENVIRONMENT, STATS, METADATA_READER));
    }

    @Test(dataProvider = "rowCount")
    public void testParquetPageSourceSchemaEvolution(int rowCount)
            throws Exception
    {
        List<TestColumn> writeColumns = getTestColumnsSupportedByParquet();

        // test index-based access
        List<TestColumn> readColumns = writeColumns.stream()
                .map(column -> new TestColumn(
                        column.getName() + "_new",
                        column.getObjectInspector(),
                        column.getWriteValue(),
                        column.getExpectedValue(),
                        column.isPartitionKey()))
                .collect(toList());
        assertThatFileFormat(PARQUET)
                .withWriteColumns(writeColumns)
                .withReadColumns(readColumns)
                .withSession(parquetPageSourceSession)
                .withRowsCount(rowCount)
                .isReadableByPageSource(new ParquetPageSourceFactory(FUNCTION_AND_TYPE_MANAGER, FUNCTION_RESOLUTION, HDFS_ENVIRONMENT, STATS, METADATA_READER));

        // test name-based access
        readColumns = Lists.reverse(writeColumns);
        assertThatFileFormat(PARQUET)
                .withWriteColumns(writeColumns)
                .withReadColumns(readColumns)
                .withSession(parquetPageSourceSessionUseName)
                .isReadableByPageSource(new ParquetPageSourceFactory(FUNCTION_AND_TYPE_MANAGER, FUNCTION_RESOLUTION, HDFS_ENVIRONMENT, STATS, METADATA_READER));
    }

    @Test
    public void testParquetLogicalTypes() throws IOException
    {
        HiveFileWriterFactory parquetFileWriterFactory = new ParquetFileWriterFactory(HDFS_ENVIRONMENT, FUNCTION_AND_TYPE_MANAGER, new NodeVersion("test"), HIVE_STORAGE_TIME_ZONE);

        List<PropertyMetadata<?>> allSessionProperties = getAllSessionProperties(
                new HiveClientConfig(),
                new ParquetFileWriterConfig().setParquetOptimizedWriterEnabled(true),
                createOrcHiveCommonClientConfig(true, 100.0));

        TestingConnectorSession session = new TestingConnectorSession(allSessionProperties);
        File file = createTempFile("logicaltest", ".parquet");
        long timestamp = new DateTime(2011, 5, 6, 7, 8, 9, 123).getMillis();

        try {
            createTestFile(
                    file.getAbsolutePath(),
                    PARQUET,
                    HiveCompressionCodec.NONE,
                    ImmutableList.of(new TestColumn("t_timestamp", javaTimestampObjectInspector, new Timestamp(timestamp), timestamp)),
                    session,
                    3,
                    parquetFileWriterFactory);

            FileParquetDataSource dataSource = new FileParquetDataSource(file);
            ParquetMetadata parquetMetadata = MetadataReader.readFooter(
                    dataSource,
                    file.length(),
                    Optional.empty(),
                    false).getParquetMetadata();

            MessageType writtenSchema = parquetMetadata.getFileMetaData().getSchema();
            Type timestampType = writtenSchema.getType("t_timestamp");
            if (timestampType.getLogicalTypeAnnotation() instanceof LogicalTypeAnnotation.TimestampLogicalTypeAnnotation) {
                LogicalTypeAnnotation.TimestampLogicalTypeAnnotation annotation = (LogicalTypeAnnotation.TimestampLogicalTypeAnnotation) timestampType.getLogicalTypeAnnotation();
                assertFalse(annotation.isAdjustedToUTC());
            }
            else {
                fail("the logical type annotation saved was not of type TimestampLogicalTypeAnnotation");
            }
        }
        finally {
            file.delete();
        }
    }

    private static List<TestColumn> getTestColumnsSupportedByParquet()
    {
        // Write of complex hive data to Parquet is broken
        // TODO: empty arrays or maps with null keys don't seem to work
        // Parquet does not support DATE
        return TEST_COLUMNS.stream()
                .filter(column -> !ImmutableSet.of("t_null_array_int", "t_array_empty", "t_map_null_key", "t_map_null_key_complex_value", "t_map_null_key_complex_key_value")
                        .contains(column.getName()))
                .filter(column -> column.isPartitionKey() || (
                        !hasType(column.getObjectInspector(), PrimitiveCategory.DATE)) &&
                        !hasType(column.getObjectInspector(), PrimitiveCategory.SHORT) &&
                        !hasType(column.getObjectInspector(), PrimitiveCategory.BYTE))
                .collect(toList());
    }

    @Test(dataProvider = "rowCount")
    public void testDwrf(int rowCount)
            throws Exception
    {
        List<TestColumn> testColumns = TEST_COLUMNS.stream()
                .filter(testColumn -> !hasType(testColumn.getObjectInspector(), PrimitiveCategory.DATE, PrimitiveCategory.VARCHAR, PrimitiveCategory.CHAR, PrimitiveCategory.DECIMAL))
                .collect(Collectors.toList());
        assertThatFileFormat(DWRF)
                .withColumns(testColumns)
                .withRowsCount(rowCount)
                .isReadableByPageSource(new DwrfBatchPageSourceFactory(FUNCTION_AND_TYPE_MANAGER, FUNCTION_RESOLUTION, HIVE_CLIENT_CONFIG, HDFS_ENVIRONMENT, STATS, new StorageOrcFileTailSource(), StripeMetadataSourceFactory.of(new StorageStripeMetadataSource()), NO_ENCRYPTION));
    }

    @Test(dataProvider = "rowCount")
    public void testDwrfOptimizedWriter(int rowCount)
            throws Exception
    {
        List<PropertyMetadata<?>> allSessionProperties = getAllSessionProperties(
                new HiveClientConfig(),
                createOrcHiveCommonClientConfig(true, 100.0));
        TestingConnectorSession session = new TestingConnectorSession(allSessionProperties);

        // DWRF does not support modern Hive types
        // A Presto page can not contain a map with null keys, so a page based writer can not write null keys
        List<TestColumn> testColumns = TEST_COLUMNS.stream()
                .filter(testColumn -> !hasType(testColumn.getObjectInspector(), PrimitiveCategory.DATE, PrimitiveCategory.VARCHAR, PrimitiveCategory.CHAR, PrimitiveCategory.DECIMAL))
                .filter(testColumn -> !testColumn.getName().equals("t_map_null_key") && !testColumn.getName().equals("t_map_null_key_complex_value") && !testColumn.getName().equals("t_map_null_key_complex_key_value"))
                .collect(toList());

        assertThatFileFormat(DWRF)
                .withColumns(testColumns)
                .withRowsCount(rowCount)
                .withSession(session)
                .withFileWriterFactory(new OrcFileWriterFactory(HDFS_ENVIRONMENT, new OutputStreamDataSinkFactory(), FUNCTION_AND_TYPE_MANAGER, new NodeVersion("test"), HIVE_STORAGE_TIME_ZONE, STATS, new OrcFileWriterConfig(), NO_ENCRYPTION))
                .isReadableByRecordCursor(new GenericHiveRecordCursorProvider(HDFS_ENVIRONMENT))
                .isReadableByPageSource(new DwrfBatchPageSourceFactory(FUNCTION_AND_TYPE_MANAGER, FUNCTION_RESOLUTION, HIVE_CLIENT_CONFIG, HDFS_ENVIRONMENT, STATS, new StorageOrcFileTailSource(), StripeMetadataSourceFactory.of(new StorageStripeMetadataSource()), NO_ENCRYPTION));
    }

    @Test
    public void testTruncateVarcharColumn()
            throws Exception
    {
        TestColumn writeColumn = new TestColumn("varchar_column", getPrimitiveJavaObjectInspector(new VarcharTypeInfo(4)), new HiveVarchar("test", 4), utf8Slice("test"));
        TestColumn readColumn = new TestColumn("varchar_column", getPrimitiveJavaObjectInspector(new VarcharTypeInfo(3)), new HiveVarchar("tes", 3), utf8Slice("tes"));

        assertThatFileFormat(RCTEXT)
                .withWriteColumns(ImmutableList.of(writeColumn))
                .withReadColumns(ImmutableList.of(readColumn))
                .isReadableByPageSource(new RcFilePageSourceFactory(FUNCTION_AND_TYPE_MANAGER, HDFS_ENVIRONMENT, STATS))
                .isReadableByRecordCursor(new GenericHiveRecordCursorProvider(HDFS_ENVIRONMENT));

        assertThatFileFormat(RCBINARY)
                .withWriteColumns(ImmutableList.of(writeColumn))
                .withReadColumns(ImmutableList.of(readColumn))
                .isReadableByPageSource(new RcFilePageSourceFactory(FUNCTION_AND_TYPE_MANAGER, HDFS_ENVIRONMENT, STATS))
                .isReadableByRecordCursor(new GenericHiveRecordCursorProvider(HDFS_ENVIRONMENT));

        assertThatFileFormat(ORC)
                .withWriteColumns(ImmutableList.of(writeColumn))
                .withReadColumns(ImmutableList.of(readColumn))
                .isReadableByPageSource(new OrcBatchPageSourceFactory(FUNCTION_AND_TYPE_MANAGER, false, HDFS_ENVIRONMENT, STATS, 100, new StorageOrcFileTailSource(), StripeMetadataSourceFactory.of(new StorageStripeMetadataSource())));

        assertThatFileFormat(PARQUET)
                .withWriteColumns(ImmutableList.of(writeColumn))
                .withReadColumns(ImmutableList.of(readColumn))
                .withSession(parquetPageSourceSession)
                .isReadableByPageSource(new ParquetPageSourceFactory(FUNCTION_AND_TYPE_MANAGER, FUNCTION_RESOLUTION, HDFS_ENVIRONMENT, STATS, METADATA_READER));

        assertThatFileFormat(AVRO)
                .withWriteColumns(ImmutableList.of(writeColumn))
                .withReadColumns(ImmutableList.of(readColumn))
                .isReadableByRecordCursor(new GenericHiveRecordCursorProvider(HDFS_ENVIRONMENT));

        assertThatFileFormat(SEQUENCEFILE)
                .withWriteColumns(ImmutableList.of(writeColumn))
                .withReadColumns(ImmutableList.of(readColumn))
                .isReadableByRecordCursor(new GenericHiveRecordCursorProvider(HDFS_ENVIRONMENT));

        assertThatFileFormat(TEXTFILE)
                .withWriteColumns(ImmutableList.of(writeColumn))
                .withReadColumns(ImmutableList.of(readColumn))
                .isReadableByRecordCursor(new GenericHiveRecordCursorProvider(HDFS_ENVIRONMENT));
    }

    @Test
    public void testFailForLongVarcharPartitionColumn()
            throws Exception
    {
        TestColumn partitionColumn = new TestColumn("partition_column", getPrimitiveJavaObjectInspector(new VarcharTypeInfo(3)), "test", utf8Slice("tes"), true);
        TestColumn varcharColumn = new TestColumn("varchar_column", getPrimitiveJavaObjectInspector(new VarcharTypeInfo(3)), new HiveVarchar("tes", 3), utf8Slice("tes"));

        List<TestColumn> columns = ImmutableList.of(partitionColumn, varcharColumn);

        HiveErrorCode expectedErrorCode = HIVE_INVALID_PARTITION_VALUE;
        String expectedMessage = "Invalid partition value 'test' for varchar\\(3\\) partition key: partition_column";

        assertThatFileFormat(RCTEXT)
                .withColumns(columns)
                .isFailingForPageSource(new RcFilePageSourceFactory(FUNCTION_AND_TYPE_MANAGER, HDFS_ENVIRONMENT, STATS), expectedErrorCode, expectedMessage)
                .isFailingForRecordCursor(new GenericHiveRecordCursorProvider(HDFS_ENVIRONMENT), expectedErrorCode, expectedMessage);

        assertThatFileFormat(RCBINARY)
                .withColumns(columns)
                .isFailingForPageSource(new RcFilePageSourceFactory(FUNCTION_AND_TYPE_MANAGER, HDFS_ENVIRONMENT, STATS), expectedErrorCode, expectedMessage)
                .isFailingForRecordCursor(new GenericHiveRecordCursorProvider(HDFS_ENVIRONMENT), expectedErrorCode, expectedMessage);

        assertThatFileFormat(ORC)
                .withColumns(columns)
                .isFailingForPageSource(new OrcBatchPageSourceFactory(FUNCTION_AND_TYPE_MANAGER, false, HDFS_ENVIRONMENT, STATS, 100, new StorageOrcFileTailSource(), StripeMetadataSourceFactory.of(new StorageStripeMetadataSource())), expectedErrorCode, expectedMessage);

        assertThatFileFormat(PARQUET)
                .withColumns(columns)
                .withSession(parquetPageSourceSession)
                .isFailingForPageSource(new ParquetPageSourceFactory(FUNCTION_AND_TYPE_MANAGER, FUNCTION_RESOLUTION, HDFS_ENVIRONMENT, STATS, METADATA_READER), expectedErrorCode, expectedMessage);

        assertThatFileFormat(SEQUENCEFILE)
                .withColumns(columns)
                .isFailingForRecordCursor(new GenericHiveRecordCursorProvider(HDFS_ENVIRONMENT), expectedErrorCode, expectedMessage);

        assertThatFileFormat(TEXTFILE)
                .withColumns(columns)
                .isFailingForRecordCursor(new GenericHiveRecordCursorProvider(HDFS_ENVIRONMENT), expectedErrorCode, expectedMessage);
    }

    @Test
    public void testSchemaMismatch()
            throws Exception
    {
        TestColumn floatColumn = new TestColumn("column_name", javaFloatObjectInspector, 5.1f, 5.1f);
        TestColumn doubleColumn = new TestColumn("column_name", javaDoubleObjectInspector, 5.1, 5.1);
        TestColumn booleanColumn = new TestColumn("column_name", javaBooleanObjectInspector, true, true);
        TestColumn stringColumn = new TestColumn("column_name", javaStringObjectInspector, "test", utf8Slice("test"));
        TestColumn intColumn = new TestColumn("column_name", javaIntObjectInspector, 3, 3);
        TestColumn longColumn = new TestColumn("column_name", javaLongObjectInspector, 4L, 4L);
        TestColumn longStoredAsIntColumn = new TestColumn("column_name", javaIntObjectInspector, 4, 4);
        TestColumn timestampColumn = new TestColumn("column_name", javaTimestampObjectInspector, 4L, 4L);
        TestColumn mapLongColumn = new TestColumn("column_name",
                getStandardMapObjectInspector(javaLongObjectInspector, javaLongObjectInspector),
                ImmutableMap.of(4L, 4L),
                mapBlockOf(BIGINT, BIGINT, 4L, 4L));
        TestColumn mapDoubleColumn = new TestColumn("column_name",
                getStandardMapObjectInspector(javaDoubleObjectInspector, javaDoubleObjectInspector),
                ImmutableMap.of(5.1, 5.2),
                mapBlockOf(DOUBLE, DOUBLE, 5.1, 5.2));
        TestColumn arrayStringColumn = new TestColumn("column_name",
                getStandardListObjectInspector(javaStringObjectInspector),
                ImmutableList.of("test"),
                arrayBlockOf(createUnboundedVarcharType(), "test"));
        TestColumn arrayBooleanColumn = new TestColumn("column_name",
                getStandardListObjectInspector(javaBooleanObjectInspector),
                ImmutableList.of(true),
                arrayBlockOf(BOOLEAN, true));
        TestColumn rowLongColumn = new TestColumn("column_name",
                getStandardStructObjectInspector(ImmutableList.of("s_bigint"), ImmutableList.of(javaLongObjectInspector)),
                new Long[] {1L},
                rowBlockOf(ImmutableList.of(BIGINT), 1));
        TestColumn nestColumn = new TestColumn("column_name",
                getStandardMapObjectInspector(
                        javaStringObjectInspector,
                        getStandardListObjectInspector(
                                getStandardStructObjectInspector(
                                        ImmutableList.of("s_int"),
                                        ImmutableList.of(javaIntObjectInspector)))),
                ImmutableMap.of("test", ImmutableList.<Object>of(new Integer[] {1})),
                mapBlockOf(createUnboundedVarcharType(), new ArrayType(RowType.anonymous(ImmutableList.of(INTEGER))),
                        "test", arrayBlockOf(RowType.anonymous(ImmutableList.of(INTEGER)), rowBlockOf(ImmutableList.of(INTEGER), 1L))));

        HiveErrorCode expectedErrorCode = HIVE_PARTITION_SCHEMA_MISMATCH;

        // Make sure INT64 is still readable as Timestamp see https://github.com/prestodb/presto/issues/13855
        assertThatFileFormat(PARQUET)
                .withWriteColumns(ImmutableList.of(longColumn))
                .withReadColumns(ImmutableList.of(timestampColumn))
                .withSession(parquetPageSourceSession)
                .isReadableByPageSource(new ParquetPageSourceFactory(FUNCTION_AND_TYPE_MANAGER, FUNCTION_RESOLUTION, HDFS_ENVIRONMENT, STATS, METADATA_READER));

        // make sure INT64 (declared in Hive schema) stored as INT32 in file is still readable
        assertThatFileFormat(PARQUET)
                .withWriteColumns(ImmutableList.of(longStoredAsIntColumn))
                .withReadColumns(ImmutableList.of(longColumn))
                .withSession(parquetPageSourceSession)
                .isReadableByPageSource(new ParquetPageSourceFactory(FUNCTION_AND_TYPE_MANAGER, FUNCTION_RESOLUTION, HDFS_ENVIRONMENT, STATS, METADATA_READER));

        assertThatFileFormat(PARQUET)
                .withWriteColumns(ImmutableList.of(floatColumn))
                .withReadColumns(ImmutableList.of(doubleColumn))
                .withSession(parquetPageSourceSession)
                .isReadableByPageSource(new ParquetPageSourceFactory(FUNCTION_AND_TYPE_MANAGER, FUNCTION_RESOLUTION, HDFS_ENVIRONMENT, STATS, METADATA_READER));

        String expectedMessageDoubleLong = "The column column_name of table schema.table is declared as type bigint, but the Parquet file ((.*?)) declares the column as type DOUBLE";

        assertThatFileFormat(PARQUET)
                .withWriteColumns(ImmutableList.of(doubleColumn))
                .withReadColumns(ImmutableList.of(longColumn))
                .withSession(parquetPageSourceSession)
                .isFailingForPageSource(new ParquetPageSourceFactory(FUNCTION_AND_TYPE_MANAGER, FUNCTION_RESOLUTION, HDFS_ENVIRONMENT, STATS, METADATA_READER), expectedErrorCode, expectedMessageDoubleLong);

        String expectedMessageFloatInt = "The column column_name of table schema.table is declared as type int, but the Parquet file ((.*?)) declares the column as type FLOAT";

        assertThatFileFormat(PARQUET)
                .withWriteColumns(ImmutableList.of(floatColumn))
                .withReadColumns(ImmutableList.of(intColumn))
                .withSession(parquetPageSourceSession)
                .isFailingForPageSource(new ParquetPageSourceFactory(FUNCTION_AND_TYPE_MANAGER, FUNCTION_RESOLUTION, HDFS_ENVIRONMENT, STATS, METADATA_READER), expectedErrorCode, expectedMessageFloatInt);

        String expectedMessageIntBoolean = "The column column_name of table schema.table is declared as type boolean, but the Parquet file ((.*?)) declares the column as type INT32";

        assertThatFileFormat(PARQUET)
                .withWriteColumns(ImmutableList.of(intColumn))
                .withReadColumns(ImmutableList.of(booleanColumn))
                .withSession(parquetPageSourceSession)
                .isFailingForPageSource(new ParquetPageSourceFactory(FUNCTION_AND_TYPE_MANAGER, FUNCTION_RESOLUTION, HDFS_ENVIRONMENT, STATS, METADATA_READER), expectedErrorCode, expectedMessageIntBoolean);

        String expectedMessageStringLong = "The column column_name of table schema.table is declared as type string, but the Parquet file ((.*?)) declares the column as type INT64";

        assertThatFileFormat(PARQUET)
                .withWriteColumns(ImmutableList.of(longColumn))
                .withReadColumns(ImmutableList.of(stringColumn))
                .withSession(parquetPageSourceSession)
                .isFailingForPageSource(new ParquetPageSourceFactory(FUNCTION_AND_TYPE_MANAGER, FUNCTION_RESOLUTION, HDFS_ENVIRONMENT, STATS, METADATA_READER), expectedErrorCode, expectedMessageStringLong);

        String expectedMessageIntString = "The column column_name of table schema.table is declared as type int, but the Parquet file ((.*?)) declares the column as type BINARY";

        assertThatFileFormat(PARQUET)
                .withWriteColumns(ImmutableList.of(stringColumn))
                .withReadColumns(ImmutableList.of(intColumn))
                .withSession(parquetPageSourceSession)
                .isFailingForPageSource(new ParquetPageSourceFactory(FUNCTION_AND_TYPE_MANAGER, FUNCTION_RESOLUTION, HDFS_ENVIRONMENT, STATS, METADATA_READER), expectedErrorCode, expectedMessageIntString);

        String expectedMessageMapLongLong = "The column column_name of table schema.table is declared as type map<bigint,bigint>, but the Parquet file ((.*?)) declares the column as type INT64";

        assertThatFileFormat(PARQUET)
                .withWriteColumns(ImmutableList.of(longColumn))
                .withReadColumns(ImmutableList.of(mapLongColumn))
                .withSession(parquetPageSourceSession)
                .isFailingForPageSource(new ParquetPageSourceFactory(FUNCTION_AND_TYPE_MANAGER, FUNCTION_RESOLUTION, HDFS_ENVIRONMENT, STATS, METADATA_READER), expectedErrorCode, expectedMessageMapLongLong);

        String expectedMessageMapLongMapDouble = "The column column_name of table schema.table is declared as type map<bigint,bigint>, but the Parquet file ((.*?)) declares the column as type optional group column_name \\(MAP\\) \\{\n"
                + "  repeated group key_value \\(MAP_KEY_VALUE\\) \\{\n"
                + "    required double key;\n"
                + "    optional double value;\n"
                + "  \\}\n"
                + "\\}";

        assertThatFileFormat(PARQUET)
                .withWriteColumns(ImmutableList.of(mapDoubleColumn))
                .withReadColumns(ImmutableList.of(mapLongColumn))
                .withSession(parquetPageSourceSession)
                .isFailingForPageSource(new ParquetPageSourceFactory(FUNCTION_AND_TYPE_MANAGER, FUNCTION_RESOLUTION, HDFS_ENVIRONMENT, STATS, METADATA_READER), expectedErrorCode, expectedMessageMapLongMapDouble);

        String expectedMessageArrayStringArrayBoolean = "The column column_name of table schema.table is declared as type array<string>, but the Parquet file ((.*?)) declares the column as type optional group column_name \\(LIST\\) \\{\n"
                + "  repeated group bag \\{\n"
                + "    optional boolean array_element;\n"
                + "  \\}\n"
                + "\\}";

        assertThatFileFormat(PARQUET)
                .withWriteColumns(ImmutableList.of(arrayBooleanColumn))
                .withReadColumns(ImmutableList.of(arrayStringColumn))
                .withSession(parquetPageSourceSession)
                .isFailingForPageSource(new ParquetPageSourceFactory(FUNCTION_AND_TYPE_MANAGER, FUNCTION_RESOLUTION, HDFS_ENVIRONMENT, STATS, METADATA_READER), expectedErrorCode, expectedMessageArrayStringArrayBoolean);

        String expectedMessageBooleanArrayBoolean = "The column column_name of table schema.table is declared as type array<boolean>, but the Parquet file ((.*?)) declares the column as type BOOLEAN";

        assertThatFileFormat(PARQUET)
                .withWriteColumns(ImmutableList.of(booleanColumn))
                .withReadColumns(ImmutableList.of(arrayBooleanColumn))
                .withSession(parquetPageSourceSession)
                .isFailingForPageSource(new ParquetPageSourceFactory(FUNCTION_AND_TYPE_MANAGER, FUNCTION_RESOLUTION, HDFS_ENVIRONMENT, STATS, METADATA_READER), expectedErrorCode, expectedMessageBooleanArrayBoolean);

        String expectedMessageRowLongLong = "The column column_name of table schema.table is declared as type bigint, but the Parquet file ((.*?)) declares the column as type optional group column_name \\{\n"
                + "  optional int64 s_bigint;\n"
                + "\\}";

        assertThatFileFormat(PARQUET)
                .withWriteColumns(ImmutableList.of(rowLongColumn))
                .withReadColumns(ImmutableList.of(longColumn))
                .withSession(parquetPageSourceSession)
                .isFailingForPageSource(new ParquetPageSourceFactory(FUNCTION_AND_TYPE_MANAGER, FUNCTION_RESOLUTION, HDFS_ENVIRONMENT, STATS, METADATA_READER), expectedErrorCode, expectedMessageRowLongLong);

        TestColumn rowLongColumnReadOnMap = new TestColumn("column_name",
                getStandardStructObjectInspector(ImmutableList.of("s_bigint"), ImmutableList.of(javaLongObjectInspector)),
                new Long[] {1L},
                null);
        assertThatFileFormat(PARQUET)
                .withWriteColumns(ImmutableList.of(mapLongColumn))
                .withReadColumns(ImmutableList.of(rowLongColumnReadOnMap))
                .withSession(parquetPageSourceSession)
                .isReadableByPageSource(new ParquetPageSourceFactory(FUNCTION_AND_TYPE_MANAGER, FUNCTION_RESOLUTION, HDFS_ENVIRONMENT, STATS, METADATA_READER));

        String expectedMessageRowLongNest = "The column column_name of table schema.table is declared as type map<string,array<struct<s_int:int>>>, but the Parquet file ((.*?)) declares the column as type optional group column_name \\{\n"
                + "  optional int64 s_bigint;\n"
                + "\\}";

        assertThatFileFormat(PARQUET)
                .withWriteColumns(ImmutableList.of(rowLongColumn))
                .withReadColumns(ImmutableList.of(nestColumn))
                .withSession(parquetPageSourceSession)
                .isFailingForPageSource(new ParquetPageSourceFactory(FUNCTION_AND_TYPE_MANAGER, FUNCTION_RESOLUTION, HDFS_ENVIRONMENT, STATS, METADATA_READER), expectedErrorCode, expectedMessageRowLongNest);
    }

    @Test
    public void testSchemaMismatchOnNestedStruct()
            throws Exception
    {
        //test out of order fields in nested Row type
        TestColumn writeColumn = new TestColumn("column_name",
                getStandardMapObjectInspector(
                        javaStringObjectInspector,
                        getStandardStructObjectInspector(
                                ImmutableList.of("s_int", "s_double"),
                                ImmutableList.of(javaIntObjectInspector, javaDoubleObjectInspector))),
                ImmutableMap.of("test", Arrays.asList(1, 5.0)),
                mapBlockOf(createUnboundedVarcharType(), RowType.anonymous(ImmutableList.of(INTEGER, DOUBLE)),
                        "test", rowBlockOf(ImmutableList.of(INTEGER, DOUBLE), 1L, 5.0)));
        TestColumn readColumn = new TestColumn("column_name",
                getStandardMapObjectInspector(
                        javaStringObjectInspector,
                        getStandardStructObjectInspector(
                                ImmutableList.of("s_double", "s_int"),  //out of order
                                ImmutableList.of(javaDoubleObjectInspector, javaIntObjectInspector))),
                ImmutableMap.of("test", Arrays.asList(5.0, 1)),
                mapBlockOf(createUnboundedVarcharType(), RowType.anonymous(ImmutableList.of(DOUBLE, INTEGER)),
                        "test", rowBlockOf(ImmutableList.of(DOUBLE, INTEGER), 5.0, 1L)));
        assertThatFileFormat(PARQUET)
                .withWriteColumns(ImmutableList.of(writeColumn))
                .withReadColumns(ImmutableList.of(readColumn))
                .withRowsCount(1)
                .withSession(parquetPageSourceSession)
                .isReadableByPageSource(new ParquetPageSourceFactory(FUNCTION_AND_TYPE_MANAGER, FUNCTION_RESOLUTION, HDFS_ENVIRONMENT, STATS, METADATA_READER));

        //test add/remove sub-fields
        readColumn = new TestColumn("column_name",
                getStandardMapObjectInspector(
                        javaStringObjectInspector,
                        getStandardStructObjectInspector(
                                ImmutableList.of("s_int", "s_int_new"),  //add sub-field s_int_new, remove s_double
                                ImmutableList.of(javaIntObjectInspector, javaIntObjectInspector))),
                ImmutableMap.of("test", Arrays.asList(1, 5.0)),
                mapBlockOf(createUnboundedVarcharType(), RowType.anonymous(ImmutableList.of(INTEGER, INTEGER)),
                        "test", rowBlockOf(ImmutableList.of(INTEGER, INTEGER), 1L, null))); //expected null for s_int_new

        assertThatFileFormat(PARQUET)
                .withWriteColumns(ImmutableList.of(writeColumn))
                .withReadColumns(ImmutableList.of(readColumn))
                .withRowsCount(1)
                .withSession(parquetPageSourceSession)
                .isReadableByPageSource(new ParquetPageSourceFactory(FUNCTION_AND_TYPE_MANAGER, FUNCTION_RESOLUTION, HDFS_ENVIRONMENT, STATS, METADATA_READER));

        //test field name case sensitivity in nested Row type
        readColumn = new TestColumn("column_name",
                getStandardMapObjectInspector(
                        javaStringObjectInspector,
                        getStandardStructObjectInspector(
                                ImmutableList.of("s_DOUBLE", "s_INT"),  //out of order
                                ImmutableList.of(javaDoubleObjectInspector, javaIntObjectInspector))),
                ImmutableMap.of("test", Arrays.asList(5.0, 1)),
                mapBlockOf(createUnboundedVarcharType(), RowType.anonymous(ImmutableList.of(DOUBLE, INTEGER)),
                        "test", rowBlockOf(ImmutableList.of(DOUBLE, INTEGER), 5.0, 1L)));
        assertThatFileFormat(PARQUET)
                .withWriteColumns(ImmutableList.of(writeColumn))
                .withReadColumns(ImmutableList.of(readColumn))
                .withRowsCount(1)
                .withSession(parquetPageSourceSession)
                .isReadableByPageSource(new ParquetPageSourceFactory(FUNCTION_AND_TYPE_MANAGER, FUNCTION_RESOLUTION, HDFS_ENVIRONMENT, STATS, METADATA_READER));

        //test sub-field type mismatch in nested Row type
        readColumn = new TestColumn("column_name",
                getStandardMapObjectInspector(
                        javaStringObjectInspector,
                        getStandardStructObjectInspector(
                                ImmutableList.of("s_double", "s_int"),
                                ImmutableList.of(javaIntObjectInspector, javaIntObjectInspector))), //re-type a sub-field
                ImmutableMap.of("test", Arrays.asList(5, 1)),
                mapBlockOf(createUnboundedVarcharType(), RowType.anonymous(ImmutableList.of(INTEGER, INTEGER)),
                        "test", rowBlockOf(ImmutableList.of(INTEGER, INTEGER), 5L, 1L)));

        HiveErrorCode expectedErrorCode = HIVE_PARTITION_SCHEMA_MISMATCH;
        String expectedMessageRowLongNest = "The column column_name of table schema.table is declared as type map<string,struct<s_double:int,s_int:int>>, but the Parquet file ((.*?)) declares the column as type optional group column_name \\(MAP\\) \\{\n" +
                "  repeated group key_value \\(MAP_KEY_VALUE\\) \\{\n" +
                "    required binary key \\(STRING\\);\n" +
                "    optional group value \\{\n" +
                "      optional int32 s_int;\n" +
                "      optional double s_double;\n" +
                "    \\}\n" +
                "  \\}\n" +
                "\\}";
        assertThatFileFormat(PARQUET)
                .withWriteColumns(ImmutableList.of(writeColumn))
                .withReadColumns(ImmutableList.of(readColumn))
                .withRowsCount(1)
                .withSession(parquetPageSourceSession)
                .isFailingForPageSource(new ParquetPageSourceFactory(FUNCTION_AND_TYPE_MANAGER, FUNCTION_RESOLUTION, HDFS_ENVIRONMENT, STATS, METADATA_READER), expectedErrorCode, expectedMessageRowLongNest);
    }

    private void testCursorProvider(HiveRecordCursorProvider cursorProvider,
            FileSplit split,
            HiveStorageFormat storageFormat,
            List<TestColumn> testColumns,
            ConnectorSession session,
            int rowCount)
    {
        List<HivePartitionKey> partitionKeys = testColumns.stream()
                .filter(TestColumn::isPartitionKey)
                .map(TestColumn::toHivePartitionKey)
                .collect(toList());

        List<HiveColumnHandle> partitionKeyColumnHandles = getColumnHandles(testColumns.stream().filter(TestColumn::isPartitionKey).collect(toImmutableList()));
        List<Column> tableDataColumns = testColumns.stream()
                .filter(column -> !column.isPartitionKey())
                .map(column -> new Column(column.getName(), HiveType.valueOf(column.getType()), Optional.empty(), Optional.empty()))
                .collect(toImmutableList());

        HiveFileSplit hiveFileSplit = new HiveFileSplit(
                split.getPath().toString(),
                split.getStart(),
                split.getLength(),
                split.getLength(),
                Instant.now().toEpochMilli(),
                Optional.empty(),
                ImmutableMap.of(),
                0);

        Configuration configuration = new Configuration();
        configuration.set("io.compression.codecs", LzoCodec.class.getName() + "," + LzopCodec.class.getName());
        Optional<ConnectorPageSource> pageSource = HivePageSourceProvider.createHivePageSource(
                ImmutableSet.of(cursorProvider),
                ImmutableSet.of(),
                configuration,
                session,
                hiveFileSplit,
                OptionalInt.empty(),
                new Storage(
                        StorageFormat.create(storageFormat.getSerDe(), storageFormat.getInputFormat(), storageFormat.getOutputFormat()),
                        "location",
                        Optional.empty(),
                        false,
                        ImmutableMap.of(),
                        ImmutableMap.of()),
                TupleDomain.all(),
                getColumnHandles(testColumns),
                ImmutableMap.of(),
                partitionKeys,
                DateTimeZone.getDefault(),
                FUNCTION_AND_TYPE_MANAGER,
                new SchemaTableName("schema", "table"),
                partitionKeyColumnHandles,
                tableDataColumns,
                ImmutableMap.of(),
                tableDataColumns.size(),
                TableToPartitionMapping.empty(),
                Optional.empty(),
                false,
                DEFAULT_HIVE_FILE_CONTEXT,
                TRUE_CONSTANT,
                false,
                ROW_EXPRESSION_SERVICE,
                Optional.empty(),
                Optional.empty());

        RecordCursor cursor = ((RecordPageSource) pageSource.get()).getCursor();

        checkCursor(cursor, testColumns, rowCount);
    }

    private void testPageSourceFactory(HiveBatchPageSourceFactory sourceFactory,
            FileSplit split,
            HiveStorageFormat storageFormat,
            List<TestColumn> testColumns,
            ConnectorSession session,
            int rowCount)
            throws IOException
    {
        List<HivePartitionKey> partitionKeys = testColumns.stream()
                .filter(TestColumn::isPartitionKey)
                .map(TestColumn::toHivePartitionKey)
                .collect(toList());

        List<HiveColumnHandle> partitionKeyColumnHandles = getColumnHandles(testColumns.stream().filter(TestColumn::isPartitionKey).collect(toImmutableList()));
        List<Column> tableDataColumns = testColumns.stream()
                .filter(column -> !column.isPartitionKey())
                .map(column -> new Column(column.getName(), HiveType.valueOf(column.getType()), Optional.empty(), Optional.empty()))
                .collect(toImmutableList());

        List<HiveColumnHandle> columnHandles = getColumnHandles(testColumns);

        HiveFileSplit hiveFileSplit = new HiveFileSplit(
                split.getPath().toString(),
                split.getStart(),
                split.getLength(),
                split.getLength(),
                Instant.now().toEpochMilli(),
                Optional.empty(),
                ImmutableMap.of(),
                0);

        Optional<ConnectorPageSource> pageSource = HivePageSourceProvider.createHivePageSource(
                ImmutableSet.of(),
                ImmutableSet.of(sourceFactory),
                new Configuration(),
                session,
                hiveFileSplit,
                OptionalInt.empty(),
                new Storage(
                        StorageFormat.create(storageFormat.getSerDe(), storageFormat.getInputFormat(), storageFormat.getOutputFormat()),
                        "location",
                        Optional.empty(),
                        false,
                        ImmutableMap.of(),
                        ImmutableMap.of()),
                TupleDomain.all(),
                columnHandles,
                ImmutableMap.of(),
                partitionKeys,
                DateTimeZone.getDefault(),
                FUNCTION_AND_TYPE_MANAGER,
                new SchemaTableName("schema", "table"),
                partitionKeyColumnHandles,
                tableDataColumns,
                ImmutableMap.of(),
                tableDataColumns.size(),
                TableToPartitionMapping.empty(),
                Optional.empty(),
                false,
                DEFAULT_HIVE_FILE_CONTEXT,
                TRUE_CONSTANT,
                false,
                ROW_EXPRESSION_SERVICE,
                Optional.empty(),
                Optional.empty());

        assertTrue(pageSource.isPresent());

        checkPageSource(pageSource.get(), testColumns, getTypes(columnHandles), rowCount);
    }

    public static boolean hasType(ObjectInspector objectInspector, PrimitiveCategory... types)
    {
        if (objectInspector instanceof PrimitiveObjectInspector) {
            PrimitiveObjectInspector primitiveInspector = (PrimitiveObjectInspector) objectInspector;
            PrimitiveCategory primitiveCategory = primitiveInspector.getPrimitiveCategory();
            for (PrimitiveCategory type : types) {
                if (primitiveCategory == type) {
                    return true;
                }
            }
            return false;
        }
        if (objectInspector instanceof ListObjectInspector) {
            ListObjectInspector listInspector = (ListObjectInspector) objectInspector;
            return hasType(listInspector.getListElementObjectInspector(), types);
        }
        if (objectInspector instanceof MapObjectInspector) {
            MapObjectInspector mapInspector = (MapObjectInspector) objectInspector;
            return hasType(mapInspector.getMapKeyObjectInspector(), types) ||
                    hasType(mapInspector.getMapValueObjectInspector(), types);
        }
        if (objectInspector instanceof StructObjectInspector) {
            for (StructField field : ((StructObjectInspector) objectInspector).getAllStructFieldRefs()) {
                if (hasType(field.getFieldObjectInspector(), types)) {
                    return true;
                }
            }
            return false;
        }
        throw new IllegalArgumentException("Unknown object inspector type " + objectInspector);
    }

    private static boolean withoutNullMapKeyTests(TestColumn testColumn)
    {
        String name = testColumn.getName();
        return !name.equals("t_map_null_key") &&
                !name.equals("t_map_null_key_complex_key_value") &&
                !name.equals("t_map_null_key_complex_value");
    }

    private FileFormatAssertion assertThatFileFormat(HiveStorageFormat hiveStorageFormat)
    {
        return new FileFormatAssertion(hiveStorageFormat.name())
                .withStorageFormat(hiveStorageFormat);
    }

    private static HiveClientConfig createRcFileHiveClientConfig(boolean rcfileOptimizedWriterEnabled)
    {
        HiveClientConfig config = new HiveClientConfig();
        config.setRcfileOptimizedWriterEnabled(rcfileOptimizedWriterEnabled);
        return config;
    }

    private static HiveCommonClientConfig createParquetHiveCommonClientConfig(boolean useParquetColumnNames)
    {
        HiveCommonClientConfig config = new HiveCommonClientConfig();
        config.setUseParquetColumnNames(useParquetColumnNames);
        return config;
    }

    private static HiveCommonClientConfig createOrcHiveCommonClientConfig(boolean orcOptimizedWriterEnabled, double orcWriterValidationPercentage)
    {
        HiveCommonClientConfig config = new HiveCommonClientConfig();
        config.setOrcOptimizedWriterEnabled(orcOptimizedWriterEnabled);
        config.setOrcWriterValidationPercentage(orcWriterValidationPercentage);
        return config;
    }

    private class FileFormatAssertion
    {
        private final String formatName;
        private HiveStorageFormat storageFormat;
        private HiveCompressionCodec compressionCodec = HiveCompressionCodec.NONE;
        private List<TestColumn> writeColumns;
        private List<TestColumn> readColumns;
        private ConnectorSession session = SESSION;
        private int rowsCount = 1000;
        private HiveFileWriterFactory fileWriterFactory;

        private FileFormatAssertion(String formatName)
        {
            this.formatName = requireNonNull(formatName, "formatName is null");
        }

        public FileFormatAssertion withStorageFormat(HiveStorageFormat storageFormat)
        {
            this.storageFormat = requireNonNull(storageFormat, "storageFormat is null");
            return this;
        }

        public FileFormatAssertion withCompressionCodec(HiveCompressionCodec compressionCodec)
        {
            this.compressionCodec = requireNonNull(compressionCodec, "compressionCodec is null");
            return this;
        }

        public FileFormatAssertion withFileWriterFactory(HiveFileWriterFactory fileWriterFactory)
        {
            this.fileWriterFactory = requireNonNull(fileWriterFactory, "fileWriterFactory is null");
            return this;
        }

        public FileFormatAssertion withColumns(List<TestColumn> inputColumns)
        {
            withWriteColumns(inputColumns);
            withReadColumns(inputColumns);
            return this;
        }

        public FileFormatAssertion withWriteColumns(List<TestColumn> writeColumns)
        {
            this.writeColumns = requireNonNull(writeColumns, "writeColumns is null");
            return this;
        }

        public FileFormatAssertion withReadColumns(List<TestColumn> readColumns)
        {
            this.readColumns = requireNonNull(readColumns, "readColumns is null");
            return this;
        }

        public FileFormatAssertion withRowsCount(int rowsCount)
        {
            this.rowsCount = rowsCount;
            return this;
        }

        public FileFormatAssertion withSession(ConnectorSession session)
        {
            this.session = requireNonNull(session, "session is null");
            return this;
        }

        public FileFormatAssertion isReadableByPageSource(HiveBatchPageSourceFactory pageSourceFactory)
                throws Exception
        {
            assertRead(Optional.of(pageSourceFactory), Optional.empty());
            return this;
        }

        public FileFormatAssertion isReadableByRecordCursor(HiveRecordCursorProvider cursorProvider)
                throws Exception
        {
            assertRead(Optional.empty(), Optional.of(cursorProvider));
            return this;
        }

        public FileFormatAssertion isFailingForPageSource(HiveBatchPageSourceFactory pageSourceFactory, ErrorCodeSupplier expectedErrorCode, String expectedMessage)
                throws Exception
        {
            assertFailure(Optional.of(pageSourceFactory), Optional.empty(), expectedErrorCode, expectedMessage);
            return this;
        }

        public FileFormatAssertion isFailingForRecordCursor(HiveRecordCursorProvider cursorProvider, ErrorCodeSupplier expectedErrorCode, String expectedMessage)
                throws Exception
        {
            assertFailure(Optional.empty(), Optional.of(cursorProvider), expectedErrorCode, expectedMessage);
            return this;
        }

        private void assertRead(Optional<HiveBatchPageSourceFactory> pageSourceFactory, Optional<HiveRecordCursorProvider> cursorProvider)
                throws Exception
        {
            assertNotNull(storageFormat, "storageFormat must be specified");
            assertNotNull(writeColumns, "writeColumns must be specified");
            assertNotNull(readColumns, "readColumns must be specified");
            assertNotNull(session, "session must be specified");
            assertTrue(rowsCount >= 0, "rowsCount must be greater than zero");

            String compressionSuffix = compressionCodec.getCodec()
                    .map(codec -> {
                        try {
                            return codec.getConstructor().newInstance().getDefaultExtension();
                        }
                        catch (Exception e) {
                            throw new RuntimeException(e);
                        }
                    })
                    .orElse("");

            File file = File.createTempFile("presto_test", formatName + compressionSuffix);
            file.delete();
            try {
                FileSplit split;
                if (fileWriterFactory != null) {
                    split = createTestFile(file.getAbsolutePath(), storageFormat, compressionCodec, writeColumns, session, rowsCount, fileWriterFactory);
                }
                else {
                    split = createTestFile(file.getAbsolutePath(), storageFormat, compressionCodec, writeColumns, rowsCount);
                }
                if (pageSourceFactory.isPresent()) {
                    testPageSourceFactory(pageSourceFactory.get(), split, storageFormat, readColumns, session, rowsCount);
                }
                if (cursorProvider.isPresent()) {
                    testCursorProvider(cursorProvider.get(), split, storageFormat, readColumns, session, rowsCount);
                }
            }
            finally {
                //noinspection ResultOfMethodCallIgnored
                file.delete();
            }
        }

        private void assertFailure(
                Optional<HiveBatchPageSourceFactory> pageSourceFactory,
                Optional<HiveRecordCursorProvider> cursorProvider,
                ErrorCodeSupplier expectedErrorCode,
                String expectedMessage)
                throws Exception
        {
            try {
                assertRead(pageSourceFactory, cursorProvider);
                fail("failure is expected");
            }
            catch (PrestoException prestoException) {
                assertEquals(prestoException.getErrorCode(), expectedErrorCode.toErrorCode());
                assertTrue(prestoException.getMessage().matches(expectedMessage));
            }
        }
    }
}