TestLargeRowGroup.java

/*
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.facebook.presto.parquet.reader;

import com.facebook.presto.parquet.Field;
import com.facebook.presto.parquet.ParquetDataSourceId;
import com.facebook.presto.parquet.cache.MetadataReader;
import io.airlift.units.DataSize;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.column.Encoding;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
import org.apache.parquet.hadoop.metadata.FileMetaData;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.io.MessageColumnIO;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType;
import org.testng.annotations.Test;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;

import static com.facebook.presto.common.type.VarcharType.VARCHAR;
import static com.facebook.presto.parquet.ParquetTypeUtils.getColumnIO;
import static com.facebook.presto.parquet.ParquetTypeUtils.lookupColumnByName;
import static com.facebook.presto.parquet.reader.TestEncryption.constructField;
import static com.facebook.presto.parquet.reader.TestEncryption.createParquetReader;
import static com.facebook.presto.parquet.reader.TestEncryption.validateColumn;
import static java.lang.String.format;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
import static org.apache.parquet.schema.Type.Repetition.OPTIONAL;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;

public class TestLargeRowGroup
{
    private static final DataSize MAX_DATA_SOURCE_BUFFER_SIZE = new DataSize(16, DataSize.Unit.MEGABYTE);
    private final Configuration conf = new Configuration(false);

    @Test
    public void testDataSourceIsReadInBoundedChunksForALargeColumnChunk()
            throws IOException
    {
        MessageType schema = new MessageType("schema", new PrimitiveType(OPTIONAL, BINARY, "col1"));
        //Create a parquet file with a columnChunk has more than MAX_DATA_SOURCE_BUFFER_SIZE of data
        TestFile inputFile = new TestFileBuilder(conf, schema)
                .withCodec("UNCOMPRESSED")
                //20000 record count => more than 16 MiB of data
                .withNumRecord(200000)
                //Allow for a very large row group
                .withRowGroupSize(1024 * 1024 * 1024)
                .build();

        TestParquetFileProperties parquetFile = createTestParquetFile(inputFile);
        List<BlockMetaData> rowGroupsMetadata = parquetFile.getParquetMetadata().getBlocks();
        assertEquals(rowGroupsMetadata.size(), 1, "Test requires only 1 row group to be created");

        ColumnChunkMetaData col1ColChunkMetadata = rowGroupsMetadata.get(0).getColumns().get(0);
        long totalSize = col1ColChunkMetadata.getTotalSize();

        assertTrue(totalSize > MAX_DATA_SOURCE_BUFFER_SIZE.toBytes(), "Test setup requires a single ColumnChunk more than MAX_DATA_SOURCE_BUFFER_SIZE");

        //Read all columns in the file
        validateFile(parquetFile.getParquetReader(), parquetFile.getMessageColumnIO(), inputFile);

        List<Integer> bytesFetchedPerCall = parquetFile.getDataSource().getDataSourceBytesFetchedPerCall();
        assertTrue(bytesFetchedPerCall.size() > 1, "Expected more than one call to dataSource");

        //Verify that we don't read more tha MAX_DATA_SOURCE_BUFFER_SIZE in any 'read' from the dataSource
        for (Integer length : bytesFetchedPerCall) {
            assertTrue(length <= MAX_DATA_SOURCE_BUFFER_SIZE.toBytes());
        }
    }

    //We test that the total memory consumed to parse and read this column chunk is NOT a function of the number of data pages
    // in a ColumnChunk. Instead, we use a bounded amount of memory to read any ColumnChunk since we don't materialize the full
    // chunk in memory at any time
    @Test
    public void testMemoryUsedIsNotAFunctionOfDataPageCount()
            throws IOException
    {
        MessageType schema = new MessageType("schema", new PrimitiveType(OPTIONAL, BINARY, "col1"));

        //These max memory used by the parquet reader will change based on the implementation/memory accounting changes
        //Update this value as needed
        //We set the expected max to 30% more than MAX_DATA_SOURCE_BUFFER_SIZE
        long expectedMaxMemoryUsage = (long) (MAX_DATA_SOURCE_BUFFER_SIZE.toBytes() * 1.30);

        for (Integer testPageCount : Arrays.asList(1000, 5000, 10000, 20000)) {
            //Create an input file with 1 row group, 1 column and a pre-determined number of data pages
            TestParquetFileProperties parquetFile = createTestParquetFile(schema, testPageCount);
            long expectedRowCount = parquetFile.getParquetMetadata().getBlocks().get(0).getRowCount();

            //Read the column completely
            Field col1 = constructField(VARCHAR, lookupColumnByName(parquetFile.getMessageColumnIO(), "col1")).orElse(null);
            long maxSystemMemoryUsed = 0;

            ParquetReader parquetReader = parquetFile.getParquetReader();
            long totalRowsRead = 0;
            while (totalRowsRead < expectedRowCount) {
                parquetReader.nextBatch();
                int rowsRead = parquetReader.readBlock(col1).getPositionCount();
                totalRowsRead += rowsRead;
                maxSystemMemoryUsed = Math.max(maxSystemMemoryUsed, parquetReader.getSystemMemoryUsage());
            }

            assertTrue(maxSystemMemoryUsed < expectedMaxMemoryUsage,
                    format("[%d] pages :: %s :: actual [%d], expected < [%d]", testPageCount, "maxSystemMemoryUsed", maxSystemMemoryUsed, expectedMaxMemoryUsage));

            parquetReader.close();
        }
    }
    private TestParquetFileProperties createTestParquetFile(MessageType schema, int requestedPageCount)
            throws IOException
    {
        int pageSize = 100;
        TestFile inputFile = new TestFileBuilder(conf, schema)
                .withCodec("UNCOMPRESSED")
                .withPageSize(pageSize) //Keep page size small, so we get a column chunk with a lot of pages
                .withNumRecord(requestedPageCount * pageSize) //Since we're using an UNCOMPRESSED file, we can set the record count to an exact multiple of the pageSize
                //Keep row group size large, so we get can fit all pages in one row group
                .withRowGroupSize(1024 * 1024 * 1024)
                .build();

        TestParquetFileProperties parquetFile = createTestParquetFile(inputFile);
        List<BlockMetaData> rowGroupsMetadata = parquetFile.getParquetMetadata().getBlocks();
        assertEquals(rowGroupsMetadata.size(), 1, "Test requires only 1 row group to be created");

        ColumnChunkMetaData col1ColChunkMetadata = rowGroupsMetadata.get(0).getColumns().get(0);
        int actualDataPageCount = col1ColChunkMetadata.getEncodingStats().getNumDataPagesEncodedAs(Encoding.PLAIN);
        assertEquals(actualDataPageCount, requestedPageCount);

        return parquetFile;
    }

    private TestParquetFileProperties createTestParquetFile(TestFile inputFile)
            throws IOException
    {
        Path path = new Path(inputFile.getFileName());
        FileSystem fileSystem = path.getFileSystem(conf);
        FSDataInputStream inputStream = fileSystem.open(path);
        MockParquetDataSource dataSource = new MockParquetDataSource(new ParquetDataSourceId(path.toString()), inputStream);
        ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, inputFile.getFileSize(), Optional.empty(), false).getParquetMetadata();
        FileMetaData fileMetaData = parquetMetadata.getFileMetaData();
        MessageType fileSchema = fileMetaData.getSchema();
        MessageColumnIO messageColumn = getColumnIO(fileSchema, fileSchema);
        ParquetReader parquetReader = createParquetReader(parquetMetadata, messageColumn, dataSource, Optional.empty(), MAX_DATA_SOURCE_BUFFER_SIZE);

        return new TestParquetFileProperties(messageColumn, parquetReader, dataSource, parquetMetadata);
    }

    private static void validateFile(ParquetReader parquetReader, MessageColumnIO messageColumn, TestFile inputFile)
            throws IOException
    {
        int rowIndex = 0;
        int batchSize = parquetReader.nextBatch();
        while (batchSize > 0) {
            validateColumn("col1", VARCHAR, rowIndex, parquetReader, messageColumn, inputFile, new String[0]);
            rowIndex += batchSize;
            batchSize = parquetReader.nextBatch();
        }
    }
    private static class TestParquetFileProperties
    {
        private MessageColumnIO messageColumnIO;
        private ParquetReader parquetReader;
        private MockParquetDataSource dataSource;
        private ParquetMetadata parquetMetadata;

        public TestParquetFileProperties(MessageColumnIO messageColumnIO, ParquetReader parquetReader, MockParquetDataSource dataSource, ParquetMetadata parquetMetadata)
        {
            this.messageColumnIO = messageColumnIO;
            this.parquetReader = parquetReader;
            this.dataSource = dataSource;
            this.parquetMetadata = parquetMetadata;
        }

        public MessageColumnIO getMessageColumnIO()
        {
            return messageColumnIO;
        }

        public ParquetReader getParquetReader()
        {
            return parquetReader;
        }

        public MockParquetDataSource getDataSource()
        {
            return dataSource;
        }

        public ParquetMetadata getParquetMetadata()
        {
            return parquetMetadata;
        }
    }
}