HiveTestUtils.java

/*
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.facebook.presto.hive;

import com.facebook.airlift.json.JsonCodec;
import com.facebook.airlift.json.smile.SmileCodec;
import com.facebook.airlift.log.Logger;
import com.facebook.presto.PagesIndexPageSorter;
import com.facebook.presto.cache.CacheConfig;
import com.facebook.presto.common.block.BlockEncodingManager;
import com.facebook.presto.common.type.ArrayType;
import com.facebook.presto.common.type.MapType;
import com.facebook.presto.common.type.NamedTypeSignature;
import com.facebook.presto.common.type.RowType;
import com.facebook.presto.common.type.StandardTypes;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.common.type.TypeSignatureParameter;
import com.facebook.presto.cost.ConnectorFilterStatsCalculatorService;
import com.facebook.presto.cost.FilterStatsCalculator;
import com.facebook.presto.cost.ScalarStatsCalculator;
import com.facebook.presto.cost.StatsNormalizer;
import com.facebook.presto.hive.authentication.NoHdfsAuthentication;
import com.facebook.presto.hive.datasink.OutputStreamDataSinkFactory;
import com.facebook.presto.hive.gcs.HiveGcsConfig;
import com.facebook.presto.hive.gcs.HiveGcsConfigurationInitializer;
import com.facebook.presto.hive.orc.DwrfAggregatedPageSourceFactory;
import com.facebook.presto.hive.orc.DwrfBatchPageSourceFactory;
import com.facebook.presto.hive.orc.DwrfSelectivePageSourceFactory;
import com.facebook.presto.hive.orc.OrcAggregatedPageSourceFactory;
import com.facebook.presto.hive.orc.OrcBatchPageSourceFactory;
import com.facebook.presto.hive.orc.OrcSelectivePageSourceFactory;
import com.facebook.presto.hive.orc.TupleDomainFilterCache;
import com.facebook.presto.hive.pagefile.PageFilePageSourceFactory;
import com.facebook.presto.hive.pagefile.PageFileWriterFactory;
import com.facebook.presto.hive.parquet.ParquetAggregatedPageSourceFactory;
import com.facebook.presto.hive.parquet.ParquetPageSourceFactory;
import com.facebook.presto.hive.rcfile.RcFilePageSourceFactory;
import com.facebook.presto.hive.s3.HiveS3Config;
import com.facebook.presto.hive.s3.PrestoS3ClientFactory;
import com.facebook.presto.hive.s3.PrestoS3ConfigurationUpdater;
import com.facebook.presto.hive.s3select.S3SelectRecordCursorProvider;
import com.facebook.presto.metadata.FunctionAndTypeManager;
import com.facebook.presto.metadata.MetadataManager;
import com.facebook.presto.operator.PagesIndex;
import com.facebook.presto.orc.StorageStripeMetadataSource;
import com.facebook.presto.orc.StripeMetadataSourceFactory;
import com.facebook.presto.orc.cache.StorageOrcFileTailSource;
import com.facebook.presto.parquet.cache.MetadataReader;
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.PageSorter;
import com.facebook.presto.spi.function.StandardFunctionResolution;
import com.facebook.presto.spi.plan.FilterStatsCalculatorService;
import com.facebook.presto.spi.relation.DeterminismEvaluator;
import com.facebook.presto.spi.relation.DomainTranslator;
import com.facebook.presto.spi.relation.ExpressionOptimizer;
import com.facebook.presto.spi.relation.PredicateCompiler;
import com.facebook.presto.spi.relation.RowExpression;
import com.facebook.presto.spi.relation.RowExpressionService;
import com.facebook.presto.spi.session.PropertyMetadata;
import com.facebook.presto.sql.gen.RowExpressionPredicateCompiler;
import com.facebook.presto.sql.planner.planPrinter.RowExpressionFormatter;
import com.facebook.presto.sql.relational.FunctionResolution;
import com.facebook.presto.sql.relational.RowExpressionDeterminismEvaluator;
import com.facebook.presto.sql.relational.RowExpressionDomainTranslator;
import com.facebook.presto.sql.relational.RowExpressionOptimizer;
import com.facebook.presto.testing.TestingConnectorSession;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import io.airlift.slice.Slice;

import java.io.File;
import java.math.BigDecimal;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.Set;

import static com.facebook.airlift.json.JsonCodec.jsonCodec;
import static com.facebook.airlift.json.smile.SmileCodec.smileCodec;
import static com.facebook.presto.common.type.Decimals.encodeScaledValue;
import static com.facebook.presto.hive.HiveDwrfEncryptionProvider.NO_ENCRYPTION;
import static java.lang.String.format;
import static java.util.stream.Collectors.toList;

public final class HiveTestUtils
{
    private static final Logger log = Logger.get(HiveTestUtils.class);

    private HiveTestUtils()
    {
    }

    public static final DirectoryLister DO_NOTHING_DIRECTORY_LISTER = (fileSystem, table, path, partition, namenodeStats, hiveDirectoryContext) -> null;

    public static final JsonCodec<PartitionUpdate> PARTITION_UPDATE_CODEC = jsonCodec(PartitionUpdate.class);
    public static final SmileCodec<PartitionUpdate> PARTITION_UPDATE_SMILE_CODEC = smileCodec(PartitionUpdate.class);

    public static final Set<String> TEST_CLIENT_TAGS = ImmutableSet.of("TAG1", "TAG2");

    public static final ConnectorSession SESSION = new TestingConnectorSession(getAllSessionProperties(new HiveClientConfig(), new HiveCommonClientConfig()), TEST_CLIENT_TAGS);
    public static final MetadataManager METADATA = MetadataManager.createTestMetadataManager();

    public static final FunctionAndTypeManager FUNCTION_AND_TYPE_MANAGER = METADATA.getFunctionAndTypeManager();

    public static final StandardFunctionResolution FUNCTION_RESOLUTION = new FunctionResolution(METADATA.getFunctionAndTypeManager().getFunctionAndTypeResolver());

    public static final RowExpressionService ROW_EXPRESSION_SERVICE = new RowExpressionService()
    {
        @Override
        public DomainTranslator getDomainTranslator()
        {
            return new RowExpressionDomainTranslator(METADATA);
        }

        @Override
        public ExpressionOptimizer getExpressionOptimizer(ConnectorSession session)
        {
            return new RowExpressionOptimizer(METADATA);
        }

        @Override
        public PredicateCompiler getPredicateCompiler()
        {
            return new RowExpressionPredicateCompiler(METADATA);
        }

        @Override
        public DeterminismEvaluator getDeterminismEvaluator()
        {
            return new RowExpressionDeterminismEvaluator(METADATA);
        }

        @Override
        public String formatRowExpression(ConnectorSession session, RowExpression expression)
        {
            return new RowExpressionFormatter(METADATA.getFunctionAndTypeManager()).formatRowExpression(session, expression);
        }
    };

    public static final FilterStatsCalculatorService FILTER_STATS_CALCULATOR_SERVICE = new ConnectorFilterStatsCalculatorService(
            new FilterStatsCalculator(METADATA, new ScalarStatsCalculator(METADATA, ROW_EXPRESSION_SERVICE), new StatsNormalizer()));

    public static final HiveClientConfig HIVE_CLIENT_CONFIG = new HiveClientConfig();
    public static final MetastoreClientConfig METASTORE_CLIENT_CONFIG = new MetastoreClientConfig();

    public static final HdfsEnvironment HDFS_ENVIRONMENT = createTestHdfsEnvironment(HIVE_CLIENT_CONFIG, METASTORE_CLIENT_CONFIG);

    public static final PageSorter PAGE_SORTER = new PagesIndexPageSorter(new PagesIndex.TestingFactory(false));

    public static Set<HiveBatchPageSourceFactory> getDefaultHiveBatchPageSourceFactories(HiveClientConfig hiveClientConfig, MetastoreClientConfig metastoreClientConfig)
    {
        FileFormatDataSourceStats stats = new FileFormatDataSourceStats();
        HdfsEnvironment testHdfsEnvironment = createTestHdfsEnvironment(hiveClientConfig, metastoreClientConfig);
        return ImmutableSet.<HiveBatchPageSourceFactory>builder()
                .add(new RcFilePageSourceFactory(FUNCTION_AND_TYPE_MANAGER, testHdfsEnvironment, stats))
                .add(new OrcBatchPageSourceFactory(FUNCTION_AND_TYPE_MANAGER, FUNCTION_RESOLUTION, hiveClientConfig, testHdfsEnvironment, stats, new StorageOrcFileTailSource(), StripeMetadataSourceFactory.of(new StorageStripeMetadataSource())))
                .add(new DwrfBatchPageSourceFactory(FUNCTION_AND_TYPE_MANAGER, FUNCTION_RESOLUTION, hiveClientConfig, testHdfsEnvironment, stats, new StorageOrcFileTailSource(), StripeMetadataSourceFactory.of(new StorageStripeMetadataSource()), NO_ENCRYPTION))
                .add(new ParquetPageSourceFactory(FUNCTION_AND_TYPE_MANAGER, FUNCTION_RESOLUTION, testHdfsEnvironment, stats, new MetadataReader()))
                .add(new PageFilePageSourceFactory(testHdfsEnvironment, new BlockEncodingManager()))
                .build();
    }

    public static Set<HiveSelectivePageSourceFactory> getDefaultHiveSelectivePageSourceFactories(HiveClientConfig hiveClientConfig, MetastoreClientConfig metastoreClientConfig)
    {
        FileFormatDataSourceStats stats = new FileFormatDataSourceStats();
        HdfsEnvironment testHdfsEnvironment = createTestHdfsEnvironment(hiveClientConfig, metastoreClientConfig);
        return ImmutableSet.<HiveSelectivePageSourceFactory>builder()
                .add(new OrcSelectivePageSourceFactory(FUNCTION_AND_TYPE_MANAGER, FUNCTION_RESOLUTION, ROW_EXPRESSION_SERVICE, hiveClientConfig, testHdfsEnvironment, stats, new StorageOrcFileTailSource(), StripeMetadataSourceFactory.of(new StorageStripeMetadataSource()), new TupleDomainFilterCache()))
                .add(new DwrfSelectivePageSourceFactory(FUNCTION_AND_TYPE_MANAGER, FUNCTION_RESOLUTION, ROW_EXPRESSION_SERVICE, hiveClientConfig, testHdfsEnvironment, stats, new StorageOrcFileTailSource(), StripeMetadataSourceFactory.of(new StorageStripeMetadataSource()), new TupleDomainFilterCache(), NO_ENCRYPTION))
                .build();
    }

    public static Set<HiveAggregatedPageSourceFactory> getDefaultHiveAggregatedPageSourceFactories(HiveClientConfig hiveClientConfig, MetastoreClientConfig metastoreClientConfig)
    {
        FileFormatDataSourceStats stats = new FileFormatDataSourceStats();
        HdfsEnvironment testHdfsEnvironment = createTestHdfsEnvironment(hiveClientConfig, metastoreClientConfig);
        return ImmutableSet.<HiveAggregatedPageSourceFactory>builder()
                .add(new OrcAggregatedPageSourceFactory(FUNCTION_AND_TYPE_MANAGER, FUNCTION_RESOLUTION, hiveClientConfig, testHdfsEnvironment, stats, new StorageOrcFileTailSource(), StripeMetadataSourceFactory.of(new StorageStripeMetadataSource())))
                .add(new DwrfAggregatedPageSourceFactory(FUNCTION_AND_TYPE_MANAGER, FUNCTION_RESOLUTION, hiveClientConfig, testHdfsEnvironment, stats, new StorageOrcFileTailSource(), StripeMetadataSourceFactory.of(new StorageStripeMetadataSource())))
                .add(new ParquetAggregatedPageSourceFactory(FUNCTION_AND_TYPE_MANAGER, FUNCTION_RESOLUTION, testHdfsEnvironment, stats, new MetadataReader()))
                .build();
    }

    public static Set<HiveRecordCursorProvider> getDefaultHiveRecordCursorProvider(HiveClientConfig hiveClientConfig, MetastoreClientConfig metastoreClientConfig)
    {
        HdfsEnvironment testHdfsEnvironment = createTestHdfsEnvironment(hiveClientConfig, metastoreClientConfig);
        return ImmutableSet.<HiveRecordCursorProvider>builder()
                .add(new GenericHiveRecordCursorProvider(testHdfsEnvironment))
                .build();
    }

    public static Set<HiveRecordCursorProvider> getDefaultS3HiveRecordCursorProvider(HiveClientConfig hiveClientConfig, MetastoreClientConfig metastoreClientConfig)
    {
        HdfsEnvironment testHdfsEnvironment = createTestHdfsEnvironment(hiveClientConfig, metastoreClientConfig);
        // Without S3SelectRecordCursorProvider we are not pushing down to Select, but falling on GET path.
        // GenericHiveRecordCursorProvider is needed to handle Hive splits when the query does not filter data
        // as this is no longer pushed to Select.
        return ImmutableSet.<HiveRecordCursorProvider>builder()
                .add(new S3SelectRecordCursorProvider(testHdfsEnvironment, hiveClientConfig, new PrestoS3ClientFactory()))
                .add(new GenericHiveRecordCursorProvider(testHdfsEnvironment))
                .build();
    }

    public static Set<HiveFileWriterFactory> getDefaultHiveFileWriterFactories(HiveClientConfig hiveClientConfig, MetastoreClientConfig metastoreClientConfig)
    {
        HdfsEnvironment testHdfsEnvironment = createTestHdfsEnvironment(hiveClientConfig, metastoreClientConfig);
        return ImmutableSet.<HiveFileWriterFactory>builder()
                .add(new RcFileFileWriterFactory(testHdfsEnvironment, FUNCTION_AND_TYPE_MANAGER, new NodeVersion("test_version"), hiveClientConfig, new FileFormatDataSourceStats()))
                .add(new PageFileWriterFactory(testHdfsEnvironment, new OutputStreamDataSinkFactory(), new BlockEncodingManager()))
                .add(getDefaultOrcFileWriterFactory(hiveClientConfig, metastoreClientConfig))
                .build();
    }

    public static OrcFileWriterFactory getDefaultOrcFileWriterFactory(HiveClientConfig hiveClientConfig, MetastoreClientConfig metastoreClientConfig)
    {
        HdfsEnvironment testHdfsEnvironment = createTestHdfsEnvironment(hiveClientConfig, metastoreClientConfig);
        return new OrcFileWriterFactory(
                testHdfsEnvironment,
                new OutputStreamDataSinkFactory(),
                FUNCTION_AND_TYPE_MANAGER,
                new NodeVersion("test_version"),
                hiveClientConfig,
                new FileFormatDataSourceStats(),
                new OrcFileWriterConfig(),
                NO_ENCRYPTION);
    }

    public static List<Type> getTypes(List<? extends ColumnHandle> columnHandles)
    {
        ImmutableList.Builder<Type> types = ImmutableList.builder();
        for (ColumnHandle columnHandle : columnHandles) {
            types.add(FUNCTION_AND_TYPE_MANAGER.getType(((HiveColumnHandle) columnHandle).getTypeSignature()));
        }
        return types.build();
    }

    public static HdfsEnvironment createTestHdfsEnvironment(HiveClientConfig config, MetastoreClientConfig metastoreClientConfig)
    {
        HdfsConfiguration hdfsConfig = new HiveHdfsConfiguration(
                new HdfsConfigurationInitializer(
                        config,
                        metastoreClientConfig,
                        new PrestoS3ConfigurationUpdater(new HiveS3Config()),
                        new HiveGcsConfigurationInitializer(new HiveGcsConfig())),
                ImmutableSet.of(),
                config);
        return new HdfsEnvironment(hdfsConfig, metastoreClientConfig, new NoHdfsAuthentication());
    }

    public static MapType mapType(Type keyType, Type valueType)
    {
        return (MapType) FUNCTION_AND_TYPE_MANAGER.getParameterizedType(StandardTypes.MAP, ImmutableList.of(
                TypeSignatureParameter.of(keyType.getTypeSignature()),
                TypeSignatureParameter.of(valueType.getTypeSignature())));
    }

    public static ArrayType arrayType(Type elementType)
    {
        return (ArrayType) FUNCTION_AND_TYPE_MANAGER.getParameterizedType(
                StandardTypes.ARRAY,
                ImmutableList.of(TypeSignatureParameter.of(elementType.getTypeSignature())));
    }

    public static RowType rowType(List<NamedTypeSignature> elementTypeSignatures)
    {
        return (RowType) FUNCTION_AND_TYPE_MANAGER.getParameterizedType(
                StandardTypes.ROW,
                ImmutableList.copyOf(elementTypeSignatures.stream()
                        .map(TypeSignatureParameter::of)
                        .collect(toList())));
    }

    public static Long shortDecimal(String value)
    {
        return new BigDecimal(value).unscaledValue().longValueExact();
    }

    public static Slice longDecimal(String value)
    {
        return encodeScaledValue(new BigDecimal(value));
    }

    public static Optional<String> getProperty(String name)
    {
        String systemPropertyValue = System.getProperty(name);
        String environmentVariableValue = System.getenv(name);
        if (systemPropertyValue == null) {
            if (environmentVariableValue == null) {
                return Optional.empty();
            }
            else {
                return Optional.of(environmentVariableValue);
            }
        }
        else {
            if (environmentVariableValue != null && !systemPropertyValue.equals(environmentVariableValue)) {
                throw new IllegalArgumentException(format("%s is set in both Java system property and environment variable, but their values are different. The Java system property value is %s, while the" +
                                " environment variable value is %s. Please use only one value.",
                        name,
                        systemPropertyValue,
                        environmentVariableValue));
            }
            return Optional.of(systemPropertyValue);
        }
    }

    public static Optional<Path> getDataDirectoryPath(Optional<String> suppliedDataDirectoryPath)
    {
        Optional<Path> dataDirectory = Optional.empty();
        if (!suppliedDataDirectoryPath.isPresent()) {
            //in case the path is not supplied as program argument, read it from env variable.
            suppliedDataDirectoryPath = getProperty("DATA_DIR");
        }
        if (suppliedDataDirectoryPath.isPresent()) {
            File dataDirectoryFile = new File(suppliedDataDirectoryPath.get());
            if (dataDirectoryFile.exists()) {
                if (!dataDirectoryFile.isDirectory()) {
                    log.error("Error: " + dataDirectoryFile.getAbsolutePath() + " is not a directory.");
                    System.exit(1);
                }
                else if (!dataDirectoryFile.canRead() || !dataDirectoryFile.canWrite()) {
                    log.error("Error: " + dataDirectoryFile.getAbsolutePath() + " is not readable/writable.");
                    System.exit(1);
                }
            }
            else {
                // For user supplied path like [path_exists_but_is_not_readable_or_writable]/[paths_do_not_exist], the hadoop file system won't
                // be able to create directory for it. e.g. "/aaa/bbb" is not creatable because path "/" is not writable.
                while (!dataDirectoryFile.exists()) {
                    dataDirectoryFile = dataDirectoryFile.getParentFile();
                }
                if (!dataDirectoryFile.canRead() || !dataDirectoryFile.canWrite()) {
                    log.error("Error: The ancestor directory " + dataDirectoryFile.getAbsolutePath() + " is not readable/writable.");
                    System.exit(1);
                }
            }

            dataDirectory = Optional.of(dataDirectoryFile.toPath());
        }
        return dataDirectory;
    }

    public static List<PropertyMetadata<?>> getAllSessionProperties(HiveClientConfig hiveClientConfig, HiveCommonClientConfig hiveCommonClientConfig)
    {
        return getAllSessionProperties(hiveClientConfig, new ParquetFileWriterConfig(), hiveCommonClientConfig);
    }

    public static List<PropertyMetadata<?>> getAllSessionProperties(HiveClientConfig hiveClientConfig, ParquetFileWriterConfig parquetFileWriterConfig, HiveCommonClientConfig hiveCommonClientConfig)
    {
        HiveSessionProperties hiveSessionProperties = new HiveSessionProperties(
                hiveClientConfig,
                new OrcFileWriterConfig(),
                parquetFileWriterConfig,
                new CacheConfig());

        List<PropertyMetadata<?>> allSessionProperties = new ArrayList<>(hiveSessionProperties.getSessionProperties());

        HiveCommonSessionProperties hiveCommonSessionProperties = new HiveCommonSessionProperties(
                hiveCommonClientConfig);

        allSessionProperties.addAll(hiveCommonSessionProperties.getSessionProperties());
        return allSessionProperties;
    }
}