TestHivePartitionManager.java

/*
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package com.facebook.presto.hive;

import com.facebook.presto.cache.CacheConfig;
import com.facebook.presto.common.predicate.Domain;
import com.facebook.presto.common.predicate.NullableValue;
import com.facebook.presto.common.predicate.TupleDomain;
import com.facebook.presto.common.type.StandardTypes;
import com.facebook.presto.common.type.TestingTypeManager;
import com.facebook.presto.hive.metastore.Column;
import com.facebook.presto.hive.metastore.PrestoTableType;
import com.facebook.presto.hive.metastore.Storage;
import com.facebook.presto.hive.metastore.Table;
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.Constraint;
import com.facebook.presto.testing.TestingConnectorSession;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Predicate;

import static com.facebook.presto.common.type.IntegerType.INTEGER;
import static com.facebook.presto.common.type.TypeSignature.parseTypeSignature;
import static com.facebook.presto.common.type.VarcharType.VARCHAR;
import static com.facebook.presto.hive.BaseHiveColumnHandle.ColumnType.PARTITION_KEY;
import static com.facebook.presto.hive.BaseHiveColumnHandle.ColumnType.REGULAR;
import static com.facebook.presto.hive.BucketFunctionType.HIVE_COMPATIBLE;
import static com.facebook.presto.hive.HiveColumnHandle.MAX_PARTITION_KEY_COLUMN_INDEX;
import static com.facebook.presto.hive.HiveColumnHandle.bucketColumnHandle;
import static com.facebook.presto.hive.HiveMetadata.convertToPredicate;
import static com.facebook.presto.hive.HiveStorageFormat.ORC;
import static com.facebook.presto.hive.HiveType.HIVE_INT;
import static com.facebook.presto.hive.HiveType.HIVE_STRING;
import static com.facebook.presto.hive.metastore.StorageFormat.fromHiveStorageFormat;
import static io.airlift.slice.Slices.utf8Slice;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;

public class TestHivePartitionManager
{
    private static final String SCHEMA_NAME = "schema";
    private static final String TABLE_NAME = "table";
    private static final String TABLE_NAME_LARGE_PARTITIONS = "table_large_partitions";
    private static final String USER_NAME = "user";
    private static final String LOCATION = "somewhere/over/the/rainbow";
    private static final String LOCATION_LARGE_PARTITIONS = "large/partitions/over/the/rainbow";
    private static final Column PARTITION_COLUMN = new Column("ds", HIVE_STRING, Optional.empty(), Optional.empty());
    private static final Column PARTITION_COLUMN_TS = new Column("ts", HIVE_STRING, Optional.empty(), Optional.empty());
    private static final Column BUCKET_COLUMN = new Column("c1", HIVE_INT, Optional.empty(), Optional.empty());
    private static final Table TABLE = new Table(
            Optional.of("catalogName"),
            SCHEMA_NAME,
            TABLE_NAME,
            USER_NAME,
            PrestoTableType.MANAGED_TABLE,
            new Storage(fromHiveStorageFormat(ORC),
                    LOCATION,
                    Optional.of(new HiveBucketProperty(
                            ImmutableList.of(BUCKET_COLUMN.getName()),
                            100,
                            ImmutableList.of(),
                            HIVE_COMPATIBLE,
                            Optional.empty())),
                    false,
                    ImmutableMap.of(),
                    ImmutableMap.of()),
            ImmutableList.of(BUCKET_COLUMN),
            ImmutableList.of(PARTITION_COLUMN),
            ImmutableMap.of(),
            Optional.empty(),
            Optional.empty());
    private static final Table TABLE_LARGE_PARTITIONS = new Table(
            Optional.of("catalogName"),
            SCHEMA_NAME,
            TABLE_NAME_LARGE_PARTITIONS,
            USER_NAME,
            PrestoTableType.MANAGED_TABLE,
            new Storage(fromHiveStorageFormat(ORC),
                    LOCATION_LARGE_PARTITIONS,
                    Optional.of(new HiveBucketProperty(
                            ImmutableList.of(BUCKET_COLUMN.getName()),
                            100,
                            ImmutableList.of(),
                            HIVE_COMPATIBLE,
                            Optional.empty())),
                    false,
                    ImmutableMap.of(),
                    ImmutableMap.of()),
            ImmutableList.of(BUCKET_COLUMN),
            ImmutableList.of(PARTITION_COLUMN, PARTITION_COLUMN_TS),
            ImmutableMap.of(),
            Optional.empty(),
            Optional.empty());

    private static final List<String> PARTITIONS = ImmutableList.of("ds=2019-07-23", "ds=2019-08-23");
    private static final List<String> PARTITIONS_LARGE_PARTITIONS = ImmutableList.of("ds=2019-07-23/ts=2019-07-23:01:00:00",
            "ds=2019-07-23/ts=2019-07-23:10:00:00", "ds=2019-08-23/ts=2019-07-23:01:00:00", "ds=2019-08-23/ts=2019-08-23:05:00:00");

    private HivePartitionManager hivePartitionManager = new HivePartitionManager(new TestingTypeManager(), new HiveClientConfig());
    private final TestingSemiTransactionalHiveMetastore metastore = TestingSemiTransactionalHiveMetastore.create();

    @BeforeClass
    public void setUp()
    {
        metastore.addTable(SCHEMA_NAME, TABLE_NAME, TABLE, PARTITIONS);
        metastore.addTable(SCHEMA_NAME, TABLE_NAME_LARGE_PARTITIONS, TABLE_LARGE_PARTITIONS, PARTITIONS_LARGE_PARTITIONS);
    }

    @Test
    public void testUsesBucketingIfSmallEnough()
    {
        HiveTableHandle tableHandle = new HiveTableHandle(SCHEMA_NAME, TABLE_NAME);
        HivePartitionResult result = hivePartitionManager.getPartitions(
                metastore,
                tableHandle,
                Constraint.alwaysTrue(),
                new TestingConnectorSession(
                        new HiveSessionProperties(
                                new HiveClientConfig(),
                                new OrcFileWriterConfig(),
                                new ParquetFileWriterConfig(),
                                new CacheConfig()).getSessionProperties()));
        assertTrue(result.getBucketHandle().isPresent(), "bucketHandle is not present");
        assertFalse(result.getBucketFilter().isPresent(), "bucketFilter is present");
    }

    @Test
    public void testIgnoresBucketingWhenTooManyBuckets()
    {
        ConnectorSession session = new TestingConnectorSession(
                new HiveSessionProperties(
                        new HiveClientConfig().setMaxBucketsForGroupedExecution(100),
                        new OrcFileWriterConfig(),
                        new ParquetFileWriterConfig(),
                        new CacheConfig())
                        .getSessionProperties());
        HivePartitionResult result = hivePartitionManager.getPartitions(metastore, new HiveTableHandle(SCHEMA_NAME, TABLE_NAME), Constraint.alwaysTrue(), session);
        assertFalse(result.getBucketHandle().isPresent(), "bucketHandle is present");
        assertFalse(result.getBucketFilter().isPresent(), "bucketFilter is present");
    }

    @Test
    public void testUsesBucketingWithPartitionFilters()
    {
        ConnectorSession session = new TestingConnectorSession(
                new HiveSessionProperties(
                        new HiveClientConfig().setMaxBucketsForGroupedExecution(100),
                        new OrcFileWriterConfig(),
                        new ParquetFileWriterConfig(),
                        new CacheConfig()).getSessionProperties());
        HiveTableHandle tableHandle = new HiveTableHandle(SCHEMA_NAME, TABLE_NAME);
        HivePartitionResult result = hivePartitionManager.getPartitions(
                metastore,
                tableHandle,
                new Constraint<>(TupleDomain.withColumnDomains(
                        ImmutableMap.of(
                                new HiveColumnHandle(
                                        PARTITION_COLUMN.getName(),
                                        PARTITION_COLUMN.getType(),
                                        parseTypeSignature(StandardTypes.VARCHAR),
                                        MAX_PARTITION_KEY_COLUMN_INDEX,
                                        PARTITION_KEY,
                                        Optional.empty(),
                                        Optional.empty()),
                                Domain.singleValue(VARCHAR, utf8Slice("2019-07-23"))))),
                session);
        assertTrue(result.getBucketHandle().isPresent(), "bucketHandle is not present");
        assertFalse(result.getBucketFilter().isPresent(), "bucketFilter is present");
    }

    @Test
    public void testUsesBucketingWithBucketFilters()
    {
        ConnectorSession session = new TestingConnectorSession(
                new HiveSessionProperties(
                        new HiveClientConfig().setMaxBucketsForGroupedExecution(100),
                        new OrcFileWriterConfig(),
                        new ParquetFileWriterConfig(),
                        new CacheConfig()).getSessionProperties());
        HiveTableHandle tableHandle = new HiveTableHandle(SCHEMA_NAME, TABLE_NAME);
        HivePartitionResult result = hivePartitionManager.getPartitions(
                metastore,
                tableHandle,
                new Constraint<>(TupleDomain.withColumnDomains(
                        ImmutableMap.of(
                                new HiveColumnHandle(
                                        BUCKET_COLUMN.getName(),
                                        BUCKET_COLUMN.getType(),
                                        parseTypeSignature(StandardTypes.VARCHAR),
                                        0,
                                        REGULAR,
                                        Optional.empty(),
                                        Optional.empty()),
                                Domain.singleValue(INTEGER, 1L)))),
                session);
        assertTrue(result.getBucketHandle().isPresent(), "bucketHandle is not present");
        assertTrue(result.getBucketFilter().isPresent(), "bucketFilter is present");
    }

    @Test
    public void testUsesBucketingWithBucketColumn()
    {
        ConnectorSession session = new TestingConnectorSession(
                new HiveSessionProperties(
                        new HiveClientConfig().setMaxBucketsForGroupedExecution(1),
                        new OrcFileWriterConfig(),
                        new ParquetFileWriterConfig(),
                        new CacheConfig()).getSessionProperties());
        HiveTableHandle tableHandle = new HiveTableHandle(SCHEMA_NAME, TABLE_NAME);
        HivePartitionResult result = hivePartitionManager.getPartitions(
                metastore,
                tableHandle,
                new Constraint<>(TupleDomain.withColumnDomains(
                        ImmutableMap.of(
                                bucketColumnHandle(),
                                Domain.singleValue(INTEGER, 1L)))),
                session);
        assertTrue(result.getBucketHandle().isPresent(), "bucketHandle is not present");
        assertTrue(result.getBucketFilter().isPresent(), "bucketFilter is present");
    }

    @Test
    public void testIgnoresBucketingWhenConfigured()
    {
        ConnectorSession session = new TestingConnectorSession(
                new HiveSessionProperties(
                        new HiveClientConfig().setIgnoreTableBucketing(true),
                        new OrcFileWriterConfig(),
                        new ParquetFileWriterConfig(),
                        new CacheConfig())
                        .getSessionProperties());
        HivePartitionResult result = hivePartitionManager.getPartitions(metastore, new HiveTableHandle(SCHEMA_NAME, TABLE_NAME), Constraint.alwaysTrue(), session);
        assertFalse(result.getBucketHandle().isPresent(), "bucketHandle is present");
        assertFalse(result.getBucketFilter().isPresent(), "bucketFilter is present");
    }

    @Test
    public void testMultiplePartitions()
    {
        ConnectorSession session = new TestingConnectorSession(
                new HiveSessionProperties(
                        new HiveClientConfig().setIgnoreTableBucketing(true).setOptimizeParsingOfPartitionValues(true).setOptimizeParsingOfPartitionValuesThreshold(2),
                        new OrcFileWriterConfig(),
                        new ParquetFileWriterConfig(),
                        new CacheConfig())
                        .getSessionProperties());
        ColumnHandle columnHandle = new HiveColumnHandle(
                PARTITION_COLUMN.getName(),
                PARTITION_COLUMN.getType(),
                parseTypeSignature(StandardTypes.VARCHAR),
                MAX_PARTITION_KEY_COLUMN_INDEX,
                PARTITION_KEY,
                Optional.empty(),
                Optional.empty());
        TupleDomain<ColumnHandle> tupleDomain =
                TupleDomain.withColumnDomains(
                        ImmutableMap.of(
                                columnHandle,
                                Domain.singleValue(VARCHAR, utf8Slice("2019-07-23"))));
        Predicate<Map<ColumnHandle, NullableValue>> predicate = convertToPredicate(tupleDomain);
        List<ColumnHandle> predicateInput = ImmutableList.of(columnHandle);
        Constraint<ColumnHandle> constraint = new Constraint<>(TupleDomain.all(), Optional.of(predicate), Optional.of(predicateInput));
        HivePartitionResult result = hivePartitionManager.getPartitions(metastore, new HiveTableHandle(SCHEMA_NAME, TABLE_NAME_LARGE_PARTITIONS), constraint, session);
        assertEquals(result.getPartitions().size(), 2);
    }
}