TestHiveSkipEmptyFiles.java

/*
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.facebook.presto.hive;

import com.facebook.presto.Session;
import com.facebook.presto.common.type.IntegerType;
import com.facebook.presto.common.type.TimeZoneKey;
import com.facebook.presto.common.type.VarcharType;
import com.facebook.presto.hive.parquet.ParquetTester;
import com.facebook.presto.testing.MaterializedResult;
import com.facebook.presto.testing.QueryRunner;
import com.facebook.presto.tests.AbstractTestQueryFramework;
import com.facebook.presto.tests.DistributedQueryRunner;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.intellij.lang.annotations.Language;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;

import static com.facebook.presto.testing.TestingSession.testSessionBuilder;
import static com.google.common.io.Files.createTempDir;
import static com.google.common.io.MoreFiles.deleteRecursively;
import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE;
import static java.lang.String.format;
import static java.nio.file.Files.createTempFile;
import static java.util.Objects.requireNonNull;
import static java.util.UUID.randomUUID;
import static org.apache.parquet.column.ParquetProperties.WriterVersion.PARQUET_1_0;
import static org.apache.parquet.hadoop.metadata.CompressionCodecName.GZIP;
import static org.testng.Assert.assertEquals;

@Test
public class TestHiveSkipEmptyFiles
        extends AbstractTestQueryFramework
{
    private static final String CATALOG = "hive";
    private static final String SCHEMA = "skip_empty_files_schema";
    private DistributedQueryRunner queryRunner;
    private DistributedQueryRunner queryFailRunner;
    private DistributedQueryRunner queryBucketRunner;
    private DistributedQueryRunner queryBucketFailRunner;
    private File temporaryDirectory;

    @Override
    protected QueryRunner createQueryRunner()
            throws Exception
    {
        Session session = testSessionBuilder().setCatalog(CATALOG).setSchema(SCHEMA).setTimeZoneKey(TimeZoneKey.UTC_KEY).build();
        this.queryRunner = DistributedQueryRunner.builder(session).setExtraProperties(ImmutableMap.<String, String>builder().build()).build();

        this.queryRunner.installPlugin(new HivePlugin(CATALOG));
        Path catalogDirectory = this.queryRunner.getCoordinator().getDataDirectory().resolve("hive_data").getParent().resolve("catalog");
        Map<String, String> properties = ImmutableMap.<String, String>builder()
                .put("hive.metastore", "file")
                .put("hive.metastore.catalog.dir", catalogDirectory.toFile().toURI().toString())
                .put("hive.allow-drop-table", "true")
                .put("hive.non-managed-table-writes-enabled", "true")
                .put("hive.parquet.use-column-names", "true")
                .put("hive.compression-codec", "GZIP")
                .put("hive.storage-format", "PARQUET")
                .put("hive.bucket-execution", "false")
                .put("hive.skip-empty-files", "true")
                .build();

        this.queryRunner.createCatalog(CATALOG, CATALOG, properties);
        this.queryRunner.execute(format("CREATE SCHEMA %s.%s", CATALOG, SCHEMA));

        return this.queryRunner;
    }

    @BeforeClass
    private void generateMetadataDirectory()
            throws Exception
    {
        temporaryDirectory = createTempDir();
        generateMetadata(queryRunner, "skip_empty_files_success");
        generateMetadata(queryFailRunner, "skip_empty_files_fail");
        generateBucketedMetadataWithEmptyFiles(queryBucketRunner, "skip_empty_files_bucket_success", false);
        generateBucketedMetadataWithEmptyFiles(queryBucketFailRunner, "skip_empty_files_bucket_insert_fail", false);
        generateBucketedMetadataWithEmptyFiles(queryBucketFailRunner, "skip_empty_files_bucket_replace_fail", true);
    }

    @AfterClass(alwaysRun = true)
    private void tearDown()
            throws IOException
    {
        deleteRecursively(temporaryDirectory.toPath(), ALLOW_INSECURE);
    }

    @BeforeClass
    private void createQueryFailRunner()
            throws Exception
    {
        Session session = testSessionBuilder().setCatalog(CATALOG).setSchema(SCHEMA).setTimeZoneKey(TimeZoneKey.UTC_KEY).build();
        this.queryFailRunner = DistributedQueryRunner.builder(session).setExtraProperties(ImmutableMap.<String, String>builder().build()).build();

        this.queryFailRunner.installPlugin(new HivePlugin(CATALOG));
        Path catalogDirectory = this.queryFailRunner.getCoordinator().getDataDirectory().resolve("hive_data").getParent().resolve("catalog");
        Map<String, String> properties = ImmutableMap.<String, String>builder()
                .put("hive.metastore", "file")
                .put("hive.metastore.catalog.dir", catalogDirectory.toFile().toURI().toString())
                .put("hive.allow-drop-table", "true")
                .put("hive.non-managed-table-writes-enabled", "true")
                .put("hive.parquet.use-column-names", "true")
                .put("hive.compression-codec", "GZIP")
                .put("hive.storage-format", "PARQUET")
                .put("hive.bucket-execution", "false")
                .put("hive.skip-empty-files", "false")
                .build();

        this.queryFailRunner.createCatalog(CATALOG, CATALOG, properties);
        this.queryFailRunner.execute(format("CREATE SCHEMA %s.%s", CATALOG, SCHEMA));
    }

    @BeforeClass
    private void createQueryBucketRunner()
            throws Exception
    {
        Session session = testSessionBuilder().setCatalog(CATALOG).setSchema(SCHEMA).setTimeZoneKey(TimeZoneKey.UTC_KEY).build();
        this.queryBucketRunner = DistributedQueryRunner.builder(session).setExtraProperties(ImmutableMap.<String, String>builder().build()).build();

        this.queryBucketRunner.installPlugin(new HivePlugin(CATALOG));
        Path catalogDirectory = this.queryBucketRunner.getCoordinator().getDataDirectory().resolve("hive_data").getParent().resolve("catalog");
        Map<String, String> properties = ImmutableMap.<String, String>builder()
                .put("hive.metastore", "file")
                .put("hive.metastore.catalog.dir", catalogDirectory.toFile().toURI().toString())
                .put("hive.allow-drop-table", "true")
                .put("hive.non-managed-table-writes-enabled", "true")
                .put("hive.parquet.use-column-names", "true")
                .put("hive.compression-codec", "GZIP")
                .put("hive.storage-format", "PARQUET")
                .put("hive.bucket-execution", "true")
                .put("hive.skip-empty-files", "true")
                .build();

        this.queryBucketRunner.createCatalog(CATALOG, CATALOG, properties);
        this.queryBucketRunner.execute(format("CREATE SCHEMA %s.%s", CATALOG, SCHEMA));
    }

    @BeforeClass
    private void createQueryBucketFailRunner()
            throws Exception
    {
        Session session = testSessionBuilder().setCatalog(CATALOG).setSchema(SCHEMA).setTimeZoneKey(TimeZoneKey.UTC_KEY).build();
        this.queryBucketFailRunner = DistributedQueryRunner.builder(session).setExtraProperties(ImmutableMap.<String, String>builder().build()).build();

        this.queryBucketFailRunner.installPlugin(new HivePlugin(CATALOG));
        Path catalogDirectory = this.queryBucketFailRunner.getCoordinator().getDataDirectory().resolve("hive_data").getParent().resolve("catalog");
        Map<String, String> properties = ImmutableMap.<String, String>builder()
                .put("hive.metastore", "file")
                .put("hive.metastore.catalog.dir", catalogDirectory.toFile().toURI().toString())
                .put("hive.allow-drop-table", "true")
                .put("hive.non-managed-table-writes-enabled", "true")
                .put("hive.parquet.use-column-names", "true")
                .put("hive.compression-codec", "GZIP")
                .put("hive.storage-format", "PARQUET")
                .put("hive.bucket-execution", "true")
                .put("hive.skip-empty-files", "false")
                .build();

        this.queryBucketFailRunner.createCatalog(CATALOG, CATALOG, properties);
        this.queryBucketFailRunner.execute(format("CREATE SCHEMA %s.%s", CATALOG, SCHEMA));
    }

    /**
     * Generates a temporary directory and creates two parquet files inside, one is empty and the other is not
     *
     * @param tableName a {@link String} containing the desired table name
     */
    private void generateMetadata(DistributedQueryRunner queryRunner, String tableName)
            throws Exception
    {
        Path tempDirectory = Files.createDirectory(Paths.get(temporaryDirectory.getPath(), tableName));
        @Language("SQL") String createQuery = format(
                "CREATE TABLE %s.\"%s\".\"%s\" (field %s) WITH (external_location = '%s')",
                CATALOG,
                SCHEMA,
                tableName,
                IntegerType.INTEGER,
                temporaryDirectory.toURI() + tableName);
        queryRunner.execute(createQuery);
        Path firstParquetFile = createTempFile(tempDirectory, randomUUID().toString(), randomUUID().toString());
        ParquetTester.writeParquetFileFromPresto(firstParquetFile.toFile(),
                ImmutableList.of(IntegerType.INTEGER),
                Collections.singletonList("field"),
                new Iterable[] {Collections.singleton(1)},
                1,
                GZIP,
                PARQUET_1_0);
        createTempFile(tempDirectory, randomUUID().toString(), randomUUID().toString());
    }

    /**
     * Generates a temporary directory and inserts data in every partition of the bucketed table, including an empty file in the first partition
     *
     * @param queryRunner a {@link QueryRunner} with the desired configuration properties
     * @param tableName a {@link String} containing the desired table name
     * @param replaceDataFileByEmptyFile a {@code true} if it is necessary to delete a partition file
     */
    private void generateBucketedMetadataWithEmptyFiles(DistributedQueryRunner queryRunner, String tableName, boolean replaceDataFileByEmptyFile) throws Exception
    {
        Path tempDirectory = Files.createDirectory(Paths.get(temporaryDirectory.getPath(), tableName));
        @Language("SQL") String createQuery = format("CREATE TABLE %s.\"%s\".\"%s\" (id %s, field %s) WITH (external_location = '%s'," +
                        "format = 'Parquet',partitioned_by = ARRAY['field']," +
                        "  bucketed_by = ARRAY['id']," +
                        "  bucket_count = 3)",
                CATALOG, SCHEMA, tableName, IntegerType.INTEGER, VarcharType.VARCHAR, tempDirectory.toUri());
        queryRunner.execute(createQuery);
        Path partitionDirectory = Paths.get(tempDirectory + "/field=field1");
        String partitionName = "field";
        @Language("SQL") String insertQuery;
        for (int i = 1; i <= 5; i++) {
            insertQuery = format("INSERT INTO %s.\"%s\".\"%s\" VALUES (%s,'%s')",
                    CATALOG, SCHEMA, tableName, i, partitionName + i);
            queryRunner.execute(insertQuery);
        }
        if (replaceDataFileByEmptyFile) {
            FilenameFilter filenameFilter = (dir, name) -> !name.endsWith(".crc");
            File[] filteredFiles = partitionDirectory.toFile().listFiles(filenameFilter);
            Files.delete(Arrays.stream(requireNonNull(filteredFiles)).iterator().next().toPath());
        }
        createTempFile(partitionDirectory, randomUUID().toString(), randomUUID().toString());
    }

    /**
     * Tries a table with the configuration property desired. If succeeds, tests the output.
     * Finally, it drops the table.
     */
    @Test
    public void testSkipEmptyFilesSuccessful()
    {
        try {
            @Language("SQL") String selectQuery = format("SELECT \"$path\" FROM %s.\"%s\".\"%s\"", CATALOG,
                    SCHEMA, "skip_empty_files_success");
            MaterializedResult result = queryRunner.execute(selectQuery);
            assertEquals(1, result.getRowCount());
        }
        finally {
            dropTable(queryRunner, "skip_empty_files_success");
        }
    }

    @Test
    public void testSkipEmptyFilesError()
    {
        try {
            @Language("SQL") String selectQuery = format("SELECT * FROM %s.\"%s\".\"%s\"", CATALOG,
                    SCHEMA, "skip_empty_files_fail");
            assertQueryFails(queryFailRunner, selectQuery, ".* is not a valid Parquet File");
        }
        finally {
            dropTable(queryFailRunner, "skip_empty_files_fail");
        }
    }

    @Test
    public void testSkipEmptyFilesBucketSuccessful()
    {
        try {
            @Language("SQL") String selectQuery = format("SELECT * FROM %s.\"%s\".\"%s\"", CATALOG,
                    SCHEMA, "skip_empty_files_bucket_success");
            MaterializedResult result = queryBucketRunner.execute(selectQuery);
            assertEquals(5, result.getRowCount());
        }
        finally {
            dropTable(queryBucketRunner, "skip_empty_files_bucket_success");
        }
    }

    @Test
    public void testSkipEmptyFilesBucketInsertFileFail()
    {
        try {
            @Language("SQL") String selectQuery = format("SELECT * FROM %s.\"%s\".\"%s\"", CATALOG,
                    SCHEMA, "skip_empty_files_bucket_insert_fail");
            assertQueryFails(queryBucketFailRunner, selectQuery, ".* is corrupt.* does not match the standard naming pattern," +
                    " and the number of files in the directory .* does not match the declared bucket count.*");
        }
        finally {
            dropTable(queryBucketFailRunner, "skip_empty_files_bucket_insert_fail");
        }
    }

    @Test
    public void testSkipEmptyFilesBucketReplaceFileFail()
    {
        try {
            @Language("SQL") String selectQuery = format("SELECT * FROM %s.\"%s\".\"%s\"", CATALOG,
                    SCHEMA, "skip_empty_files_bucket_replace_fail");
            assertQueryFails(queryBucketFailRunner, selectQuery, ".* is not a valid Parquet File");
        }
        finally {
            dropTable(queryBucketFailRunner, "skip_empty_files_bucket_replace_fail");
        }
    }

    private void dropTable(DistributedQueryRunner queryRunner, String tableName)
    {
        @Language("SQL") String dropQuery = format("DROP TABLE IF EXISTS %s.\"%s\".\"%s\"", CATALOG,
                SCHEMA, tableName);
        queryRunner.execute(dropQuery);
    }
}