TestHiveSplitManager.java

/*
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.facebook.presto.hive;

import com.facebook.airlift.stats.CounterStat;
import com.facebook.presto.cache.CacheConfig;
import com.facebook.presto.common.Subfield;
import com.facebook.presto.common.predicate.Domain;
import com.facebook.presto.common.predicate.NullableValue;
import com.facebook.presto.common.predicate.Range;
import com.facebook.presto.common.predicate.SortedRangeSet;
import com.facebook.presto.common.predicate.TupleDomain;
import com.facebook.presto.common.predicate.TupleDomain.ColumnDomain;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.hive.authentication.NoHdfsAuthentication;
import com.facebook.presto.hive.datasink.OutputStreamDataSinkFactory;
import com.facebook.presto.hive.filesystem.ExtendedFileSystem;
import com.facebook.presto.hive.metastore.Column;
import com.facebook.presto.hive.metastore.MetastoreContext;
import com.facebook.presto.hive.metastore.Partition;
import com.facebook.presto.hive.metastore.PartitionStatistics;
import com.facebook.presto.hive.metastore.PartitionWithStatistics;
import com.facebook.presto.hive.metastore.Storage;
import com.facebook.presto.hive.metastore.StorageFormat;
import com.facebook.presto.hive.metastore.Table;
import com.facebook.presto.hive.metastore.UnimplementedHiveMetastore;
import com.facebook.presto.hive.statistics.QuickStatsProvider;
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ConnectorSplitSource;
import com.facebook.presto.spi.SchemaTableName;
import com.facebook.presto.spi.WarningCollector;
import com.facebook.presto.spi.connector.ConnectorSplitManager.SplitSchedulingContext;
import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
import com.facebook.presto.testing.TestingConnectorSession;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
import org.joda.time.DateTimeZone;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

import java.io.IOException;
import java.math.BigDecimal;
import java.time.LocalDate;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalDouble;
import java.util.OptionalLong;
import java.util.Set;

import static com.facebook.airlift.concurrent.Threads.daemonThreadsNamed;
import static com.facebook.presto.common.predicate.Range.range;
import static com.facebook.presto.common.type.BigintType.BIGINT;
import static com.facebook.presto.common.type.DateType.DATE;
import static com.facebook.presto.common.type.Decimals.encodeScaledValue;
import static com.facebook.presto.common.type.DoubleType.DOUBLE;
import static com.facebook.presto.common.type.IntegerType.INTEGER;
import static com.facebook.presto.common.type.RealType.REAL;
import static com.facebook.presto.common.type.SmallintType.SMALLINT;
import static com.facebook.presto.common.type.StandardTypes.VARCHAR;
import static com.facebook.presto.common.type.TinyintType.TINYINT;
import static com.facebook.presto.common.type.TypeSignature.parseTypeSignature;
import static com.facebook.presto.common.type.VarcharType.createUnboundedVarcharType;
import static com.facebook.presto.expressions.LogicalRowExpressions.TRUE_CONSTANT;
import static com.facebook.presto.hive.AbstractTestHiveClient.TEST_SERVER_VERSION;
import static com.facebook.presto.hive.BaseHiveColumnHandle.ColumnType.PARTITION_KEY;
import static com.facebook.presto.hive.BaseHiveColumnHandle.ColumnType.REGULAR;
import static com.facebook.presto.hive.EncryptionProperties.DWRF_ENCRYPTION_ALGORITHM_KEY;
import static com.facebook.presto.hive.EncryptionProperties.DWRF_ENCRYPTION_PROVIDER_KEY;
import static com.facebook.presto.hive.EncryptionProperties.ENCRYPT_COLUMNS_KEY;
import static com.facebook.presto.hive.HiveFileInfo.createHiveFileInfo;
import static com.facebook.presto.hive.HiveStorageFormat.DWRF;
import static com.facebook.presto.hive.HiveStorageFormat.ORC;
import static com.facebook.presto.hive.HiveTestUtils.DO_NOTHING_DIRECTORY_LISTER;
import static com.facebook.presto.hive.HiveTestUtils.FILTER_STATS_CALCULATOR_SERVICE;
import static com.facebook.presto.hive.HiveTestUtils.FUNCTION_AND_TYPE_MANAGER;
import static com.facebook.presto.hive.HiveTestUtils.FUNCTION_RESOLUTION;
import static com.facebook.presto.hive.HiveTestUtils.HDFS_ENVIRONMENT;
import static com.facebook.presto.hive.HiveTestUtils.ROW_EXPRESSION_SERVICE;
import static com.facebook.presto.hive.HiveTestUtils.getAllSessionProperties;
import static com.facebook.presto.hive.HiveType.HIVE_BYTE;
import static com.facebook.presto.hive.HiveType.HIVE_DATE;
import static com.facebook.presto.hive.HiveType.HIVE_DOUBLE;
import static com.facebook.presto.hive.HiveType.HIVE_FLOAT;
import static com.facebook.presto.hive.HiveType.HIVE_INT;
import static com.facebook.presto.hive.HiveType.HIVE_LONG;
import static com.facebook.presto.hive.HiveType.HIVE_SHORT;
import static com.facebook.presto.hive.HiveType.HIVE_STRING;
import static com.facebook.presto.hive.metastore.HiveColumnStatistics.createDateColumnStatistics;
import static com.facebook.presto.hive.metastore.HiveColumnStatistics.createDecimalColumnStatistics;
import static com.facebook.presto.hive.metastore.HiveColumnStatistics.createDoubleColumnStatistics;
import static com.facebook.presto.hive.metastore.HiveColumnStatistics.createIntegerColumnStatistics;
import static com.facebook.presto.hive.metastore.PrestoTableType.MANAGED_TABLE;
import static com.facebook.presto.hive.metastore.StorageFormat.VIEW_STORAGE_FORMAT;
import static com.facebook.presto.hive.metastore.StorageFormat.fromHiveStorageFormat;
import static com.facebook.presto.spi.connector.ConnectorSplitManager.SplitSchedulingStrategy.UNGROUPED_SCHEDULING;
import static com.facebook.presto.spi.connector.NotPartitionedPartitionHandle.NOT_PARTITIONED;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static io.airlift.slice.Slices.utf8Slice;
import static java.lang.Float.floatToIntBits;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.Executors.newFixedThreadPool;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;

public class TestHiveSplitManager
{
    private static final int MAX_PARTITION_KEY_COLUMN_INDEX = -13;
    private static final SplitSchedulingContext SPLIT_SCHEDULING_CONTEXT = new SplitSchedulingContext(UNGROUPED_SCHEDULING, false, WarningCollector.NOOP);
    private static final HiveType LONT_DECIMAL = HiveType.valueOf("decimal(38,10)");
    private static final HiveType SHORT_DECIMAL = HiveType.valueOf("decimal(10,0)");
    private static final List<Column> COLUMNS = ImmutableList.of(
            new Column("t_tinyint", HIVE_BYTE, Optional.empty(), Optional.empty()),
            new Column("t_smallint", HIVE_SHORT, Optional.empty(), Optional.empty()),
            new Column("t_int", HIVE_INT, Optional.empty(), Optional.empty()),
            new Column("t_bigint", HIVE_LONG, Optional.empty(), Optional.empty()),
            new Column("t_float", HIVE_FLOAT, Optional.empty(), Optional.empty()),
            new Column("t_double", HIVE_DOUBLE, Optional.empty(), Optional.empty()),
            new Column("t_short_decimal", SHORT_DECIMAL, Optional.empty(), Optional.empty()),
            new Column("t_long_decimal", LONT_DECIMAL, Optional.empty(), Optional.empty()),
            new Column("t_date", HIVE_DATE, Optional.empty(), Optional.empty()));
    private static final String PARTITION_VALUE = "2020-01-01";
    private static final String PARTITION_NAME = "ds=2020-01-01";
    private static final PartitionNameWithVersion PARTITION_NAME_WITH_VERSION = new PartitionNameWithVersion(PARTITION_NAME, Optional.empty());
    private static final Table TEST_TABLE = createTestTable(VIEW_STORAGE_FORMAT, ImmutableMap.of());

    private ListeningExecutorService executor;
    private static final String TEST_CATALOG_NAME = "catalogName";

    @BeforeClass
    public void setUp()
    {
        executor = MoreExecutors.listeningDecorator(newFixedThreadPool(10, daemonThreadsNamed("test-hive-split-manager-%s")));
    }

    @AfterClass(alwaysRun = true)
    public void shutdown()
    {
        executor.shutdownNow();
    }

    private static Table createTestTable(StorageFormat storageFormat, Map<String, String> parameters)
    {
        return new Table(Optional.of(TEST_CATALOG_NAME),
                "test_db",
                "test_table",
                "test_owner",
                MANAGED_TABLE,
                new Storage(storageFormat,
                        "",
                        Optional.empty(),
                        false,
                        ImmutableMap.of(),
                        ImmutableMap.of()),
                COLUMNS,
                ImmutableList.of(
                        new Column("ds", HIVE_STRING,
                                Optional.empty(),
                                Optional.empty())),
                parameters,
                Optional.empty(),
                Optional.empty());
    }

    @Test
    public void testPartitionStatsBasedOptimizationForInteger()
            throws Exception
    {
        testPartitionStatsBasedOptimizationForInteger("t_tinyint", TINYINT, HIVE_BYTE);
        testPartitionStatsBasedOptimizationForInteger("t_smallint", SMALLINT, HIVE_SHORT);
        testPartitionStatsBasedOptimizationForInteger("t_int", INTEGER, HIVE_INT);
        testPartitionStatsBasedOptimizationForInteger("t_bigint", BIGINT, HIVE_LONG);
    }

    private void testPartitionStatsBasedOptimizationForInteger(String columnName, Type type, HiveType hiveType)
            throws Exception
    {
        HiveColumnHandle columnHandle = new HiveColumnHandle(
                columnName,
                hiveType,
                type.getTypeSignature(),
                0,
                REGULAR,
                Optional.empty(),
                Optional.empty());
        Range partitionRange = range(type, 10L, true, 20L, true);

        // Test no partition stats
        assertRedundantColumnDomains(
                partitionRange,
                PartitionStatistics.empty(),
                ImmutableList.of(ImmutableSet.of()),
                columnHandle);

        // Test partition left unchanged
        assertRedundantColumnDomains(
                partitionRange,
                createIntegerPartitionStatistics(5, 25, columnName),
                ImmutableList.of(ImmutableSet.of()),
                columnHandle);

        // Test partition being pruned
        assertRedundantColumnDomains(
                partitionRange,
                createIntegerPartitionStatistics(1, 3, columnName),
                ImmutableList.of(),
                columnHandle);

        // Test partition having subfield domain stripped
        assertRedundantColumnDomains(
                partitionRange,
                createIntegerPartitionStatistics(13, 15, columnName),
                ImmutableList.of(ImmutableSet.of(columnHandle)),
                columnHandle);
    }

    private PartitionStatistics createIntegerPartitionStatistics(long min, long max, String columnName)
    {
        return PartitionStatistics.builder()
                .setColumnStatistics(ImmutableMap.of(
                        columnName, createIntegerColumnStatistics(OptionalLong.of(min), OptionalLong.of(max), OptionalLong.of(0), OptionalLong.of(max - min + 1))))
                .build();
    }

    @Test
    public void testPartitionStatsBasedOptimizationForReal()
            throws Exception
    {
        Type type = REAL;
        Range partitionRange = range(type, (long) floatToIntBits(10.0f), true, (long) floatToIntBits(20.0f), true);
        HiveColumnHandle columnHandle = new HiveColumnHandle(
                "t_real",
                HIVE_FLOAT,
                type.getTypeSignature(),
                0,
                REGULAR,
                Optional.empty(),
                Optional.empty());

        // Test no partition stats
        assertRedundantColumnDomains(
                partitionRange,
                PartitionStatistics.empty(),
                ImmutableList.of(ImmutableSet.of()),
                columnHandle);

        // Test partition left unchanged
        assertRedundantColumnDomains(
                partitionRange,
                createDoublePartitionStatistics(5.0, 25.0, columnHandle.getName()),
                ImmutableList.of(ImmutableSet.of()),
                columnHandle);

        // Test partition being pruned
        assertRedundantColumnDomains(
                partitionRange,
                createDoublePartitionStatistics(1.0, 3.0, columnHandle.getName()),
                ImmutableList.of(),
                columnHandle);

        // Test partition having subfield domain stripped
        assertRedundantColumnDomains(
                partitionRange,
                createDoublePartitionStatistics(13.0, 15.0, columnHandle.getName()),
                ImmutableList.of(ImmutableSet.of(columnHandle)),
                columnHandle);
    }

    @Test
    public void testPartitionStatsBasedOptimizationForDouble()
            throws Exception
    {
        Type type = DOUBLE;
        Range partitionRange = range(type, 10.0, true, 20.0, true);
        HiveColumnHandle columnHandle = new HiveColumnHandle(
                "t_double",
                HIVE_DOUBLE,
                type.getTypeSignature(),
                0,
                REGULAR,
                Optional.empty(),
                Optional.empty());

        // Test no partition stats
        assertRedundantColumnDomains(
                partitionRange,
                PartitionStatistics.empty(),
                ImmutableList.of(ImmutableSet.of()),
                columnHandle);

        // Test partition left unchanged
        assertRedundantColumnDomains(
                partitionRange,
                createDoublePartitionStatistics(5.0, 25.0, columnHandle.getName()),
                ImmutableList.of(ImmutableSet.of()),
                columnHandle);

        // Test partition being pruned
        assertRedundantColumnDomains(
                partitionRange,
                createDoublePartitionStatistics(1.0, 3.0, columnHandle.getName()),
                ImmutableList.of(),
                columnHandle);

        // Test partition having subfield domain stripped
        assertRedundantColumnDomains(
                partitionRange,
                createDoublePartitionStatistics(13.0, 15.0, columnHandle.getName()),
                ImmutableList.of(ImmutableSet.of(columnHandle)),
                columnHandle);
    }

    private PartitionStatistics createDoublePartitionStatistics(double min, double max, String columnName)
    {
        return PartitionStatistics.builder()
                .setColumnStatistics(ImmutableMap.of(
                        columnName, createDoubleColumnStatistics(OptionalDouble.of(min), OptionalDouble.of(max), OptionalLong.of(0), OptionalLong.empty())))
                .build();
    }

    @Test
    public void testPartitionStatsBasedOptimizationForDecimal()
            throws Exception
    {
        Type shortDecimal = HiveType.getPrimitiveType((PrimitiveTypeInfo) SHORT_DECIMAL.getTypeInfo());
        testPartitionStatsBasedOptimizationForDecimal(
                range(shortDecimal, 10L, true, 20L, true),
                new HiveColumnHandle(
                        "t_short_decimal",
                        SHORT_DECIMAL,
                        shortDecimal.getTypeSignature(),
                        0,
                        REGULAR,
                        Optional.empty(),
                        Optional.empty()));

        Type longDecimal = HiveType.getPrimitiveType((PrimitiveTypeInfo) LONT_DECIMAL.getTypeInfo());
        testPartitionStatsBasedOptimizationForDecimal(
                range(longDecimal, encodeScaledValue(BigDecimal.valueOf(10)), true, encodeScaledValue(BigDecimal.valueOf(20)), true),
                new HiveColumnHandle(
                        "t_long_decimal",
                        LONT_DECIMAL,
                        longDecimal.getTypeSignature(),
                        0,
                        REGULAR,
                        Optional.empty(),
                        Optional.empty()));
    }

    private void testPartitionStatsBasedOptimizationForDecimal(Range partitionRange, HiveColumnHandle columnHandle)
            throws Exception
    {
        // Test no partition stats
        assertRedundantColumnDomains(
                partitionRange,
                PartitionStatistics.empty(),
                ImmutableList.of(ImmutableSet.of()),
                columnHandle);

        // Test partition left unchanged
        assertRedundantColumnDomains(
                partitionRange,
                createDecimalPartitionStatistics(5, 25, columnHandle.getName()),
                ImmutableList.of(ImmutableSet.of()),
                columnHandle);

        // Test partition being pruned
        assertRedundantColumnDomains(
                partitionRange,
                createDecimalPartitionStatistics(1, 3, columnHandle.getName()),
                ImmutableList.of(),
                columnHandle);

        // Test partition having subfield domain stripped
        assertRedundantColumnDomains(
                partitionRange,
                createDecimalPartitionStatistics(13, 15, columnHandle.getName()),
                ImmutableList.of(ImmutableSet.of(columnHandle)),
                columnHandle);
    }

    private PartitionStatistics createDecimalPartitionStatistics(long min, long max, String columnName)
    {
        return PartitionStatistics.builder()
                .setColumnStatistics(ImmutableMap.of(
                        columnName, createDecimalColumnStatistics(Optional.of(BigDecimal.valueOf(min)), Optional.of(BigDecimal.valueOf(max)), OptionalLong.empty(), OptionalLong.empty())))
                .build();
    }

    @Test
    public void testPartitionStatsBasedOptimizationForDate()
            throws Exception
    {
        Type type = DATE;
        Range partitionRange = range(type, 10L, true, 20L, true);
        HiveColumnHandle columnHandle = new HiveColumnHandle(
                "t_date",
                HIVE_DATE,
                type.getTypeSignature(),
                0,
                REGULAR,
                Optional.empty(),
                Optional.empty());

        // Test no partition stats
        assertRedundantColumnDomains(
                partitionRange,
                PartitionStatistics.empty(),
                ImmutableList.of(ImmutableSet.of()),
                columnHandle);

        // Test partition left unchanged
        assertRedundantColumnDomains(
                partitionRange,
                createDatePartitionStatistics(5, 25, columnHandle.getName()),
                ImmutableList.of(ImmutableSet.of()),
                columnHandle);

        // Test partition being pruned
        assertRedundantColumnDomains(
                partitionRange,
                createDatePartitionStatistics(1, 3, columnHandle.getName()),
                ImmutableList.of(),
                columnHandle);

        // Test partition having subfield domain stripped
        assertRedundantColumnDomains(
                partitionRange,
                createDatePartitionStatistics(13, 15, columnHandle.getName()),
                ImmutableList.of(ImmutableSet.of(columnHandle)),
                columnHandle);
    }

    private PartitionStatistics createDatePartitionStatistics(long min, long max, String columnName)
    {
        return PartitionStatistics.builder()
                .setColumnStatistics(ImmutableMap.of(
                        columnName, createDateColumnStatistics(Optional.of(LocalDate.ofEpochDay(min)), Optional.of(LocalDate.ofEpochDay(max)), OptionalLong.empty(), OptionalLong.empty())))
                .build();
    }

    private void assertRedundantColumnDomains(Range predicateRange, PartitionStatistics partitionStatistics, List<Set<ColumnHandle>> expectedRedundantColumnDomains, HiveColumnHandle columnHandle)
            throws Exception
    {
        // Prepare query predicate tuple domain
        TupleDomain<ColumnHandle> queryTupleDomain = TupleDomain.fromColumnDomains(
                Optional.of(ImmutableList.of(new ColumnDomain<>(
                        columnHandle,
                        Domain.create(
                                SortedRangeSet.copyOf(predicateRange.getType(), ImmutableList.of(predicateRange)),
                                false)))));

        // Prepare partition with stats
        PartitionWithStatistics partitionWithStatistics = new PartitionWithStatistics(
                new Partition("test_db",
                        "test_table",
                        ImmutableList.of(PARTITION_VALUE),
                        new Storage(
                                fromHiveStorageFormat(ORC),
                                "location",
                                Optional.empty(),
                                true,
                                ImmutableMap.of(),
                                ImmutableMap.of()),
                        COLUMNS,
                        ImmutableMap.of(),
                        Optional.empty(),
                        false,
                        true,
                        0,
                        0,
                        Optional.empty()),
                PARTITION_NAME,
                partitionStatistics);

        HiveClientConfig hiveClientConfig = new HiveClientConfig().setPartitionStatisticsBasedOptimizationEnabled(true);
        HdfsEnvironment hdfsEnvironment = new HdfsEnvironment(
                new HiveHdfsConfiguration(new HdfsConfigurationInitializer(hiveClientConfig, new MetastoreClientConfig()), ImmutableSet.of(), hiveClientConfig),
                new MetastoreClientConfig(),
                new NoHdfsAuthentication());
        TestingExtendedHiveMetastore metastore = new TestingExtendedHiveMetastore(TEST_TABLE, partitionWithStatistics);
        HiveMetadataFactory metadataFactory = new HiveMetadataFactory(
                metastore,
                hdfsEnvironment,
                new HivePartitionManager(FUNCTION_AND_TYPE_MANAGER, hiveClientConfig),
                DateTimeZone.forOffsetHours(1),
                true,
                false,
                false,
                true,
                true,
                hiveClientConfig.getMaxPartitionBatchSize(),
                hiveClientConfig.getMaxPartitionsPerScan(),
                false,
                10_000,
                FUNCTION_AND_TYPE_MANAGER,
                new HiveLocationService(hdfsEnvironment),
                FUNCTION_RESOLUTION,
                ROW_EXPRESSION_SERVICE,
                FILTER_STATS_CALCULATOR_SERVICE,
                new TableParameterCodec(),
                HiveTestUtils.PARTITION_UPDATE_CODEC,
                HiveTestUtils.PARTITION_UPDATE_SMILE_CODEC,
                executor,
                new HiveTypeTranslator(),
                new HiveStagingFileCommitter(hdfsEnvironment, executor),
                new HiveZeroRowFileCreator(hdfsEnvironment, new OutputStreamDataSinkFactory(), executor),
                TEST_SERVER_VERSION,
                new HivePartitionObjectBuilder(),
                new HiveEncryptionInformationProvider(ImmutableList.of()),
                new HivePartitionStats(),
                new HiveFileRenamer(),
                HiveColumnConverterProvider.DEFAULT_COLUMN_CONVERTER_PROVIDER,
                new QuickStatsProvider(metastore, HDFS_ENVIRONMENT, DO_NOTHING_DIRECTORY_LISTER, new HiveClientConfig(), new NamenodeStats(), ImmutableList.of()),
                new HiveTableWritabilityChecker(false));

        HiveSplitManager splitManager = new HiveSplitManager(
                new TestingHiveTransactionManager(metadataFactory),
                new NamenodeStats(),
                hdfsEnvironment,
                new TestingDirectoryLister(),
                directExecutor(),
                new HiveCoercionPolicy(FUNCTION_AND_TYPE_MANAGER),
                new CounterStat(),
                100,
                hiveClientConfig.getMaxOutstandingSplitsSize(),
                hiveClientConfig.getMinPartitionBatchSize(),
                hiveClientConfig.getMaxPartitionBatchSize(),
                hiveClientConfig.getSplitLoaderConcurrency(),
                false,
                new ConfigBasedCacheQuotaRequirementProvider(new CacheConfig()),
                new HiveEncryptionInformationProvider(ImmutableList.of()),
                new HivePartitionSkippabilityChecker());

        HiveColumnHandle partitionColumn = new HiveColumnHandle(
                "ds",
                HIVE_STRING,
                parseTypeSignature(VARCHAR),
                MAX_PARTITION_KEY_COLUMN_INDEX,
                PARTITION_KEY,
                Optional.empty(),
                Optional.empty());
        List<HivePartition> partitions = ImmutableList.of(
                new HivePartition(
                        new SchemaTableName("test_schema", "test_table"),
                        PARTITION_NAME_WITH_VERSION,
                        ImmutableMap.of(partitionColumn, NullableValue.of(createUnboundedVarcharType(), utf8Slice(PARTITION_VALUE)))));
        TupleDomain<Subfield> domainPredicate = queryTupleDomain
                .transform(HiveColumnHandle.class::cast)
                .transform(column -> new Subfield(column.getName(), ImmutableList.of()));

        SchemaTableName schemaTableName = new SchemaTableName("test_schema", "test_table");
        HiveTableHandle hiveTableHandle = new HiveTableHandle(schemaTableName.getSchemaName(), schemaTableName.getTableName());
        HiveTableLayoutHandle layoutHandle = new HiveTableLayoutHandle.Builder()
                .setSchemaTableName(schemaTableName)
                .setTablePath("test_path")
                .setPartitionColumns(ImmutableList.of(partitionColumn))
                .setDataColumns(COLUMNS)
                .setTableParameters(ImmutableMap.of())
                .setDomainPredicate(domainPredicate)
                .setRemainingPredicate(TRUE_CONSTANT)
                .setPredicateColumns(ImmutableMap.of(partitionColumn.getName(), partitionColumn, columnHandle.getName(), columnHandle))
                .setPartitionColumnPredicate(queryTupleDomain)
                .setPartitions(partitions)
                .setBucketHandle(Optional.empty())
                .setBucketFilter(Optional.empty())
                .setPushdownFilterEnabled(false)
                .setLayoutString("layout")
                .setRequestedColumns(Optional.empty())
                .setPartialAggregationsPushedDown(false)
                .setAppendRowNumberEnabled(false)
                .setHiveTableHandle(hiveTableHandle)
                .build();

        ConnectorSplitSource splitSource = splitManager.getSplits(
                new HiveTransactionHandle(),
                new TestingConnectorSession(getAllSessionProperties(hiveClientConfig, new HiveCommonClientConfig())),
                layoutHandle,
                SPLIT_SCHEDULING_CONTEXT);
        List<Set<ColumnHandle>> actualRedundantColumnDomains = splitSource.getNextBatch(NOT_PARTITIONED, 100).get().getSplits().stream()
                .map(HiveSplit.class::cast)
                .map(HiveSplit::getRedundantColumnDomains)
                .collect(toImmutableList());
        assertEquals(actualRedundantColumnDomains, expectedRedundantColumnDomains);
    }

    @Test
    public void testEncryptionInformation()
            throws Exception
    {
        StorageFormat storageFormat = fromHiveStorageFormat(DWRF);
        String testEncryptionAlgorithm = "test_encryption_algo";
        String testEncryptionProvider = "test_provider";
        Table testTable = createTestTable(storageFormat,
                ImmutableMap.of(
                        ENCRYPT_COLUMNS_KEY, "foo1:col_bigint,col_struct.b.b1;foo2:col_map;foo3:col_struct.a",
                        DWRF_ENCRYPTION_ALGORITHM_KEY, testEncryptionAlgorithm,
                        DWRF_ENCRYPTION_PROVIDER_KEY, testEncryptionProvider));
        PartitionWithStatistics partitionWithStatistics = new PartitionWithStatistics(
                new Partition(
                        "test_db",
                        "test_table",
                        ImmutableList.of(PARTITION_VALUE),
                        new Storage(
                                storageFormat,
                                "location",
                                Optional.empty(),
                                true,
                                ImmutableMap.of(),
                                ImmutableMap.of()),
                        COLUMNS,
                        ImmutableMap.of(),
                        Optional.empty(),
                        false,
                        true,
                        0,
                        0,
                        Optional.empty()),
                PARTITION_NAME,
                PartitionStatistics.empty());

        HiveClientConfig hiveClientConfig = new HiveClientConfig().setPartitionStatisticsBasedOptimizationEnabled(true);
        HdfsEnvironment hdfsEnvironment = new HdfsEnvironment(
                new HiveHdfsConfiguration(new HdfsConfigurationInitializer(hiveClientConfig, new MetastoreClientConfig()), ImmutableSet.of(), hiveClientConfig),
                new MetastoreClientConfig(),
                new NoHdfsAuthentication());
        HiveEncryptionInformationProvider encryptionInformationProvider = new HiveEncryptionInformationProvider(ImmutableList.of(new TestDwrfEncryptionInformationSource()));

        TestingExtendedHiveMetastore metastore = new TestingExtendedHiveMetastore(testTable, partitionWithStatistics);
        HiveMetadataFactory metadataFactory = new HiveMetadataFactory(
                metastore,
                hdfsEnvironment,
                new HivePartitionManager(FUNCTION_AND_TYPE_MANAGER, hiveClientConfig),
                DateTimeZone.forOffsetHours(1),
                true,
                false,
                false,
                true,
                true,
                hiveClientConfig.getMaxPartitionBatchSize(),
                hiveClientConfig.getMaxPartitionsPerScan(),
                false,
                10_000,
                FUNCTION_AND_TYPE_MANAGER,
                new HiveLocationService(hdfsEnvironment),
                FUNCTION_RESOLUTION,
                ROW_EXPRESSION_SERVICE,
                FILTER_STATS_CALCULATOR_SERVICE,
                new TableParameterCodec(),
                HiveTestUtils.PARTITION_UPDATE_CODEC,
                HiveTestUtils.PARTITION_UPDATE_SMILE_CODEC,
                executor,
                new HiveTypeTranslator(),
                new HiveStagingFileCommitter(hdfsEnvironment, executor),
                new HiveZeroRowFileCreator(hdfsEnvironment, new OutputStreamDataSinkFactory(), executor),
                TEST_SERVER_VERSION,
                new HivePartitionObjectBuilder(),
                encryptionInformationProvider,
                new HivePartitionStats(),
                new HiveFileRenamer(),
                HiveColumnConverterProvider.DEFAULT_COLUMN_CONVERTER_PROVIDER,
                new QuickStatsProvider(metastore, HDFS_ENVIRONMENT, DO_NOTHING_DIRECTORY_LISTER, new HiveClientConfig(), new NamenodeStats(), ImmutableList.of()),
                new HiveTableWritabilityChecker(false));

        HiveSplitManager splitManager = new HiveSplitManager(
                new TestingHiveTransactionManager(metadataFactory),
                new NamenodeStats(),
                hdfsEnvironment,
                new TestingDirectoryLister(),
                directExecutor(),
                new HiveCoercionPolicy(FUNCTION_AND_TYPE_MANAGER),
                new CounterStat(),
                100,
                hiveClientConfig.getMaxOutstandingSplitsSize(),
                hiveClientConfig.getMinPartitionBatchSize(),
                hiveClientConfig.getMaxPartitionBatchSize(),
                hiveClientConfig.getSplitLoaderConcurrency(),
                false,
                new ConfigBasedCacheQuotaRequirementProvider(new CacheConfig()),
                encryptionInformationProvider,
                new HivePartitionSkippabilityChecker());

        HiveColumnHandle partitionColumn = new HiveColumnHandle(
                "ds",
                HIVE_STRING,
                parseTypeSignature(VARCHAR),
                MAX_PARTITION_KEY_COLUMN_INDEX,
                PARTITION_KEY,
                Optional.empty(),
                Optional.empty());
        List<HivePartition> partitions = ImmutableList.of(
                new HivePartition(
                        new SchemaTableName("test_schema", "test_table"),
                        PARTITION_NAME_WITH_VERSION,
                        ImmutableMap.of(partitionColumn, NullableValue.of(createUnboundedVarcharType(), utf8Slice(PARTITION_VALUE)))));

        SchemaTableName schemaTableName = new SchemaTableName("test_schema", "test_table");
        HiveTableHandle hiveTableHandle = new HiveTableHandle(schemaTableName.getSchemaName(), schemaTableName.getTableName());
        HiveTableLayoutHandle layoutHandle = new HiveTableLayoutHandle.Builder()
                .setSchemaTableName(schemaTableName)
                .setTablePath("test_path")
                .setPartitionColumns(ImmutableList.of(partitionColumn))
                .setDataColumns(COLUMNS)
                .setTableParameters(ImmutableMap.of())
                .setDomainPredicate(TupleDomain.all())
                .setRemainingPredicate(TRUE_CONSTANT)
                .setPredicateColumns(ImmutableMap.of())
                .setPartitionColumnPredicate(TupleDomain.all())
                .setPartitions(partitions)
                .setBucketHandle(Optional.empty())
                .setBucketFilter(Optional.empty())
                .setPushdownFilterEnabled(false)
                .setLayoutString("layout")
                .setRequestedColumns(Optional.empty())
                .setPartialAggregationsPushedDown(false)
                .setAppendRowNumberEnabled(false)
                .setHiveTableHandle(hiveTableHandle)
                .build();

        ConnectorSplitSource splitSource = splitManager.getSplits(
                new HiveTransactionHandle(),
                new TestingConnectorSession(getAllSessionProperties(hiveClientConfig, new HiveCommonClientConfig())),
                layoutHandle,
                SPLIT_SCHEDULING_CONTEXT);
        Optional<EncryptionInformation> encryptionInformation = splitSource.getNextBatch(NOT_PARTITIONED, 100).get().getSplits()
                .stream()
                .map(HiveSplit.class::cast)
                .map(HiveSplit::getEncryptionInformation)
                .findFirst()
                .get();
        assertTrue(encryptionInformation.isPresent());
        Optional<DwrfEncryptionMetadata> dwrfEncryptionMetadata = encryptionInformation.get().getDwrfEncryptionMetadata();
        assertTrue(dwrfEncryptionMetadata.isPresent());
        assertEquals(dwrfEncryptionMetadata.get().getEncryptionAlgorithm(), testEncryptionAlgorithm);
        assertEquals(dwrfEncryptionMetadata.get().getEncryptionProvider(), testEncryptionProvider);
        assertEquals(dwrfEncryptionMetadata.get().getFieldToKeyData().size(), 4);
    }

    private static class TestingHiveTransactionManager
            extends HiveTransactionManager
    {
        private final HiveMetadataFactory metadataFactory;

        public TestingHiveTransactionManager(HiveMetadataFactory metadataFactory)
        {
            this.metadataFactory = requireNonNull(metadataFactory, "metadataFactory is null");
        }

        @Override
        public TransactionalMetadata get(ConnectorTransactionHandle transactionHandle)
        {
            return metadataFactory.get();
        }
    }

    private static class TestingExtendedHiveMetastore
            extends UnimplementedHiveMetastore
    {
        private final Table table;
        private final PartitionWithStatistics partitionWithStatistics;

        public TestingExtendedHiveMetastore(Table table, PartitionWithStatistics partitionWithStatistics)
        {
            this.table = requireNonNull(table, "table is null");
            this.partitionWithStatistics = requireNonNull(partitionWithStatistics, "partitionWithStatistics is null");
        }

        @Override
        public Optional<Table> getTable(MetastoreContext metastoreContext, String databaseName, String tableName)
        {
            return Optional.of(table);
        }

        @Override
        public Map<String, Optional<Partition>> getPartitionsByNames(MetastoreContext metastoreContext, String databaseName, String tableName, List<PartitionNameWithVersion> partitionNames)
        {
            return ImmutableMap.of(partitionWithStatistics.getPartitionName(), Optional.of(partitionWithStatistics.getPartition()));
        }

        @Override
        public Map<String, PartitionStatistics> getPartitionStatistics(MetastoreContext metastoreContext, String databaseName, String tableName, Set<String> partitionNames)
        {
            return ImmutableMap.of(partitionWithStatistics.getPartitionName(), partitionWithStatistics.getStatistics());
        }
    }

    private static class TestingDirectoryLister
            implements DirectoryLister
    {
        @Override
        public Iterator<HiveFileInfo> list(ExtendedFileSystem fileSystem, Table table, Path path, Optional<Partition> partition, NamenodeStats namenodeStats, HiveDirectoryContext hiveDirectoryContext)
        {
            try {
                return ImmutableList.of(
                        createHiveFileInfo(
                                new LocatedFileStatus(
                                        new FileStatus(0, false, 1, 0, 0, new Path(path.toString() + "/" + "test_file_name")),
                                        new BlockLocation[] {}),
                                Optional.empty()))
                        .iterator();
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
    }
}