TestParquetQuickStatsBuilder.java

/*
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package com.facebook.presto.hive.statistics;

import com.facebook.presto.hive.FileFormatDataSourceStats;
import com.facebook.presto.hive.HdfsConfigurationInitializer;
import com.facebook.presto.hive.HdfsContext;
import com.facebook.presto.hive.HdfsEnvironment;
import com.facebook.presto.hive.HiveClientConfig;
import com.facebook.presto.hive.HiveColumnConverterProvider;
import com.facebook.presto.hive.HiveFileInfo;
import com.facebook.presto.hive.HiveHdfsConfiguration;
import com.facebook.presto.hive.MetastoreClientConfig;
import com.facebook.presto.hive.TestingExtendedHiveMetastore;
import com.facebook.presto.hive.authentication.NoHdfsAuthentication;
import com.facebook.presto.hive.filesystem.ExtendedFileSystem;
import com.facebook.presto.hive.metastore.ExtendedHiveMetastore;
import com.facebook.presto.hive.metastore.MetastoreContext;
import com.facebook.presto.hive.metastore.PrincipalPrivileges;
import com.facebook.presto.hive.metastore.Storage;
import com.facebook.presto.hive.metastore.Table;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.SchemaTableName;
import com.facebook.presto.testing.TestingConnectorSession;
import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.ImmutableSet;
import io.airlift.slice.Slice;
import io.airlift.units.Duration;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.testng.annotations.BeforeTest;
import org.testng.annotations.Test;

import java.io.IOException;
import java.time.LocalDate;
import java.time.chrono.ChronoLocalDate;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import static com.facebook.presto.hive.HiveCommonSessionProperties.READ_MASKED_VALUE_ENABLED;
import static com.facebook.presto.hive.HivePartition.UNPARTITIONED_ID;
import static com.facebook.presto.hive.HiveStorageFormat.PARQUET;
import static com.facebook.presto.hive.HiveTestUtils.createTestHdfsEnvironment;
import static com.facebook.presto.hive.metastore.PrestoTableType.MANAGED_TABLE;
import static com.facebook.presto.hive.metastore.StorageFormat.fromHiveStorageFormat;
import static com.facebook.presto.spi.session.PropertyMetadata.booleanProperty;
import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
import static java.lang.System.exit;
import static java.time.LocalDate.parse;
import static java.util.stream.Collectors.toMap;
import static org.testng.Assert.assertEquals;

public class TestParquetQuickStatsBuilder
{
    public static final ConnectorSession SESSION = new TestingConnectorSession(ImmutableList.of(booleanProperty(
            READ_MASKED_VALUE_ENABLED,
            "Return null when access is denied for an encrypted parquet column",
            false,
            false)));
    public static final String TEST_SCHEMA = "test_schema";
    public static final String TEST_TABLE = "quick_stats";
    private ParquetQuickStatsBuilder parquetQuickStatsBuilder;
    private MetastoreContext metastoreContext;
    private ExtendedHiveMetastore metastore;
    private HdfsEnvironment hdfsEnvironment;
    private HiveClientConfig hiveClientConfig;
    private MetastoreClientConfig metastoreClientConfig;

    public static void main(String[] args)
            throws Exception
    {
        benchmarkS3ReadsDriver();

        exit(0);
    }

    /**
     * Micro benchmark for performance of the ParquetQuickStats builder
     */
    private static void benchmarkS3ReadsDriver()
    {
        TestParquetQuickStatsBuilder testParquetQuickStatsBuilder = new TestParquetQuickStatsBuilder();
        String s3BucketUri = "s3://some-bucket";
        String s3Directory = "/path/to/partition";

        for (int i = 0; i < 5; i++) {
            // Do some warmup reads
            testParquetQuickStatsBuilder.benchmarkS3Reads(1, true, s3BucketUri, s3Directory);
        }

        testParquetQuickStatsBuilder.benchmarkS3Reads(10, false, s3BucketUri, s3Directory);
    }

    private static ColumnQuickStats<ChronoLocalDate> createDateStats(String columnName, long rowCount, long nullsCount, LocalDate minDate, LocalDate maxDate)
    {
        ColumnQuickStats<ChronoLocalDate> result = new ColumnQuickStats<>(columnName, ChronoLocalDate.class);
        result.addToRowCount(rowCount);
        result.addToNullsCount(nullsCount);
        result.setMinValue(minDate);
        result.setMaxValue(maxDate);

        return result;
    }

    private static ColumnQuickStats<Long> createLongStats(String columnName, long rowCount, long nullsCount, long min, long max)
    {
        ColumnQuickStats<Long> result = new ColumnQuickStats<>(columnName, Long.class);
        result.addToRowCount(rowCount);
        result.addToNullsCount(nullsCount);
        result.setMinValue(min);
        result.setMaxValue(max);

        return result;
    }

    private static ColumnQuickStats<Integer> createIntegerStats(String columnName, long rowCount, long nullsCount, int min, int max)
    {
        ColumnQuickStats<Integer> result = new ColumnQuickStats<>(columnName, Integer.class);
        result.addToRowCount(rowCount);
        result.addToNullsCount(nullsCount);
        result.setMinValue(min);
        result.setMaxValue(max);

        return result;
    }

    private static ColumnQuickStats<Slice> createBinaryStats(String columnName, long rowCount, long nullsCount)
    {
        ColumnQuickStats<Slice> result = new ColumnQuickStats<>(columnName, Slice.class);
        result.addToRowCount(rowCount);
        result.addToNullsCount(nullsCount);

        return result;
    }

    private static ColumnQuickStats<Double> createDoubleStats(String columnName, long rowCount, long nullsCount, double min, double max)
    {
        ColumnQuickStats<Double> result = new ColumnQuickStats<>(columnName, Double.class);
        result.addToRowCount(rowCount);
        result.addToNullsCount(nullsCount);
        result.setMinValue(min);
        result.setMaxValue(max);

        return result;
    }

    private ImmutableList<HiveFileInfo> buildHiveFileInfos(String basePath, String partitionDir, int repeatCount)
    {
        ImmutableList.Builder<HiveFileInfo> fileInfoBuilder = ImmutableList.builder();
        Path fullPath = new Path(basePath + "/" + partitionDir);

        try (FileSystem fs = hdfsEnvironment.getFileSystem(new HdfsContext(SESSION), new Path(basePath))) {
            RemoteIterator<LocatedFileStatus> fileList = fs.listFiles(fullPath, true);

            while (fileList.hasNext()) {
                LocatedFileStatus fileStatus = fileList.next();
                // Add each discovered file repeatCount times - useful for simulating a large file test
                for (int i = 0; i < repeatCount; i++) {
                    fileInfoBuilder.add(HiveFileInfo.createHiveFileInfo(fileStatus, Optional.empty()));
                }
            }
        }
        catch (Exception ex) {
            throw new RuntimeException(ex);
        }
        return fileInfoBuilder.build();
    }

    @BeforeTest
    private void setUp()
    {
        Table table = new Table(
                Optional.of("catalogName"),
                TEST_SCHEMA,
                TEST_TABLE,
                "owner",
                MANAGED_TABLE,
                Storage.builder()
                        .setStorageFormat(fromHiveStorageFormat(PARQUET))
                        .setLocation("location")
                        .build(),
                ImmutableList.of(),
                ImmutableList.of(),
                ImmutableMap.of(),
                Optional.empty(),
                Optional.empty());

        metastoreContext = new MetastoreContext(SESSION.getUser(),
                SESSION.getQueryId(),
                Optional.empty(),
                Collections.emptySet(),
                Optional.empty(),
                Optional.empty(),
                false,
                HiveColumnConverterProvider.DEFAULT_COLUMN_CONVERTER_PROVIDER,
                SESSION.getWarningCollector(),
                SESSION.getRuntimeStats());
        ExtendedHiveMetastore mock = new TestingExtendedHiveMetastore();
        mock.createTable(metastoreContext, table, new PrincipalPrivileges(ImmutableMultimap.of(), ImmutableMultimap.of()), ImmutableList.of());
        metastore = mock;

        hiveClientConfig = new HiveClientConfig();
        metastoreClientConfig = new MetastoreClientConfig();
        // Use HiveUtils#createTestHdfsEnvironment to ensure that PrestoS3FileSystem is used for s3a paths
        hdfsEnvironment = createTestHdfsEnvironment(hiveClientConfig, metastoreClientConfig);

        parquetQuickStatsBuilder = new ParquetQuickStatsBuilder(new FileFormatDataSourceStats(), hdfsEnvironment, hiveClientConfig);
    }

    public void benchmarkS3Reads(int mockedFileCount, boolean isWarmup, String s3BucketUri, String partitionPath)
    {
        setUp();
        ImmutableList<HiveFileInfo> hiveFileInfos = buildHiveFileInfos(s3BucketUri, partitionPath, mockedFileCount);

        Stopwatch sw = Stopwatch.createStarted();
        PartitionQuickStats partitionQuickStats = parquetQuickStatsBuilder.buildQuickStats(SESSION, metastore, new SchemaTableName(TEST_SCHEMA, TEST_TABLE),
                metastoreContext, UNPARTITIONED_ID.getPartitionName(), hiveFileInfos.iterator());
        sw.stop();

        if (!isWarmup) {
            System.out.printf("For %d files, took %d ms%n", mockedFileCount, sw.elapsed(TimeUnit.MILLISECONDS));
            System.out.println(partitionQuickStats);
        }
        else {
            System.out.println("Warmup..");
        }
    }

    @Test
    public void testStatsBuildTimeIsBoundedUsingFooterFetchTimeout()
    {
        HiveClientConfig customHiveClientConfig = new HiveClientConfig().setParquetQuickStatsFileMetadataFetchTimeout(new Duration(10, TimeUnit.MILLISECONDS));
        HdfsEnvironment mockHdfsEnvironment = new DelayingHdfsEnvironment(hdfsEnvironment, hiveClientConfig, metastoreClientConfig);

        String resourceDir = TestParquetQuickStatsBuilder.class.getClassLoader().getResource("quick_stats").toString();
        ParquetQuickStatsBuilder customParquetQuickStatsBuilder = new ParquetQuickStatsBuilder(new FileFormatDataSourceStats(), mockHdfsEnvironment, customHiveClientConfig);
        ImmutableList<HiveFileInfo> hiveFileInfos = buildHiveFileInfos(resourceDir, "tpcds_store_sales_sf_point_01", 1);

        try {
            customParquetQuickStatsBuilder.buildQuickStats(SESSION, metastore, new SchemaTableName(TEST_SCHEMA, TEST_TABLE),
                    metastoreContext, UNPARTITIONED_ID.getPartitionName(), hiveFileInfos.iterator());
        }
        catch (RuntimeException ex) {
            assertEquals(TimeoutException.class, ex.getCause().getClass());
        }
    }

    @Test
    public void testStatsAreBuiltFromFooters()
    {
        String resourceDir = TestParquetQuickStatsBuilder.class.getClassLoader().getResource("quick_stats").toString();

        // Table :  TPCDS SF 0.01 store_sales
        ImmutableList<HiveFileInfo> hiveFileInfos = buildHiveFileInfos(resourceDir, "tpcds_store_sales_sf_point_01", 1);
        PartitionQuickStats partitionQuickStats = parquetQuickStatsBuilder.buildQuickStats(SESSION, metastore, new SchemaTableName(TEST_SCHEMA, TEST_TABLE),
                metastoreContext, UNPARTITIONED_ID.getPartitionName(), hiveFileInfos.iterator());

        assertEquals(8, partitionQuickStats.getFileCount());
        // We check a few of the columns
        Map<String, ? extends ColumnQuickStats<?>> columnQuickStatsMap = partitionQuickStats.getStats().stream().collect(toMap(ColumnQuickStats::getColumnName, v -> v));
        assertEquals(columnQuickStatsMap.get("ss_promo_sk"), createLongStats("ss_promo_sk", 120527L, 5303L, 1L, 3L));
        assertEquals(columnQuickStatsMap.get("ss_sold_date_sk"), createLongStats("ss_sold_date_sk", 120527L, 5335L, 2450816L, 2452642L));
        assertEquals(columnQuickStatsMap.get("ss_quantity"), createIntegerStats("ss_quantity", 120527L, 5450L, 1, 100));
        // DECIMAL columns are stored as binary arrays in parquet
        assertEquals(columnQuickStatsMap.get("ss_wholesale_cost"), createBinaryStats("ss_wholesale_cost", 120527L, 5369L));

        // Table : TPCH orders table; 100 rows
        hiveFileInfos = buildHiveFileInfos(resourceDir, "tpch_orders_100_rows", 1);
        partitionQuickStats = parquetQuickStatsBuilder.buildQuickStats(SESSION, metastore, new SchemaTableName(TEST_SCHEMA, TEST_TABLE),
                metastoreContext, UNPARTITIONED_ID.getPartitionName(), hiveFileInfos.iterator());

        assertEquals(1, partitionQuickStats.getFileCount());
        columnQuickStatsMap = partitionQuickStats.getStats().stream().collect(toMap(ColumnQuickStats::getColumnName, v -> v));
        // VARCHAR columns are stored as binary arrays in parquet
        assertEquals(columnQuickStatsMap.get("comment"), createBinaryStats("comment", 100L, 0L));
        assertEquals(columnQuickStatsMap.get("orderdate"), createDateStats("orderdate", 100L, 0L, parse("1992-01-29"), parse("1998-07-24")));
        assertEquals(columnQuickStatsMap.get("totalprice"), createDoubleStats("totalprice", 100L, 0L, 1373.4, 352797.28));
    }

    @Test
    public void testStatsFromNestedColumnsAreNotIncluded()
    {
        String resourceDir = TestParquetQuickStatsBuilder.class.getClassLoader().getResource("quick_stats").toString();

        // Table definition :
        // CREATE TABLE nested_parquet(
        //     id bigint,
        //     x row(a bigint, b varchar, c double, d row(d1 bigint, d2 double)),
        //     y array(row(a bigint, b varchar, c double, d row(d1 bigint, d2 double))))
        //     with (format = 'PARQUET')
        // 3  rows were added to the table
        ImmutableList<HiveFileInfo> hiveFileInfos = buildHiveFileInfos(resourceDir, "nested_table", 1);
        PartitionQuickStats partitionQuickStats = parquetQuickStatsBuilder.buildQuickStats(SESSION, metastore, new SchemaTableName(TEST_SCHEMA, TEST_TABLE),
                metastoreContext, UNPARTITIONED_ID.getPartitionName(), hiveFileInfos.iterator());

        assertEquals(partitionQuickStats.getStats().size(), 1, "Expected stats for only non-nested column : 'id'");
        ColumnQuickStats<?> idColumnQuickStats = partitionQuickStats.getStats().get(0);
        assertEquals(idColumnQuickStats, createLongStats("id", 3L, 0L, 1L, 3L));
    }

    public static class DelayingHdfsEnvironment
            extends HdfsEnvironment
    {
        private final HdfsEnvironment hdfsEnvironment;

        public DelayingHdfsEnvironment(HdfsEnvironment hdfsEnvironment, HiveClientConfig hiveClientConfig, MetastoreClientConfig metastoreClientConfig)
        {
            super(
                    new HiveHdfsConfiguration(new HdfsConfigurationInitializer(hiveClientConfig, metastoreClientConfig), ImmutableSet.of(), hiveClientConfig),
                    metastoreClientConfig,
                    new NoHdfsAuthentication());
            this.hdfsEnvironment = hdfsEnvironment;
        }

        @Override
        public ExtendedFileSystem getFileSystem(String user, Path path, Configuration configuration)
                throws IOException
        {
            sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
            return hdfsEnvironment.getFileSystem(user, path, configuration);
        }
    }
}