TestIcebergOrcMetricsCollection.java

/*
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.facebook.presto.iceberg;

import com.facebook.presto.Session;
import com.facebook.presto.testing.MaterializedResult;
import com.facebook.presto.testing.MaterializedRow;
import com.facebook.presto.testing.QueryRunner;
import com.facebook.presto.tests.AbstractTestQueryFramework;
import com.facebook.presto.tests.DistributedQueryRunner;
import com.facebook.presto.tpch.TpchPlugin;
import com.google.common.collect.ImmutableMap;
import org.testng.annotations.Test;

import java.nio.file.Path;
import java.util.Map;

import static com.facebook.presto.SystemSessionProperties.MAX_DRIVERS_PER_TASK;
import static com.facebook.presto.SystemSessionProperties.TASK_CONCURRENCY;
import static com.facebook.presto.SystemSessionProperties.TASK_WRITER_COUNT;
import static com.facebook.presto.iceberg.CatalogType.HIVE;
import static com.facebook.presto.iceberg.FileContent.DATA;
import static com.facebook.presto.iceberg.IcebergQueryRunner.ICEBERG_CATALOG;
import static com.facebook.presto.iceberg.IcebergQueryRunner.getIcebergDataDirectoryPath;
import static com.facebook.presto.iceberg.TestIcebergOrcMetricsCollection.DataFileRecord.toDataFileRecord;
import static com.facebook.presto.testing.TestingSession.testSessionBuilder;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNull;

public class TestIcebergOrcMetricsCollection
        extends AbstractTestQueryFramework
{
    @Override
    protected QueryRunner createQueryRunner()
            throws Exception
    {
        Session session = testSessionBuilder()
                .setCatalog(ICEBERG_CATALOG)
                .setSchema("test_schema")
                .setSystemProperty(TASK_CONCURRENCY, "1")
                .setSystemProperty(TASK_WRITER_COUNT, "1")
                .setSystemProperty(MAX_DRIVERS_PER_TASK, "1")
                .setCatalogSessionProperty(ICEBERG_CATALOG, "orc_string_statistics_limit", Integer.MAX_VALUE + "B")
                .build();
        DistributedQueryRunner queryRunner = DistributedQueryRunner.builder(session)
                .setNodeCount(1)
                .build();

        queryRunner.installPlugin(new TpchPlugin());
        queryRunner.createCatalog("tpch", "tpch");

        Path dataDirectory = queryRunner.getCoordinator().getDataDirectory();
        Path catalogDirectory = getIcebergDataDirectoryPath(dataDirectory, HIVE.name(), new IcebergConfig().getFileFormat(), false);

        queryRunner.installPlugin(new IcebergPlugin());
        Map<String, String> icebergProperties = ImmutableMap.<String, String>builder()
                .put("hive.metastore", "file")
                .put("hive.metastore.catalog.dir", catalogDirectory.toFile().toURI().toString())
                .build();

        queryRunner.createCatalog(ICEBERG_CATALOG, "iceberg", icebergProperties);

        queryRunner.execute("CREATE SCHEMA test_schema");

        return queryRunner;
    }

    @Test
    public void testBasic()
    {
        assertUpdate("CREATE TABLE orders WITH (\"write.format.default\" = 'ORC') AS SELECT * FROM tpch.tiny.orders", 15000);
        MaterializedResult materializedResult = computeActual("SELECT * FROM \"orders$files\"");
        assertEquals(materializedResult.getRowCount(), 1);
        DataFileRecord datafile = toDataFileRecord(materializedResult.getMaterializedRows().get(0));

        // check content
        assertEquals(datafile.getContent(), DATA.id());

        // Check file format
        assertEquals(datafile.getFileFormat(), "ORC");

        // Check file row count
        assertEquals(datafile.getRecordCount(), 15000L);

        // Check per-column value count
        datafile.getValueCounts().values().forEach(valueCount -> assertEquals(valueCount, (Long) 15000L));

        // Check per-column null value count
        datafile.getNullValueCounts().values().forEach(nullValueCount -> assertEquals(nullValueCount, (Long) 0L));

        // Check NaN value count
        // TODO: add more checks after NaN info is collected
        assertNull(datafile.getNanValueCounts());

        // Check per-column lower bound
        Map<Integer, String> lowerBounds = datafile.getLowerBounds();
        assertQuery("SELECT min(orderkey) FROM tpch.tiny.orders", "VALUES " + lowerBounds.get(1));
        assertQuery("SELECT min(custkey) FROM tpch.tiny.orders", "VALUES " + lowerBounds.get(2));
        assertQuery("SELECT min(orderstatus) FROM tpch.tiny.orders", "VALUES '" + lowerBounds.get(3) + "'");
        assertQuery("SELECT min(totalprice) FROM tpch.tiny.orders", "VALUES " + lowerBounds.get(4));
        assertQuery("SELECT min(orderdate) FROM tpch.tiny.orders", "VALUES DATE '" + lowerBounds.get(5) + "'");
        assertQuery("SELECT min(orderpriority) FROM tpch.tiny.orders", "VALUES '" + lowerBounds.get(6) + "'");
        assertQuery("SELECT min(clerk) FROM tpch.tiny.orders", "VALUES '" + lowerBounds.get(7) + "'");
        assertQuery("SELECT min(shippriority) FROM tpch.tiny.orders", "VALUES " + lowerBounds.get(8));
        assertQuery("SELECT min(comment) FROM tpch.tiny.orders", "VALUES '" + lowerBounds.get(9) + "'");

        // Check per-column upper bound
        Map<Integer, String> upperBounds = datafile.getUpperBounds();
        assertQuery("SELECT max(orderkey) FROM tpch.tiny.orders", "VALUES " + upperBounds.get(1));
        assertQuery("SELECT max(custkey) FROM tpch.tiny.orders", "VALUES " + upperBounds.get(2));
        assertQuery("SELECT max(orderstatus) FROM tpch.tiny.orders", "VALUES '" + upperBounds.get(3) + "'");
        assertQuery("SELECT max(totalprice) FROM tpch.tiny.orders", "VALUES " + upperBounds.get(4));
        assertQuery("SELECT max(orderdate) FROM tpch.tiny.orders", "VALUES DATE '" + upperBounds.get(5) + "'");
        assertQuery("SELECT max(orderpriority) FROM tpch.tiny.orders", "VALUES '" + upperBounds.get(6) + "'");
        assertQuery("SELECT max(clerk) FROM tpch.tiny.orders", "VALUES '" + upperBounds.get(7) + "'");
        assertQuery("SELECT max(shippriority) FROM tpch.tiny.orders", "VALUES " + upperBounds.get(8));
        assertQuery("SELECT max(comment) FROM tpch.tiny.orders", "VALUES '" + upperBounds.get(9) + "'");

        assertUpdate("DROP TABLE orders");
    }

    @Test
    public void testWithNulls()
    {
        assertUpdate("CREATE TABLE test_with_nulls (_integer INTEGER, _real REAL, _string VARCHAR)  WITH (\"write.format.default\" = 'ORC')");
        assertUpdate("INSERT INTO test_with_nulls VALUES (7, 3.4, 'aaa'), (3, 4.5, 'bbb'), (4, null, 'ccc'), (null, null, 'ddd')", 4);
        MaterializedResult materializedResult = computeActual("SELECT * FROM \"test_with_nulls$files\"");
        assertEquals(materializedResult.getRowCount(), 1);
        DataFileRecord datafile = toDataFileRecord(materializedResult.getMaterializedRows().get(0));

        // Check per-column value count
        datafile.getValueCounts().values().forEach(valueCount -> assertEquals(valueCount, (Long) 4L));

        // Check per-column null value count
        assertEquals(datafile.getNullValueCounts().get(1), (Long) 1L);
        assertEquals(datafile.getNullValueCounts().get(2), (Long) 2L);
        assertEquals(datafile.getNullValueCounts().get(3), (Long) 0L);

        // Check per-column lower bound
        assertEquals(datafile.getLowerBounds().get(1), "3");
        assertEquals(datafile.getLowerBounds().get(2), "3.4");
        assertEquals(datafile.getLowerBounds().get(3), "aaa");

        assertUpdate("DROP TABLE test_with_nulls");

        assertUpdate("CREATE TABLE test_all_nulls (_integer INTEGER) WITH (\"write.format.default\" = 'ORC')");
        assertUpdate("INSERT INTO test_all_nulls VALUES null, null, null", 3);
        materializedResult = computeActual("SELECT * FROM \"test_all_nulls$files\"");
        assertEquals(materializedResult.getRowCount(), 1);
        datafile = toDataFileRecord(materializedResult.getMaterializedRows().get(0));

        // Check per-column value count
        assertEquals(datafile.getValueCounts().get(1), (Long) 3L);

        // Check per-column null value count
        assertEquals(datafile.getNullValueCounts().get(1), (Long) 3L);

        // Check that lower bounds and upper bounds are nulls. (There's no non-null record)
        assertNull(datafile.getLowerBounds());
        assertNull(datafile.getUpperBounds());

        assertUpdate("DROP TABLE test_all_nulls");
    }

    @Test
    public void testWithNaNs()
    {
        assertUpdate("CREATE TABLE test_with_nans (_int INTEGER, _real REAL, _double DOUBLE)");
        assertUpdate("INSERT INTO test_with_nans VALUES (1, 1.1, 1.1), (2, cast(nan() as real), 4.5), (3, 4.6, -nan())", 3);
        MaterializedResult materializedResult = computeActual("SELECT * FROM \"test_with_nans$files\"");
        assertEquals(materializedResult.getRowCount(), 1);
        DataFileRecord datafile = toDataFileRecord(materializedResult.getMaterializedRows().get(0));

        // Check per-column value count
        datafile.getValueCounts().values().forEach(valueCount -> assertEquals(valueCount, (Long) 3L));

        // TODO: add more checks after NaN info is collected
        assertEquals(datafile.getNanValueCounts().size(), 0);
        assertNull(datafile.getLowerBounds().get(2));
        assertNull(datafile.getLowerBounds().get(3));
        assertNull(datafile.getUpperBounds().get(2));
        assertNull(datafile.getUpperBounds().get(3));

        assertUpdate("DROP TABLE test_with_nans");
    }

    @Test
    public void testNestedTypes()
    {
        assertUpdate("CREATE TABLE test_nested_types (col1 INTEGER, col2 ROW (f1 INTEGER, f2 ARRAY(INTEGER), f3 DOUBLE)) WITH (\"write.format.default\" = 'ORC')");
        assertUpdate("INSERT INTO test_nested_types VALUES " +
                "(7, ROW(3, ARRAY[10, 11, 19], 1.9)), " +
                "(-9, ROW(4, ARRAY[13, 16, 20], -2.9)), " +
                "(8, ROW(0, ARRAY[14, 17, 21], 3.9)), " +
                "(3, ROW(10, ARRAY[15, 18, 22], 4.9))", 4);
        MaterializedResult materializedResult = computeActual("SELECT * FROM \"test_nested_types$files\"");
        assertEquals(materializedResult.getRowCount(), 1);
        DataFileRecord datafile = toDataFileRecord(materializedResult.getMaterializedRows().get(0));

        Map<Integer, String> lowerBounds = datafile.getLowerBounds();
        Map<Integer, String> upperBounds = datafile.getUpperBounds();

        // Only
        // 1. top-level primitive columns
        // 2. and nested primitive fields that are not descendants of LISTs or MAPs
        // should appear in lowerBounds or UpperBounds
        assertEquals(lowerBounds.size(), 3);
        assertEquals(upperBounds.size(), 3);

        // col1
        assertEquals(lowerBounds.get(1), "-9");
        assertEquals(upperBounds.get(1), "8");

        // col2.f1 (key in lowerBounds/upperBounds is Iceberg ID)
        assertEquals(lowerBounds.get(3), "0");
        assertEquals(upperBounds.get(3), "10");

        // col2.f3 (key in lowerBounds/upperBounds is Iceberg ID)
        assertEquals(lowerBounds.get(5), "-2.9");
        assertEquals(upperBounds.get(5), "4.9");

        assertUpdate("DROP TABLE test_nested_types");
    }

    public static class DataFileRecord
    {
        private final int content;
        private final String filePath;
        private final String fileFormat;
        private final long recordCount;
        private final long fileSizeInBytes;
        private final Map<Integer, Long> columnSizes;
        private final Map<Integer, Long> valueCounts;
        private final Map<Integer, Long> nullValueCounts;
        private final Map<Integer, Long> nanValueCounts;

        private final Map<Integer, String> lowerBounds;
        private final Map<Integer, String> upperBounds;

        public static DataFileRecord toDataFileRecord(MaterializedRow row)
        {
            assertEquals(row.getFieldCount(), 14);
            return new DataFileRecord(
                    (int) row.getField(0),
                    (String) row.getField(1),
                    (String) row.getField(2),
                    (long) row.getField(3),
                    (long) row.getField(4),
                    row.getField(5) != null ? ImmutableMap.copyOf((Map<Integer, Long>) row.getField(5)) : null,
                    row.getField(6) != null ? ImmutableMap.copyOf((Map<Integer, Long>) row.getField(6)) : null,
                    row.getField(7) != null ? ImmutableMap.copyOf((Map<Integer, Long>) row.getField(7)) : null,
                    row.getField(8) != null ? ImmutableMap.copyOf((Map<Integer, Long>) row.getField(8)) : null,
                    row.getField(9) != null ? ImmutableMap.copyOf((Map<Integer, String>) row.getField(9)) : null,
                    row.getField(10) != null ? ImmutableMap.copyOf((Map<Integer, String>) row.getField(10)) : null);
        }

        private DataFileRecord(
                int content,
                String filePath,
                String fileFormat,
                long recordCount,
                long fileSizeInBytes,
                Map<Integer, Long> columnSizes,
                Map<Integer, Long> valueCounts,
                Map<Integer, Long> nullValueCounts,
                Map<Integer, Long> nanValueCounts,
                Map<Integer, String> lowerBounds,
                Map<Integer, String> upperBounds)
        {
            this.content = content;
            this.filePath = filePath;
            this.fileFormat = fileFormat;
            this.recordCount = recordCount;
            this.fileSizeInBytes = fileSizeInBytes;
            this.columnSizes = columnSizes;
            this.valueCounts = valueCounts;
            this.nullValueCounts = nullValueCounts;
            this.nanValueCounts = nanValueCounts;
            this.lowerBounds = lowerBounds;
            this.upperBounds = upperBounds;
        }

        public int getContent()
        {
            return content;
        }

        public String getFileFormat()
        {
            return fileFormat;
        }

        public long getRecordCount()
        {
            return recordCount;
        }

        public Map<Integer, Long> getValueCounts()
        {
            return valueCounts;
        }

        public Map<Integer, Long> getNullValueCounts()
        {
            return nullValueCounts;
        }

        public Map<Integer, Long> getNanValueCounts()
        {
            return nanValueCounts;
        }

        public Map<Integer, String> getLowerBounds()
        {
            return lowerBounds;
        }

        public Map<Integer, String> getUpperBounds()
        {
            return upperBounds;
        }
    }
}