TestHiveTypeWidening.java

/*
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.facebook.presto.hive;

import com.facebook.presto.Session;
import com.facebook.presto.common.type.BigintType;
import com.facebook.presto.common.type.DoubleType;
import com.facebook.presto.common.type.IntegerType;
import com.facebook.presto.common.type.RealType;
import com.facebook.presto.common.type.TimeZoneKey;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.hive.parquet.ParquetTester;
import com.facebook.presto.testing.MaterializedResult;
import com.facebook.presto.testing.MaterializedRow;
import com.facebook.presto.testing.QueryRunner;
import com.facebook.presto.tests.AbstractTestQueryFramework;
import com.facebook.presto.tests.DistributedQueryRunner;
import com.google.common.collect.ImmutableMap;
import org.intellij.lang.annotations.Language;
import org.testng.annotations.Test;
import org.testng.log4testng.Logger;

import java.io.File;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Map;

import static com.facebook.presto.testing.TestingSession.testSessionBuilder;
import static java.lang.String.format;
import static java.util.UUID.randomUUID;
import static org.apache.parquet.column.ParquetProperties.WriterVersion.PARQUET_1_0;
import static org.apache.parquet.hadoop.metadata.CompressionCodecName.GZIP;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;

@Test
public class TestHiveTypeWidening
        extends AbstractTestQueryFramework
{
    private static final Logger logger = Logger.getLogger(TestHiveTypeWidening.class);
    private static final String CATALOG = "hive";
    private static final String SCHEMA = "type_widening_schema";
    private static final String INTEGER = "INTEGER";
    private static final String BIGINT = "BIGINT";
    private static final String REAL = "REAL";
    private static final String DOUBLE = "DOUBLE";
    private DistributedQueryRunner queryRunner;

    @Override
    protected QueryRunner createQueryRunner()
            throws Exception
    {
        logger.info("Creating 'QueryRunner'");
        Session session = testSessionBuilder().setCatalog(CATALOG).setSchema(SCHEMA).setTimeZoneKey(TimeZoneKey.UTC_KEY).build();
        this.queryRunner = DistributedQueryRunner.builder(session).setExtraProperties(ImmutableMap.<String, String>builder().build()).build();

        logger.info("  |-- Installing Plugin: " + CATALOG);
        this.queryRunner.installPlugin(new HivePlugin(CATALOG));
        Path catalogDirectory = this.queryRunner.getCoordinator().getDataDirectory().resolve("hive_data").getParent().resolve("catalog");
        logger.info("  |-- Obtained catalog directory: " + catalogDirectory.toFile().toURI());
        Map<String, String> properties = ImmutableMap.<String, String>builder()
                .put("hive.metastore", "file")
                .put("hive.metastore.catalog.dir", catalogDirectory.toFile().toURI().toString())
                .put("hive.allow-drop-table", "true")
                .put("hive.non-managed-table-writes-enabled", "true")
                .put("hive.parquet.use-column-names", "true")
                .put("hive.compression-codec", "GZIP")
                .put("hive.storage-format", "PARQUET")
                .build();
        logger.info("  |-- Properties loaded");

        logger.info("  |-- Creating catalog '" + CATALOG + "' using plugin '" + CATALOG + '\'');
        this.queryRunner.createCatalog(CATALOG, CATALOG, properties);
        logger.info("  |-- Catalog '" + CATALOG + "' created");
        logger.info("  |-- Creating schema '" + SCHEMA + "' on catalog '" + CATALOG + '\'');
        this.queryRunner.execute(format("CREATE SCHEMA %s.%s", CATALOG, SCHEMA));
        logger.info("  |-- Schema '" + SCHEMA + "' created");

        logger.info("'QueryRunner' created succesfully");
        return this.queryRunner;
    }

    /**
     * Generates a temporary directory and creates two parquet files inside, one with data of each type
     * @param baseType
     * @param widenedType
     * @throws Exception if an error occurs
     * @return a {@link File} pointing to the newly created temporary directory
     */
    private static File generateMetadata(String baseType, String widenedType)
            throws Exception
    {
        // obtains the root resouce directory in order to create temporary tables
        URL url = TestHiveTypeWidening.class.getClassLoader().getResource(".");
        if (url == null) {
            throw new RuntimeException("Could not obtain resource URL");
        }
        File temporaryDirectory = new File(url.getPath(), getTableName(baseType, widenedType));
        boolean created = temporaryDirectory.mkdirs();
        if (!created) {
            throw new RuntimeException("Could not create resource directory: " + temporaryDirectory.getPath());
        }
        logger.info("Created temporary directory: " + temporaryDirectory.toPath());
        File firstParquetFile = new File(temporaryDirectory, randomUUID().toString());
        ParquetTester.writeParquetFileFromPresto(firstParquetFile,
                Collections.singletonList(toType(baseType)),
                Collections.singletonList("field"),
                new Iterable[] {Collections.singletonList(getExpectedValueForType(baseType))},
                1,
                GZIP,
                PARQUET_1_0);
        logger.info("First file written");
        File secondParquetFile = new File(temporaryDirectory, randomUUID().toString());
        ParquetTester.writeParquetFileFromPresto(secondParquetFile,
                Collections.singletonList(toType(widenedType)),
                Collections.singletonList("field"),
                new Iterable[] {Collections.singletonList(getExpectedValueForType(widenedType))},
                1,
                GZIP,
                PARQUET_1_0);
        logger.info("Second file written");
        return temporaryDirectory;
    }

    /**
     * Returns the presto type for the given type name
     * @param type a {@link String} containing the type name
     * @return a {@link Type} matching the given type name
     */
    private static Type toType(String type)
    {
        switch (type) {
            case INTEGER:
                return IntegerType.INTEGER;
            case BIGINT:
                return BigintType.BIGINT;
            case REAL:
                return RealType.REAL;
            case DOUBLE:
                return DoubleType.DOUBLE;
            default:
                throw new RuntimeException("Type not supported: " + type);
        }
    }

    /**
     * Deletes the given directory and all of its contents recursively
     * Does not follow symbolic links
     * @param temporaryDirectory a {@link File} pointing to the directory to delete
     */
    private static void deleteMetadata(File temporaryDirectory)
    {
        File[] data = temporaryDirectory.listFiles();
        if (data != null) {
            for (File f : data) {
                if (!Files.isSymbolicLink(f.toPath())) {
                    deleteMetadata(f);
                }
            }
        }
        deleteAndLog(temporaryDirectory);
    }

    private static void deleteAndLog(File file)
    {
        String filePath = file.getAbsolutePath();
        boolean isDirectory = file.isDirectory();
        if (file.delete()) {
            if (isDirectory) {
                logger.info("   deleted temporary directory: " + filePath);
            }
            else {
                logger.info("   deleted temporary file: " + filePath);
            }
        }
        else {
            logger.info("   could not delete temporary element: " + filePath);
        }
    }

    // Integer type widenings

    @Test
    public void testTypeWideningTableCreationIntegerToInteger()
            throws Exception
    {
        File resourcesLocation = generateMetadata(INTEGER, INTEGER);
        String tableName = getTableName(INTEGER, INTEGER);
        executeCreationTestAndDropCycle(tableName, getResourceUrl(tableName), INTEGER, INTEGER, false, null);
        deleteMetadata(resourcesLocation);
    }

    @Test
    public void testTypeWideningTableCreationIntegerToBigint()
            throws Exception
    {
        File resourcesLocation = generateMetadata(INTEGER, BIGINT);
        String tableName = getTableName(INTEGER, BIGINT);
        executeCreationTestAndDropCycle(tableName, getResourceUrl(tableName), INTEGER, BIGINT, false, null);
        deleteMetadata(resourcesLocation);
    }

    @Test
    public void testTypeWideningTableCreationIntegerToReal()
            throws Exception
    {
        File resourcesLocation = generateMetadata(INTEGER, REAL);
        String tableName = getTableName(INTEGER, REAL);
        executeCreationTestAndDropCycle(tableName, getResourceUrl(tableName), INTEGER, REAL, false, null);
        deleteMetadata(resourcesLocation);
    }

    @Test
    public void testTypeWideningTableCreationIntegerToDouble()
            throws Exception
    {
        File resourcesLocation = generateMetadata(INTEGER, DOUBLE);
        String tableName = getTableName(INTEGER, DOUBLE);
        executeCreationTestAndDropCycle(tableName, getResourceUrl(tableName), INTEGER, DOUBLE, false, null);
        deleteMetadata(resourcesLocation);
    }

    // Bigint type widenings

    @Test
    public void testTypeWideningTableCreationBigintToInteger()
            throws Exception
    {
        File resourcesLocation = generateMetadata(BIGINT, INTEGER);
        String tableName = getTableName(BIGINT, INTEGER);
        executeCreationTestAndDropCycle(tableName, getResourceUrl(tableName), BIGINT, INTEGER, true,
                "The column field of table type_widening_schema\\.bigint_to_integer is declared as type int, but the Parquet file (.*) declares the column as type INT64");
        deleteMetadata(resourcesLocation);
    }

    @Test
    public void testTypeWideningTableCreationBigintToBigint()
            throws Exception
    {
        File resourcesLocation = generateMetadata(BIGINT, BIGINT);
        String tableName = getTableName(BIGINT, BIGINT);
        executeCreationTestAndDropCycle(tableName, getResourceUrl(tableName), BIGINT, BIGINT, false, null);
        deleteMetadata(resourcesLocation);
    }

    @Test
    public void testTypeWideningTableCreationBigintToReal()
            throws Exception
    {
        File resourcesLocation = generateMetadata(BIGINT, REAL);
        String tableName = getTableName(BIGINT, REAL);
        executeCreationTestAndDropCycle(tableName, getResourceUrl(tableName), BIGINT, REAL, false, null);
        deleteMetadata(resourcesLocation);
    }

    @Test
    public void testTypeWideningTableCreationBigintToDouble()
            throws Exception
    {
        File resourcesLocation = generateMetadata(BIGINT, DOUBLE);
        String tableName = getTableName(BIGINT, DOUBLE);
        executeCreationTestAndDropCycle(tableName, getResourceUrl(tableName), BIGINT, DOUBLE, false, null);
        deleteMetadata(resourcesLocation);
    }

    // Real type widenings

    @Test
    public void testTypeWideningTableCreationRealToInteger()
            throws Exception
    {
        File resourcesLocation = generateMetadata(REAL, INTEGER);
        String tableName = getTableName(REAL, INTEGER);
        executeCreationTestAndDropCycle(tableName, getResourceUrl(tableName), REAL, INTEGER, true,
                "The column field of table type_widening_schema\\.real_to_integer is declared as type int, but the Parquet file (.*) declares the column as type FLOAT");
        deleteMetadata(resourcesLocation);
    }

    @Test
    public void testTypeWideningTableCreationRealToBigint()
            throws Exception
    {
        File resourcesLocation = generateMetadata(REAL, BIGINT);
        String tableName = getTableName(REAL, BIGINT);
        executeCreationTestAndDropCycle(tableName, getResourceUrl(tableName), REAL, BIGINT, true,
                "The column field of table type_widening_schema\\.real_to_bigint is declared as type bigint, but the Parquet file (.*) declares the column as type FLOAT");
        deleteMetadata(resourcesLocation);
    }

    @Test
    public void testTypeWideningTableCreationRealToReal()
            throws Exception
    {
        File resourcesLocation = generateMetadata(REAL, REAL);
        String tableName = getTableName(REAL, REAL);
        executeCreationTestAndDropCycle(tableName, getResourceUrl(tableName), REAL, REAL, false, null);
        deleteMetadata(resourcesLocation);
    }

    @Test
    public void testTypeWideningTableCreationRealToDouble()
            throws Exception
    {
        File resourcesLocation = generateMetadata(REAL, DOUBLE);
        String tableName = getTableName(REAL, DOUBLE);
        executeCreationTestAndDropCycle(tableName, getResourceUrl(tableName), REAL, DOUBLE, false, null);
        deleteMetadata(resourcesLocation);
    }

    // Double type widenings

    @Test
    public void testTypeWideningTableCreationDoubleToInteger()
            throws Exception
    {
        File resourcesLocation = generateMetadata(DOUBLE, INTEGER);
        String tableName = getTableName(DOUBLE, INTEGER);
        executeCreationTestAndDropCycle(tableName, getResourceUrl(tableName), DOUBLE, INTEGER, true,
                "The column field of table type_widening_schema\\.double_to_integer is declared as type int, but the Parquet file (.*) declares the column as type DOUBLE");
        deleteMetadata(resourcesLocation);
    }

    @Test
    public void testTypeWideningTableCreationDoubleToBigint()
            throws Exception
    {
        File resourcesLocation = generateMetadata(DOUBLE, BIGINT);
        String tableName = getTableName(DOUBLE, BIGINT);
        executeCreationTestAndDropCycle(tableName, getResourceUrl(tableName), DOUBLE, BIGINT, true,
                "The column field of table type_widening_schema\\.double_to_bigint is declared as type bigint, but the Parquet file (.*) declares the column as type DOUBLE");
        deleteMetadata(resourcesLocation);
    }

    @Test
    public void testTypeWideningTableCreationDoubleToReal()
            throws Exception
    {
        File resourcesLocation = generateMetadata(DOUBLE, REAL);
        String tableName = getTableName(DOUBLE, REAL);
        executeCreationTestAndDropCycle(tableName, getResourceUrl(tableName), DOUBLE, REAL, true,
                "The column field of table type_widening_schema\\.double_to_real is declared as type float, but the Parquet file (.*) declares the column as type DOUBLE");
        deleteMetadata(resourcesLocation);
    }

    @Test
    public void testTypeWideningTableCreationDoubleToDouble()
            throws Exception
    {
        File resourcesLocation = generateMetadata(DOUBLE, DOUBLE);
        String tableName = getTableName(DOUBLE, DOUBLE);
        executeCreationTestAndDropCycle(tableName, getResourceUrl(tableName), DOUBLE, DOUBLE, false, null);
        deleteMetadata(resourcesLocation);
    }

    private static String getTableName(String baseType, String targetType)
    {
        return baseType.toLowerCase(Locale.ENGLISH) + "_to_" + targetType.toLowerCase(Locale.ENGLISH);
    }

    /**
     * Obtains the external location from the local resources directory of the project
     * @param tableName a {@link String} containting the directory name to search for
     * @return a {@link String} with the external location for the given table_name
     */
    private static String getResourceUrl(String tableName)
    {
        URL resourceUrl = TestHiveTypeWidening.class.getClassLoader().getResource(tableName);
        if (resourceUrl == null) {
            throw new RuntimeException("Cannot find resource path for table name: " + tableName);
        }
        logger.info("resource url: " + resourceUrl.toString());
        return resourceUrl.toString();
    }

    /**
     * Tries a table with the type defined in {@code widenedType}. If succeeds, tests the output.
     * Finally, it drops the table.
     * @param tableName a {@link String} containing the desired table name
     * @param externalLocation a {@link String} with the external location to create the table against it
     * @param baseType a {@link String} containing the type of the files to read
     * @param widenedType a {@link String} containing the type of the created table
     * @param shouldFail {@code true} if the table creation should fail, {@code false} otherwise
     * @param errorMessage a {@link String} containing the expected error message. Will be checked if {@code shouldFail} is {@code true}
     */
    private void executeCreationTestAndDropCycle(String tableName, String externalLocation, String baseType,
            String widenedType, boolean shouldFail, @Language("RegExp") String errorMessage)
    {
        logger.info("Executing Create - Test - Drop for: " + tableName);
        try {
            @Language("SQL") String createQuery = format(
                    "CREATE TABLE %s.\"%s\".\"%s\" (field %s) WITH (external_location = '%s')",
                    CATALOG,
                    SCHEMA,
                    tableName,
                    widenedType,
                    externalLocation);
            logger.info("Creating table: " + createQuery);
            this.queryRunner.execute(createQuery);
            @Language("SQL") String selectQuery = format("SELECT * FROM %s.\"%s\".\"%s\"", CATALOG,
                    SCHEMA, tableName);
            logger.info("Executing query: " + selectQuery);
            if (shouldFail) {
                assertQueryFails(selectQuery, errorMessage);
            }
            else {
                MaterializedResult result = this.queryRunner.execute(selectQuery);
                assertEquals(1, result.getTypes().size());
                assertEquals(widenedType, result.getTypes().get(0).toString().toUpperCase());
                List<Object> fieldsValues = new ArrayList<>(0);
                for (MaterializedRow mr : result.getMaterializedRows()) {
                    fieldsValues.addAll(mr.getFields());
                }
                for (Object o : fieldsValues) {
                    logger.info(o.getClass().toString() + "     " + o);
                }
                Number genericTypeValue = getExpectedValueForType(widenedType);
                Number specificTypeValue = getExpectedValueCastedForType(getExpectedValueForType(baseType), widenedType);
                logger.info("Checking for existence of type '" + widenedType + "' value: " + genericTypeValue.toString());
                assertTrue(fieldsValues.contains(genericTypeValue));
                logger.info("Checking for existence of type '" + widenedType + "' value: " + specificTypeValue.toString());
                assertTrue(fieldsValues.contains(specificTypeValue));
            }
        }
        finally {
            @Language("SQL") String dropQuery = format("DROP TABLE IF EXISTS %s.\"%s\".\"%s\"", CATALOG,
                    SCHEMA, tableName);
            logger.info("Dropping table: " + dropQuery);
            this.queryRunner.execute(dropQuery);
        }
    }

    /**
     * Gives the desired output value from each type
     * @param typeName a {@link String} with the target type
     * @return the expected value for each type
     */
    private static Number getExpectedValueForType(String typeName)
    {
        switch (typeName) {
            case INTEGER:
                return Integer.valueOf(1);
            case BIGINT:
                return Long.valueOf(1000000000000L);
            case REAL:
                return Float.valueOf(0.04f);
            case DOUBLE:
                return Double.valueOf(4124.1324213412341241242134243d);
            default:
                throw new RuntimeException("Type not supported: " + typeName);
        }
    }

    /**
     * Casts the desired value to the desired type
     * @param typeName a {@link String} with the target type
     * @return the expected value converted to the given type
     */
    private static Number getExpectedValueCastedForType(Number value, String typeName)
    {
        switch (typeName) {
            case INTEGER:
                return value.intValue();
            case BIGINT:
                return value.longValue();
            case REAL:
                return value.floatValue();
            case DOUBLE:
                return value.doubleValue();
            default:
                throw new RuntimeException("Type not supported: " + typeName);
        }
    }
}